Files
2025-04-30 08:48:49 -05:00

1964 lines
92 KiB
Python

# uncompyle6 version 3.9.2
# Python bytecode version base 3.7.0 (3394)
# Decompiled from: Python 3.8.19 (default, Mar 20 2024, 15:27:52)
# [Clang 14.0.6 ]
# Embedded file name: /var/user/app/device_supervisorbak/device_supervisor/lib/linkkit/linkkit.py
# Compiled at: 2024-04-18 03:12:57
# Size of source mod 2**32: 100637 bytes
import os, logging, threading, queue, urllib.request, urllib.parse, json, hashlib, hmac, random, ssl, socket, string, time, re, sys
from enum import Enum
import paho.mqtt.client as mqtt
from paho.mqtt.client import MQTTMessage
from linkkit import h2client
REQUIRED_MAJOR_VERSION = 3
REQUIRED_MINOR_VERSION = 5
def lk_check_python_version(major_version, minor_version):
version = sys.version_info
if (version[0] < major_version or version[0]) == major_version:
if version[1] < minor_version:
print("WARNING: linkit requires Python %d.%d or higher, and the current version is %s" % (major_version, minor_version, sys.version))
lk_check_python_version(REQUIRED_MAJOR_VERSION, REQUIRED_MINOR_VERSION)
class LinkKit(object):
TAG_KEY = "attrKey"
TAG_VALUE = "attrValue"
class LinkKitState(Enum):
INITIALIZED = 1
CONNECTING = 2
CONNECTED = 3
DISCONNECTING = 4
DISCONNECTED = 5
DESTRUCTING = 6
DESTRUCTED = 7
class StateError(Exception):
def __init__(self, err):
Exception.__init__(self, err)
class Shadow(object):
def __init__(self):
self._Shadow__version = None
self._Shadow__timestamp = None
self._Shadow__state = None
self._Shadow__metadata = None
self._Shadow__latest_shadow_lock = threading.Lock()
self._Shadow__latest_received_time = None
self._Shadow__lastest_received_payload = None
def get_version(self):
with self._Shadow__latest_shadow_lock:
return self._Shadow__version
def get_metadata(self):
with self._Shadow__latest_shadow_lock:
return self._Shadow__metadata
def get_state(self):
with self._Shadow__latest_shadow_lock:
return self._Shadow__state
def set_state(self, state):
with self._Shadow__latest_shadow_lock:
self._Shadow__state = state
def set_metadata(self, metadata):
with self._Shadow__latest_shadow_lock:
self._Shadow__metadata = metadata
def set_version(self, version):
with self._Shadow__latest_shadow_lock:
self._Shadow__version = version
def set_timestamp(self, timestamp):
with self._Shadow__latest_shadow_lock:
self._Shadow__timestamp = timestamp
def set_latest_recevied_time(self, timestamp):
with self._Shadow__latest_shadow_lock:
self._Shadow__latest_received_time = timestamp
def get_latest_recevied_time(self):
with self._Shadow__latest_shadow_lock:
return self._Shadow__latest_received_time
def set_latest_recevied_payload(self, payload):
with self._Shadow__latest_shadow_lock:
self._Shadow__latest_received_payload = payload
def get_latest_recevied_payload(self):
with self._Shadow__latest_shadow_lock:
return self._Shadow__latest_received_payload
def to_dict(self):
return {'state':self._Shadow__state, 'metadata':self._Shadow__metadata, 'version':self._Shadow__version, 'timestamp':self._Shadow__timestamp}
def to_json_string(self):
return json.dumps(self.to_dict())
class __HandlerTask(object):
def __init__(self, logger=None):
self._HandlerTask__logger = logger
if self._HandlerTask__logger is not None:
self._HandlerTask__logger.info("HandlerTask init enter")
self._HandlerTask__message_queue = queue.Queue(20)
self._HandlerTask__cmd_callback = {}
self._HandlerTask__started = False
self._HandlerTask__exited = False
self._HandlerTask__thread = None
def register_cmd_callback(self, cmd, callback):
if self._HandlerTask__started is False:
if cmd != "req_exit":
self._HandlerTask__cmd_callback[cmd] = callback
return 0
return 1
else:
return 2
def post_message(self, cmd, value):
self._HandlerTask__logger.debug("post_message :%r " % cmd)
if self._HandlerTask__started:
if self._HandlerTask__exited is False:
try:
self._HandlerTask__message_queue.put((cmd, value), timeout=5)
except queue.Full as e:
try:
self._HandlerTask__logger.error("queue full: %r" % e)
return False
finally:
e = None
del e
self._HandlerTask__logger.debug("post_message success")
return True
self._HandlerTask__logger.debug("post_message fail started:%r,exited:%r" % (self._HandlerTask__started, self._HandlerTask__exited))
return False
def start(self):
if self._HandlerTask__logger is not None:
self._HandlerTask__logger.info("HandlerTask start")
if self._HandlerTask__started is False:
if self._HandlerTask__logger is not None:
self._HandlerTask__logger.info("HandlerTask try start")
self._HandlerTask__exited = False
self._HandlerTask__started = True
self._HandlerTask__message_queue = queue.Queue(20)
self._HandlerTask__thread = threading.Thread(target=(self._HandlerTask__thread_runnable))
self._HandlerTask__thread.daemon = True
self._HandlerTask__thread.start()
return 0
return 1
def stop(self):
if self._HandlerTask__started:
if self._HandlerTask__exited is False:
self._HandlerTask__exited = True
self._HandlerTask__message_queue.put(('req_exit', None))
def wait_stop(self):
if self._HandlerTask__started is True:
self._HandlerTask__thread.join()
def __thread_runnable(self):
if self._HandlerTask__logger is not None:
self._HandlerTask__logger.debug("thread runnable enter")
while 1:
cmd, value = self._HandlerTask__message_queue.get()
self._HandlerTask__logger.debug("thread runnable pop cmd:%r" % cmd)
if cmd == "req_exit":
break
if self._HandlerTask__cmd_callback[cmd] is not None:
try:
self._HandlerTask__cmd_callback[cmd](value)
except Exception as e:
try:
if self._HandlerTask__logger is not None:
self._HandlerTask__logger.error("thread runnable raise exception:%s" % e)
finally:
e = None
del e
self._HandlerTask__started = False
if self._HandlerTask__logger is not None:
self._HandlerTask__logger.debug("thread runnable exit")
class LoopThread(object):
def __init__(self, logger=None):
self._LoopThread__logger = logger
if logger is not None:
self._LoopThread__logger.info("LoopThread init enter")
self._LoopThread__callback = None
self._LoopThread__thread = None
self._LoopThread__started = False
self._LoopThread__req_wait = threading.Event()
if logger is not None:
self._LoopThread__logger.info("LoopThread init exit")
def start(self, callback):
if self._LoopThread__started is True:
self._LoopThread__logger.info("LoopThread already ")
return 1
self._LoopThread__callback = callback
self._LoopThread__thread = threading.Thread(target=(self._LoopThread__thread_main))
self._LoopThread__thread.daemon = True
self._LoopThread__thread.start()
return 0
def stop(self):
self._LoopThread__req_wait.wait()
self._LoopThread__req_wait.clear()
def __thread_main(self):
self._LoopThread__started = True
try:
if self._LoopThread__logger is not None:
self._LoopThread__logger.debug("LoopThread thread enter")
if self._LoopThread__callback is not None:
self._LoopThread__callback()
if self._LoopThread__logger is not None:
self._LoopThread__logger.debug("LoopThread thread exit")
except Exception as e:
try:
self._LoopThread__logger.error("LoopThread thread Exception:" + str(e))
finally:
e = None
del e
self._LoopThread__started = False
self._LoopThread__req_wait.set()
class __H2FileUploadSink(h2client.H2FileUploadSink):
def __init__(self, linkkit_instance):
self._H2FileUploadSink__lk_instance = linkkit_instance
def on_file_upload_start(self, id, upload_file_info, user_data):
self._H2FileUploadSink__lk_instance._on_file_upload_start(id, upload_file_info, user_data)
def on_file_upload_end(self, id, upload_file_info, upload_file_result, user_data):
self._H2FileUploadSink__lk_instance._on_file_upload_end(id, upload_file_info, upload_file_result, user_data)
def on_file_upload_progress(self, id, upload_file_result, upload_file_info, user_data):
self._H2FileUploadSink__lk_instance._on_file_upload_progress(id, upload_file_result, upload_file_info, user_data)
def _on_file_upload_start(self, id, upload_file_info, user_data):
if self._LinkKit__on_file_upload_begin != None:
self._LinkKit__on_file_upload_begin(id, upload_file_info, self._LinkKit__user_data)
def _on_file_upload_end(self, id, upload_file_info, upload_file_result, user_data):
if self._LinkKit__on_file_upload_end != None:
self._LinkKit__on_file_upload_end(id, upload_file_info, upload_file_result, self._LinkKit__user_data)
def _on_file_upload_progress(self, id, upload_file_result, upload_file_info, user_data):
pass
class __LinkKitLog(object):
def __init__(self):
self._LinkKitLog__logger = logging.getLogger("linkkit")
self._LinkKitLog__enabled = False
def enable_logger(self):
self._LinkKitLog__enabled = True
def disable_logger(self):
self._LinkKitLog__enabled = False
def is_enabled(self):
return self._LinkKitLog__enabled
def config_logger(self, level):
self._LinkKitLog__logger.setLevel(level)
def debug(self, fmt, *args):
if self._LinkKitLog__enabled:
(self._LinkKitLog__logger.debug)(fmt, *args)
def warring(self, fmt, *args):
if self._LinkKitLog__enabled:
(self._LinkKitLog__logger.warning)(fmt, *args)
def info(self, fmt, *args):
if self._LinkKitLog__enabled:
(self._LinkKitLog__logger.info)(fmt, *args)
def error(self, fmt, *args):
if self._LinkKitLog__enabled:
(self._LinkKitLog__logger.error)(fmt, *args)
def critical(self, fmt, *args):
if self._LinkKitLog__enabled:
(self._LinkKitLog__logger.critical)(fmt, *args)
_LinkKit__USER_TOPIC_PREFIX = "/%s/%s/%s"
_LinkKit__ALIYUN_BROKER_CA_DATA = "-----BEGIN CERTIFICATE-----\nMIIDdTCCAl2gAwIBAgILBAAAAAABFUtaw5QwDQYJKoZIhvcNAQEFBQAwVzELMAkGA1UEBhMCQkUxGTAXBgNVBAoTEEdsb2JhbFNpZ24gbnYtc2ExEDAOBgNVBAsTB1Jvb3QgQ0ExGzAZBgNVBAMTEkdsb2JhbFNpZ24gUm9vdCBDQTAeFw05ODA5MDExMjAwMDBaFw0yODAxMjgxMjAwMDBaMFcxCzAJBgNVBAYTAkJFMRkwFwYDVQQKExBHbG9iYWxTaWduIG52LXNhMRAwDgYDVQQLEwdSb290IENBMRswGQYDVQQDExJHbG9iYWxTaWduIFJvb3QgQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDaDuaZjc6j40+Kfvvxi4Mla+pIH/EqsLmVEQS98GPR4mdmzxzdzxtIK+6NiY6arymAZavpxy0Sy6scTHAHoT0KMM0VjU/43dSMUBUc71DuxC73/OlS8pF94G3VNTCOXkNz8kHp1Wrjsok6Vjk4bwY8iGlbKk3Fp1S4bInMm/k8yuX9ifUSPJJ4ltbcdG6TRGHRjcdGsnUOhugZitVtbNV4FpWi6cgKOOvyJBNPc1STE4U6G7weNLWLBYy5d4ux2x8gkasJU26Qzns3dLlwR5EiUWMWea6xrkEmCMgZK9FGqkjWZCrXgzT/LCrBbBlDSgeF59N89iFo7+ryUp9/k5DPAgMBAAGjQjBAMA4GA1UdDwEB/wQEAwIBBjAPBgNVHRMBAf8EBTADAQH/MB0GA1UdDgQWBBRge2YaRQ2XyolQL30EzTSo//z9SzANBgkqhkiG9w0BAQUFAAOCAQEA1nPnfE920I2/7LqivjTFKDK1fPxsnCwrvQmeU79rXqoRSLblCKOzyj1hTdNGCbM+w6DjY1Ub8rrvrTnhQ7k4o+YviiY776BQVvnGCv04zcQLcFGUl5gE38NflNUVyRRBnMRddWQVDf9VMOyGj/8N7yy5Y0b2qvzfvGn9LhJIZJrglfCm7ymPAbEVtQwdpf5pLGkkeB6zpxxxYu7KyJesF12KwvhHhm4qxFYxldBniYUr+WymXUadDKqC5JlR3XC321Y9YeRq4VzW9v493kHMB65jUr9TU/Qr6cf9tveCX4XSQRjbgbMEHMUfpIBvFSDJ3gyICh3WZlXi/EjJKSZp4A==\n-----END CERTIFICATE-----"
def __init__(self, host_name, product_key, device_name, device_secret, product_secret=None, user_data=None):
self._LinkKit__just_for_pycharm_autocomplete = False
def __str_is_empty(value):
if value is None or value == "":
return True
return False
if _LinkKit__str_is_empty(host_name):
raise ValueError("host_name wrong")
if _LinkKit__str_is_empty(product_key):
raise ValueError("product key wrong")
if _LinkKit__str_is_empty(device_name):
raise ValueError("device name wrong")
if _LinkKit__str_is_empty(device_secret):
if _LinkKit__str_is_empty(product_secret):
raise ValueError("device secret & product secret are both empty")
self._LinkKit__link_log = LinkKit._LinkKit__LinkKitLog()
self._LinkKit__PahoLog = logging.getLogger("Paho")
self._LinkKit__PahoLog.setLevel(logging.DEBUG)
self._LinkKit__host_name = host_name
self._LinkKit__product_key = product_key
self._LinkKit__device_name = device_name
self._LinkKit__device_secret = device_secret
self._LinkKit__product_secret = product_secret
self._LinkKit__user_data = user_data
self._LinkKit__device_interface_info = ""
self._LinkKit__device_mac = None
self._LinkKit__cellular_IMEI = None
self._LinkKit__cellular_ICCID = None
self._LinkKit__cellular_IMSI = None
self._LinkKit__cellular_MSISDN = None
self._LinkKit__mqtt_client = None
self._LinkKit__sdk_version = "1.2.0"
self._LinkKit__sdk_program_language = "Python"
self._LinkKit__endpoint = None
self._LinkKit__h2_endpoint = None
self._LinkKit__mqtt_port = 1883
self._LinkKit__mqtt_protocol = "MQTTv311"
self._LinkKit__mqtt_transport = "TCP"
self._LinkKit__mqtt_secure = "TLS"
self._LinkKit__mqtt_keep_alive = 60
self._LinkKit__mqtt_clean_session = True
self._LinkKit__mqtt_max_inflight_message = 20
self._LinkKit__mqtt_max_queued_message = 40
self._LinkKit__mqtt_auto_reconnect_min_sec = 1
self._LinkKit__mqtt_auto_reconnect_max_sec = 60
self._LinkKit__mqtt_auto_reconnect_sec = 0
self._LinkKit__mqtt_request_timeout = 10
self._LinkKit__linkkit_state = LinkKit.LinkKitState.INITIALIZED
self._LinkKit__aliyun_broker_ca_data = self._LinkKit__ALIYUN_BROKER_CA_DATA
self._LinkKit__latest_shadow = LinkKit.Shadow()
self._LinkKit__on_device_dynamic_register = None
self._LinkKit__on_connect = None
self._LinkKit__on_disconnect = None
self._LinkKit__on_publish_topic = None
self._LinkKit__on_subscribe_topic = None
self._LinkKit__on_unsubscribe_topic = None
self._LinkKit__on_topic_message = None
self._LinkKit__on_topic_rrpc_message = None
self._LinkKit__on_subscribe_rrpc_topic = None
self._LinkKit__on_unsubscribe_rrpc_topic = None
self._LinkKit__on_thing_create = None
self._LinkKit__on_thing_enable = None
self._LinkKit__on_thing_disable = None
self._LinkKit__on_thing_raw_data_arrived = None
self._LinkKit__on_thing_raw_data_post = None
self._LinkKit__on_thing_call_service = None
self._LinkKit__on_thing_prop_changed = None
self._LinkKit__on_thing_event_post = None
self._LinkKit__on_thing_prop_post = None
self._LinkKit__on_thing_shadow_get = None
self._LinkKit__on_thing_device_info_update = None
self._LinkKit__on_thing_device_info_delete = None
self._LinkKit__on_file_upload_begin = None
self._LinkKit__on_file_upload_end = None
self._LinkKit__user_topics = {}
self._LinkKit__user_topics_subscribe_request = {}
self._LinkKit__user_topics_unsubscribe_request = {}
self._LinkKit__user_topics_request_lock = threading.Lock()
self._LinkKit__user_topics_unsubscribe_request_lock = threading.Lock()
self._LinkKit__user_rrpc_topics = {}
self._LinkKit__user_rrpc_topics_lock = threading.RLock()
self._LinkKit__user_rrpc_topics_subscribe_request = {}
self._LinkKit__user_rrpc_topics_unsubscribe_request = {}
self._LinkKit__user_rrpc_topics_subscribe_request_lock = threading.RLock()
self._LinkKit__user_rrpc_topics_unsubscribe_request_lock = threading.RLock()
self._LinkKit__user_rrpc_request_ids = []
self._LinkKit__user_rrpc_request_id_index_map = {}
self._LinkKit__user_rrpc_request_ids_lock = threading.RLock()
self._LinkKit__user_rrpc_request_max_len = 100
self._LinkKit__thing_topic_prop_post = "/sys/%s/%s/thing/event/property/post" % (
self._LinkKit__product_key, self._LinkKit__device_name)
self._LinkKit__thing_topic_prop_post_reply = self._LinkKit__thing_topic_prop_post + "_reply"
self._LinkKit__thing_topic_prop_set = "/sys/%s/%s/thing/service/property/set" % (
self._LinkKit__product_key, self._LinkKit__device_name)
self._LinkKit__thing_topic_prop_set_reply = self._LinkKit__thing_topic_prop_set + "_reply"
self._LinkKit__thing_topic_prop_get = "/sys/%s/%s/thing/service/property/get" % (
self._LinkKit__product_key, self._LinkKit__device_name)
self._LinkKit__thing_topic_event_post_pattern = "/sys/%s/%s/thing/event/%s/post"
self._LinkKit__thing_prop_post_mid = {}
self._LinkKit__thing_prop_post_mid_lock = threading.Lock()
self._LinkKit__thing_prop_set_reply_mid = {}
self._LinkKit__thing_prop_set_reply_mid_lock = threading.Lock()
self._LinkKit__thing_topic_event_post = {}
self._LinkKit__thing_topic_event_post_reply = set()
self._LinkKit__thing_events = set()
self._LinkKit__thing_request_id_max = 1000000
self._LinkKit__thing_request_value = 0
self._LinkKit__thing_request_id = {}
self._LinkKit__thing_request_id_lock = threading.Lock()
self._LinkKit__thing_event_post_mid = {}
self._LinkKit__thing_event_post_mid_lock = threading.Lock()
self._LinkKit__thing_topic_shadow_get = "/shadow/get/%s/%s" % (
self._LinkKit__product_key, self._LinkKit__device_name)
self._LinkKit__thing_topic_shadow_update = "/shadow/update/%s/%s" % (
self._LinkKit__product_key, self._LinkKit__device_name)
self._LinkKit__thing_shadow_mid = {}
self._LinkKit__thing_shadow_mid_lock = threading.Lock()
self._LinkKit__thing_topic_service_pattern = "/sys/%s/%s/thing/service/%s"
self._LinkKit__thing_topic_services = set()
self._LinkKit__thing_topic_services_reply = set()
self._LinkKit__thing_services = set()
self._LinkKit__thing_answer_service_mid = {}
self._LinkKit__thing_answer_service_mid_lock = threading.Lock()
self._LinkKit__thing_topic_raw_up = "/sys/%s/%s/thing/model/up_raw" % (self._LinkKit__product_key, self._LinkKit__device_name)
self._LinkKit__thing_topic_raw_up_reply = self._LinkKit__thing_topic_raw_up + "_reply"
self._LinkKit__thing_topic_raw_down = "/sys/%s/%s/thing/model/down_raw" % (self._LinkKit__product_key, self._LinkKit__device_name)
self._LinkKit__thing_topic_raw_down_reply = self._LinkKit__thing_topic_raw_down + "_reply"
self._LinkKit__thing_raw_up_mid = {}
self._LinkKit__thing_raw_up_mid_lock = threading.Lock()
self._LinkKit__thing_raw_down_reply_mid = {}
self._LinkKit__thing_raw_down_reply_mid_lock = threading.Lock()
self._LinkKit__thing_topic_update_device_info_up = "/sys/%s/%s/thing/deviceinfo/update" % (self._LinkKit__product_key, self._LinkKit__device_name)
self._LinkKit__thing_topic_update_device_info_reply = self._LinkKit__thing_topic_update_device_info_up + "_reply"
self._LinkKit__thing_topic_delete_device_info_up = "/sys/%s/%s/thing/deviceinfo/delete" % (self._LinkKit__product_key, self._LinkKit__device_name)
self._LinkKit__thing_topic_delete_device_info_reply = self._LinkKit__thing_topic_delete_device_info_up + "_reply"
self._LinkKit__thing_update_device_info_up_mid = {}
self._LinkKit__thing_update_device_info_up_mid_lock = threading.Lock()
self._LinkKit__thing_delete_device_info_up_mid = {}
self._LinkKit__thing_delete_device_info_up_mid_lock = threading.Lock()
self._LinkKit__thing_properties_set = set()
self._LinkKit__thing_properties_get = set()
self._LinkKit__thing_properties_post = set()
self._LinkKit__thing_subscribe_sys_request = False
self._LinkKit__thing_subscribe_sys_request_mid = {}
self._LinkKit__thing_subscribe_sys_request_lock = threading.Lock()
self._LinkKit__thing_setup_state = False
self._LinkKit__thing_raw_only = False
self._LinkKit__thing_enable_state = False
if self._LinkKit__just_for_pycharm_autocomplete:
self._LinkKit__mqtt_client = mqtt.Client()
self._LinkKit__device_info_topic = "/sys/%s/%s/thing/deviceinfo/update" % (self._LinkKit__product_key, self._LinkKit__device_name)
self._LinkKit__device_info_topic_reply = self._LinkKit__device_info_topic + "_reply"
self._LinkKit__device_info_mid_lock = threading.Lock()
self._LinkKit__device_info_mid = {}
self._LinkKit__connect_async_req = False
self._LinkKit__worker_loop_exit_req = False
self._LinkKit__worker_loop_runing_state = False
self._LinkKit__worker_loop_exit_req_lock = threading.Lock()
self._LinkKit__loop_thread = LinkKit.LoopThread(self._LinkKit__link_log)
self._LinkKit__handler_task = LinkKit._LinkKit__HandlerTask(self._LinkKit__link_log)
self._LinkKit__handler_task_cmd_on_connect = "on_connect"
self._LinkKit__handler_task_cmd_on_disconnect = "on_disconnect"
self._LinkKit__handler_task_cmd_on_message = "on_message"
self._LinkKit__handler_task_cmd_on_publish = "on_publish"
self._LinkKit__handler_task_cmd_on_subscribe = "on_subscribe"
self._LinkKit__handler_task_cmd_on_unsubscribe = "on_unsubscribe"
self._LinkKit__handler_task.register_cmd_callback(self._LinkKit__handler_task_cmd_on_connect, self._LinkKit__handler_task_on_connect_callback)
self._LinkKit__handler_task.register_cmd_callback(self._LinkKit__handler_task_cmd_on_disconnect, self._LinkKit__handler_task_on_disconnect_callback)
self._LinkKit__handler_task.register_cmd_callback(self._LinkKit__handler_task_cmd_on_message, self._LinkKit__handler_task_on_message_callback)
self._LinkKit__handler_task.register_cmd_callback(self._LinkKit__handler_task_cmd_on_publish, self._LinkKit__handler_task_on_publish_callback)
self._LinkKit__handler_task.register_cmd_callback(self._LinkKit__handler_task_cmd_on_subscribe, self._LinkKit__handler_task_on_subscribe_callback)
self._LinkKit__handler_task.register_cmd_callback(self._LinkKit__handler_task_cmd_on_unsubscribe, self._LinkKit__handler_task_on_unsubscribe_callback)
self._LinkKit__handler_task.start()
self._LinkKit__h2_client = None
self._LinkKit__h2_client_lock = threading.RLock()
@property
def on_device_dynamic_register(self):
pass
@on_device_dynamic_register.setter
def on_device_dynamic_register(self, value):
self._LinkKit__on_device_dynamic_register = value
@property
def on_connect(self):
return self._LinkKit__on_connect
@on_connect.setter
def on_connect(self, value):
self._LinkKit__on_connect = value
@property
def on_disconnect(self):
return self._LinkKit__on_disconnect
@on_disconnect.setter
def on_disconnect(self, value):
self._LinkKit__on_disconnect = value
@property
def on_publish_topic(self):
pass
@on_publish_topic.setter
def on_publish_topic(self, value):
self._LinkKit__on_publish_topic = value
@property
def on_subscribe_topic(self):
pass
@on_subscribe_topic.setter
def on_subscribe_topic(self, value):
self._LinkKit__on_subscribe_topic = value
@property
def on_unsubscribe_topic(self):
pass
@on_unsubscribe_topic.setter
def on_unsubscribe_topic(self, value):
self._LinkKit__on_unsubscribe_topic = value
@property
def on_topic_message(self):
pass
@on_topic_message.setter
def on_topic_message(self, value):
self._LinkKit__on_topic_message = value
@property
def on_topic_rrpc_message(self):
pass
@on_topic_rrpc_message.setter
def on_topic_rrpc_message(self, value):
self._LinkKit__on_topic_rrpc_message = value
@property
def on_thing_create(self):
pass
@on_thing_create.setter
def on_thing_create(self, value):
self._LinkKit__on_thing_create = value
@property
def on_thing_enable(self):
pass
@on_thing_enable.setter
def on_thing_enable(self, value):
self._LinkKit__on_thing_enable = value
@property
def on_thing_disable(self):
pass
@on_thing_disable.setter
def on_thing_disable(self, value):
self._LinkKit__on_thing_disable = value
@property
def on_thing_raw_data_arrived(self):
pass
@on_thing_raw_data_arrived.setter
def on_thing_raw_data_arrived(self, value):
self._LinkKit__on_thing_raw_data_arrived = value
@property
def on_thing_raw_data_post(self):
return self._LinkKit__on_thing_raw_data_post
@property
def on_thing_device_info_update(self):
return self._LinkKit__on_thing_device_info_update
@on_thing_device_info_update.setter
def on_thing_device_info_update(self, value):
self._LinkKit__on_thing_device_info_update = value
@property
def on_thing_device_info_delete(self):
return self._LinkKit__on_thing_device_info_delete
@on_thing_device_info_delete.setter
def on_thing_device_info_delete(self, value):
self._LinkKit__on_thing_device_info_delete = value
@on_thing_raw_data_post.setter
def on_thing_raw_data_post(self, value):
self._LinkKit__on_thing_raw_data_post = value
@property
def on_thing_call_service(self):
pass
@on_thing_call_service.setter
def on_thing_call_service(self, value):
self._LinkKit__on_thing_call_service = value
@property
def on_thing_prop_changed(self):
pass
@on_thing_prop_changed.setter
def on_thing_prop_changed(self, value):
self._LinkKit__on_thing_prop_changed = value
@property
def on_thing_event_post(self):
return self._LinkKit__on_thing_event_post
@on_thing_event_post.setter
def on_thing_event_post(self, value):
self._LinkKit__on_thing_event_post = value
@property
def on_thing_prop_post(self):
return self._LinkKit__on_thing_prop_post
@on_thing_prop_post.setter
def on_thing_prop_post(self, value):
self._LinkKit__on_thing_prop_post = value
@property
def on_thing_shadow_get(self):
return self._LinkKit__on_thing_shadow_get
@on_thing_shadow_get.setter
def on_thing_shadow_get(self, value):
self._LinkKit__on_thing_shadow_get = value
@property
def on_file_upload_begin(self):
return self._LinkKit__on_file_upload_begin
@on_file_upload_begin.setter
def on_file_upload_begin(self, value):
self._LinkKit__on_file_upload_begin = value
@property
def on_file_upload_end(self):
return self._LinkKit__on_file_upload_end
@on_file_upload_end.setter
def on_file_upload_end(self, value):
self._LinkKit__on_file_upload_end = value
def enable_logger(self, level):
self._LinkKit__link_log.config_logger(level)
self._LinkKit__link_log.enable_logger()
if self._LinkKit__mqtt_client is not None:
self._LinkKit__mqtt_client.enable_logger(self._LinkKit__PahoLog)
self._LinkKit__PahoLog.setLevel(level)
def disable_logger(self):
self._LinkKit__link_log.disable_logger()
if self._LinkKit__mqtt_client is not None:
self._LinkKit__mqtt_client.disable_logger()
def config_logger(self, level):
self._LinkKit__link_log.config_logger(level)
if self._LinkKit__mqtt_client is not None:
self._LinkKit__PahoLog.setLevel(level)
def config_http2(self, endpoint=None):
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.INITIALIZED:
raise LinkKit.StateError("not in INITIALIZED state")
self._LinkKit__h2_endpoint = endpoint
def config_mqtt(self, port=1883, protocol='MQTTv311', transport='TCP', secure='TLS', keep_alive=60, clean_session=True, max_inflight_message=20, max_queued_message=40, auto_reconnect_min_sec=1, auto_reconnect_max_sec=60, cadata=None, endpoint=None):
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.INITIALIZED:
raise LinkKit.StateError("not in INITIALIZED state")
else:
if port < 1 or port > 65535:
raise ValueError("port wrong")
elif protocol != "MQTTv311":
if protocol != "MQTTv31":
raise ValueError("protocol wrong")
if transport != "TCP":
raise ValueError("transport wrong")
if secure != "TLS" and secure != "":
raise ValueError("secure wrong")
if keep_alive < 60 or keep_alive > 180:
raise ValueError("keep_alive range wrong")
if clean_session is not True and clean_session is not False:
raise ValueError("clean session wrong")
if max_queued_message < 0:
raise ValueError("max_queued_message wrong")
if max_inflight_message < 0:
raise ValueError("max_inflight_message wrong")
if auto_reconnect_min_sec < 1 or auto_reconnect_min_sec > 7200:
raise ValueError("auto_reconnect_min_sec wrong")
if auto_reconnect_max_sec < 1 or auto_reconnect_max_sec > 7200:
raise ValueError("auto_reconnect_max_sec wrong")
if auto_reconnect_min_sec > auto_reconnect_max_sec:
raise ValueError("auto_reconnect_max_sec less than auto_reconnect_min_sec")
self._LinkKit__link_log.info("config_mqtt enter")
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.INITIALIZED:
if port is not None:
self._LinkKit__mqtt_port = port
if protocol is not None:
self._LinkKit__mqtt_protocol = protocol
if transport is not None:
self._LinkKit__mqtt_transport = transport
if secure is not None:
self._LinkKit__mqtt_secure = secure
if keep_alive is not None:
self._LinkKit__mqtt_keep_alive = keep_alive
if clean_session is not None:
self._LinkKit__mqtt_clean_session = clean_session
if max_inflight_message is not None:
self._LinkKit__mqtt_max_inflight_message = max_inflight_message
if max_queued_message is not None:
self._LinkKit__mqtt_max_queued_message = max_queued_message
if auto_reconnect_min_sec is not None:
self._LinkKit__mqtt_auto_reconnect_min_sec = auto_reconnect_min_sec
if auto_reconnect_max_sec is not None:
self._LinkKit__mqtt_auto_reconnect_max_sec = auto_reconnect_max_sec
if cadata is not None:
self._LinkKit__aliyun_broker_ca_data = cadata
self._LinkKit__endpoint = endpoint
def config_device_info(self, interface_info):
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.INITIALIZED:
raise LinkKit.StateError("LinkKit object not in INITIALIZED")
if not isinstance(interface_info, str):
raise ValueError("interface info must be string")
if len(interface_info) > 160:
return 1
self._LinkKit__device_interface_info = interface_info
return 0
def get_product(self):
return self._LinkKit__product_key
def get_device_name(self):
return self._LinkKit__device_name
def get_endpoint(self):
return self._LinkKit__endpoint
def get_h2_endpoint(self):
return self._LinkKit__h2_endpoint
def get_actual_endpoint(self):
return self._LinkKit__generate_endpoint()
def get_actual_h2_endpoint(self):
if self._LinkKit__h2_client:
return self.get_actual_endpoint()
return self._LinkKit__try_generate_custom_h2_endpoint()
def __load_json(self, payload):
return json.loads(self._LinkKit__to_str(payload))
def __to_str(self, payload):
if type(payload) is bytes:
return str(payload, "utf-8")
return payload
def __try_open_h2_client(self):
with self._LinkKit__h2_client_lock:
if not self._LinkKit__h2_client:
self._LinkKit__h2_client = h2client.H2Client((self._LinkKit__host_name), (self._LinkKit__product_key),
(self._LinkKit__device_name),
(self._LinkKit__device_secret),
endpoint=(self._LinkKit__try_generate_custom_h2_endpoint()))
self._LinkKit__h2_client.open()
def __try_generate_custom_h2_endpoint(self):
if self._LinkKit__h2_endpoint:
return self._LinkKit__h2_endpoint
if self._LinkKit__endpoint:
if self._LinkKit__endpoint.find(".iot-as-mqtt.") > 0:
return self._LinkKit__endpoint.replace(".iot-as-mqtt.", ".iot-as-http2.")
return
def __try_close_h2_client(self):
with self._LinkKit__h2_client_lock:
if self._LinkKit__h2_client:
self._LinkKit__h2_client.close()
self._LinkKit__h2_client = None
def _get_h2_client(self):
self._LinkKit__try_open_h2_client()
return self._LinkKit__h2_client
def upload_file_sync(self, local_filename, remote_filename=None, over_write=True, timeout=None):
"""
upload a file to the cloud and block the request
Parameters
----------
local_filename : str
the path of local file
remote_filename: str, optional
the filename on the cloud
over_write: boolean, optional
if true, overwrite the file. The default value is True
timeout: int or float, optional
timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
Returns
----------
UploadFileResult
upload_size, total_size, file_store_id, code, exception
code is 0 if success.
Exceptions
----------
ValueError
Exception
"""
sink = self._LinkKit__H2FileUploadSink(self)
self._LinkKit__try_open_h2_client()
return self._LinkKit__h2_client.upload_file_sync(local_filename, remote_filename, over_write, timeout, sink, None)
def upload_file_async(self, local_filename, remote_filename=None, over_write=True):
sink = self._LinkKit__H2FileUploadSink(self)
self._LinkKit__try_open_h2_client()
return self._LinkKit__h2_client.upload_file_async(local_filename, remote_filename, over_write, sink, None)
def __upload_device_interface_info(self):
request_id = self._LinkKit__get_thing_request_id()
payload = {'id':request_id,
'version':"1.0",
'params':[
{'domain':"SYSTEM",
'attrKey':"SYS_SDK_LANGUAGE",
'attrValue':self._LinkKit__sdk_program_language},
{'domain':"SYSTEM",
'attrKey':"SYS_LP_SDK_VERSION",
'attrValue':self._LinkKit__sdk_version},
{'domain':"SYSTEM",
'attrKey':"SYS_SDK_IF_INFO",
'attrValue':self._LinkKit__device_interface_info}],
'method':"thing.deviceinfo.update"}
with self._LinkKit__device_info_mid_lock:
rc, mid = self._LinkKit__mqtt_client.publish(self._LinkKit__device_info_topic, json.dumps(payload), 0)
if rc == mqtt.MQTT_ERR_SUCCESS:
self._LinkKit__device_info_mid[mid] = self._LinkKit__timestamp()
return 0
return 1
def destruct(self):
self._LinkKit__try_close_h2_client()
if self._LinkKit__linkkit_state is LinkKit.LinkKitState.DESTRUCTED:
raise LinkKit.StateError("LinkKit object has already destructed")
else:
self._LinkKit__link_log.debug("destruct enter")
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.CONNECTED or self._LinkKit__linkkit_state == LinkKit.LinkKitState.CONNECTING:
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DESTRUCTING
if self._LinkKit__connect_async_req:
with self._LinkKit__worker_loop_exit_req_lock:
self._LinkKit__worker_loop_exit_req = True
if self._LinkKit__mqtt_client is not None:
self._LinkKit__mqtt_client.disconnect()
self._LinkKit__handler_task.wait_stop()
else:
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DESTRUCTING
if self._LinkKit__connect_async_req:
with self._LinkKit__worker_loop_exit_req_lock:
self._LinkKit__worker_loop_exit_req = True
self._LinkKit__handler_task.stop()
self._LinkKit__handler_task.wait_stop()
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DESTRUCTED
def destroy(self):
self.destruct()
def check_state(self):
return self._LinkKit__linkkit_state
@staticmethod
def __generate_random_str(randomlength=16):
"""
generate radom string
"""
random_str = ""
for i in range(randomlength):
random_str += random.choice(string.digits + string.ascii_letters)
return random_str
def __dynamic_register_device(self):
pk = self._LinkKit__product_key
ps = self._LinkKit__product_secret
dn = self._LinkKit__device_name
random_str = self._LinkKit__generate_random_str(15)
context = ssl.create_default_context((ssl.Purpose.CLIENT_AUTH), cadata=(self._LinkKit__aliyun_broker_ca_data))
sign_content = "deviceName%sproductKey%srandom%s" % (dn, pk, random_str)
sign = hmac.new(ps.encode("utf-8"), sign_content.encode("utf-8"), hashlib.sha256).hexdigest()
post_data = {
'productKey': pk,
'deviceName': dn,
'random': random_str,
'sign': sign,
'signMethod': '"hmacsha256"'}
data = urllib.parse.urlencode(post_data)
data = data.encode("ascii")
request_url = "https://iot-auth.%s.aliyuncs.com/auth/register/device" % self._LinkKit__host_name
with urllib.request.urlopen(request_url, data, context=context) as f:
reply_data = f.read().decode("utf-8")
reply_obj = self._LinkKit__load_json(reply_data)
if reply_obj["code"] == 200:
reply_obj_data = reply_obj["data"]
if reply_obj_data is not None:
return (
0, reply_obj_data["deviceSecret"])
else:
return (
1, reply_obj["message"])
def __config_mqtt_client_internal(self):
self._LinkKit__link_log.info("start connect")
timestamp = str(int(time.time()))
if self._LinkKit__mqtt_secure == "TLS":
securemode = 2
else:
securemode = 3
if self._LinkKit__device_interface_info:
sii_option = "sii=%s," % self._LinkKit__device_interface_info
else:
sii_option = ""
client_id = "%s&%s|securemode=%d,signmethod=hmacsha1,ext=1,lan=%s,_v=%s,%stimestamp=%s|" % (
self._LinkKit__product_key, self._LinkKit__device_name, securemode, self._LinkKit__sdk_program_language,
self._LinkKit__sdk_version, sii_option, timestamp)
username = self._LinkKit__device_name + "&" + self._LinkKit__product_key
sign_content = "clientId%sdeviceName%sproductKey%stimestamp%s" % (
self._LinkKit__product_key + "&" + self._LinkKit__device_name,
self._LinkKit__device_name,
self._LinkKit__product_key,
timestamp)
password = hmac.new(self._LinkKit__device_secret.encode("utf-8"), sign_content.encode("utf-8"), hashlib.sha1).hexdigest()
mqtt_protocol_version = mqtt.MQTTv311
if self._LinkKit__mqtt_protocol == "MQTTv311":
mqtt_protocol_version = mqtt.MQTTv311
else:
if self._LinkKit__mqtt_protocol == "MQTTv31":
mqtt_protocol_version = mqtt.MQTTv31
self._LinkKit__mqtt_client = mqtt.Client(client_id=client_id, clean_session=(self._LinkKit__mqtt_clean_session),
protocol=mqtt_protocol_version)
if self._LinkKit__link_log.is_enabled():
self._LinkKit__mqtt_client.enable_logger(self._LinkKit__PahoLog)
self._LinkKit__mqtt_client.username_pw_set(username, password)
self._LinkKit__mqtt_client.on_connect = self._LinkKit__on_internal_connect
self._LinkKit__mqtt_client.on_disconnect = self._LinkKit__on_internal_disconnect
self._LinkKit__mqtt_client.on_message = self._LinkKit__on_internal_message
self._LinkKit__mqtt_client.on_publish = self._LinkKit__on_internal_publish
self._LinkKit__mqtt_client.on_subscribe = self._LinkKit__on_internal_subscribe
self._LinkKit__mqtt_client.on_unsubscribe = self._LinkKit__on_internal_unsubscribe
self._LinkKit__mqtt_client.reconnect_delay_set(self._LinkKit__mqtt_auto_reconnect_min_sec, self._LinkKit__mqtt_auto_reconnect_max_sec)
self._LinkKit__mqtt_client.max_queued_messages_set(self._LinkKit__mqtt_max_queued_message)
self._LinkKit__mqtt_client.max_inflight_messages_set(self._LinkKit__mqtt_max_inflight_message)
self._LinkKit__link_log.debug("current working directory:" + os.getcwd())
if self._LinkKit__mqtt_secure == "TLS":
context = ssl.create_default_context((ssl.Purpose.CLIENT_AUTH), cadata=(self._LinkKit__aliyun_broker_ca_data))
self._LinkKit__mqtt_client.tls_set_context(context)
self._LinkKit__host_name_internal = self._LinkKit__generate_endpoint()
def __generate_endpoint(self):
if self._LinkKit__endpoint:
return self._LinkKit__endpoint
if self._LinkKit__host_name == "127.0.0.1" or self._LinkKit__host_name == "localhost":
return self._LinkKit__host_name
return "%s.iot-as-mqtt.%s.aliyuncs.com" % (
self._LinkKit__product_key, self._LinkKit__host_name)
def connect(self):
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.INITIALIZED:
raise LinkKit.StateError("not in INITIALIZED state")
if self._LinkKit__device_secret is None or self._LinkKit__device_secret == "":
if not self._LinkKit__product_secret is None:
if self._LinkKit__product_secret == "":
raise ValueError("device Secret & product secret both empty")
rc, value = self._LinkKit__dynamic_register_device()
if self._LinkKit__on_device_dynamic_register is None:
raise Exception("user not give on_device_dynamic_register")
else:
try:
self._LinkKit__on_device_dynamic_register(rc, value, self._LinkKit__user_data)
if rc == 0:
self._LinkKit__device_secret = value
else:
self._LinkKit__link_log.error("dynamic register device fail:" + value)
return 1
except Exception as e:
try:
self._LinkKit__link_log.error(e)
return 2
finally:
e = None
del e
self._LinkKit__config_mqtt_client_internal()
self._LinkKit__mqtt_client.connect(host=(self._LinkKit__host_name_internal), port=(self._LinkKit__mqtt_port), keepalive=(self._LinkKit__mqtt_keep_alive))
return 0
def __connect_async_internal(self):
if self._LinkKit__device_secret is None or self._LinkKit__device_secret == "":
if not self._LinkKit__product_secret is None:
if self._LinkKit__product_secret == "":
raise ValueError("device Secret & product secret both empty")
rc, value = self._LinkKit__dynamic_register_device()
if self._LinkKit__on_device_dynamic_register is None:
raise Exception("user not give on_device_dynamic_register")
else:
try:
self._LinkKit__on_device_dynamic_register(rc, value, self._LinkKit__user_data)
if rc == 0:
self._LinkKit__device_secret = value
else:
self._LinkKit__link_log.error("dynamic register device fail:" + value)
return 1
except Exception as e:
try:
self._LinkKit__link_log.error(e)
return 2
finally:
e = None
del e
self._LinkKit__config_mqtt_client_internal()
self._LinkKit__mqtt_client.connect_async(host=(self._LinkKit__host_name_internal), port=(self._LinkKit__mqtt_port), keepalive=(self._LinkKit__mqtt_keep_alive))
self._LinkKit__mqtt_client.loop_start()
def connect_async(self):
self._LinkKit__link_log.debug("connect_async")
if self._LinkKit__linkkit_state not in (LinkKit.LinkKitState.INITIALIZED, LinkKit.LinkKitState.DISCONNECTED):
raise LinkKit.StateError("not in INITIALIZED or DISCONNECTED state")
self._LinkKit__connect_async_req = True
with self._LinkKit__worker_loop_exit_req_lock:
self._LinkKit__worker_loop_exit_req = False
return self._LinkKit__loop_thread.start(self._LinkKit__loop_forever_internal)
def disconnect(self):
self._LinkKit__link_log.debug("disconnect")
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
raise LinkKit.StateError("not in CONNECTED state")
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DISCONNECTING
if self._LinkKit__connect_async_req:
with self._LinkKit__worker_loop_exit_req_lock:
self._LinkKit__worker_loop_exit_req = True
self._LinkKit__mqtt_client.disconnect()
self._LinkKit__loop_thread.stop()
@staticmethod
def __check_topic_string(topic):
if len(topic) > 128 or len(topic) == 0:
raise ValueError("topic string length too long,need decrease %d bytes" % (128 - len(topic)))
def publish_topic(self, topic, payload=None, qos=1):
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
raise LinkKit.StateError("not in CONNECTED state")
elif topic is None or len(topic) == 0:
raise ValueError("Invalid topic.")
if qos != 0 and qos != 1:
raise ValueError("Invalid qos.")
self._LinkKit__check_topic_string(topic)
rc, mid = self._LinkKit__mqtt_client.publish(topic, payload, qos)
if rc == 0:
return (
0, mid)
return (1, None)
def subscribe_topicParse error at or near `COME_FROM' instruction at offset 340_0
def unsubscribe_topic(self, topic):
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
raise LinkKit.StateError("not in CONNECTED state")
else:
unsubscribe_topics = []
if not topic is None:
if topic == "":
raise ValueError("Invalid topic.")
if isinstance(topic, str):
self._LinkKit__check_topic_string(topic)
if topic not in self._LinkKit__user_topics:
return (1, None)
unsubscribe_topics.append(topic)
elif isinstance(topic, list):
for one_topic in topic:
self._LinkKit__check_topic_string(one_topic)
if one_topic in self._LinkKit__user_topics:
unsubscribe_topics.append(one_topic)
continue
with self._LinkKit__user_topics_unsubscribe_request_lock:
if len(unsubscribe_topics) == 0:
return (2, None)
ret = self._LinkKit__mqtt_client.unsubscribe(unsubscribe_topics)
rc, mid = ret
if rc == mqtt.MQTT_ERR_SUCCESS:
self._LinkKit__user_topics_unsubscribe_request[mid] = unsubscribe_topics
return ret
return (1, None)
def __make_rrpc_topic(self, topic):
return "/ext/rrpc/+%s" % topic
def subscribe_rrpc_topicParse error at or near `COME_FROM' instruction at offset 340_0
def unsubscribe_rrpc_topicParse error at or near `POP_BLOCK' instruction at offset 252
def __on_internal_connect_safe(self, client, user_data, session_flag, rc):
if rc == 0:
self._LinkKit__reset_reconnect_wait()
session_flag_internal = {"session present": session_flag}
self._LinkKit__handler_task.post_message(self._LinkKit__handler_task_cmd_on_connect, (
client, user_data, session_flag_internal, rc))
def __loop_forever_internal(self):
self._LinkKit__link_log.debug("enter")
self._LinkKit__linkkit_state = LinkKit.LinkKitState.CONNECTING
if self._LinkKit__device_secret is None or self._LinkKit__device_secret == "":
rc, value = self._LinkKit__dynamic_register_device()
try:
self._LinkKit__on_device_dynamic_register(rc, value, self._LinkKit__user_data)
if rc == 0:
self._LinkKit__device_secret = value
else:
self._LinkKit__link_log.error("dynamic register device fail:" + value)
self._LinkKit__linkkit_state = LinkKit.LinkKitState.INITIALIZED
return 1
except Exception as e:
try:
self._LinkKit__link_log.error(e)
self._LinkKit__linkkit_state = LinkKit.LinkKitState.INITIALIZED
return 2
finally:
e = None
del e
try:
self._LinkKit__config_mqtt_client_internal()
except ssl.SSLError as e:
try:
self._LinkKit__link_log.error("config mqtt raise exception:" + str(e))
self._LinkKit__linkkit_state = LinkKit.LinkKitState.INITIALIZED
self._LinkKit__on_internal_connect_safe(None, None, 0, 6)
return
finally:
e = None
del e
try:
self._LinkKit__mqtt_client.connect_async(host=(self._LinkKit__host_name_internal), port=(self._LinkKit__mqtt_port), keepalive=(self._LinkKit__mqtt_keep_alive))
except Exception as e:
try:
self._LinkKit__link_log.error("__loop_forever_internal connect raise exception:" + str(e))
self._LinkKit__linkkit_state = LinkKit.LinkKitState.INITIALIZED
self._LinkKit__on_internal_connect_safe(None, None, 0, 7)
return
finally:
e = None
del e
while True:
if self._LinkKit__worker_loop_exit_req:
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.DESTRUCTING:
self._LinkKit__handler_task.stop()
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DESTRUCTED
break
try:
self._LinkKit__linkkit_state = LinkKit.LinkKitState.CONNECTING
self._LinkKit__mqtt_client.reconnect()
except (socket.error, OSError) as e:
try:
self._LinkKit__link_log.error(e)
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.CONNECTING:
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DISCONNECTED
self._LinkKit__on_internal_connect_safe(None, None, 0, 9)
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.DESTRUCTING:
self._LinkKit__handler_task.stop()
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DESTRUCTED
break
self._LinkKit__reconnect_wait()
continue
finally:
e = None
del e
rc = mqtt.MQTT_ERR_SUCCESS
while rc == mqtt.MQTT_ERR_SUCCESS:
rc = self._LinkKit__mqtt_client.loop(self._LinkKit__mqtt_request_timeout, 1)
self._LinkKit__clean_timeout_message()
self._LinkKit__clean_thing_timeout_request_id()
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.CONNECTED:
self._LinkKit__on_internal_disconnect(None, None, 1)
self._LinkKit__link_log.info("loop return:%r" % rc)
if self._LinkKit__worker_loop_exit_req:
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.DESTRUCTING:
self._LinkKit__handler_task.stop()
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DESTRUCTED
break
self._LinkKit__reconnect_wait()
def __clean_timeout_message(self):
expire_timestamp = self._LinkKit__timestamp() - self._LinkKit__mqtt_request_timeout * 1000
with self._LinkKit__thing_prop_post_mid_lock:
self._LinkKit__clean_timeout_message_item(self._LinkKit__thing_prop_post_mid, expire_timestamp)
with self._LinkKit__thing_event_post_mid_lock:
self._LinkKit__clean_timeout_message_item(self._LinkKit__thing_event_post_mid, expire_timestamp)
with self._LinkKit__thing_answer_service_mid_lock:
self._LinkKit__clean_timeout_message_item(self._LinkKit__thing_answer_service_mid, expire_timestamp)
with self._LinkKit__thing_raw_up_mid_lock:
self._LinkKit__clean_timeout_message_item(self._LinkKit__thing_raw_up_mid, expire_timestamp)
with self._LinkKit__thing_raw_down_reply_mid_lock:
self._LinkKit__clean_timeout_message_item(self._LinkKit__thing_raw_down_reply_mid, expire_timestamp)
with self._LinkKit__thing_prop_set_reply_mid_lock:
self._LinkKit__clean_timeout_message_item(self._LinkKit__thing_prop_set_reply_mid, expire_timestamp)
self._LinkKit__clean_timeout_message_item(self._LinkKit__thing_subscribe_sys_request_mid, expire_timestamp)
self._LinkKit__clean_timeout_message_item(self._LinkKit__device_info_mid, expire_timestamp)
def __clean_timeout_message_item(self, mids, expire_time):
for mid in list(mids.keys()):
if mids[mid] < expire_time:
timestamp = mids.pop(mid)
self._LinkKit__link_log.error("__clean_timeout_message_item pop:%r,timestamp:%r", mid, timestamp)
def __reconnect_wait(self):
if self._LinkKit__mqtt_auto_reconnect_sec == 0:
self._LinkKit__mqtt_auto_reconnect_sec = self._LinkKit__mqtt_auto_reconnect_min_sec
else:
self._LinkKit__mqtt_auto_reconnect_sec = min(self._LinkKit__mqtt_auto_reconnect_sec * 2, self._LinkKit__mqtt_auto_reconnect_max_sec)
self._LinkKit__mqtt_auto_reconnect_sec += random.randint(1, self._LinkKit__mqtt_auto_reconnect_sec)
time.sleep(self._LinkKit__mqtt_auto_reconnect_sec)
def __reset_reconnect_wait(self):
self._LinkKit__mqtt_auto_reconnect_sec = 0
def start_worker_loop(self):
pass
def thing_setup(self, file=None):
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.INITIALIZED:
raise LinkKit.StateError("not in INITIALIZED state")
elif self._LinkKit__thing_setup_state:
return 1
elif file is None:
self._LinkKit__thing_raw_only = True
self._LinkKit__thing_setup_state = True
return 0
try:
with open(file, encoding="utf-8") as f:
tsl = json.load(f)
index = 0
while index < len(tsl["events"]):
identifier = tsl["events"][index]["identifier"]
if identifier == "post":
output_data = tsl["events"][index]["outputData"]
output_data_index = 0
while output_data_index < len(output_data):
output_data_item = output_data[output_data_index]["identifier"]
self._LinkKit__thing_properties_post.add(output_data_item)
output_data_index += 1
else:
self._LinkKit__thing_events.add(identifier)
index += 1
index = 0
while index < len(tsl["services"]):
identifier = tsl["services"][index]["identifier"]
if identifier == "set":
input_data = tsl["services"][index]["inputData"]
input_data_index = 0
while input_data_index < len(input_data):
input_data_item = input_data[input_data_index]
self._LinkKit__thing_properties_set.add(input_data_item["identifier"])
input_data_index += 1
else:
if identifier == "get":
output_data = tsl["services"][index]["outputData"]
output_data_index = 0
while output_data_index < len(output_data):
output_data_item = output_data[output_data_index]
self._LinkKit__thing_properties_get.add(output_data_item["identifier"])
output_data_index += 1
else:
self._LinkKit__thing_services.add(identifier)
service_reply_topic = self._LinkKit__thing_topic_service_pattern % (self._LinkKit__product_key,
self._LinkKit__device_name,
identifier + "_reply")
self._LinkKit__thing_topic_services_reply.add(service_reply_topic)
index += 1
for event in self._LinkKit__thing_events:
post_topic = self._LinkKit__thing_topic_event_post_pattern % (
self._LinkKit__product_key, self._LinkKit__device_name, event)
self._LinkKit__thing_topic_event_post[event] = post_topic
self._LinkKit__thing_topic_event_post_reply.add(post_topic + "_reply")
for service in self._LinkKit__thing_services:
self._LinkKit__thing_topic_services.add(self._LinkKit__thing_topic_service_pattern % (
self._LinkKit__product_key, self._LinkKit__device_name, service))
except Exception as e:
try:
self._LinkKit__link_log.info("file open error:" + str(e))
return 2
finally:
e = None
del e
self._LinkKit__thing_setup_state = True
return 0
def __subscribe_sys_topic(self):
subscribe_sys_topics = [
(
self._LinkKit__device_info_topic_reply, 0)]
if self._LinkKit__thing_setup_state:
if self._LinkKit__thing_raw_only:
thing_subscribe_topics = [
(
self._LinkKit__thing_topic_raw_down, 0),
(
self._LinkKit__thing_topic_raw_up_reply, 0)]
else:
thing_subscribe_topics = [
(
self._LinkKit__thing_topic_prop_set, 0),
(
self._LinkKit__thing_topic_prop_get, 0),
(
self._LinkKit__thing_topic_raw_down, 0),
(
self._LinkKit__thing_topic_prop_post_reply, 0),
(
self._LinkKit__thing_topic_raw_up_reply, 0),
(
self._LinkKit__thing_topic_update_device_info_reply, 0),
(
self._LinkKit__thing_topic_delete_device_info_reply, 0),
(
self._LinkKit__thing_topic_shadow_get, 0)]
for topic in self._LinkKit__thing_topic_services:
thing_subscribe_topics.append((topic, 0))
for topic in self._LinkKit__thing_topic_event_post_reply:
thing_subscribe_topics.append((topic, 0))
subscribe_sys_topics += thing_subscribe_topics
with self._LinkKit__thing_subscribe_sys_request_lock:
rc, mid = self._LinkKit__mqtt_client.subscribe(subscribe_sys_topics)
if rc == mqtt.MQTT_ERR_SUCCESS:
self._LinkKit__thing_subscribe_sys_request = True
self._LinkKit__thing_subscribe_sys_request_mid[mid] = self._LinkKit__timestamp()
return 0
return 1
def thing_raw_post_data(self, payload):
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
raise LinkKit.StateError("not in CONNECTED state")
with self._LinkKit__thing_raw_up_mid_lock:
rc, mid = self._LinkKit__mqtt_client.publish(self._LinkKit__thing_topic_raw_up, payload, 0)
if rc == mqtt.MQTT_ERR_SUCCESS:
self._LinkKit__thing_raw_up_mid[mid] = self._LinkKit__timestamp()
return 0
return 1
def thing_raw_data_reply(self, payload):
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
raise LinkKit.StateError("not in CONNECTED state")
with self._LinkKit__thing_raw_down_reply_mid_lock:
rc, mid = self._LinkKit__mqtt_client.publish(self._LinkKit__thing_topic_raw_down_reply, payload, 0)
if rc == mqtt.MQTT_ERR_SUCCESS:
self._LinkKit__thing_raw_down_reply_mid[mid] = self._LinkKit__timestamp()
return 0
return 1
def thing_update_device_info(self, payload):
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
raise LinkKit.StateError("not in CONNECTED state")
else:
raise self._LinkKit__thing_setup_state and self._LinkKit__thing_enable_state or LinkKit.StateError("not in SETUP & ENABLE state")
return (1, None)
request_id = self._LinkKit__get_thing_request_id()
with self._LinkKit__thing_update_device_info_up_mid_lock:
rc, mid = self._LinkKit__mqtt_client.publish(self._LinkKit__thing_topic_update_device_info_up, self._LinkKit__pack_alink_request(request_id, "thing.deviceinfo.update", payload), 0)
if rc == mqtt.MQTT_ERR_SUCCESS:
self._LinkKit__thing_update_device_info_up_mid[mid] = self._LinkKit__timestamp()
return (rc, request_id)
return (1, None)
def thing_delete_device_info(self, payload):
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
raise LinkKit.StateError("not in CONNECTED state")
else:
return self._LinkKit__thing_setup_state and self._LinkKit__thing_enable_state or 1
request_id = self._LinkKit__get_thing_request_id()
with self._LinkKit__thing_delete_device_info_up_mid_lock:
rc, mid = self._LinkKit__mqtt_client.publish(self._LinkKit__thing_topic_delete_device_info_up, self._LinkKit__pack_alink_request(request_id, "thing.deviceinfo.delete", payload), 0)
if rc == mqtt.MQTT_ERR_SUCCESS:
self._LinkKit__thing_delete_device_info_up_mid[mid] = self._LinkKit__timestamp()
return (rc, request_id)
return (1, None)
def thing_update_tags(self, tagMap):
if not isinstance(tagMap, dict):
raise ValueError("tagMap must be a dictionary")
return (1, None)
payload = []
for k, v in tagMap.items():
payload.append({(LinkKit.TAG_KEY): k, (LinkKit.TAG_VALUE): v})
return self.thing_update_device_info(payload)
def thing_remove_tags(self, tagKeys):
if not isinstance(tagKeys, list):
if not isinstance(tagKeys, tuple):
raise ValueError("tagKeys must be a list or tuple")
return (1, None)
payload = []
for tagKey in tagKeys:
payload.append({(LinkKit.TAG_KEY): tagKey})
return self.thing_delete_device_info(payload)
def __pack_alink_request(self, request_id, method, params):
request = {
'id': request_id,
'version': '"1.0"',
'params': params,
'method': method}
return json.dumps(request)
def thing_answer_service(self, identifier, request_id, code, data=None):
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
raise LinkKit.StateError("not in CONNECTED state")
elif self._LinkKit__thing_setup_state:
if not self._LinkKit__thing_enable_state:
return 1
if data is None:
data = {}
response = {'id':request_id,
'code':code,
'data':data}
item = self._LinkKit__pop_rrpc_service("alink_" + str(request_id))
if item:
service_reply_topic = item["topic"]
else:
service_reply_topic = self._LinkKit__thing_topic_service_pattern % (self._LinkKit__product_key,
self._LinkKit__device_name,
identifier + "_reply")
with self._LinkKit__thing_answer_service_mid_lock:
rc, mid = self._LinkKit__mqtt_client.publish(service_reply_topic, json.dumps(response), 0)
if rc == mqtt.MQTT_ERR_SUCCESS:
self._LinkKit__thing_answer_service_mid[mid] = self._LinkKit__timestamp()
return 0
return 1
def __get_thing_request_id(self):
with self._LinkKit__thing_request_id_lock:
self._LinkKit__thing_request_value += 1
if self._LinkKit__thing_request_value > self._LinkKit__thing_request_id_max:
self._LinkKit__thing_request_value = 0
if len(self._LinkKit__thing_request_id) > self._LinkKit__mqtt_max_queued_message:
return
if self._LinkKit__thing_request_value not in self._LinkKit__thing_request_id:
self._LinkKit__thing_request_id[self._LinkKit__thing_request_value] = self._LinkKit__timestamp()
self._LinkKit__link_log.debug("__get_thing_request_id pop:%r" % self._LinkKit__thing_request_value)
return str(self._LinkKit__thing_request_value)
return
def __back_thing_request_id(self, post_id):
with self._LinkKit__thing_request_id_lock:
try:
self._LinkKit__thing_request_id.pop(int(post_id))
except Exception as e:
try:
self._LinkKit__link_log.error("__back_thing_request_id pop:%r,%r" % (post_id, e))
finally:
e = None
del e
def __reset_thing_request_id(self):
with self._LinkKit__thing_request_id_lock:
self._LinkKit__thing_request_value = 0
self._LinkKit__thing_request_id.clear()
def __clean_thing_timeout_request_id(self):
with self._LinkKit__thing_request_id_lock:
expire_timestamp = self._LinkKit__timestamp() - self._LinkKit__mqtt_request_timeout * 1000
for request_id in list(self._LinkKit__thing_request_id.keys()):
if self._LinkKit__thing_request_id[request_id] < expire_timestamp:
timestamp = self._LinkKit__thing_request_id.pop(request_id)
self._LinkKit__link_log.error("__clean_thing_timeout_request_id pop:%r,timestamp:%r", request_id, timestamp)
def thing_trigger_event(self, event_tuple):
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
raise LinkKit.StateError("not in CONNECTED state")
else:
return self._LinkKit__thing_setup_state and self._LinkKit__thing_enable_state or (1,
None)
if isinstance(event_tuple, tuple):
event, params = event_tuple
else:
return (1, None)
if event not in self._LinkKit__thing_topic_event_post.keys():
return (1, None)
request_id = self._LinkKit__get_thing_request_id()
if request_id is None:
return 1
request = {'id':request_id, 'version':"1.0",
'params':{"value": params},
'method':"thing.event.%s.post" % event}
with self._LinkKit__thing_event_post_mid_lock:
event_topic = self._LinkKit__thing_topic_event_post[event]
self._LinkKit__link_log.debug("thing_trigger_event publish topic")
rc, mid = self._LinkKit__mqtt_client.publish(event_topic, json.dumps(request), 0)
self._LinkKit__link_log.debug("thing_trigger_event publish done")
if rc == mqtt.MQTT_ERR_SUCCESS:
self._LinkKit__thing_event_post_mid[mid] = self._LinkKit__timestamp()
return (0, request_id)
return (1, None)
def thing_post_property(self, property_data):
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
raise LinkKit.StateError("not in CONNECTED state")
else:
return self._LinkKit__thing_setup_state and self._LinkKit__thing_enable_state or (1,
None)
request_params = property_data
request_id = self._LinkKit__get_thing_request_id()
if request_id is None:
return (1, None)
request = {'id': request_id,
'version': '"1.0"',
'params': request_params,
'method': '"thing.event.property.post"'}
with self._LinkKit__thing_prop_post_mid_lock:
rc, mid = self._LinkKit__mqtt_client.publish(self._LinkKit__thing_topic_prop_post, json.dumps(request), 1)
if rc == mqtt.MQTT_ERR_SUCCESS:
self._LinkKit__thing_prop_post_mid[mid] = self._LinkKit__timestamp()
return (0, request_id)
return (1, None)
def __on_internal_async_messageParse error at or near `COME_FROM' instruction at offset 1624_2
def __parse_raw_topic(self, topic):
return re.search("/ext/rrpc/.*?(/.*)", topic).group(1)
def __tidy_topic(self, topic):
if topic == None:
return
topic = topic.strip()
if len(topic) == 0:
return
if topic[0] != "/":
topic = "/" + topic
return topic
def __push_rrpc_service(self, item):
with self._LinkKit__user_rrpc_request_ids_lock:
if len(self._LinkKit__user_rrpc_request_ids) > self._LinkKit__user_rrpc_request_max_len:
removed_item = self._LinkKit__user_rrpc_request_ids.pop(0)
del self._LinkKit__user_rrpc_request_id_index_map[removed_item["id"]]
self._LinkKit__user_rrpc_request_ids.append(item)
self._LinkKit__user_rrpc_request_id_index_map[item["id"]] = 0
def __pop_rrpc_service(self, id):
with self._LinkKit__user_rrpc_request_ids_lock:
if id not in self._LinkKit__user_rrpc_request_id_index_map:
return
del self._LinkKit__user_rrpc_request_id_index_map[id]
for index in range(0, len(self._LinkKit__user_rrpc_request_ids)):
item = self._LinkKit__user_rrpc_request_ids[index]
if item["id"] == id:
del self._LinkKit__user_rrpc_request_ids[index]
return item
return
def thing_answer_rrpc(self, id, response):
item = self._LinkKit__pop_rrpc_service("rrpc_" + id)
if item == None:
self._LinkKit__link_log.error("answer_rrpc_topic, the id does not exist: %s" % id)
return (1, None)
rc, mid = self._LinkKit__mqtt_client.publish(item["topic"], response, 0)
self._LinkKit__link_log.debug("reply topic:%s" % item["topic"])
return (rc, mid)
def __try_parse_rrpc_topic(self, message):
self._LinkKit__link_log.debug("receive a rrpc topic:%s" % message.topic)
raw_topic = self._LinkKit__parse_raw_topic(message.topic)
if raw_topic.startswith("/sys"):
if raw_topic in self._LinkKit__thing_topic_services:
identifier = raw_topic.split("/", 6)[6]
payload = self._LinkKit__load_json(self._LinkKit__to_str(message.payload))
try:
request_id = payload["id"]
params = payload["params"]
item_id = "alink_" + request_id
item = {'id':item_id, 'request_id':request_id, 'payload':payload, 'identifier':identifier, 'topic':message.topic}
self._LinkKit__push_rrpc_service(item)
self._LinkKit__on_thing_call_service(identifier, request_id, params, self._LinkKit__user_data)
except Exception as e:
try:
self._LinkKit__link_log.error("on_thing_call_service raise exception: %s" % e)
finally:
e = None
del e
return
else:
with self._LinkKit__user_rrpc_topics_subscribe_request_lock:
with self._LinkKit__user_rrpc_topics_lock:
if raw_topic not in self._LinkKit__user_rrpc_topics:
self._LinkKit__link_log.error("%s is not in the rrpc-subscribed list" % raw_topic)
return
return self._LinkKit__on_topic_rrpc_message or None
try:
rrpc_id = message.topic.split("/", 4)[3]
item_id = "rrpc_" + rrpc_id
item = {'id':item_id, 'payload':message.payload, 'topic':message.topic}
self._LinkKit__push_rrpc_service(item)
self._LinkKit__on_topic_rrpc_message(rrpc_id, message.topic, message.payload, message.qos, self._LinkKit__user_data)
except Exception as e:
try:
self._LinkKit__link_log.error("on_topic_rrpc_message process raise exception:%r" % e)
finally:
e = None
del e
def __try_parse_try_shadow(self, payload):
try:
self._LinkKit__latest_shadow.set_latest_recevied_time(self._LinkKit__timestamp())
self._LinkKit__latest_shadow.set_latest_recevied_payload(payload)
msg = self._LinkKit__load_json(payload)
if "version" in msg:
self._LinkKit__latest_shadow.set_version(msg["version"])
else:
if "payload" in msg:
if "version" in msg["payload"]:
self._LinkKit__latest_shadow.set_version(msg["payload"]["version"])
elif "timestamp" in msg:
self._LinkKit__latest_shadow.set_timestamp(msg["timestamp"])
else:
if "payload" in msg:
if "timestamp" in msg["payload"]:
self._LinkKit__latest_shadow.set_timestamp(msg["payload"]["timestamp"])
if "payload" in msg:
if msg["payload"]["status"] == "success":
if "state" in msg["payload"]:
self._LinkKit__latest_shadow.set_state(msg["payload"]["state"])
if "metadata" in msg["payload"]:
self._LinkKit__latest_shadow.set_metadata(msg["payload"]["metadata"])
except Exception as e:
try:
pass
finally:
e = None
del e
def thing_update_shadow(self, reported, version):
request = {'state':{"reported": reported},
'method':"update",
'version':version}
return self._LinkKit__thing_update_shadow(request)
def thing_get_shadow(self):
request = {"method": "get"}
return self._LinkKit__thing_update_shadow(request)
def local_get_latest_shadow(self):
return self._LinkKit__latest_shadow
def __thing_update_shadow(self, request):
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
raise LinkKit.StateError("not in CONNECTED state")
else:
return self._LinkKit__thing_setup_state and self._LinkKit__thing_enable_state or (1,
None)
with self._LinkKit__thing_shadow_mid_lock:
rc, mid = self._LinkKit__mqtt_client.publish(self._LinkKit__thing_topic_shadow_update, json.dumps(request), 1)
if rc == mqtt.MQTT_ERR_SUCCESS:
self._LinkKit__thing_shadow_mid[mid] = self._LinkKit__timestamp()
return (0, mid)
return (1, None)
def __on_internal_message(self, client, user_data, message):
self._LinkKit__link_log.info("__on_internal_message")
self._LinkKit__handler_task.post_message(self._LinkKit__handler_task_cmd_on_message, (client, user_data, message))
def __handler_task_on_message_callback(self, value):
client, user_data, message = value
self._LinkKit__on_internal_async_message(message)
def __on_internal_connect(self, client, user_data, session_flag, rc):
self._LinkKit__link_log.info("__on_internal_connect")
if rc == 0:
self._LinkKit__reset_reconnect_wait()
self._LinkKit__subscribe_sys_topic()
self._LinkKit__handler_task.post_message(self._LinkKit__handler_task_cmd_on_connect, (client, user_data, session_flag, rc))
def __handler_task_on_connect_callback(self, value):
client, user_data, session_flag, rc = value
self._LinkKit__link_log.info("__on_internal_connect enter")
self._LinkKit__link_log.debug("session:%d, return code:%d" % (session_flag["session present"], rc))
if rc == 0:
self._LinkKit__linkkit_state = LinkKit.LinkKitState.CONNECTED
if self._LinkKit__on_connect is not None:
try:
self._LinkKit__on_connect(session_flag["session present"], rc, self._LinkKit__user_data)
except Exception as e:
try:
self._LinkKit__link_log.error("on_connect process raise exception:%r" % e)
finally:
e = None
del e
def __on_internal_disconnect(self, client, user_data, rc):
self._LinkKit__link_log.info("__on_internal_disconnect enter")
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.DESTRUCTING:
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DESTRUCTED
else:
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.DISCONNECTING:
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DISCONNECTED
else:
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.CONNECTED:
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DISCONNECTED
else:
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.DISCONNECTED:
self._LinkKit__link_log.error("__on_internal_disconnect enter from wrong state:%r" % self._LinkKit__linkkit_state)
return
self._LinkKit__link_log.error("__on_internal_disconnect enter from wrong state:%r" % self._LinkKit__linkkit_state)
return
self._LinkKit__user_topics.clear()
self._LinkKit__user_topics_subscribe_request.clear()
self._LinkKit__user_topics_unsubscribe_request.clear()
self._LinkKit__user_rrpc_topics.clear()
self._LinkKit__user_rrpc_topics_subscribe_request.clear()
self._LinkKit__user_rrpc_topics_unsubscribe_request.clear()
self._LinkKit__thing_prop_post_mid.clear()
self._LinkKit__thing_event_post_mid.clear()
self._LinkKit__thing_answer_service_mid.clear()
self._LinkKit__thing_raw_down_reply_mid.clear()
self._LinkKit__thing_raw_up_mid.clear()
self._LinkKit__thing_shadow_mid.clear()
self._LinkKit__device_info_mid.clear()
self._LinkKit__thing_update_device_info_up_mid.clear()
self._LinkKit__thing_delete_device_info_up_mid.clear()
self._LinkKit__handler_task.post_message(self._LinkKit__handler_task_cmd_on_disconnect, (client, user_data, rc))
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.DESTRUCTED:
self._LinkKit__handler_task.stop()
def __handler_task_on_disconnect_callback(self, value):
self._LinkKit__link_log.info("__handler_task_on_disconnect_callback enter")
client, user_data, rc = value
if self._LinkKit__thing_setup_state:
if self._LinkKit__thing_enable_state:
self._LinkKit__thing_enable_state = False
if self._LinkKit__on_thing_disable is not None:
try:
self._LinkKit__on_thing_disable(self._LinkKit__user_data)
except Exception as e:
try:
self._LinkKit__link_log.error("on_thing_disable process raise exception:%r" % e)
finally:
e = None
del e
if self._LinkKit__on_disconnect is not None:
try:
self._LinkKit__on_disconnect(rc, self._LinkKit__user_data)
except Exception as e:
try:
self._LinkKit__link_log.error("on_disconnect process raise exception:%r" % e)
finally:
e = None
del e
def __on_internal_publish(self, client, user_data, mid):
self._LinkKit__handler_task.post_message(self._LinkKit__handler_task_cmd_on_publish, (client, user_data, mid))
def __handler_task_on_publish_callback(self, value):
client, user_data, mid = value
self._LinkKit__link_log.debug("__on_internal_publish message:%d" % mid)
with self._LinkKit__thing_event_post_mid_lock:
if mid in self._LinkKit__thing_event_post_mid:
self._LinkKit__thing_event_post_mid.pop(mid)
self._LinkKit__link_log.debug("__on_internal_publish event post mid removed")
return
with self._LinkKit__thing_prop_post_mid_lock:
if mid in self._LinkKit__thing_prop_post_mid:
self._LinkKit__thing_prop_post_mid.pop(mid)
self._LinkKit__link_log.debug("__on_internal_publish prop post mid removed")
return
with self._LinkKit__thing_prop_set_reply_mid_lock:
if mid in self._LinkKit__thing_prop_set_reply_mid:
self._LinkKit__thing_prop_set_reply_mid.pop(mid)
self._LinkKit__link_log.debug("__on_internal_publish prop set reply mid removed")
return
with self._LinkKit__thing_answer_service_mid_lock:
if mid in self._LinkKit__thing_answer_service_mid:
self._LinkKit__thing_answer_service_mid.pop(mid)
self._LinkKit__link_log.debug("__thing_answer_service_mid mid removed")
return
with self._LinkKit__thing_raw_up_mid_lock:
if mid in self._LinkKit__thing_raw_up_mid:
self._LinkKit__thing_raw_up_mid.pop(mid)
self._LinkKit__link_log.debug("__thing_raw_up_mid mid removed")
return
with self._LinkKit__thing_raw_down_reply_mid_lock:
if mid in self._LinkKit__thing_raw_down_reply_mid:
self._LinkKit__thing_raw_down_reply_mid.pop(mid)
self._LinkKit__link_log.debug("__thing_raw_down_reply_mid mid removed")
return
with self._LinkKit__device_info_mid_lock:
if mid in self._LinkKit__device_info_mid:
self._LinkKit__device_info_mid.pop(mid)
self._LinkKit__link_log.debug("__device_info_mid mid removed")
return
with self._LinkKit__thing_shadow_mid_lock:
if mid in self._LinkKit__thing_shadow_mid:
self._LinkKit__thing_shadow_mid.pop(mid)
self._LinkKit__link_log.debug("__thing_shadow_mid mid removed")
return
with self._LinkKit__thing_update_device_info_up_mid_lock:
if mid in self._LinkKit__thing_update_device_info_up_mid:
self._LinkKit__thing_update_device_info_up_mid.pop(mid)
self._LinkKit__link_log.debug("__thing_update_device_info_up_mid mid removed")
return
with self._LinkKit__thing_delete_device_info_up_mid_lock:
if mid in self._LinkKit__thing_delete_device_info_up_mid:
self._LinkKit__thing_delete_device_info_up_mid.pop(mid)
self._LinkKit__link_log.debug("__thing_delete_device_info_up_mid mid removed")
return
if self._LinkKit__on_publish_topic is not None:
self._LinkKit__on_publish_topic(mid, self._LinkKit__user_data)
def __on_internal_subscribe(self, client, user_data, mid, granted_qos):
self._LinkKit__handler_task.post_message(self._LinkKit__handler_task_cmd_on_subscribe, (client, user_data, mid, granted_qos))
def __handler_task_on_subscribe_callback(self, value):
client, user_data, mid, granted_qos = value
self._LinkKit__link_log.debug("__on_internal_subscribe mid:%d granted_qos:%s" % (
mid, str(",".join(("%s" % it for it in granted_qos)))))
if self._LinkKit__thing_subscribe_sys_request:
if mid in self._LinkKit__thing_subscribe_sys_request_mid:
self._LinkKit__thing_subscribe_sys_request_mid.pop(mid)
self._LinkKit__thing_subscribe_sys_request = False
if self._LinkKit__thing_setup_state:
self._LinkKit__thing_enable_state = True
if self._LinkKit__on_thing_enable:
self._LinkKit__on_thing_enable(self._LinkKit__user_data)
return
with self._LinkKit__user_rrpc_topics_subscribe_request_lock:
if mid in self._LinkKit__user_rrpc_topics_subscribe_request:
self._LinkKit__user_rrpc_topics_subscribe_request.pop(mid)
if self._LinkKit__on_subscribe_rrpc_topic:
try:
self._LinkKit__on_subscribe_rrpc_topic(mid, granted_qos, self._LinkKit__user_data)
except Exception as err:
try:
self._LinkKit__link_log.error("Caught exception in on_subscribe_topic: %s", err)
finally:
err = None
del err
return
topics_requests = None
self._LinkKit__user_topics_request_lock.acquire()
if mid in self._LinkKit__user_topics_subscribe_request:
topics_requests = self._LinkKit__user_topics_subscribe_request.pop(mid)
self._LinkKit__user_topics_request_lock.release()
if topics_requests is not None:
return_topics = []
for index in range(len(topics_requests)):
if granted_qos[index] < 0 or granted_qos[index] > 1:
self._LinkKit__link_log.error("topics:%s, granted wrong:%d" % (
topics_requests[index], granted_qos[index]))
else:
self._LinkKit__user_topics[topics_requests[index][0]] = granted_qos[index]
return_topics.append((topics_requests[index], granted_qos[index]))
if self._LinkKit__on_subscribe_topic is not None:
try:
self._LinkKit__on_subscribe_topic(mid, granted_qos, self._LinkKit__user_data)
except Exception as err:
try:
self._LinkKit__link_log.error("Caught exception in on_subscribe_topic: %s", err)
finally:
err = None
del err
def __on_internal_unsubscribe(self, client, user_data, mid):
self._LinkKit__handler_task.post_message(self._LinkKit__handler_task_cmd_on_unsubscribe, (client, user_data, mid))
def __handler_task_on_unsubscribe_callback(self, value):
client, user_data, mid = value
self._LinkKit__link_log.debug("__on_internal_unsubscribe mid:%d" % mid)
unsubscribe_request = None
with self._LinkKit__user_rrpc_topics_unsubscribe_request_lock:
if mid in self._LinkKit__user_rrpc_topics_unsubscribe_request:
self._LinkKit__user_rrpc_topics_unsubscribe_request.pop(mid)
if self._LinkKit__on_unsubscribe_rrpc_topic:
try:
self._LinkKit__on_unsubscribe_rrpc_topic(mid, self._LinkKit__user_data)
except Exception as err:
try:
self._LinkKit__link_log.error("Caught exception in on_unsubscribe_rrpc_topic: %s", err)
finally:
err = None
del err
return
with self._LinkKit__user_topics_unsubscribe_request_lock:
if mid in self._LinkKit__user_topics_unsubscribe_request:
unsubscribe_request = self._LinkKit__user_topics_unsubscribe_request.pop(mid)
if unsubscribe_request is not None:
for t in unsubscribe_request:
self._LinkKit__link_log.debug("__user_topics:%s" % str(self._LinkKit__user_topics))
try:
self._LinkKit__user_topics.pop(t)
except Exception as e:
try:
self._LinkKit__link_log.error("__on_internal_unsubscribe e:" + str(e))
return
finally:
e = None
del e
if self._LinkKit__on_unsubscribe_topic is not None:
try:
self._LinkKit__on_unsubscribe_topic(mid, self._LinkKit__user_data)
except Exception as err:
try:
self._LinkKit__link_log.error("Caught exception in on_unsubscribe_topic: %s", err)
finally:
err = None
del err
def dump_user_topics(self):
return self._LinkKit__user_topics
@staticmethod
def to_user_topic(topic):
topic_section = topic.split("/", 3)
user_topic = topic_section[3]
return user_topic
def to_full_topic(self, topic):
return self._LinkKit__USER_TOPIC_PREFIX % (self._LinkKit__product_key, self._LinkKit__device_name, topic)
@staticmethod
def __timestamp():
return int(time.time() * 1000)