from operator import attrgetter from httpx import AsyncClient from loguru import logger from app.schemas.bluetooth import IBeaconBase from app.services.platform import DataPlatformService class SpacePositioningController: def __init__(self, realtime_ibeacon_list: list[IBeaconBase], ibeacon_space_dict: dict[str, str]): super(SpacePositioningController, self).__init__() self.realtime_ibeacon_list = realtime_ibeacon_list self.ibeacon_space_dict = ibeacon_space_dict def run(self) -> str: ibeacon_list = sorted( self.realtime_ibeacon_list, key=attrgetter("rssi"), reverse=True ) space = "" for ibeacon in ibeacon_list: if ibeacon.rssi != 0: strongest = ibeacon logger.debug(strongest) strongest_id = str(strongest.major) + "-" + str(strongest.minor) space = self.ibeacon_space_dict.get(strongest_id) if space: logger.debug(strongest_id) break else: space = "" return space @logger.catch() async def get_space_location(project_id: str, realtime_ibeacon_list: list[IBeaconBase]) -> str: async with AsyncClient() as client: platform = DataPlatformService(client, project_id) spaces = await platform.get_items_by_category("Sp") ibeacon_space_dict = dict() for sp in spaces: try: major = str(sp["infos"]["ctm-iBeacon-Major"]) minor = str(sp["infos"]["ctm-iBeacon-Minor"]) if len(major) > 0 and len(minor) > 0: k = major + "-" + minor v = sp["id"] ibeacon_space_dict.update({k: v}) except KeyError: pass logger.debug(ibeacon_space_dict) controller = SpacePositioningController(realtime_ibeacon_list, ibeacon_space_dict) return controller.run()