1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 |
- from operator import attrgetter
- from httpx import AsyncClient
- from loguru import logger
- from app.schemas.bluetooth import IBeaconBase
- from app.services.platform import DataPlatformService
- class SpacePositioningController:
- def __init__(self, realtime_ibeacon_list: list[IBeaconBase], ibeacon_space_dict: dict[str, str]):
- super(SpacePositioningController, self).__init__()
- self.realtime_ibeacon_list = realtime_ibeacon_list
- self.ibeacon_space_dict = ibeacon_space_dict
- def run(self) -> str:
- ibeacon_list = sorted(
- self.realtime_ibeacon_list, key=attrgetter("rssi"), reverse=True
- )
- space = ""
- for ibeacon in ibeacon_list:
- if ibeacon.rssi != 0:
- strongest = ibeacon
- logger.debug(strongest)
- strongest_id = str(strongest.major) + "-" + str(strongest.minor)
- space = self.ibeacon_space_dict.get(strongest_id)
- if space:
- logger.debug(strongest_id)
- break
- else:
- space = ""
- return space
- @logger.catch()
- async def get_space_location(project_id: str, realtime_ibeacon_list: list[IBeaconBase]) -> str:
- async with AsyncClient() as client:
- platform = DataPlatformService(client, project_id)
- spaces = await platform.get_items_by_category("Sp")
- ibeacon_space_dict = dict()
- for sp in spaces:
- try:
- major = str(sp["infos"]["ctm-iBeacon-Major"])
- minor = str(sp["infos"]["ctm-iBeacon-Minor"])
- if len(major) > 0 and len(minor) > 0:
- k = major + "-" + minor
- v = sp["id"]
- ibeacon_space_dict.update({k: v})
- except KeyError:
- pass
- logger.debug(ibeacon_space_dict)
- controller = SpacePositioningController(realtime_ibeacon_list, ibeacon_space_dict)
- return controller.run()
|