497 lines
20 KiB
Python
497 lines
20 KiB
Python
# uncompyle6 version 3.9.2
|
|
# Python bytecode version base 3.7.0 (3394)
|
|
# Decompiled from: Python 3.8.19 (default, Mar 20 2024, 15:27:52)
|
|
# [Clang 14.0.6 ]
|
|
# Embedded file name: /var/user/app/device_supervisorbak/device_supervisor/lib/linkkit/h2client.py
|
|
# Compiled at: 2024-04-18 03:12:57
|
|
# Size of source mod 2**32: 19761 bytes
|
|
import hyper, time, hmac, ssl, logging, os, threading, crcmod, concurrent.futures, hashlib
|
|
|
|
def _assert_value(condition, error_msg):
|
|
if not condition:
|
|
raise ValueError(error_msg)
|
|
|
|
|
|
_H2_OPT_HEART_BEAT_TIME_DEFAULT = 25
|
|
_H2_OPT_PORT_DEFAULT = 443
|
|
_H2_MAX_FILE_SIZE = 1073741824
|
|
|
|
def h2_set_option(opt, value):
|
|
global _H2_MAX_FILE_SIZE
|
|
global _H2_OPT_HEART_BEAT_TIME_DEFAULT
|
|
global _H2_OPT_PORT_DEFAULT
|
|
if "heart_beat_interval" == opt:
|
|
_H2_OPT_HEART_BEAT_TIME_DEFAULT = value
|
|
else:
|
|
if "port" == opt:
|
|
_H2_OPT_PORT_DEFAULT = value
|
|
else:
|
|
if "max_file_size" == opt:
|
|
_H2_MAX_FILE_SIZE = value
|
|
|
|
|
|
class StreamHandler:
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
def __enter__(self):
|
|
pass
|
|
|
|
def __exit__(self, type, value, trace):
|
|
pass
|
|
|
|
def get_content_length(self):
|
|
pass
|
|
|
|
def next(self):
|
|
pass
|
|
|
|
def has_next(self):
|
|
return False
|
|
|
|
|
|
class FileStreamHandler(StreamHandler):
|
|
|
|
def __init__(self, filename, block_size=524288, opt_crc64=False):
|
|
self._FileStreamHandler__filename = filename
|
|
self._FileStreamHandler__block_size = block_size
|
|
self._FileStreamHandler__size = os.stat(filename).st_size
|
|
self._FileStreamHandler__opt_crc64 = opt_crc64
|
|
self._FileStreamHandler__last_crc = 0
|
|
self._FileStreamHandler__read_size = 0
|
|
|
|
def get_content_length(self):
|
|
return self._FileStreamHandler__size
|
|
|
|
def __enter__(self):
|
|
logging.debug("open the file, filename:%s" % self._FileStreamHandler__filename)
|
|
self._FileStreamHandler__f = open(self._FileStreamHandler__filename, "rb")
|
|
self._FileStreamHandler__read_size = 0
|
|
|
|
def __exit__(self, type, value, trace):
|
|
if self._FileStreamHandler__f:
|
|
self._FileStreamHandler__f.close()
|
|
self._FileStreamHandler__f = None
|
|
|
|
def next(self):
|
|
if not self._FileStreamHandler__f or self._FileStreamHandler__read_size >= self._FileStreamHandler__size:
|
|
return
|
|
data = self._FileStreamHandler__f.read(self._FileStreamHandler__block_size)
|
|
if data:
|
|
self._FileStreamHandler__read_size += len(data)
|
|
if self._FileStreamHandler__opt_crc64:
|
|
do_crc64 = crcmod.mkCrcFun(23270347676907615891L, initCrc=(self._FileStreamHandler__last_crc), xorOut=18446744073709551615L, rev=True)
|
|
self._FileStreamHandler__last_crc = do_crc64(data)
|
|
return data
|
|
|
|
def has_next(self):
|
|
return self._FileStreamHandler__f.tell() < self._FileStreamHandler__size
|
|
|
|
def get_crc64(self):
|
|
return self._FileStreamHandler__last_crc
|
|
|
|
def get_read_size(self):
|
|
return self._FileStreamHandler__read_size
|
|
|
|
|
|
class H2Exception(Exception):
|
|
|
|
def __init__(self, code, msg):
|
|
Exception.__init__(self, msg)
|
|
self._H2Exception__code = code
|
|
self._H2Exception__msg = msg
|
|
|
|
def get_code(self):
|
|
return self._H2Exception__code
|
|
|
|
def get_msg(self):
|
|
return self._H2Exception__msg
|
|
|
|
def __name__(self):
|
|
return "H2Exception"
|
|
|
|
|
|
class UploadFileInfo:
|
|
|
|
def __init__(self, local_filename, remote_filename=None, overwrite=True):
|
|
self.local_filename = local_filename
|
|
self.opt_overwrite = overwrite
|
|
if not remote_filename:
|
|
self.remote_filename = os.path.basename(local_filename)
|
|
else:
|
|
self.remote_filename = remote_filename
|
|
|
|
def __name__(self):
|
|
return "UploadFileInfo"
|
|
|
|
|
|
class UploadFileResult:
|
|
|
|
def __init__(self, code=None, exception=None, upload_size=None, total_size=None, file_store_id=None):
|
|
self.upload_size = upload_size
|
|
self.total_size = total_size
|
|
self.file_store_id = file_store_id
|
|
self.code = code
|
|
self.exception = exception
|
|
|
|
def __name__(self):
|
|
return "UploadFileResult"
|
|
|
|
|
|
class H2FileUploadSink:
|
|
|
|
def on_file_upload_start(self, id, upload_file_info, user_data):
|
|
pass
|
|
|
|
def on_file_upload_end(self, id, upload_file_info, upload_file_result, user_data):
|
|
pass
|
|
|
|
def on_file_upload_progress(self, id, upload_file_info, upload_file_result, user_data):
|
|
pass
|
|
|
|
|
|
class H2FileTask:
|
|
|
|
def __init__(self, id, file_info, future_result):
|
|
self._H2FileTask__file_info = file_info
|
|
self._H2FileTask__future_result = future_result
|
|
self._H2FileTask__id = id
|
|
|
|
def get_file_info(self):
|
|
return self._H2FileTask__file_info
|
|
|
|
def get_future_result(self):
|
|
return self._H2FileTask__future_result
|
|
|
|
def result(self, timeout=None):
|
|
return self._H2FileTask__future_result.result(timeout)
|
|
|
|
def cancel(self):
|
|
self._H2FileTask__future_result.call()
|
|
|
|
def get_id(self):
|
|
return self._H2FileTask__id
|
|
|
|
def __name__(self):
|
|
return "H2FileTask"
|
|
|
|
|
|
class H2Stream:
|
|
|
|
def __init__(self, client, id):
|
|
self._H2Stream__client = client
|
|
self._H2Stream__conn = None
|
|
self._H2Stream__total_sent_size = 0
|
|
self._H2Stream__path = None
|
|
self._H2Stream__id = id
|
|
self._H2Stream__stream_id = None
|
|
self._H2Stream__x_request_id = None
|
|
self._H2Stream__x_data_stream_id = None
|
|
|
|
def __name__(self):
|
|
return "H2Stream"
|
|
|
|
def get_id(self):
|
|
return self._H2Stream__id
|
|
|
|
def open(self, path, header):
|
|
_assert_value(path, "path is required")
|
|
with self._H2Stream__client._get_auth_lock():
|
|
url = "/stream/open" + path
|
|
self._H2Stream__conn = self._H2Stream__client.get_connect()
|
|
self._H2Stream__total_sent_size = 0
|
|
self._H2Stream__path = path
|
|
logging.debug("request url: %s" % url)
|
|
conn_header = self._H2Stream__client.get_default_header()
|
|
if header:
|
|
conn_header.update(header)
|
|
req_id = self._H2Stream__conn.request("GET", url, None, conn_header)
|
|
response = self._H2Stream__conn.get_response(req_id)
|
|
self._H2Stream__check_response(response)
|
|
self._H2Stream__x_request_id = response.headers["x-request-id"][0]
|
|
self._H2Stream__x_data_stream_id = response.headers["x-data-stream-id"][0]
|
|
logging.debug("x_request_id: %s" % self._H2Stream__x_request_id)
|
|
logging.debug("x_data_stream_id: %s" % self._H2Stream__x_data_stream_id)
|
|
return response
|
|
|
|
def close(self, header):
|
|
logging.debug("close the stream")
|
|
final_header = {'x-request-id':self._H2Stream__x_request_id, 'x-data-stream-id':self._H2Stream__x_data_stream_id}
|
|
final_header.update(header)
|
|
req_id = self._H2Stream__conn.request("GET", "/stream/close/" + self._H2Stream__path, None, final_header)
|
|
response = self._H2Stream__conn.get_response(req_id)
|
|
self._H2Stream__check_response(response)
|
|
return response
|
|
|
|
def send(self, headers, data_handler):
|
|
with self._H2Stream__client._get_auth_lock():
|
|
url = "/stream/send" + self._H2Stream__path
|
|
logging.debug("request url: %s" % url)
|
|
self._H2Stream__stream_id = self._H2Stream__conn.putrequest("GET", url)
|
|
self._H2Stream__conn.putheader("x-request-id", (self._H2Stream__x_request_id), stream_id=(self._H2Stream__stream_id))
|
|
self._H2Stream__conn.putheader("x-data-stream-id", (self._H2Stream__x_data_stream_id), stream_id=(self._H2Stream__stream_id))
|
|
content_length = data_handler.get_content_length()
|
|
if content_length:
|
|
self._H2Stream__conn.putheader("content-length", "%s" % content_length, self._H2Stream__stream_id)
|
|
for k, v in headers.items():
|
|
self._H2Stream__conn.putheader(k, v, self._H2Stream__stream_id)
|
|
|
|
self._H2Stream__conn.endheaders(stream_id=(self._H2Stream__stream_id))
|
|
with data_handler:
|
|
final = False
|
|
while not final:
|
|
data = data_handler.next()
|
|
if data == None or len(data) == 0:
|
|
break
|
|
final = not data_handler.has_next()
|
|
self._H2Stream__conn.send(data, final, stream_id=(self._H2Stream__stream_id))
|
|
|
|
response = self._H2Stream__conn.get_response(self._H2Stream__stream_id)
|
|
self._H2Stream__check_response(response)
|
|
return response
|
|
|
|
def __check_response(self, response, msg=None):
|
|
if response.status != 200:
|
|
raise H2Exception(response.status, msg if msg else "fail to request http/2, code:%d" % response.status)
|
|
|
|
def __str__(self):
|
|
return "H2Stream(id=%s,stream_x_id=%s,x_request_id=%s,x_data_stream_id:%s" % (self._H2Stream__id,
|
|
self._H2Stream__stream_id, self._H2Stream__x_request_id, self._H2Stream__x_data_stream_id)
|
|
|
|
|
|
class H2Client:
|
|
|
|
def __init__(self, region, product_key, device_name, device_secret, client_id=None, opt_max_thread_num=4, endpoint=None):
|
|
_assert_value(region, "region is not empty")
|
|
_assert_value(product_key, "product_key is not empty")
|
|
_assert_value(device_name, "device_name is not empty")
|
|
self._H2Client__product_key = product_key
|
|
self._H2Client__device_name = device_name
|
|
self._H2Client__client_id = client_id
|
|
self._H2Client__device_secret = device_secret
|
|
self._H2Client__region = region
|
|
self._H2Client__endpoint = endpoint
|
|
self._H2Client__opt_free_idle_connect = False
|
|
self._H2Client__connected = False
|
|
self._H2Client__port = _H2_OPT_PORT_DEFAULT
|
|
self._H2Client__conn = None
|
|
self._H2Client__opt_heart_beat_time = _H2_OPT_HEART_BEAT_TIME_DEFAULT
|
|
self._H2Client__conn_lock = threading.RLock()
|
|
self._H2Client__lock = threading.RLock()
|
|
self._H2Client__stream_list = []
|
|
self._H2Client__stream_list_lock = threading.RLock()
|
|
self._H2Client__thread_executor = concurrent.futures.ThreadPoolExecutor(max_workers=opt_max_thread_num)
|
|
self._H2Client__auth_lock = threading.RLock()
|
|
self._H2Client__id = 0
|
|
self._H2Client__heart_beat_lock = threading.RLock()
|
|
self._H2Client__timer = None
|
|
|
|
def get_endpoint(self):
|
|
return self._H2Client__endpoint
|
|
|
|
def get_actual_endpoint(self):
|
|
return self._H2Client__generate_endpoint()
|
|
|
|
def __generate_endpoint(self):
|
|
if self._H2Client__endpoint:
|
|
return self._H2Client__endpoint
|
|
return self._H2Client__product_key + ".iot-as-http2.%s.aliyuncs.com" % self._H2Client__region
|
|
|
|
def open(self):
|
|
with self._H2Client__conn_lock:
|
|
if self._H2Client__conn:
|
|
logging.info("the client is opened")
|
|
return -1
|
|
return self._H2Client__connect()
|
|
|
|
def close(self):
|
|
with self._H2Client__conn_lock:
|
|
return self._H2Client__close_connect()
|
|
self._H2Client__close_all_streams()
|
|
|
|
def upload_file_async(self, local_filename, remote_filename=None, over_write=True, upload_file_sink=None, upload_sink_user_data=None):
|
|
_assert_value(local_filename, "local_filename is required")
|
|
self._H2Client__check_file(local_filename)
|
|
file_info = UploadFileInfo(local_filename, remote_filename, over_write)
|
|
future_result = self._H2Client__thread_executor.submit(self._H2Client__post_file_task, file_info, upload_file_sink, upload_sink_user_data)
|
|
return H2FileTask(id, file_info, future_result)
|
|
|
|
def upload_file_sync(self, local_filename, remote_filename=None, over_write=True, timeout=None, upload_file_sink=None, upload_sink_user_data=None):
|
|
self._H2Client__check_file(local_filename)
|
|
f = self.upload_file_async(local_filename, remote_filename, over_write, upload_file_sink, upload_sink_user_data)
|
|
return f.result(timeout)
|
|
|
|
def __create_stream_id(self):
|
|
with self._H2Client__lock:
|
|
self._H2Client__id += 1
|
|
return self._H2Client__id
|
|
|
|
def new_stream(self):
|
|
return H2Stream(self, self._H2Client__create_stream_id())
|
|
|
|
def _get_auth_lock(self):
|
|
return self._H2Client__auth_lock
|
|
|
|
def __crc_equal(self, value1, value2):
|
|
if value1 == value2:
|
|
return True
|
|
return self._H2Client__to_unsign(value1) == self._H2Client__to_unsign(value2)
|
|
|
|
def __to_unsign(self, value):
|
|
if value > 0:
|
|
return value
|
|
return 18446744073709551616L + value
|
|
|
|
def __check_file(self, path):
|
|
stat_info = os.stat(path)
|
|
if stat_info.st_size >= _H2_MAX_FILE_SIZE:
|
|
raise ValueError("maximum file size exceeded")
|
|
|
|
def __post_file_task(self, file_info, sink=None, user_data=None):
|
|
local_filename = file_info.local_filename
|
|
remote_filename = file_info.remote_filename
|
|
over_write = file_info.opt_overwrite
|
|
fs = None
|
|
file_store_id = None
|
|
exception = None
|
|
code = 0
|
|
x_file_upload_id = None
|
|
stream = self.new_stream()
|
|
self._H2Client__on_new_stream(stream)
|
|
try:
|
|
try:
|
|
logging.info("start to post file, local_filename:%s, remote:%s, over_write:%d" % (local_filename, remote_filename, over_write))
|
|
if sink:
|
|
sink.on_file_upload_start(stream.get_id(), file_info, user_data)
|
|
header = {'x-file-name':remote_filename,
|
|
'x-file-overwrite':"1" if over_write else "0"}
|
|
response = stream.open("/c/iot/sys/thing/file/upload", header)
|
|
x_file_upload_id = response.headers["x-file-upload-id"][0]
|
|
header = {"x-file-upload-id": x_file_upload_id}
|
|
fs = FileStreamHandler(local_filename, opt_crc64=True)
|
|
stream.send(header, fs)
|
|
response = stream.close(header)
|
|
remote_crc64 = int(response.headers["x-file-crc64ecma"][0])
|
|
logging.info("crc64, local:%ld, remote:%ld" % (fs.get_crc64(), remote_crc64))
|
|
if not self._H2Client__crc_equal(fs.get_crc64(), remote_crc64):
|
|
raise Exception("fail to check crc64, local:%ld, remote:%ld" % (fs.get_crc64(), remote_crc64))
|
|
file_store_id = response.headers["x-file-store-id"][0]
|
|
logging.info("finish uploading file, local_filename:%s, remote:%s, over_write:%d, file_store_id:%s" % (
|
|
local_filename, remote_filename, over_write, file_store_id))
|
|
return UploadFileResult(code, exception, fs.get_read_size(), fs.get_content_length, file_store_id)
|
|
except H2Exception as e:
|
|
try:
|
|
logging.error("fail to upload the file, local_filename:%s, remote:%s, over_write:%d, x_file_upload_id:%s, stream:%s, code:%s, error:%s" % (
|
|
local_filename, remote_filename, over_write, x_file_upload_id, stream, e.get_code(), e))
|
|
return UploadFileResult(e.get_code(), exception, fs.get_read_size() if fs else -1, fs.get_content_length() if fs else -1, file_store_id)
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
except Exception as e:
|
|
try:
|
|
logging.error("fail to upload the file, local_filename:%s, remote:%s, over_write:%d, x_file_upload_id:%s, stream:%s, error:%s" % (
|
|
local_filename, remote_filename, over_write, x_file_upload_id, stream, e))
|
|
return UploadFileResult(-1, exception, fs.get_read_size() if fs else -1, fs.get_content_length() if fs else -1, file_store_id)
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
finally:
|
|
self._H2Client__on_free_stream(stream)
|
|
if sink:
|
|
result = UploadFileResult(code, exception, fs.get_read_size() if fs else -1, fs.get_content_length() if fs else -1, file_store_id)
|
|
sink.on_file_upload_end(stream.get_id(), file_info, result, user_data)
|
|
|
|
def __connect(self):
|
|
with self._H2Client__conn_lock:
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
|
h2_endpoint = self._H2Client__generate_endpoint()
|
|
logging.debug("http/2 endpoint:%s" % h2_endpoint)
|
|
self._H2Client__conn = hyper.HTTP20Connection(h2_endpoint, port=(self._H2Client__port), force_proto=(hyper.tls.NPN_PROTOCOL),
|
|
ssl_context=ctx)
|
|
return 0
|
|
|
|
def get_connect(self):
|
|
with self._H2Client__conn_lock:
|
|
if self._H2Client__conn:
|
|
return self._H2Client__conn
|
|
return self._H2Client__connect()
|
|
|
|
def __fill_auth_header(self, header):
|
|
client_id = self._H2Client__client_id or self._H2Client__device_name
|
|
timestamp = str(int(time.time() * 1000))
|
|
sign_content = "clientId" + client_id + "deviceName" + self._H2Client__device_name + "productKey" + self._H2Client__product_key + "timestamp" + timestamp
|
|
sign = hmac.new(self._H2Client__device_secret.encode("utf-8"), sign_content.encode("utf-8"), hashlib.sha256).hexdigest()
|
|
header["x-auth-param-timestamp"] = timestamp
|
|
header["x-auth-param-signmethod"] = "hmacsha256"
|
|
header["x-auth-param-sign"] = sign
|
|
header["x-auth-param-product-key"] = self._H2Client__product_key
|
|
header["x-auth-param-device-name"] = self._H2Client__device_name
|
|
header["x-auth-param-client-id"] = client_id
|
|
header["x-auth-name"] = "devicename"
|
|
return header
|
|
|
|
def __fill_sdk_header(self, header):
|
|
header["x-sdk-version"] = "1.2.0"
|
|
header["x-sdk-version-name"] = "1.2.0"
|
|
header["x-sdk-platform"] = "python"
|
|
return header
|
|
|
|
def get_default_header(self):
|
|
header = {}
|
|
self._H2Client__fill_auth_header(header)
|
|
self._H2Client__fill_sdk_header(header)
|
|
return header
|
|
|
|
def __close_connect(self):
|
|
with self._H2Client__conn_lock:
|
|
if self._H2Client__conn:
|
|
self._H2Client__conn.close(0)
|
|
return 0
|
|
|
|
def __close_all_streams(self):
|
|
with self._H2Client__stream_list_lock:
|
|
self._H2Client__stream_list.clear()
|
|
self._H2Client__stream_list = None
|
|
self._H2Client__stop_heart_beat()
|
|
|
|
def __on_new_stream(self, stream):
|
|
with self._H2Client__stream_list_lock:
|
|
self._H2Client__stream_list.append(stream)
|
|
if len(self._H2Client__stream_list) == 1:
|
|
self._H2Client__start_heart_beat()
|
|
|
|
def __on_free_stream(self, stream):
|
|
with self._H2Client__stream_list_lock:
|
|
self._H2Client__stream_list.remove(stream)
|
|
if len(self._H2Client__stream_list) == 0:
|
|
self._H2Client__stop_heart_beat()
|
|
|
|
def __start_heart_beat(self):
|
|
logging.debug("start heart_beat")
|
|
self._H2Client__schedule_heart_beat()
|
|
|
|
def __handle_heart_beat(self):
|
|
logging.debug("heart...")
|
|
self._H2Client__conn.ping(b'PINGPONG')
|
|
self._H2Client__schedule_heart_beat()
|
|
|
|
def __stop_heart_beat(self):
|
|
logging.debug("stop heart")
|
|
self._H2Client__cancel_heart_beat()
|
|
|
|
def __schedule_heart_beat(self):
|
|
with self._H2Client__heart_beat_lock:
|
|
if self._H2Client__opt_heart_beat_time:
|
|
if self._H2Client__opt_heart_beat_time > 0:
|
|
self._H2Client__timer = threading.Timer(self._H2Client__opt_heart_beat_time, self._H2Client__handle_heart_beat)
|
|
self._H2Client__timer.start()
|
|
|
|
def __cancel_heart_beat(self):
|
|
with self._H2Client__heart_beat_lock:
|
|
if self._H2Client__timer:
|
|
self._H2Client__timer.cancel()
|
|
self._H2Client__timer = None
|