|
@@ -1,110 +0,0 @@
|
|
|
-
|
|
|
-import base64
|
|
|
-import hmac
|
|
|
-import json
|
|
|
-import sys
|
|
|
-import threading
|
|
|
-import time
|
|
|
-import uuid
|
|
|
-from hashlib import sha256
|
|
|
-
|
|
|
-if (sys.version_info.major == 3):
|
|
|
- import urllib.request as urllib2
|
|
|
-else:
|
|
|
- import urllib2
|
|
|
-
|
|
|
-import ssl
|
|
|
-
|
|
|
-
|
|
|
-ssl._create_default_https_context = ssl._create_unverified_context
|
|
|
-
|
|
|
-host = "https://192.168.64.44:443"
|
|
|
-appKey = "25227103"
|
|
|
-appSecret = "LMGm1LNhAE7zF6Alzqdz"
|
|
|
-
|
|
|
-
|
|
|
-class HkSecure():
|
|
|
- def __init__(self, host, appKey, appSecret):
|
|
|
- self.host = host
|
|
|
- self.appKey = appKey
|
|
|
- self.appSecret = appSecret
|
|
|
- self.token = ""
|
|
|
- flushTokenThread = threading.Thread(target=self.update_token)
|
|
|
- flushTokenThread.start()
|
|
|
- while 1:
|
|
|
- if self.token:
|
|
|
- break
|
|
|
- time.sleep(1)
|
|
|
-
|
|
|
- def update_token(self):
|
|
|
- while 1:
|
|
|
- try:
|
|
|
- print("update token every 11 hours")
|
|
|
- self.get_token()
|
|
|
- time.sleep(3600 * 11)
|
|
|
- except Exception as e:
|
|
|
- print("update token error: wait for 60 seconds")
|
|
|
- time.sleep(60)
|
|
|
-
|
|
|
-
|
|
|
- def get_token(self):
|
|
|
- path = "/artemis/api/v1/oauth/token"
|
|
|
- base_headers = {
|
|
|
- "Accept": "*/*",
|
|
|
- "Content-Type": "application/json",
|
|
|
- }
|
|
|
- headers = self.build_signature("POST", path, base_headers)
|
|
|
- print(headers)
|
|
|
- post1 = self.post(self.host + path, "", headers)
|
|
|
- self.token = json.loads(post1)["data"]["access_token"]
|
|
|
- print(self.token)
|
|
|
-
|
|
|
-
|
|
|
- def base_request(self, url, data):
|
|
|
- headers = {
|
|
|
- "access_token": self.token,
|
|
|
- "Content-Type": "application/json"
|
|
|
- }
|
|
|
- post = self.post(self.host + url, data, headers)
|
|
|
- return post
|
|
|
-
|
|
|
- def post(self, url, postData, headers):
|
|
|
- if isinstance(postData, dict):
|
|
|
- postData = json.dumps(postData)
|
|
|
- postData = postData.encode("utf-8")
|
|
|
- req = urllib2.Request(url, data=postData,
|
|
|
- headers=headers)
|
|
|
- res = urllib2.urlopen(req, timeout=60).read().decode("utf-8")
|
|
|
- return res
|
|
|
-
|
|
|
-
|
|
|
- def get_millisecond(self):
|
|
|
- millis = int(round(time.time() * 1000))
|
|
|
- return millis
|
|
|
-
|
|
|
- def get_sha256(self, data, key):
|
|
|
- key = key.encode("utf-8")
|
|
|
- data = data.encode("utf-8")
|
|
|
- return base64.b64encode(hmac.new(key, data, digestmod=sha256).digest()).decode("utf-8")
|
|
|
-
|
|
|
- def build_signature(self, httpType, url, headers):
|
|
|
- text = httpType + "\n"
|
|
|
- for header in sorted(headers):
|
|
|
- value = str(headers[header])
|
|
|
- if "x-ca-" in header:
|
|
|
- value = header + ":" + value
|
|
|
- text += value
|
|
|
- text += "\n"
|
|
|
- text += url
|
|
|
- sha_ = self.get_sha256(text, self.appSecret)
|
|
|
- headers["x-ca-signature"] = sha_
|
|
|
- headers["x-ca-key"] = self.appKey
|
|
|
- headers["x-ca-nonce"] = str(uuid.uuid4())
|
|
|
- return headers
|
|
|
-
|
|
|
-
|
|
|
-hksecure = HkSecure(host, appKey, appSecret)
|
|
|
-url = "/artemis/api/acs/v1/door/states"
|
|
|
-data = {"doorIndexCodes": ["f8f6ec5b4030477ebc9c838dc5ac670d"]}
|
|
|
-re = hksecure.base_request(url, data)
|
|
|
-print(re)
|