space.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. from operator import attrgetter
  2. from httpx import AsyncClient
  3. from loguru import logger
  4. from app.schemas.bluetooth import IBeaconBase
  5. from app.services.platform import DataPlatformService
  6. class SpacePositioningController:
  7. def __init__(self, realtime_ibeacon_list: list[IBeaconBase], ibeacon_space_dict: dict[str, str]):
  8. super(SpacePositioningController, self).__init__()
  9. self.realtime_ibeacon_list = realtime_ibeacon_list
  10. self.ibeacon_space_dict = ibeacon_space_dict
  11. def run(self) -> str:
  12. ibeacon_list = sorted(
  13. self.realtime_ibeacon_list, key=attrgetter("rssi"), reverse=True
  14. )
  15. space = ""
  16. for ibeacon in ibeacon_list:
  17. if ibeacon.rssi != 0:
  18. strongest = ibeacon
  19. logger.debug(strongest)
  20. strongest_id = str(strongest.major) + "-" + str(strongest.minor)
  21. space = self.ibeacon_space_dict.get(strongest_id)
  22. if space:
  23. logger.debug(strongest_id)
  24. break
  25. else:
  26. space = ""
  27. return space
  28. @logger.catch()
  29. async def get_space_location(project_id: str, realtime_ibeacon_list: list[IBeaconBase]) -> str:
  30. async with AsyncClient() as client:
  31. platform = DataPlatformService(client, project_id)
  32. spaces = await platform.get_items_by_category("Sp")
  33. ibeacon_space_dict = dict()
  34. for sp in spaces:
  35. try:
  36. major = str(sp["infos"]["ctm-iBeacon-Major"])
  37. minor = str(sp["infos"]["ctm-iBeacon-Minor"])
  38. if len(major) > 0 and len(minor) > 0:
  39. k = major + "-" + minor
  40. v = sp["id"]
  41. ibeacon_space_dict.update({k: v})
  42. except KeyError:
  43. pass
  44. logger.debug(ibeacon_space_dict)
  45. controller = SpacePositioningController(realtime_ibeacon_list, ibeacon_space_dict)
  46. return controller.run()