Files
2025-04-30 08:48:49 -05:00

497 lines
20 KiB
Python

# uncompyle6 version 3.9.2
# Python bytecode version base 3.7.0 (3394)
# Decompiled from: Python 3.8.19 (default, Mar 20 2024, 15:27:52)
# [Clang 14.0.6 ]
# Embedded file name: /var/user/app/device_supervisorbak/device_supervisor/lib/linkkit/h2client.py
# Compiled at: 2024-04-18 03:12:57
# Size of source mod 2**32: 19761 bytes
import hyper, time, hmac, ssl, logging, os, threading, crcmod, concurrent.futures, hashlib
def _assert_value(condition, error_msg):
if not condition:
raise ValueError(error_msg)
_H2_OPT_HEART_BEAT_TIME_DEFAULT = 25
_H2_OPT_PORT_DEFAULT = 443
_H2_MAX_FILE_SIZE = 1073741824
def h2_set_option(opt, value):
global _H2_MAX_FILE_SIZE
global _H2_OPT_HEART_BEAT_TIME_DEFAULT
global _H2_OPT_PORT_DEFAULT
if "heart_beat_interval" == opt:
_H2_OPT_HEART_BEAT_TIME_DEFAULT = value
else:
if "port" == opt:
_H2_OPT_PORT_DEFAULT = value
else:
if "max_file_size" == opt:
_H2_MAX_FILE_SIZE = value
class StreamHandler:
def __init__(self):
pass
def __enter__(self):
pass
def __exit__(self, type, value, trace):
pass
def get_content_length(self):
pass
def next(self):
pass
def has_next(self):
return False
class FileStreamHandler(StreamHandler):
def __init__(self, filename, block_size=524288, opt_crc64=False):
self._FileStreamHandler__filename = filename
self._FileStreamHandler__block_size = block_size
self._FileStreamHandler__size = os.stat(filename).st_size
self._FileStreamHandler__opt_crc64 = opt_crc64
self._FileStreamHandler__last_crc = 0
self._FileStreamHandler__read_size = 0
def get_content_length(self):
return self._FileStreamHandler__size
def __enter__(self):
logging.debug("open the file, filename:%s" % self._FileStreamHandler__filename)
self._FileStreamHandler__f = open(self._FileStreamHandler__filename, "rb")
self._FileStreamHandler__read_size = 0
def __exit__(self, type, value, trace):
if self._FileStreamHandler__f:
self._FileStreamHandler__f.close()
self._FileStreamHandler__f = None
def next(self):
if not self._FileStreamHandler__f or self._FileStreamHandler__read_size >= self._FileStreamHandler__size:
return
data = self._FileStreamHandler__f.read(self._FileStreamHandler__block_size)
if data:
self._FileStreamHandler__read_size += len(data)
if self._FileStreamHandler__opt_crc64:
do_crc64 = crcmod.mkCrcFun(23270347676907615891L, initCrc=(self._FileStreamHandler__last_crc), xorOut=18446744073709551615L, rev=True)
self._FileStreamHandler__last_crc = do_crc64(data)
return data
def has_next(self):
return self._FileStreamHandler__f.tell() < self._FileStreamHandler__size
def get_crc64(self):
return self._FileStreamHandler__last_crc
def get_read_size(self):
return self._FileStreamHandler__read_size
class H2Exception(Exception):
def __init__(self, code, msg):
Exception.__init__(self, msg)
self._H2Exception__code = code
self._H2Exception__msg = msg
def get_code(self):
return self._H2Exception__code
def get_msg(self):
return self._H2Exception__msg
def __name__(self):
return "H2Exception"
class UploadFileInfo:
def __init__(self, local_filename, remote_filename=None, overwrite=True):
self.local_filename = local_filename
self.opt_overwrite = overwrite
if not remote_filename:
self.remote_filename = os.path.basename(local_filename)
else:
self.remote_filename = remote_filename
def __name__(self):
return "UploadFileInfo"
class UploadFileResult:
def __init__(self, code=None, exception=None, upload_size=None, total_size=None, file_store_id=None):
self.upload_size = upload_size
self.total_size = total_size
self.file_store_id = file_store_id
self.code = code
self.exception = exception
def __name__(self):
return "UploadFileResult"
class H2FileUploadSink:
def on_file_upload_start(self, id, upload_file_info, user_data):
pass
def on_file_upload_end(self, id, upload_file_info, upload_file_result, user_data):
pass
def on_file_upload_progress(self, id, upload_file_info, upload_file_result, user_data):
pass
class H2FileTask:
def __init__(self, id, file_info, future_result):
self._H2FileTask__file_info = file_info
self._H2FileTask__future_result = future_result
self._H2FileTask__id = id
def get_file_info(self):
return self._H2FileTask__file_info
def get_future_result(self):
return self._H2FileTask__future_result
def result(self, timeout=None):
return self._H2FileTask__future_result.result(timeout)
def cancel(self):
self._H2FileTask__future_result.call()
def get_id(self):
return self._H2FileTask__id
def __name__(self):
return "H2FileTask"
class H2Stream:
def __init__(self, client, id):
self._H2Stream__client = client
self._H2Stream__conn = None
self._H2Stream__total_sent_size = 0
self._H2Stream__path = None
self._H2Stream__id = id
self._H2Stream__stream_id = None
self._H2Stream__x_request_id = None
self._H2Stream__x_data_stream_id = None
def __name__(self):
return "H2Stream"
def get_id(self):
return self._H2Stream__id
def open(self, path, header):
_assert_value(path, "path is required")
with self._H2Stream__client._get_auth_lock():
url = "/stream/open" + path
self._H2Stream__conn = self._H2Stream__client.get_connect()
self._H2Stream__total_sent_size = 0
self._H2Stream__path = path
logging.debug("request url: %s" % url)
conn_header = self._H2Stream__client.get_default_header()
if header:
conn_header.update(header)
req_id = self._H2Stream__conn.request("GET", url, None, conn_header)
response = self._H2Stream__conn.get_response(req_id)
self._H2Stream__check_response(response)
self._H2Stream__x_request_id = response.headers["x-request-id"][0]
self._H2Stream__x_data_stream_id = response.headers["x-data-stream-id"][0]
logging.debug("x_request_id: %s" % self._H2Stream__x_request_id)
logging.debug("x_data_stream_id: %s" % self._H2Stream__x_data_stream_id)
return response
def close(self, header):
logging.debug("close the stream")
final_header = {'x-request-id':self._H2Stream__x_request_id, 'x-data-stream-id':self._H2Stream__x_data_stream_id}
final_header.update(header)
req_id = self._H2Stream__conn.request("GET", "/stream/close/" + self._H2Stream__path, None, final_header)
response = self._H2Stream__conn.get_response(req_id)
self._H2Stream__check_response(response)
return response
def send(self, headers, data_handler):
with self._H2Stream__client._get_auth_lock():
url = "/stream/send" + self._H2Stream__path
logging.debug("request url: %s" % url)
self._H2Stream__stream_id = self._H2Stream__conn.putrequest("GET", url)
self._H2Stream__conn.putheader("x-request-id", (self._H2Stream__x_request_id), stream_id=(self._H2Stream__stream_id))
self._H2Stream__conn.putheader("x-data-stream-id", (self._H2Stream__x_data_stream_id), stream_id=(self._H2Stream__stream_id))
content_length = data_handler.get_content_length()
if content_length:
self._H2Stream__conn.putheader("content-length", "%s" % content_length, self._H2Stream__stream_id)
for k, v in headers.items():
self._H2Stream__conn.putheader(k, v, self._H2Stream__stream_id)
self._H2Stream__conn.endheaders(stream_id=(self._H2Stream__stream_id))
with data_handler:
final = False
while not final:
data = data_handler.next()
if data == None or len(data) == 0:
break
final = not data_handler.has_next()
self._H2Stream__conn.send(data, final, stream_id=(self._H2Stream__stream_id))
response = self._H2Stream__conn.get_response(self._H2Stream__stream_id)
self._H2Stream__check_response(response)
return response
def __check_response(self, response, msg=None):
if response.status != 200:
raise H2Exception(response.status, msg if msg else "fail to request http/2, code:%d" % response.status)
def __str__(self):
return "H2Stream(id=%s,stream_x_id=%s,x_request_id=%s,x_data_stream_id:%s" % (self._H2Stream__id,
self._H2Stream__stream_id, self._H2Stream__x_request_id, self._H2Stream__x_data_stream_id)
class H2Client:
def __init__(self, region, product_key, device_name, device_secret, client_id=None, opt_max_thread_num=4, endpoint=None):
_assert_value(region, "region is not empty")
_assert_value(product_key, "product_key is not empty")
_assert_value(device_name, "device_name is not empty")
self._H2Client__product_key = product_key
self._H2Client__device_name = device_name
self._H2Client__client_id = client_id
self._H2Client__device_secret = device_secret
self._H2Client__region = region
self._H2Client__endpoint = endpoint
self._H2Client__opt_free_idle_connect = False
self._H2Client__connected = False
self._H2Client__port = _H2_OPT_PORT_DEFAULT
self._H2Client__conn = None
self._H2Client__opt_heart_beat_time = _H2_OPT_HEART_BEAT_TIME_DEFAULT
self._H2Client__conn_lock = threading.RLock()
self._H2Client__lock = threading.RLock()
self._H2Client__stream_list = []
self._H2Client__stream_list_lock = threading.RLock()
self._H2Client__thread_executor = concurrent.futures.ThreadPoolExecutor(max_workers=opt_max_thread_num)
self._H2Client__auth_lock = threading.RLock()
self._H2Client__id = 0
self._H2Client__heart_beat_lock = threading.RLock()
self._H2Client__timer = None
def get_endpoint(self):
return self._H2Client__endpoint
def get_actual_endpoint(self):
return self._H2Client__generate_endpoint()
def __generate_endpoint(self):
if self._H2Client__endpoint:
return self._H2Client__endpoint
return self._H2Client__product_key + ".iot-as-http2.%s.aliyuncs.com" % self._H2Client__region
def open(self):
with self._H2Client__conn_lock:
if self._H2Client__conn:
logging.info("the client is opened")
return -1
return self._H2Client__connect()
def close(self):
with self._H2Client__conn_lock:
return self._H2Client__close_connect()
self._H2Client__close_all_streams()
def upload_file_async(self, local_filename, remote_filename=None, over_write=True, upload_file_sink=None, upload_sink_user_data=None):
_assert_value(local_filename, "local_filename is required")
self._H2Client__check_file(local_filename)
file_info = UploadFileInfo(local_filename, remote_filename, over_write)
future_result = self._H2Client__thread_executor.submit(self._H2Client__post_file_task, file_info, upload_file_sink, upload_sink_user_data)
return H2FileTask(id, file_info, future_result)
def upload_file_sync(self, local_filename, remote_filename=None, over_write=True, timeout=None, upload_file_sink=None, upload_sink_user_data=None):
self._H2Client__check_file(local_filename)
f = self.upload_file_async(local_filename, remote_filename, over_write, upload_file_sink, upload_sink_user_data)
return f.result(timeout)
def __create_stream_id(self):
with self._H2Client__lock:
self._H2Client__id += 1
return self._H2Client__id
def new_stream(self):
return H2Stream(self, self._H2Client__create_stream_id())
def _get_auth_lock(self):
return self._H2Client__auth_lock
def __crc_equal(self, value1, value2):
if value1 == value2:
return True
return self._H2Client__to_unsign(value1) == self._H2Client__to_unsign(value2)
def __to_unsign(self, value):
if value > 0:
return value
return 18446744073709551616L + value
def __check_file(self, path):
stat_info = os.stat(path)
if stat_info.st_size >= _H2_MAX_FILE_SIZE:
raise ValueError("maximum file size exceeded")
def __post_file_task(self, file_info, sink=None, user_data=None):
local_filename = file_info.local_filename
remote_filename = file_info.remote_filename
over_write = file_info.opt_overwrite
fs = None
file_store_id = None
exception = None
code = 0
x_file_upload_id = None
stream = self.new_stream()
self._H2Client__on_new_stream(stream)
try:
try:
logging.info("start to post file, local_filename:%s, remote:%s, over_write:%d" % (local_filename, remote_filename, over_write))
if sink:
sink.on_file_upload_start(stream.get_id(), file_info, user_data)
header = {'x-file-name':remote_filename,
'x-file-overwrite':"1" if over_write else "0"}
response = stream.open("/c/iot/sys/thing/file/upload", header)
x_file_upload_id = response.headers["x-file-upload-id"][0]
header = {"x-file-upload-id": x_file_upload_id}
fs = FileStreamHandler(local_filename, opt_crc64=True)
stream.send(header, fs)
response = stream.close(header)
remote_crc64 = int(response.headers["x-file-crc64ecma"][0])
logging.info("crc64, local:%ld, remote:%ld" % (fs.get_crc64(), remote_crc64))
if not self._H2Client__crc_equal(fs.get_crc64(), remote_crc64):
raise Exception("fail to check crc64, local:%ld, remote:%ld" % (fs.get_crc64(), remote_crc64))
file_store_id = response.headers["x-file-store-id"][0]
logging.info("finish uploading file, local_filename:%s, remote:%s, over_write:%d, file_store_id:%s" % (
local_filename, remote_filename, over_write, file_store_id))
return UploadFileResult(code, exception, fs.get_read_size(), fs.get_content_length, file_store_id)
except H2Exception as e:
try:
logging.error("fail to upload the file, local_filename:%s, remote:%s, over_write:%d, x_file_upload_id:%s, stream:%s, code:%s, error:%s" % (
local_filename, remote_filename, over_write, x_file_upload_id, stream, e.get_code(), e))
return UploadFileResult(e.get_code(), exception, fs.get_read_size() if fs else -1, fs.get_content_length() if fs else -1, file_store_id)
finally:
e = None
del e
except Exception as e:
try:
logging.error("fail to upload the file, local_filename:%s, remote:%s, over_write:%d, x_file_upload_id:%s, stream:%s, error:%s" % (
local_filename, remote_filename, over_write, x_file_upload_id, stream, e))
return UploadFileResult(-1, exception, fs.get_read_size() if fs else -1, fs.get_content_length() if fs else -1, file_store_id)
finally:
e = None
del e
finally:
self._H2Client__on_free_stream(stream)
if sink:
result = UploadFileResult(code, exception, fs.get_read_size() if fs else -1, fs.get_content_length() if fs else -1, file_store_id)
sink.on_file_upload_end(stream.get_id(), file_info, result, user_data)
def __connect(self):
with self._H2Client__conn_lock:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
h2_endpoint = self._H2Client__generate_endpoint()
logging.debug("http/2 endpoint:%s" % h2_endpoint)
self._H2Client__conn = hyper.HTTP20Connection(h2_endpoint, port=(self._H2Client__port), force_proto=(hyper.tls.NPN_PROTOCOL),
ssl_context=ctx)
return 0
def get_connect(self):
with self._H2Client__conn_lock:
if self._H2Client__conn:
return self._H2Client__conn
return self._H2Client__connect()
def __fill_auth_header(self, header):
client_id = self._H2Client__client_id or self._H2Client__device_name
timestamp = str(int(time.time() * 1000))
sign_content = "clientId" + client_id + "deviceName" + self._H2Client__device_name + "productKey" + self._H2Client__product_key + "timestamp" + timestamp
sign = hmac.new(self._H2Client__device_secret.encode("utf-8"), sign_content.encode("utf-8"), hashlib.sha256).hexdigest()
header["x-auth-param-timestamp"] = timestamp
header["x-auth-param-signmethod"] = "hmacsha256"
header["x-auth-param-sign"] = sign
header["x-auth-param-product-key"] = self._H2Client__product_key
header["x-auth-param-device-name"] = self._H2Client__device_name
header["x-auth-param-client-id"] = client_id
header["x-auth-name"] = "devicename"
return header
def __fill_sdk_header(self, header):
header["x-sdk-version"] = "1.2.0"
header["x-sdk-version-name"] = "1.2.0"
header["x-sdk-platform"] = "python"
return header
def get_default_header(self):
header = {}
self._H2Client__fill_auth_header(header)
self._H2Client__fill_sdk_header(header)
return header
def __close_connect(self):
with self._H2Client__conn_lock:
if self._H2Client__conn:
self._H2Client__conn.close(0)
return 0
def __close_all_streams(self):
with self._H2Client__stream_list_lock:
self._H2Client__stream_list.clear()
self._H2Client__stream_list = None
self._H2Client__stop_heart_beat()
def __on_new_stream(self, stream):
with self._H2Client__stream_list_lock:
self._H2Client__stream_list.append(stream)
if len(self._H2Client__stream_list) == 1:
self._H2Client__start_heart_beat()
def __on_free_stream(self, stream):
with self._H2Client__stream_list_lock:
self._H2Client__stream_list.remove(stream)
if len(self._H2Client__stream_list) == 0:
self._H2Client__stop_heart_beat()
def __start_heart_beat(self):
logging.debug("start heart_beat")
self._H2Client__schedule_heart_beat()
def __handle_heart_beat(self):
logging.debug("heart...")
self._H2Client__conn.ping(b'PINGPONG')
self._H2Client__schedule_heart_beat()
def __stop_heart_beat(self):
logging.debug("stop heart")
self._H2Client__cancel_heart_beat()
def __schedule_heart_beat(self):
with self._H2Client__heart_beat_lock:
if self._H2Client__opt_heart_beat_time:
if self._H2Client__opt_heart_beat_time > 0:
self._H2Client__timer = threading.Timer(self._H2Client__opt_heart_beat_time, self._H2Client__handle_heart_beat)
self._H2Client__timer.start()
def __cancel_heart_beat(self):
with self._H2Client__heart_beat_lock:
if self._H2Client__timer:
self._H2Client__timer.cancel()
self._H2Client__timer = None