rwd.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. from httpx import AsyncClient, URL
  2. from app.core.config import settings
  3. from app.services.service import Service
  4. from app.utils.date import get_time_str
  5. class RealWorldService(Service):
  6. def __init__(
  7. self,
  8. client: AsyncClient,
  9. group_code: str,
  10. project_id: str,
  11. app_id: str | None,
  12. user_id: str | None,
  13. server_settings=settings,
  14. ):
  15. super(RealWorldService, self).__init__(client)
  16. self._group_code = group_code
  17. self._project_id = project_id
  18. self._app_id = app_id
  19. self._user_id = user_id
  20. self._base_url = URL(server_settings.PLATFORM_HOST)
  21. self._now_time = get_time_str()
  22. def _common_parameters(self) -> dict:
  23. return {"groupCode": self._group_code, "projectId": self._project_id}
  24. async def query_instance(
  25. self, object_id: list | None, class_code: str | None
  26. ):
  27. url = self._base_url.join("/rwd/instance/object/query")
  28. params = self._common_parameters()
  29. criteria = dict()
  30. if object_id:
  31. criteria.update({"id": object_id})
  32. if class_code:
  33. criteria.update({"classCode": class_code})
  34. payload = {"criteria": criteria}
  35. raw_info = await self._post(url, params, payload)
  36. return raw_info