# -*- coding: utf-8 -*- """ data_platform_api ~~~~~~~~~~~~~~~~~ This module handle the data platform api request, query equipment data, creat alarm and so on. """ import datetime import logging import redis import requests from requests import exceptions logger = logging.getLogger(__name__) pool = redis.ConnectionPool(host='172.16.34.43', port=6379, db=12, password='deepsaga', decode_responses=True) r = redis.Redis(connection_pool=pool) DATA_PLATFORM_SERVER = 'http://api.sagacloud.cn/data-platform-3/' ALARM_SERVER = 'http://47.93.33.207:8890/alarm-system/' class PlatformRequest: """The Base Request for Platform""" def __init__(self, server, url, params, payload, timeout=5): self._url = server + url self._params = params self._payload = payload self._timeout = timeout def send_request(self): result = dict() try: response = requests.post(self._url, params=self._params, json=self._payload, timeout=self._timeout) except exceptions as e: raise Exception(f'Could not connect to platform server {e}') else: if response.status_code == 200: result = response.json() else: logger.info(f'request error {response.status_code}, {response.reason}') finally: return result def query_real_time_data(project_id, equip_id, code_list): url = 'parameter/batch_query_param' params = { 'projectId': project_id, 'secret': 'saga123456' } request_param_list = [{'id': equip_id, 'code': item} for item in code_list] payload = { 'criterias': request_param_list } porter = PlatformRequest(DATA_PLATFORM_SERVER, url, params, payload) raw_result = porter.send_request() result = {item.get('code'): item.get('data') for item in raw_result.get('Content')} return result def query_history_time_data(project_id, equip_id, code, period): url = 'hisdata/query_by_obj' from_time = (datetime.datetime.now() - datetime.timedelta(minutes=period)).strftime('%Y%m%d%H%M%S') params = { 'projectId': project_id, 'secret': 'saga123456' } payload = { 'criteria': { 'id': equip_id, 'code': code, 'receivetime': { '$gte': from_time } } } porter = PlatformRequest(DATA_PLATFORM_SERVER, url, params, payload) raw_result = porter.send_request() result = [item.get('data') for item in raw_result.get('Content')] return result def send_alarm(project_id, object_id, type_code, description): url = 'alarm/create' params = { 'projectId': project_id, 'secret': 'saga123456' } time_stamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') payload = { 'status': 'alarm', 'objects': [object_id], 'ctime': time_stamp, 'level': 'S', 'type_code': type_code, 'description': description } porter = PlatformRequest(ALARM_SERVER, url, params, payload) raw_result = porter.send_request() result = raw_result.get('id') logger.info(f'Send alarm: {result}') save_alarm_info(project_id, object_id, result, time_stamp) return result def is_alarmed(project_id, object_id, alarm_type): logger.info(f'Try to send alarm - {object_id}') url = 'alarm/query' params = { 'projectId': project_id, 'secret': 'saga123456' } from_time = (datetime.datetime.now() - datetime.timedelta(minutes=30)).strftime('%Y%m%d%H%M%S') payload = { 'criteria': { 'object': object_id, 'type_code': alarm_type, 'ctime': { '$gte': from_time } }, } porter = PlatformRequest(ALARM_SERVER, url, params, payload) raw_result = porter.send_request() flag = False for item in raw_result.get('Content'): if 'alarm' == item.get('status'): flag = True return flag def recover_alarm(project_id, alarm_id): url = 'alarm/batchRecover' params = { 'projectId': project_id, 'secret': 'saga123456' } payload = { 'alarmId': [alarm_id] } porter = PlatformRequest(ALARM_SERVER, url, params, payload) raw_result = porter.send_request() logger.info(f'Recover alarm: {alarm_id}') return raw_result.get('Result') def save_alarm_info(project_id, object_id, alarm_id, time_stamp): name = f'alarm_{project_id}_{object_id}' r.hset(name, alarm_id, time_stamp) def query_redis_alarm(project_id, object_id): name = f'alarm_{project_id}_{object_id}' result = r.hgetall(name) return result