123456789101112131415161718192021222324252627282930313233343536373839404142 |
- from httpx import AsyncClient, URL
- from app.core.config import settings
- from app.services.service import Service
- from app.utils.date import get_time_str
- class RealWorldService(Service):
- def __init__(
- self,
- client: AsyncClient,
- group_code: str,
- project_id: str,
- app_id: str | None,
- user_id: str | None,
- server_settings=settings,
- ):
- super(RealWorldService, self).__init__(client)
- self._group_code = group_code
- self._project_id = project_id
- self._app_id = app_id
- self._user_id = user_id
- self._base_url = URL(server_settings.PLATFORM_HOST)
- self._now_time = get_time_str()
- def _common_parameters(self) -> dict:
- return {"groupCode": self._group_code, "projectId": self._project_id}
- async def query_instance(
- self, object_id: list | None, class_code: str | None
- ):
- url = self._base_url.join("/rwd/instance/object/query")
- params = self._common_parameters()
- criteria = dict()
- if object_id:
- criteria.update({"id": object_id})
- if class_code:
- criteria.update({"classCode": class_code})
- payload = {"criteria": criteria}
- raw_info = await self._post(url, params, payload)
- return raw_info
|