1964 lines
92 KiB
Python
1964 lines
92 KiB
Python
# uncompyle6 version 3.9.2
|
|
# Python bytecode version base 3.7.0 (3394)
|
|
# Decompiled from: Python 3.8.19 (default, Mar 20 2024, 15:27:52)
|
|
# [Clang 14.0.6 ]
|
|
# Embedded file name: /var/user/app/device_supervisorbak/device_supervisor/lib/linkkit/linkkit.py
|
|
# Compiled at: 2024-04-18 03:12:57
|
|
# Size of source mod 2**32: 100637 bytes
|
|
import os, logging, threading, queue, urllib.request, urllib.parse, json, hashlib, hmac, random, ssl, socket, string, time, re, sys
|
|
from enum import Enum
|
|
import paho.mqtt.client as mqtt
|
|
from paho.mqtt.client import MQTTMessage
|
|
from linkkit import h2client
|
|
REQUIRED_MAJOR_VERSION = 3
|
|
REQUIRED_MINOR_VERSION = 5
|
|
|
|
def lk_check_python_version(major_version, minor_version):
|
|
version = sys.version_info
|
|
if (version[0] < major_version or version[0]) == major_version:
|
|
if version[1] < minor_version:
|
|
print("WARNING: linkit requires Python %d.%d or higher, and the current version is %s" % (major_version, minor_version, sys.version))
|
|
|
|
|
|
lk_check_python_version(REQUIRED_MAJOR_VERSION, REQUIRED_MINOR_VERSION)
|
|
|
|
class LinkKit(object):
|
|
TAG_KEY = "attrKey"
|
|
TAG_VALUE = "attrValue"
|
|
|
|
class LinkKitState(Enum):
|
|
INITIALIZED = 1
|
|
CONNECTING = 2
|
|
CONNECTED = 3
|
|
DISCONNECTING = 4
|
|
DISCONNECTED = 5
|
|
DESTRUCTING = 6
|
|
DESTRUCTED = 7
|
|
|
|
class StateError(Exception):
|
|
|
|
def __init__(self, err):
|
|
Exception.__init__(self, err)
|
|
|
|
class Shadow(object):
|
|
|
|
def __init__(self):
|
|
self._Shadow__version = None
|
|
self._Shadow__timestamp = None
|
|
self._Shadow__state = None
|
|
self._Shadow__metadata = None
|
|
self._Shadow__latest_shadow_lock = threading.Lock()
|
|
self._Shadow__latest_received_time = None
|
|
self._Shadow__lastest_received_payload = None
|
|
|
|
def get_version(self):
|
|
with self._Shadow__latest_shadow_lock:
|
|
return self._Shadow__version
|
|
|
|
def get_metadata(self):
|
|
with self._Shadow__latest_shadow_lock:
|
|
return self._Shadow__metadata
|
|
|
|
def get_state(self):
|
|
with self._Shadow__latest_shadow_lock:
|
|
return self._Shadow__state
|
|
|
|
def set_state(self, state):
|
|
with self._Shadow__latest_shadow_lock:
|
|
self._Shadow__state = state
|
|
|
|
def set_metadata(self, metadata):
|
|
with self._Shadow__latest_shadow_lock:
|
|
self._Shadow__metadata = metadata
|
|
|
|
def set_version(self, version):
|
|
with self._Shadow__latest_shadow_lock:
|
|
self._Shadow__version = version
|
|
|
|
def set_timestamp(self, timestamp):
|
|
with self._Shadow__latest_shadow_lock:
|
|
self._Shadow__timestamp = timestamp
|
|
|
|
def set_latest_recevied_time(self, timestamp):
|
|
with self._Shadow__latest_shadow_lock:
|
|
self._Shadow__latest_received_time = timestamp
|
|
|
|
def get_latest_recevied_time(self):
|
|
with self._Shadow__latest_shadow_lock:
|
|
return self._Shadow__latest_received_time
|
|
|
|
def set_latest_recevied_payload(self, payload):
|
|
with self._Shadow__latest_shadow_lock:
|
|
self._Shadow__latest_received_payload = payload
|
|
|
|
def get_latest_recevied_payload(self):
|
|
with self._Shadow__latest_shadow_lock:
|
|
return self._Shadow__latest_received_payload
|
|
|
|
def to_dict(self):
|
|
return {'state':self._Shadow__state, 'metadata':self._Shadow__metadata, 'version':self._Shadow__version, 'timestamp':self._Shadow__timestamp}
|
|
|
|
def to_json_string(self):
|
|
return json.dumps(self.to_dict())
|
|
|
|
class __HandlerTask(object):
|
|
|
|
def __init__(self, logger=None):
|
|
self._HandlerTask__logger = logger
|
|
if self._HandlerTask__logger is not None:
|
|
self._HandlerTask__logger.info("HandlerTask init enter")
|
|
self._HandlerTask__message_queue = queue.Queue(20)
|
|
self._HandlerTask__cmd_callback = {}
|
|
self._HandlerTask__started = False
|
|
self._HandlerTask__exited = False
|
|
self._HandlerTask__thread = None
|
|
|
|
def register_cmd_callback(self, cmd, callback):
|
|
if self._HandlerTask__started is False:
|
|
if cmd != "req_exit":
|
|
self._HandlerTask__cmd_callback[cmd] = callback
|
|
return 0
|
|
return 1
|
|
else:
|
|
return 2
|
|
|
|
def post_message(self, cmd, value):
|
|
self._HandlerTask__logger.debug("post_message :%r " % cmd)
|
|
if self._HandlerTask__started:
|
|
if self._HandlerTask__exited is False:
|
|
try:
|
|
self._HandlerTask__message_queue.put((cmd, value), timeout=5)
|
|
except queue.Full as e:
|
|
try:
|
|
self._HandlerTask__logger.error("queue full: %r" % e)
|
|
return False
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
self._HandlerTask__logger.debug("post_message success")
|
|
return True
|
|
self._HandlerTask__logger.debug("post_message fail started:%r,exited:%r" % (self._HandlerTask__started, self._HandlerTask__exited))
|
|
return False
|
|
|
|
def start(self):
|
|
if self._HandlerTask__logger is not None:
|
|
self._HandlerTask__logger.info("HandlerTask start")
|
|
if self._HandlerTask__started is False:
|
|
if self._HandlerTask__logger is not None:
|
|
self._HandlerTask__logger.info("HandlerTask try start")
|
|
self._HandlerTask__exited = False
|
|
self._HandlerTask__started = True
|
|
self._HandlerTask__message_queue = queue.Queue(20)
|
|
self._HandlerTask__thread = threading.Thread(target=(self._HandlerTask__thread_runnable))
|
|
self._HandlerTask__thread.daemon = True
|
|
self._HandlerTask__thread.start()
|
|
return 0
|
|
return 1
|
|
|
|
def stop(self):
|
|
if self._HandlerTask__started:
|
|
if self._HandlerTask__exited is False:
|
|
self._HandlerTask__exited = True
|
|
self._HandlerTask__message_queue.put(('req_exit', None))
|
|
|
|
def wait_stop(self):
|
|
if self._HandlerTask__started is True:
|
|
self._HandlerTask__thread.join()
|
|
|
|
def __thread_runnable(self):
|
|
if self._HandlerTask__logger is not None:
|
|
self._HandlerTask__logger.debug("thread runnable enter")
|
|
while 1:
|
|
cmd, value = self._HandlerTask__message_queue.get()
|
|
self._HandlerTask__logger.debug("thread runnable pop cmd:%r" % cmd)
|
|
if cmd == "req_exit":
|
|
break
|
|
if self._HandlerTask__cmd_callback[cmd] is not None:
|
|
try:
|
|
self._HandlerTask__cmd_callback[cmd](value)
|
|
except Exception as e:
|
|
try:
|
|
if self._HandlerTask__logger is not None:
|
|
self._HandlerTask__logger.error("thread runnable raise exception:%s" % e)
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
self._HandlerTask__started = False
|
|
if self._HandlerTask__logger is not None:
|
|
self._HandlerTask__logger.debug("thread runnable exit")
|
|
|
|
class LoopThread(object):
|
|
|
|
def __init__(self, logger=None):
|
|
self._LoopThread__logger = logger
|
|
if logger is not None:
|
|
self._LoopThread__logger.info("LoopThread init enter")
|
|
self._LoopThread__callback = None
|
|
self._LoopThread__thread = None
|
|
self._LoopThread__started = False
|
|
self._LoopThread__req_wait = threading.Event()
|
|
if logger is not None:
|
|
self._LoopThread__logger.info("LoopThread init exit")
|
|
|
|
def start(self, callback):
|
|
if self._LoopThread__started is True:
|
|
self._LoopThread__logger.info("LoopThread already ")
|
|
return 1
|
|
self._LoopThread__callback = callback
|
|
self._LoopThread__thread = threading.Thread(target=(self._LoopThread__thread_main))
|
|
self._LoopThread__thread.daemon = True
|
|
self._LoopThread__thread.start()
|
|
return 0
|
|
|
|
def stop(self):
|
|
self._LoopThread__req_wait.wait()
|
|
self._LoopThread__req_wait.clear()
|
|
|
|
def __thread_main(self):
|
|
self._LoopThread__started = True
|
|
try:
|
|
if self._LoopThread__logger is not None:
|
|
self._LoopThread__logger.debug("LoopThread thread enter")
|
|
if self._LoopThread__callback is not None:
|
|
self._LoopThread__callback()
|
|
if self._LoopThread__logger is not None:
|
|
self._LoopThread__logger.debug("LoopThread thread exit")
|
|
except Exception as e:
|
|
try:
|
|
self._LoopThread__logger.error("LoopThread thread Exception:" + str(e))
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
self._LoopThread__started = False
|
|
self._LoopThread__req_wait.set()
|
|
|
|
class __H2FileUploadSink(h2client.H2FileUploadSink):
|
|
|
|
def __init__(self, linkkit_instance):
|
|
self._H2FileUploadSink__lk_instance = linkkit_instance
|
|
|
|
def on_file_upload_start(self, id, upload_file_info, user_data):
|
|
self._H2FileUploadSink__lk_instance._on_file_upload_start(id, upload_file_info, user_data)
|
|
|
|
def on_file_upload_end(self, id, upload_file_info, upload_file_result, user_data):
|
|
self._H2FileUploadSink__lk_instance._on_file_upload_end(id, upload_file_info, upload_file_result, user_data)
|
|
|
|
def on_file_upload_progress(self, id, upload_file_result, upload_file_info, user_data):
|
|
self._H2FileUploadSink__lk_instance._on_file_upload_progress(id, upload_file_result, upload_file_info, user_data)
|
|
|
|
def _on_file_upload_start(self, id, upload_file_info, user_data):
|
|
if self._LinkKit__on_file_upload_begin != None:
|
|
self._LinkKit__on_file_upload_begin(id, upload_file_info, self._LinkKit__user_data)
|
|
|
|
def _on_file_upload_end(self, id, upload_file_info, upload_file_result, user_data):
|
|
if self._LinkKit__on_file_upload_end != None:
|
|
self._LinkKit__on_file_upload_end(id, upload_file_info, upload_file_result, self._LinkKit__user_data)
|
|
|
|
def _on_file_upload_progress(self, id, upload_file_result, upload_file_info, user_data):
|
|
pass
|
|
|
|
class __LinkKitLog(object):
|
|
|
|
def __init__(self):
|
|
self._LinkKitLog__logger = logging.getLogger("linkkit")
|
|
self._LinkKitLog__enabled = False
|
|
|
|
def enable_logger(self):
|
|
self._LinkKitLog__enabled = True
|
|
|
|
def disable_logger(self):
|
|
self._LinkKitLog__enabled = False
|
|
|
|
def is_enabled(self):
|
|
return self._LinkKitLog__enabled
|
|
|
|
def config_logger(self, level):
|
|
self._LinkKitLog__logger.setLevel(level)
|
|
|
|
def debug(self, fmt, *args):
|
|
if self._LinkKitLog__enabled:
|
|
(self._LinkKitLog__logger.debug)(fmt, *args)
|
|
|
|
def warring(self, fmt, *args):
|
|
if self._LinkKitLog__enabled:
|
|
(self._LinkKitLog__logger.warning)(fmt, *args)
|
|
|
|
def info(self, fmt, *args):
|
|
if self._LinkKitLog__enabled:
|
|
(self._LinkKitLog__logger.info)(fmt, *args)
|
|
|
|
def error(self, fmt, *args):
|
|
if self._LinkKitLog__enabled:
|
|
(self._LinkKitLog__logger.error)(fmt, *args)
|
|
|
|
def critical(self, fmt, *args):
|
|
if self._LinkKitLog__enabled:
|
|
(self._LinkKitLog__logger.critical)(fmt, *args)
|
|
|
|
_LinkKit__USER_TOPIC_PREFIX = "/%s/%s/%s"
|
|
_LinkKit__ALIYUN_BROKER_CA_DATA = "-----BEGIN CERTIFICATE-----\nMIIDdTCCAl2gAwIBAgILBAAAAAABFUtaw5QwDQYJKoZIhvcNAQEFBQAwVzELMAkGA1UEBhMCQkUxGTAXBgNVBAoTEEdsb2JhbFNpZ24gbnYtc2ExEDAOBgNVBAsTB1Jvb3QgQ0ExGzAZBgNVBAMTEkdsb2JhbFNpZ24gUm9vdCBDQTAeFw05ODA5MDExMjAwMDBaFw0yODAxMjgxMjAwMDBaMFcxCzAJBgNVBAYTAkJFMRkwFwYDVQQKExBHbG9iYWxTaWduIG52LXNhMRAwDgYDVQQLEwdSb290IENBMRswGQYDVQQDExJHbG9iYWxTaWduIFJvb3QgQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDaDuaZjc6j40+Kfvvxi4Mla+pIH/EqsLmVEQS98GPR4mdmzxzdzxtIK+6NiY6arymAZavpxy0Sy6scTHAHoT0KMM0VjU/43dSMUBUc71DuxC73/OlS8pF94G3VNTCOXkNz8kHp1Wrjsok6Vjk4bwY8iGlbKk3Fp1S4bInMm/k8yuX9ifUSPJJ4ltbcdG6TRGHRjcdGsnUOhugZitVtbNV4FpWi6cgKOOvyJBNPc1STE4U6G7weNLWLBYy5d4ux2x8gkasJU26Qzns3dLlwR5EiUWMWea6xrkEmCMgZK9FGqkjWZCrXgzT/LCrBbBlDSgeF59N89iFo7+ryUp9/k5DPAgMBAAGjQjBAMA4GA1UdDwEB/wQEAwIBBjAPBgNVHRMBAf8EBTADAQH/MB0GA1UdDgQWBBRge2YaRQ2XyolQL30EzTSo//z9SzANBgkqhkiG9w0BAQUFAAOCAQEA1nPnfE920I2/7LqivjTFKDK1fPxsnCwrvQmeU79rXqoRSLblCKOzyj1hTdNGCbM+w6DjY1Ub8rrvrTnhQ7k4o+YviiY776BQVvnGCv04zcQLcFGUl5gE38NflNUVyRRBnMRddWQVDf9VMOyGj/8N7yy5Y0b2qvzfvGn9LhJIZJrglfCm7ymPAbEVtQwdpf5pLGkkeB6zpxxxYu7KyJesF12KwvhHhm4qxFYxldBniYUr+WymXUadDKqC5JlR3XC321Y9YeRq4VzW9v493kHMB65jUr9TU/Qr6cf9tveCX4XSQRjbgbMEHMUfpIBvFSDJ3gyICh3WZlXi/EjJKSZp4A==\n-----END CERTIFICATE-----"
|
|
|
|
def __init__(self, host_name, product_key, device_name, device_secret, product_secret=None, user_data=None):
|
|
self._LinkKit__just_for_pycharm_autocomplete = False
|
|
|
|
def __str_is_empty(value):
|
|
if value is None or value == "":
|
|
return True
|
|
return False
|
|
|
|
if _LinkKit__str_is_empty(host_name):
|
|
raise ValueError("host_name wrong")
|
|
if _LinkKit__str_is_empty(product_key):
|
|
raise ValueError("product key wrong")
|
|
if _LinkKit__str_is_empty(device_name):
|
|
raise ValueError("device name wrong")
|
|
if _LinkKit__str_is_empty(device_secret):
|
|
if _LinkKit__str_is_empty(product_secret):
|
|
raise ValueError("device secret & product secret are both empty")
|
|
self._LinkKit__link_log = LinkKit._LinkKit__LinkKitLog()
|
|
self._LinkKit__PahoLog = logging.getLogger("Paho")
|
|
self._LinkKit__PahoLog.setLevel(logging.DEBUG)
|
|
self._LinkKit__host_name = host_name
|
|
self._LinkKit__product_key = product_key
|
|
self._LinkKit__device_name = device_name
|
|
self._LinkKit__device_secret = device_secret
|
|
self._LinkKit__product_secret = product_secret
|
|
self._LinkKit__user_data = user_data
|
|
self._LinkKit__device_interface_info = ""
|
|
self._LinkKit__device_mac = None
|
|
self._LinkKit__cellular_IMEI = None
|
|
self._LinkKit__cellular_ICCID = None
|
|
self._LinkKit__cellular_IMSI = None
|
|
self._LinkKit__cellular_MSISDN = None
|
|
self._LinkKit__mqtt_client = None
|
|
self._LinkKit__sdk_version = "1.2.0"
|
|
self._LinkKit__sdk_program_language = "Python"
|
|
self._LinkKit__endpoint = None
|
|
self._LinkKit__h2_endpoint = None
|
|
self._LinkKit__mqtt_port = 1883
|
|
self._LinkKit__mqtt_protocol = "MQTTv311"
|
|
self._LinkKit__mqtt_transport = "TCP"
|
|
self._LinkKit__mqtt_secure = "TLS"
|
|
self._LinkKit__mqtt_keep_alive = 60
|
|
self._LinkKit__mqtt_clean_session = True
|
|
self._LinkKit__mqtt_max_inflight_message = 20
|
|
self._LinkKit__mqtt_max_queued_message = 40
|
|
self._LinkKit__mqtt_auto_reconnect_min_sec = 1
|
|
self._LinkKit__mqtt_auto_reconnect_max_sec = 60
|
|
self._LinkKit__mqtt_auto_reconnect_sec = 0
|
|
self._LinkKit__mqtt_request_timeout = 10
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.INITIALIZED
|
|
self._LinkKit__aliyun_broker_ca_data = self._LinkKit__ALIYUN_BROKER_CA_DATA
|
|
self._LinkKit__latest_shadow = LinkKit.Shadow()
|
|
self._LinkKit__on_device_dynamic_register = None
|
|
self._LinkKit__on_connect = None
|
|
self._LinkKit__on_disconnect = None
|
|
self._LinkKit__on_publish_topic = None
|
|
self._LinkKit__on_subscribe_topic = None
|
|
self._LinkKit__on_unsubscribe_topic = None
|
|
self._LinkKit__on_topic_message = None
|
|
self._LinkKit__on_topic_rrpc_message = None
|
|
self._LinkKit__on_subscribe_rrpc_topic = None
|
|
self._LinkKit__on_unsubscribe_rrpc_topic = None
|
|
self._LinkKit__on_thing_create = None
|
|
self._LinkKit__on_thing_enable = None
|
|
self._LinkKit__on_thing_disable = None
|
|
self._LinkKit__on_thing_raw_data_arrived = None
|
|
self._LinkKit__on_thing_raw_data_post = None
|
|
self._LinkKit__on_thing_call_service = None
|
|
self._LinkKit__on_thing_prop_changed = None
|
|
self._LinkKit__on_thing_event_post = None
|
|
self._LinkKit__on_thing_prop_post = None
|
|
self._LinkKit__on_thing_shadow_get = None
|
|
self._LinkKit__on_thing_device_info_update = None
|
|
self._LinkKit__on_thing_device_info_delete = None
|
|
self._LinkKit__on_file_upload_begin = None
|
|
self._LinkKit__on_file_upload_end = None
|
|
self._LinkKit__user_topics = {}
|
|
self._LinkKit__user_topics_subscribe_request = {}
|
|
self._LinkKit__user_topics_unsubscribe_request = {}
|
|
self._LinkKit__user_topics_request_lock = threading.Lock()
|
|
self._LinkKit__user_topics_unsubscribe_request_lock = threading.Lock()
|
|
self._LinkKit__user_rrpc_topics = {}
|
|
self._LinkKit__user_rrpc_topics_lock = threading.RLock()
|
|
self._LinkKit__user_rrpc_topics_subscribe_request = {}
|
|
self._LinkKit__user_rrpc_topics_unsubscribe_request = {}
|
|
self._LinkKit__user_rrpc_topics_subscribe_request_lock = threading.RLock()
|
|
self._LinkKit__user_rrpc_topics_unsubscribe_request_lock = threading.RLock()
|
|
self._LinkKit__user_rrpc_request_ids = []
|
|
self._LinkKit__user_rrpc_request_id_index_map = {}
|
|
self._LinkKit__user_rrpc_request_ids_lock = threading.RLock()
|
|
self._LinkKit__user_rrpc_request_max_len = 100
|
|
self._LinkKit__thing_topic_prop_post = "/sys/%s/%s/thing/event/property/post" % (
|
|
self._LinkKit__product_key, self._LinkKit__device_name)
|
|
self._LinkKit__thing_topic_prop_post_reply = self._LinkKit__thing_topic_prop_post + "_reply"
|
|
self._LinkKit__thing_topic_prop_set = "/sys/%s/%s/thing/service/property/set" % (
|
|
self._LinkKit__product_key, self._LinkKit__device_name)
|
|
self._LinkKit__thing_topic_prop_set_reply = self._LinkKit__thing_topic_prop_set + "_reply"
|
|
self._LinkKit__thing_topic_prop_get = "/sys/%s/%s/thing/service/property/get" % (
|
|
self._LinkKit__product_key, self._LinkKit__device_name)
|
|
self._LinkKit__thing_topic_event_post_pattern = "/sys/%s/%s/thing/event/%s/post"
|
|
self._LinkKit__thing_prop_post_mid = {}
|
|
self._LinkKit__thing_prop_post_mid_lock = threading.Lock()
|
|
self._LinkKit__thing_prop_set_reply_mid = {}
|
|
self._LinkKit__thing_prop_set_reply_mid_lock = threading.Lock()
|
|
self._LinkKit__thing_topic_event_post = {}
|
|
self._LinkKit__thing_topic_event_post_reply = set()
|
|
self._LinkKit__thing_events = set()
|
|
self._LinkKit__thing_request_id_max = 1000000
|
|
self._LinkKit__thing_request_value = 0
|
|
self._LinkKit__thing_request_id = {}
|
|
self._LinkKit__thing_request_id_lock = threading.Lock()
|
|
self._LinkKit__thing_event_post_mid = {}
|
|
self._LinkKit__thing_event_post_mid_lock = threading.Lock()
|
|
self._LinkKit__thing_topic_shadow_get = "/shadow/get/%s/%s" % (
|
|
self._LinkKit__product_key, self._LinkKit__device_name)
|
|
self._LinkKit__thing_topic_shadow_update = "/shadow/update/%s/%s" % (
|
|
self._LinkKit__product_key, self._LinkKit__device_name)
|
|
self._LinkKit__thing_shadow_mid = {}
|
|
self._LinkKit__thing_shadow_mid_lock = threading.Lock()
|
|
self._LinkKit__thing_topic_service_pattern = "/sys/%s/%s/thing/service/%s"
|
|
self._LinkKit__thing_topic_services = set()
|
|
self._LinkKit__thing_topic_services_reply = set()
|
|
self._LinkKit__thing_services = set()
|
|
self._LinkKit__thing_answer_service_mid = {}
|
|
self._LinkKit__thing_answer_service_mid_lock = threading.Lock()
|
|
self._LinkKit__thing_topic_raw_up = "/sys/%s/%s/thing/model/up_raw" % (self._LinkKit__product_key, self._LinkKit__device_name)
|
|
self._LinkKit__thing_topic_raw_up_reply = self._LinkKit__thing_topic_raw_up + "_reply"
|
|
self._LinkKit__thing_topic_raw_down = "/sys/%s/%s/thing/model/down_raw" % (self._LinkKit__product_key, self._LinkKit__device_name)
|
|
self._LinkKit__thing_topic_raw_down_reply = self._LinkKit__thing_topic_raw_down + "_reply"
|
|
self._LinkKit__thing_raw_up_mid = {}
|
|
self._LinkKit__thing_raw_up_mid_lock = threading.Lock()
|
|
self._LinkKit__thing_raw_down_reply_mid = {}
|
|
self._LinkKit__thing_raw_down_reply_mid_lock = threading.Lock()
|
|
self._LinkKit__thing_topic_update_device_info_up = "/sys/%s/%s/thing/deviceinfo/update" % (self._LinkKit__product_key, self._LinkKit__device_name)
|
|
self._LinkKit__thing_topic_update_device_info_reply = self._LinkKit__thing_topic_update_device_info_up + "_reply"
|
|
self._LinkKit__thing_topic_delete_device_info_up = "/sys/%s/%s/thing/deviceinfo/delete" % (self._LinkKit__product_key, self._LinkKit__device_name)
|
|
self._LinkKit__thing_topic_delete_device_info_reply = self._LinkKit__thing_topic_delete_device_info_up + "_reply"
|
|
self._LinkKit__thing_update_device_info_up_mid = {}
|
|
self._LinkKit__thing_update_device_info_up_mid_lock = threading.Lock()
|
|
self._LinkKit__thing_delete_device_info_up_mid = {}
|
|
self._LinkKit__thing_delete_device_info_up_mid_lock = threading.Lock()
|
|
self._LinkKit__thing_properties_set = set()
|
|
self._LinkKit__thing_properties_get = set()
|
|
self._LinkKit__thing_properties_post = set()
|
|
self._LinkKit__thing_subscribe_sys_request = False
|
|
self._LinkKit__thing_subscribe_sys_request_mid = {}
|
|
self._LinkKit__thing_subscribe_sys_request_lock = threading.Lock()
|
|
self._LinkKit__thing_setup_state = False
|
|
self._LinkKit__thing_raw_only = False
|
|
self._LinkKit__thing_enable_state = False
|
|
if self._LinkKit__just_for_pycharm_autocomplete:
|
|
self._LinkKit__mqtt_client = mqtt.Client()
|
|
self._LinkKit__device_info_topic = "/sys/%s/%s/thing/deviceinfo/update" % (self._LinkKit__product_key, self._LinkKit__device_name)
|
|
self._LinkKit__device_info_topic_reply = self._LinkKit__device_info_topic + "_reply"
|
|
self._LinkKit__device_info_mid_lock = threading.Lock()
|
|
self._LinkKit__device_info_mid = {}
|
|
self._LinkKit__connect_async_req = False
|
|
self._LinkKit__worker_loop_exit_req = False
|
|
self._LinkKit__worker_loop_runing_state = False
|
|
self._LinkKit__worker_loop_exit_req_lock = threading.Lock()
|
|
self._LinkKit__loop_thread = LinkKit.LoopThread(self._LinkKit__link_log)
|
|
self._LinkKit__handler_task = LinkKit._LinkKit__HandlerTask(self._LinkKit__link_log)
|
|
self._LinkKit__handler_task_cmd_on_connect = "on_connect"
|
|
self._LinkKit__handler_task_cmd_on_disconnect = "on_disconnect"
|
|
self._LinkKit__handler_task_cmd_on_message = "on_message"
|
|
self._LinkKit__handler_task_cmd_on_publish = "on_publish"
|
|
self._LinkKit__handler_task_cmd_on_subscribe = "on_subscribe"
|
|
self._LinkKit__handler_task_cmd_on_unsubscribe = "on_unsubscribe"
|
|
self._LinkKit__handler_task.register_cmd_callback(self._LinkKit__handler_task_cmd_on_connect, self._LinkKit__handler_task_on_connect_callback)
|
|
self._LinkKit__handler_task.register_cmd_callback(self._LinkKit__handler_task_cmd_on_disconnect, self._LinkKit__handler_task_on_disconnect_callback)
|
|
self._LinkKit__handler_task.register_cmd_callback(self._LinkKit__handler_task_cmd_on_message, self._LinkKit__handler_task_on_message_callback)
|
|
self._LinkKit__handler_task.register_cmd_callback(self._LinkKit__handler_task_cmd_on_publish, self._LinkKit__handler_task_on_publish_callback)
|
|
self._LinkKit__handler_task.register_cmd_callback(self._LinkKit__handler_task_cmd_on_subscribe, self._LinkKit__handler_task_on_subscribe_callback)
|
|
self._LinkKit__handler_task.register_cmd_callback(self._LinkKit__handler_task_cmd_on_unsubscribe, self._LinkKit__handler_task_on_unsubscribe_callback)
|
|
self._LinkKit__handler_task.start()
|
|
self._LinkKit__h2_client = None
|
|
self._LinkKit__h2_client_lock = threading.RLock()
|
|
|
|
@property
|
|
def on_device_dynamic_register(self):
|
|
pass
|
|
|
|
@on_device_dynamic_register.setter
|
|
def on_device_dynamic_register(self, value):
|
|
self._LinkKit__on_device_dynamic_register = value
|
|
|
|
@property
|
|
def on_connect(self):
|
|
return self._LinkKit__on_connect
|
|
|
|
@on_connect.setter
|
|
def on_connect(self, value):
|
|
self._LinkKit__on_connect = value
|
|
|
|
@property
|
|
def on_disconnect(self):
|
|
return self._LinkKit__on_disconnect
|
|
|
|
@on_disconnect.setter
|
|
def on_disconnect(self, value):
|
|
self._LinkKit__on_disconnect = value
|
|
|
|
@property
|
|
def on_publish_topic(self):
|
|
pass
|
|
|
|
@on_publish_topic.setter
|
|
def on_publish_topic(self, value):
|
|
self._LinkKit__on_publish_topic = value
|
|
|
|
@property
|
|
def on_subscribe_topic(self):
|
|
pass
|
|
|
|
@on_subscribe_topic.setter
|
|
def on_subscribe_topic(self, value):
|
|
self._LinkKit__on_subscribe_topic = value
|
|
|
|
@property
|
|
def on_unsubscribe_topic(self):
|
|
pass
|
|
|
|
@on_unsubscribe_topic.setter
|
|
def on_unsubscribe_topic(self, value):
|
|
self._LinkKit__on_unsubscribe_topic = value
|
|
|
|
@property
|
|
def on_topic_message(self):
|
|
pass
|
|
|
|
@on_topic_message.setter
|
|
def on_topic_message(self, value):
|
|
self._LinkKit__on_topic_message = value
|
|
|
|
@property
|
|
def on_topic_rrpc_message(self):
|
|
pass
|
|
|
|
@on_topic_rrpc_message.setter
|
|
def on_topic_rrpc_message(self, value):
|
|
self._LinkKit__on_topic_rrpc_message = value
|
|
|
|
@property
|
|
def on_thing_create(self):
|
|
pass
|
|
|
|
@on_thing_create.setter
|
|
def on_thing_create(self, value):
|
|
self._LinkKit__on_thing_create = value
|
|
|
|
@property
|
|
def on_thing_enable(self):
|
|
pass
|
|
|
|
@on_thing_enable.setter
|
|
def on_thing_enable(self, value):
|
|
self._LinkKit__on_thing_enable = value
|
|
|
|
@property
|
|
def on_thing_disable(self):
|
|
pass
|
|
|
|
@on_thing_disable.setter
|
|
def on_thing_disable(self, value):
|
|
self._LinkKit__on_thing_disable = value
|
|
|
|
@property
|
|
def on_thing_raw_data_arrived(self):
|
|
pass
|
|
|
|
@on_thing_raw_data_arrived.setter
|
|
def on_thing_raw_data_arrived(self, value):
|
|
self._LinkKit__on_thing_raw_data_arrived = value
|
|
|
|
@property
|
|
def on_thing_raw_data_post(self):
|
|
return self._LinkKit__on_thing_raw_data_post
|
|
|
|
@property
|
|
def on_thing_device_info_update(self):
|
|
return self._LinkKit__on_thing_device_info_update
|
|
|
|
@on_thing_device_info_update.setter
|
|
def on_thing_device_info_update(self, value):
|
|
self._LinkKit__on_thing_device_info_update = value
|
|
|
|
@property
|
|
def on_thing_device_info_delete(self):
|
|
return self._LinkKit__on_thing_device_info_delete
|
|
|
|
@on_thing_device_info_delete.setter
|
|
def on_thing_device_info_delete(self, value):
|
|
self._LinkKit__on_thing_device_info_delete = value
|
|
|
|
@on_thing_raw_data_post.setter
|
|
def on_thing_raw_data_post(self, value):
|
|
self._LinkKit__on_thing_raw_data_post = value
|
|
|
|
@property
|
|
def on_thing_call_service(self):
|
|
pass
|
|
|
|
@on_thing_call_service.setter
|
|
def on_thing_call_service(self, value):
|
|
self._LinkKit__on_thing_call_service = value
|
|
|
|
@property
|
|
def on_thing_prop_changed(self):
|
|
pass
|
|
|
|
@on_thing_prop_changed.setter
|
|
def on_thing_prop_changed(self, value):
|
|
self._LinkKit__on_thing_prop_changed = value
|
|
|
|
@property
|
|
def on_thing_event_post(self):
|
|
return self._LinkKit__on_thing_event_post
|
|
|
|
@on_thing_event_post.setter
|
|
def on_thing_event_post(self, value):
|
|
self._LinkKit__on_thing_event_post = value
|
|
|
|
@property
|
|
def on_thing_prop_post(self):
|
|
return self._LinkKit__on_thing_prop_post
|
|
|
|
@on_thing_prop_post.setter
|
|
def on_thing_prop_post(self, value):
|
|
self._LinkKit__on_thing_prop_post = value
|
|
|
|
@property
|
|
def on_thing_shadow_get(self):
|
|
return self._LinkKit__on_thing_shadow_get
|
|
|
|
@on_thing_shadow_get.setter
|
|
def on_thing_shadow_get(self, value):
|
|
self._LinkKit__on_thing_shadow_get = value
|
|
|
|
@property
|
|
def on_file_upload_begin(self):
|
|
return self._LinkKit__on_file_upload_begin
|
|
|
|
@on_file_upload_begin.setter
|
|
def on_file_upload_begin(self, value):
|
|
self._LinkKit__on_file_upload_begin = value
|
|
|
|
@property
|
|
def on_file_upload_end(self):
|
|
return self._LinkKit__on_file_upload_end
|
|
|
|
@on_file_upload_end.setter
|
|
def on_file_upload_end(self, value):
|
|
self._LinkKit__on_file_upload_end = value
|
|
|
|
def enable_logger(self, level):
|
|
self._LinkKit__link_log.config_logger(level)
|
|
self._LinkKit__link_log.enable_logger()
|
|
if self._LinkKit__mqtt_client is not None:
|
|
self._LinkKit__mqtt_client.enable_logger(self._LinkKit__PahoLog)
|
|
self._LinkKit__PahoLog.setLevel(level)
|
|
|
|
def disable_logger(self):
|
|
self._LinkKit__link_log.disable_logger()
|
|
if self._LinkKit__mqtt_client is not None:
|
|
self._LinkKit__mqtt_client.disable_logger()
|
|
|
|
def config_logger(self, level):
|
|
self._LinkKit__link_log.config_logger(level)
|
|
if self._LinkKit__mqtt_client is not None:
|
|
self._LinkKit__PahoLog.setLevel(level)
|
|
|
|
def config_http2(self, endpoint=None):
|
|
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.INITIALIZED:
|
|
raise LinkKit.StateError("not in INITIALIZED state")
|
|
self._LinkKit__h2_endpoint = endpoint
|
|
|
|
def config_mqtt(self, port=1883, protocol='MQTTv311', transport='TCP', secure='TLS', keep_alive=60, clean_session=True, max_inflight_message=20, max_queued_message=40, auto_reconnect_min_sec=1, auto_reconnect_max_sec=60, cadata=None, endpoint=None):
|
|
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.INITIALIZED:
|
|
raise LinkKit.StateError("not in INITIALIZED state")
|
|
else:
|
|
if port < 1 or port > 65535:
|
|
raise ValueError("port wrong")
|
|
elif protocol != "MQTTv311":
|
|
if protocol != "MQTTv31":
|
|
raise ValueError("protocol wrong")
|
|
if transport != "TCP":
|
|
raise ValueError("transport wrong")
|
|
if secure != "TLS" and secure != "":
|
|
raise ValueError("secure wrong")
|
|
if keep_alive < 60 or keep_alive > 180:
|
|
raise ValueError("keep_alive range wrong")
|
|
if clean_session is not True and clean_session is not False:
|
|
raise ValueError("clean session wrong")
|
|
if max_queued_message < 0:
|
|
raise ValueError("max_queued_message wrong")
|
|
if max_inflight_message < 0:
|
|
raise ValueError("max_inflight_message wrong")
|
|
if auto_reconnect_min_sec < 1 or auto_reconnect_min_sec > 7200:
|
|
raise ValueError("auto_reconnect_min_sec wrong")
|
|
if auto_reconnect_max_sec < 1 or auto_reconnect_max_sec > 7200:
|
|
raise ValueError("auto_reconnect_max_sec wrong")
|
|
if auto_reconnect_min_sec > auto_reconnect_max_sec:
|
|
raise ValueError("auto_reconnect_max_sec less than auto_reconnect_min_sec")
|
|
self._LinkKit__link_log.info("config_mqtt enter")
|
|
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.INITIALIZED:
|
|
if port is not None:
|
|
self._LinkKit__mqtt_port = port
|
|
if protocol is not None:
|
|
self._LinkKit__mqtt_protocol = protocol
|
|
if transport is not None:
|
|
self._LinkKit__mqtt_transport = transport
|
|
if secure is not None:
|
|
self._LinkKit__mqtt_secure = secure
|
|
if keep_alive is not None:
|
|
self._LinkKit__mqtt_keep_alive = keep_alive
|
|
if clean_session is not None:
|
|
self._LinkKit__mqtt_clean_session = clean_session
|
|
if max_inflight_message is not None:
|
|
self._LinkKit__mqtt_max_inflight_message = max_inflight_message
|
|
if max_queued_message is not None:
|
|
self._LinkKit__mqtt_max_queued_message = max_queued_message
|
|
if auto_reconnect_min_sec is not None:
|
|
self._LinkKit__mqtt_auto_reconnect_min_sec = auto_reconnect_min_sec
|
|
if auto_reconnect_max_sec is not None:
|
|
self._LinkKit__mqtt_auto_reconnect_max_sec = auto_reconnect_max_sec
|
|
if cadata is not None:
|
|
self._LinkKit__aliyun_broker_ca_data = cadata
|
|
self._LinkKit__endpoint = endpoint
|
|
|
|
def config_device_info(self, interface_info):
|
|
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.INITIALIZED:
|
|
raise LinkKit.StateError("LinkKit object not in INITIALIZED")
|
|
if not isinstance(interface_info, str):
|
|
raise ValueError("interface info must be string")
|
|
if len(interface_info) > 160:
|
|
return 1
|
|
self._LinkKit__device_interface_info = interface_info
|
|
return 0
|
|
|
|
def get_product(self):
|
|
return self._LinkKit__product_key
|
|
|
|
def get_device_name(self):
|
|
return self._LinkKit__device_name
|
|
|
|
def get_endpoint(self):
|
|
return self._LinkKit__endpoint
|
|
|
|
def get_h2_endpoint(self):
|
|
return self._LinkKit__h2_endpoint
|
|
|
|
def get_actual_endpoint(self):
|
|
return self._LinkKit__generate_endpoint()
|
|
|
|
def get_actual_h2_endpoint(self):
|
|
if self._LinkKit__h2_client:
|
|
return self.get_actual_endpoint()
|
|
return self._LinkKit__try_generate_custom_h2_endpoint()
|
|
|
|
def __load_json(self, payload):
|
|
return json.loads(self._LinkKit__to_str(payload))
|
|
|
|
def __to_str(self, payload):
|
|
if type(payload) is bytes:
|
|
return str(payload, "utf-8")
|
|
return payload
|
|
|
|
def __try_open_h2_client(self):
|
|
with self._LinkKit__h2_client_lock:
|
|
if not self._LinkKit__h2_client:
|
|
self._LinkKit__h2_client = h2client.H2Client((self._LinkKit__host_name), (self._LinkKit__product_key),
|
|
(self._LinkKit__device_name),
|
|
(self._LinkKit__device_secret),
|
|
endpoint=(self._LinkKit__try_generate_custom_h2_endpoint()))
|
|
self._LinkKit__h2_client.open()
|
|
|
|
def __try_generate_custom_h2_endpoint(self):
|
|
if self._LinkKit__h2_endpoint:
|
|
return self._LinkKit__h2_endpoint
|
|
if self._LinkKit__endpoint:
|
|
if self._LinkKit__endpoint.find(".iot-as-mqtt.") > 0:
|
|
return self._LinkKit__endpoint.replace(".iot-as-mqtt.", ".iot-as-http2.")
|
|
return
|
|
|
|
def __try_close_h2_client(self):
|
|
with self._LinkKit__h2_client_lock:
|
|
if self._LinkKit__h2_client:
|
|
self._LinkKit__h2_client.close()
|
|
self._LinkKit__h2_client = None
|
|
|
|
def _get_h2_client(self):
|
|
self._LinkKit__try_open_h2_client()
|
|
return self._LinkKit__h2_client
|
|
|
|
def upload_file_sync(self, local_filename, remote_filename=None, over_write=True, timeout=None):
|
|
"""
|
|
upload a file to the cloud and block the request
|
|
|
|
Parameters
|
|
----------
|
|
local_filename : str
|
|
the path of local file
|
|
remote_filename: str, optional
|
|
the filename on the cloud
|
|
over_write: boolean, optional
|
|
if true, overwrite the file. The default value is True
|
|
timeout: int or float, optional
|
|
timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
|
|
|
|
Returns
|
|
----------
|
|
UploadFileResult
|
|
upload_size, total_size, file_store_id, code, exception
|
|
code is 0 if success.
|
|
|
|
Exceptions
|
|
----------
|
|
ValueError
|
|
Exception
|
|
|
|
"""
|
|
sink = self._LinkKit__H2FileUploadSink(self)
|
|
self._LinkKit__try_open_h2_client()
|
|
return self._LinkKit__h2_client.upload_file_sync(local_filename, remote_filename, over_write, timeout, sink, None)
|
|
|
|
def upload_file_async(self, local_filename, remote_filename=None, over_write=True):
|
|
sink = self._LinkKit__H2FileUploadSink(self)
|
|
self._LinkKit__try_open_h2_client()
|
|
return self._LinkKit__h2_client.upload_file_async(local_filename, remote_filename, over_write, sink, None)
|
|
|
|
def __upload_device_interface_info(self):
|
|
request_id = self._LinkKit__get_thing_request_id()
|
|
payload = {'id':request_id,
|
|
'version':"1.0",
|
|
'params':[
|
|
{'domain':"SYSTEM",
|
|
'attrKey':"SYS_SDK_LANGUAGE",
|
|
'attrValue':self._LinkKit__sdk_program_language},
|
|
{'domain':"SYSTEM",
|
|
'attrKey':"SYS_LP_SDK_VERSION",
|
|
'attrValue':self._LinkKit__sdk_version},
|
|
{'domain':"SYSTEM",
|
|
'attrKey':"SYS_SDK_IF_INFO",
|
|
'attrValue':self._LinkKit__device_interface_info}],
|
|
'method':"thing.deviceinfo.update"}
|
|
with self._LinkKit__device_info_mid_lock:
|
|
rc, mid = self._LinkKit__mqtt_client.publish(self._LinkKit__device_info_topic, json.dumps(payload), 0)
|
|
if rc == mqtt.MQTT_ERR_SUCCESS:
|
|
self._LinkKit__device_info_mid[mid] = self._LinkKit__timestamp()
|
|
return 0
|
|
return 1
|
|
|
|
def destruct(self):
|
|
self._LinkKit__try_close_h2_client()
|
|
if self._LinkKit__linkkit_state is LinkKit.LinkKitState.DESTRUCTED:
|
|
raise LinkKit.StateError("LinkKit object has already destructed")
|
|
else:
|
|
self._LinkKit__link_log.debug("destruct enter")
|
|
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.CONNECTED or self._LinkKit__linkkit_state == LinkKit.LinkKitState.CONNECTING:
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DESTRUCTING
|
|
if self._LinkKit__connect_async_req:
|
|
with self._LinkKit__worker_loop_exit_req_lock:
|
|
self._LinkKit__worker_loop_exit_req = True
|
|
if self._LinkKit__mqtt_client is not None:
|
|
self._LinkKit__mqtt_client.disconnect()
|
|
self._LinkKit__handler_task.wait_stop()
|
|
else:
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DESTRUCTING
|
|
if self._LinkKit__connect_async_req:
|
|
with self._LinkKit__worker_loop_exit_req_lock:
|
|
self._LinkKit__worker_loop_exit_req = True
|
|
self._LinkKit__handler_task.stop()
|
|
self._LinkKit__handler_task.wait_stop()
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DESTRUCTED
|
|
|
|
def destroy(self):
|
|
self.destruct()
|
|
|
|
def check_state(self):
|
|
return self._LinkKit__linkkit_state
|
|
|
|
@staticmethod
|
|
def __generate_random_str(randomlength=16):
|
|
"""
|
|
generate radom string
|
|
"""
|
|
random_str = ""
|
|
for i in range(randomlength):
|
|
random_str += random.choice(string.digits + string.ascii_letters)
|
|
|
|
return random_str
|
|
|
|
def __dynamic_register_device(self):
|
|
pk = self._LinkKit__product_key
|
|
ps = self._LinkKit__product_secret
|
|
dn = self._LinkKit__device_name
|
|
random_str = self._LinkKit__generate_random_str(15)
|
|
context = ssl.create_default_context((ssl.Purpose.CLIENT_AUTH), cadata=(self._LinkKit__aliyun_broker_ca_data))
|
|
sign_content = "deviceName%sproductKey%srandom%s" % (dn, pk, random_str)
|
|
sign = hmac.new(ps.encode("utf-8"), sign_content.encode("utf-8"), hashlib.sha256).hexdigest()
|
|
post_data = {
|
|
'productKey': pk,
|
|
'deviceName': dn,
|
|
'random': random_str,
|
|
'sign': sign,
|
|
'signMethod': '"hmacsha256"'}
|
|
data = urllib.parse.urlencode(post_data)
|
|
data = data.encode("ascii")
|
|
request_url = "https://iot-auth.%s.aliyuncs.com/auth/register/device" % self._LinkKit__host_name
|
|
with urllib.request.urlopen(request_url, data, context=context) as f:
|
|
reply_data = f.read().decode("utf-8")
|
|
reply_obj = self._LinkKit__load_json(reply_data)
|
|
if reply_obj["code"] == 200:
|
|
reply_obj_data = reply_obj["data"]
|
|
if reply_obj_data is not None:
|
|
return (
|
|
0, reply_obj_data["deviceSecret"])
|
|
else:
|
|
return (
|
|
1, reply_obj["message"])
|
|
|
|
def __config_mqtt_client_internal(self):
|
|
self._LinkKit__link_log.info("start connect")
|
|
timestamp = str(int(time.time()))
|
|
if self._LinkKit__mqtt_secure == "TLS":
|
|
securemode = 2
|
|
else:
|
|
securemode = 3
|
|
if self._LinkKit__device_interface_info:
|
|
sii_option = "sii=%s," % self._LinkKit__device_interface_info
|
|
else:
|
|
sii_option = ""
|
|
client_id = "%s&%s|securemode=%d,signmethod=hmacsha1,ext=1,lan=%s,_v=%s,%stimestamp=%s|" % (
|
|
self._LinkKit__product_key, self._LinkKit__device_name, securemode, self._LinkKit__sdk_program_language,
|
|
self._LinkKit__sdk_version, sii_option, timestamp)
|
|
username = self._LinkKit__device_name + "&" + self._LinkKit__product_key
|
|
sign_content = "clientId%sdeviceName%sproductKey%stimestamp%s" % (
|
|
self._LinkKit__product_key + "&" + self._LinkKit__device_name,
|
|
self._LinkKit__device_name,
|
|
self._LinkKit__product_key,
|
|
timestamp)
|
|
password = hmac.new(self._LinkKit__device_secret.encode("utf-8"), sign_content.encode("utf-8"), hashlib.sha1).hexdigest()
|
|
mqtt_protocol_version = mqtt.MQTTv311
|
|
if self._LinkKit__mqtt_protocol == "MQTTv311":
|
|
mqtt_protocol_version = mqtt.MQTTv311
|
|
else:
|
|
if self._LinkKit__mqtt_protocol == "MQTTv31":
|
|
mqtt_protocol_version = mqtt.MQTTv31
|
|
self._LinkKit__mqtt_client = mqtt.Client(client_id=client_id, clean_session=(self._LinkKit__mqtt_clean_session),
|
|
protocol=mqtt_protocol_version)
|
|
if self._LinkKit__link_log.is_enabled():
|
|
self._LinkKit__mqtt_client.enable_logger(self._LinkKit__PahoLog)
|
|
self._LinkKit__mqtt_client.username_pw_set(username, password)
|
|
self._LinkKit__mqtt_client.on_connect = self._LinkKit__on_internal_connect
|
|
self._LinkKit__mqtt_client.on_disconnect = self._LinkKit__on_internal_disconnect
|
|
self._LinkKit__mqtt_client.on_message = self._LinkKit__on_internal_message
|
|
self._LinkKit__mqtt_client.on_publish = self._LinkKit__on_internal_publish
|
|
self._LinkKit__mqtt_client.on_subscribe = self._LinkKit__on_internal_subscribe
|
|
self._LinkKit__mqtt_client.on_unsubscribe = self._LinkKit__on_internal_unsubscribe
|
|
self._LinkKit__mqtt_client.reconnect_delay_set(self._LinkKit__mqtt_auto_reconnect_min_sec, self._LinkKit__mqtt_auto_reconnect_max_sec)
|
|
self._LinkKit__mqtt_client.max_queued_messages_set(self._LinkKit__mqtt_max_queued_message)
|
|
self._LinkKit__mqtt_client.max_inflight_messages_set(self._LinkKit__mqtt_max_inflight_message)
|
|
self._LinkKit__link_log.debug("current working directory:" + os.getcwd())
|
|
if self._LinkKit__mqtt_secure == "TLS":
|
|
context = ssl.create_default_context((ssl.Purpose.CLIENT_AUTH), cadata=(self._LinkKit__aliyun_broker_ca_data))
|
|
self._LinkKit__mqtt_client.tls_set_context(context)
|
|
self._LinkKit__host_name_internal = self._LinkKit__generate_endpoint()
|
|
|
|
def __generate_endpoint(self):
|
|
if self._LinkKit__endpoint:
|
|
return self._LinkKit__endpoint
|
|
if self._LinkKit__host_name == "127.0.0.1" or self._LinkKit__host_name == "localhost":
|
|
return self._LinkKit__host_name
|
|
return "%s.iot-as-mqtt.%s.aliyuncs.com" % (
|
|
self._LinkKit__product_key, self._LinkKit__host_name)
|
|
|
|
def connect(self):
|
|
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.INITIALIZED:
|
|
raise LinkKit.StateError("not in INITIALIZED state")
|
|
if self._LinkKit__device_secret is None or self._LinkKit__device_secret == "":
|
|
if not self._LinkKit__product_secret is None:
|
|
if self._LinkKit__product_secret == "":
|
|
raise ValueError("device Secret & product secret both empty")
|
|
rc, value = self._LinkKit__dynamic_register_device()
|
|
if self._LinkKit__on_device_dynamic_register is None:
|
|
raise Exception("user not give on_device_dynamic_register")
|
|
else:
|
|
try:
|
|
self._LinkKit__on_device_dynamic_register(rc, value, self._LinkKit__user_data)
|
|
if rc == 0:
|
|
self._LinkKit__device_secret = value
|
|
else:
|
|
self._LinkKit__link_log.error("dynamic register device fail:" + value)
|
|
return 1
|
|
except Exception as e:
|
|
try:
|
|
self._LinkKit__link_log.error(e)
|
|
return 2
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
self._LinkKit__config_mqtt_client_internal()
|
|
self._LinkKit__mqtt_client.connect(host=(self._LinkKit__host_name_internal), port=(self._LinkKit__mqtt_port), keepalive=(self._LinkKit__mqtt_keep_alive))
|
|
return 0
|
|
|
|
def __connect_async_internal(self):
|
|
if self._LinkKit__device_secret is None or self._LinkKit__device_secret == "":
|
|
if not self._LinkKit__product_secret is None:
|
|
if self._LinkKit__product_secret == "":
|
|
raise ValueError("device Secret & product secret both empty")
|
|
rc, value = self._LinkKit__dynamic_register_device()
|
|
if self._LinkKit__on_device_dynamic_register is None:
|
|
raise Exception("user not give on_device_dynamic_register")
|
|
else:
|
|
try:
|
|
self._LinkKit__on_device_dynamic_register(rc, value, self._LinkKit__user_data)
|
|
if rc == 0:
|
|
self._LinkKit__device_secret = value
|
|
else:
|
|
self._LinkKit__link_log.error("dynamic register device fail:" + value)
|
|
return 1
|
|
except Exception as e:
|
|
try:
|
|
self._LinkKit__link_log.error(e)
|
|
return 2
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
self._LinkKit__config_mqtt_client_internal()
|
|
self._LinkKit__mqtt_client.connect_async(host=(self._LinkKit__host_name_internal), port=(self._LinkKit__mqtt_port), keepalive=(self._LinkKit__mqtt_keep_alive))
|
|
self._LinkKit__mqtt_client.loop_start()
|
|
|
|
def connect_async(self):
|
|
self._LinkKit__link_log.debug("connect_async")
|
|
if self._LinkKit__linkkit_state not in (LinkKit.LinkKitState.INITIALIZED, LinkKit.LinkKitState.DISCONNECTED):
|
|
raise LinkKit.StateError("not in INITIALIZED or DISCONNECTED state")
|
|
self._LinkKit__connect_async_req = True
|
|
with self._LinkKit__worker_loop_exit_req_lock:
|
|
self._LinkKit__worker_loop_exit_req = False
|
|
return self._LinkKit__loop_thread.start(self._LinkKit__loop_forever_internal)
|
|
|
|
def disconnect(self):
|
|
self._LinkKit__link_log.debug("disconnect")
|
|
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
|
|
raise LinkKit.StateError("not in CONNECTED state")
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DISCONNECTING
|
|
if self._LinkKit__connect_async_req:
|
|
with self._LinkKit__worker_loop_exit_req_lock:
|
|
self._LinkKit__worker_loop_exit_req = True
|
|
self._LinkKit__mqtt_client.disconnect()
|
|
self._LinkKit__loop_thread.stop()
|
|
|
|
@staticmethod
|
|
def __check_topic_string(topic):
|
|
if len(topic) > 128 or len(topic) == 0:
|
|
raise ValueError("topic string length too long,need decrease %d bytes" % (128 - len(topic)))
|
|
|
|
def publish_topic(self, topic, payload=None, qos=1):
|
|
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
|
|
raise LinkKit.StateError("not in CONNECTED state")
|
|
elif topic is None or len(topic) == 0:
|
|
raise ValueError("Invalid topic.")
|
|
if qos != 0 and qos != 1:
|
|
raise ValueError("Invalid qos.")
|
|
self._LinkKit__check_topic_string(topic)
|
|
rc, mid = self._LinkKit__mqtt_client.publish(topic, payload, qos)
|
|
if rc == 0:
|
|
return (
|
|
0, mid)
|
|
return (1, None)
|
|
|
|
def subscribe_topicParse error at or near `COME_FROM' instruction at offset 340_0
|
|
|
|
def unsubscribe_topic(self, topic):
|
|
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
|
|
raise LinkKit.StateError("not in CONNECTED state")
|
|
else:
|
|
unsubscribe_topics = []
|
|
if not topic is None:
|
|
if topic == "":
|
|
raise ValueError("Invalid topic.")
|
|
if isinstance(topic, str):
|
|
self._LinkKit__check_topic_string(topic)
|
|
if topic not in self._LinkKit__user_topics:
|
|
return (1, None)
|
|
unsubscribe_topics.append(topic)
|
|
elif isinstance(topic, list):
|
|
for one_topic in topic:
|
|
self._LinkKit__check_topic_string(one_topic)
|
|
if one_topic in self._LinkKit__user_topics:
|
|
unsubscribe_topics.append(one_topic)
|
|
continue
|
|
|
|
with self._LinkKit__user_topics_unsubscribe_request_lock:
|
|
if len(unsubscribe_topics) == 0:
|
|
return (2, None)
|
|
ret = self._LinkKit__mqtt_client.unsubscribe(unsubscribe_topics)
|
|
rc, mid = ret
|
|
if rc == mqtt.MQTT_ERR_SUCCESS:
|
|
self._LinkKit__user_topics_unsubscribe_request[mid] = unsubscribe_topics
|
|
return ret
|
|
return (1, None)
|
|
|
|
def __make_rrpc_topic(self, topic):
|
|
return "/ext/rrpc/+%s" % topic
|
|
|
|
def subscribe_rrpc_topicParse error at or near `COME_FROM' instruction at offset 340_0
|
|
|
|
def unsubscribe_rrpc_topicParse error at or near `POP_BLOCK' instruction at offset 252
|
|
|
|
def __on_internal_connect_safe(self, client, user_data, session_flag, rc):
|
|
if rc == 0:
|
|
self._LinkKit__reset_reconnect_wait()
|
|
session_flag_internal = {"session present": session_flag}
|
|
self._LinkKit__handler_task.post_message(self._LinkKit__handler_task_cmd_on_connect, (
|
|
client, user_data, session_flag_internal, rc))
|
|
|
|
def __loop_forever_internal(self):
|
|
self._LinkKit__link_log.debug("enter")
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.CONNECTING
|
|
if self._LinkKit__device_secret is None or self._LinkKit__device_secret == "":
|
|
rc, value = self._LinkKit__dynamic_register_device()
|
|
try:
|
|
self._LinkKit__on_device_dynamic_register(rc, value, self._LinkKit__user_data)
|
|
if rc == 0:
|
|
self._LinkKit__device_secret = value
|
|
else:
|
|
self._LinkKit__link_log.error("dynamic register device fail:" + value)
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.INITIALIZED
|
|
return 1
|
|
except Exception as e:
|
|
try:
|
|
self._LinkKit__link_log.error(e)
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.INITIALIZED
|
|
return 2
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
try:
|
|
self._LinkKit__config_mqtt_client_internal()
|
|
except ssl.SSLError as e:
|
|
try:
|
|
self._LinkKit__link_log.error("config mqtt raise exception:" + str(e))
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.INITIALIZED
|
|
self._LinkKit__on_internal_connect_safe(None, None, 0, 6)
|
|
return
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
try:
|
|
self._LinkKit__mqtt_client.connect_async(host=(self._LinkKit__host_name_internal), port=(self._LinkKit__mqtt_port), keepalive=(self._LinkKit__mqtt_keep_alive))
|
|
except Exception as e:
|
|
try:
|
|
self._LinkKit__link_log.error("__loop_forever_internal connect raise exception:" + str(e))
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.INITIALIZED
|
|
self._LinkKit__on_internal_connect_safe(None, None, 0, 7)
|
|
return
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
while True:
|
|
if self._LinkKit__worker_loop_exit_req:
|
|
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.DESTRUCTING:
|
|
self._LinkKit__handler_task.stop()
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DESTRUCTED
|
|
break
|
|
try:
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.CONNECTING
|
|
self._LinkKit__mqtt_client.reconnect()
|
|
except (socket.error, OSError) as e:
|
|
try:
|
|
self._LinkKit__link_log.error(e)
|
|
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.CONNECTING:
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DISCONNECTED
|
|
self._LinkKit__on_internal_connect_safe(None, None, 0, 9)
|
|
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.DESTRUCTING:
|
|
self._LinkKit__handler_task.stop()
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DESTRUCTED
|
|
break
|
|
self._LinkKit__reconnect_wait()
|
|
continue
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
rc = mqtt.MQTT_ERR_SUCCESS
|
|
while rc == mqtt.MQTT_ERR_SUCCESS:
|
|
rc = self._LinkKit__mqtt_client.loop(self._LinkKit__mqtt_request_timeout, 1)
|
|
self._LinkKit__clean_timeout_message()
|
|
self._LinkKit__clean_thing_timeout_request_id()
|
|
|
|
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.CONNECTED:
|
|
self._LinkKit__on_internal_disconnect(None, None, 1)
|
|
self._LinkKit__link_log.info("loop return:%r" % rc)
|
|
if self._LinkKit__worker_loop_exit_req:
|
|
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.DESTRUCTING:
|
|
self._LinkKit__handler_task.stop()
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DESTRUCTED
|
|
break
|
|
self._LinkKit__reconnect_wait()
|
|
|
|
def __clean_timeout_message(self):
|
|
expire_timestamp = self._LinkKit__timestamp() - self._LinkKit__mqtt_request_timeout * 1000
|
|
with self._LinkKit__thing_prop_post_mid_lock:
|
|
self._LinkKit__clean_timeout_message_item(self._LinkKit__thing_prop_post_mid, expire_timestamp)
|
|
with self._LinkKit__thing_event_post_mid_lock:
|
|
self._LinkKit__clean_timeout_message_item(self._LinkKit__thing_event_post_mid, expire_timestamp)
|
|
with self._LinkKit__thing_answer_service_mid_lock:
|
|
self._LinkKit__clean_timeout_message_item(self._LinkKit__thing_answer_service_mid, expire_timestamp)
|
|
with self._LinkKit__thing_raw_up_mid_lock:
|
|
self._LinkKit__clean_timeout_message_item(self._LinkKit__thing_raw_up_mid, expire_timestamp)
|
|
with self._LinkKit__thing_raw_down_reply_mid_lock:
|
|
self._LinkKit__clean_timeout_message_item(self._LinkKit__thing_raw_down_reply_mid, expire_timestamp)
|
|
with self._LinkKit__thing_prop_set_reply_mid_lock:
|
|
self._LinkKit__clean_timeout_message_item(self._LinkKit__thing_prop_set_reply_mid, expire_timestamp)
|
|
self._LinkKit__clean_timeout_message_item(self._LinkKit__thing_subscribe_sys_request_mid, expire_timestamp)
|
|
self._LinkKit__clean_timeout_message_item(self._LinkKit__device_info_mid, expire_timestamp)
|
|
|
|
def __clean_timeout_message_item(self, mids, expire_time):
|
|
for mid in list(mids.keys()):
|
|
if mids[mid] < expire_time:
|
|
timestamp = mids.pop(mid)
|
|
self._LinkKit__link_log.error("__clean_timeout_message_item pop:%r,timestamp:%r", mid, timestamp)
|
|
|
|
def __reconnect_wait(self):
|
|
if self._LinkKit__mqtt_auto_reconnect_sec == 0:
|
|
self._LinkKit__mqtt_auto_reconnect_sec = self._LinkKit__mqtt_auto_reconnect_min_sec
|
|
else:
|
|
self._LinkKit__mqtt_auto_reconnect_sec = min(self._LinkKit__mqtt_auto_reconnect_sec * 2, self._LinkKit__mqtt_auto_reconnect_max_sec)
|
|
self._LinkKit__mqtt_auto_reconnect_sec += random.randint(1, self._LinkKit__mqtt_auto_reconnect_sec)
|
|
time.sleep(self._LinkKit__mqtt_auto_reconnect_sec)
|
|
|
|
def __reset_reconnect_wait(self):
|
|
self._LinkKit__mqtt_auto_reconnect_sec = 0
|
|
|
|
def start_worker_loop(self):
|
|
pass
|
|
|
|
def thing_setup(self, file=None):
|
|
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.INITIALIZED:
|
|
raise LinkKit.StateError("not in INITIALIZED state")
|
|
elif self._LinkKit__thing_setup_state:
|
|
return 1
|
|
elif file is None:
|
|
self._LinkKit__thing_raw_only = True
|
|
self._LinkKit__thing_setup_state = True
|
|
return 0
|
|
try:
|
|
with open(file, encoding="utf-8") as f:
|
|
tsl = json.load(f)
|
|
index = 0
|
|
while index < len(tsl["events"]):
|
|
identifier = tsl["events"][index]["identifier"]
|
|
if identifier == "post":
|
|
output_data = tsl["events"][index]["outputData"]
|
|
output_data_index = 0
|
|
while output_data_index < len(output_data):
|
|
output_data_item = output_data[output_data_index]["identifier"]
|
|
self._LinkKit__thing_properties_post.add(output_data_item)
|
|
output_data_index += 1
|
|
|
|
else:
|
|
self._LinkKit__thing_events.add(identifier)
|
|
index += 1
|
|
|
|
index = 0
|
|
while index < len(tsl["services"]):
|
|
identifier = tsl["services"][index]["identifier"]
|
|
if identifier == "set":
|
|
input_data = tsl["services"][index]["inputData"]
|
|
input_data_index = 0
|
|
while input_data_index < len(input_data):
|
|
input_data_item = input_data[input_data_index]
|
|
self._LinkKit__thing_properties_set.add(input_data_item["identifier"])
|
|
input_data_index += 1
|
|
|
|
else:
|
|
if identifier == "get":
|
|
output_data = tsl["services"][index]["outputData"]
|
|
output_data_index = 0
|
|
while output_data_index < len(output_data):
|
|
output_data_item = output_data[output_data_index]
|
|
self._LinkKit__thing_properties_get.add(output_data_item["identifier"])
|
|
output_data_index += 1
|
|
|
|
else:
|
|
self._LinkKit__thing_services.add(identifier)
|
|
service_reply_topic = self._LinkKit__thing_topic_service_pattern % (self._LinkKit__product_key,
|
|
self._LinkKit__device_name,
|
|
identifier + "_reply")
|
|
self._LinkKit__thing_topic_services_reply.add(service_reply_topic)
|
|
index += 1
|
|
|
|
for event in self._LinkKit__thing_events:
|
|
post_topic = self._LinkKit__thing_topic_event_post_pattern % (
|
|
self._LinkKit__product_key, self._LinkKit__device_name, event)
|
|
self._LinkKit__thing_topic_event_post[event] = post_topic
|
|
self._LinkKit__thing_topic_event_post_reply.add(post_topic + "_reply")
|
|
|
|
for service in self._LinkKit__thing_services:
|
|
self._LinkKit__thing_topic_services.add(self._LinkKit__thing_topic_service_pattern % (
|
|
self._LinkKit__product_key, self._LinkKit__device_name, service))
|
|
|
|
except Exception as e:
|
|
try:
|
|
self._LinkKit__link_log.info("file open error:" + str(e))
|
|
return 2
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
self._LinkKit__thing_setup_state = True
|
|
return 0
|
|
|
|
def __subscribe_sys_topic(self):
|
|
subscribe_sys_topics = [
|
|
(
|
|
self._LinkKit__device_info_topic_reply, 0)]
|
|
if self._LinkKit__thing_setup_state:
|
|
if self._LinkKit__thing_raw_only:
|
|
thing_subscribe_topics = [
|
|
(
|
|
self._LinkKit__thing_topic_raw_down, 0),
|
|
(
|
|
self._LinkKit__thing_topic_raw_up_reply, 0)]
|
|
else:
|
|
thing_subscribe_topics = [
|
|
(
|
|
self._LinkKit__thing_topic_prop_set, 0),
|
|
(
|
|
self._LinkKit__thing_topic_prop_get, 0),
|
|
(
|
|
self._LinkKit__thing_topic_raw_down, 0),
|
|
(
|
|
self._LinkKit__thing_topic_prop_post_reply, 0),
|
|
(
|
|
self._LinkKit__thing_topic_raw_up_reply, 0),
|
|
(
|
|
self._LinkKit__thing_topic_update_device_info_reply, 0),
|
|
(
|
|
self._LinkKit__thing_topic_delete_device_info_reply, 0),
|
|
(
|
|
self._LinkKit__thing_topic_shadow_get, 0)]
|
|
for topic in self._LinkKit__thing_topic_services:
|
|
thing_subscribe_topics.append((topic, 0))
|
|
|
|
for topic in self._LinkKit__thing_topic_event_post_reply:
|
|
thing_subscribe_topics.append((topic, 0))
|
|
|
|
subscribe_sys_topics += thing_subscribe_topics
|
|
with self._LinkKit__thing_subscribe_sys_request_lock:
|
|
rc, mid = self._LinkKit__mqtt_client.subscribe(subscribe_sys_topics)
|
|
if rc == mqtt.MQTT_ERR_SUCCESS:
|
|
self._LinkKit__thing_subscribe_sys_request = True
|
|
self._LinkKit__thing_subscribe_sys_request_mid[mid] = self._LinkKit__timestamp()
|
|
return 0
|
|
return 1
|
|
|
|
def thing_raw_post_data(self, payload):
|
|
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
|
|
raise LinkKit.StateError("not in CONNECTED state")
|
|
with self._LinkKit__thing_raw_up_mid_lock:
|
|
rc, mid = self._LinkKit__mqtt_client.publish(self._LinkKit__thing_topic_raw_up, payload, 0)
|
|
if rc == mqtt.MQTT_ERR_SUCCESS:
|
|
self._LinkKit__thing_raw_up_mid[mid] = self._LinkKit__timestamp()
|
|
return 0
|
|
return 1
|
|
|
|
def thing_raw_data_reply(self, payload):
|
|
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
|
|
raise LinkKit.StateError("not in CONNECTED state")
|
|
with self._LinkKit__thing_raw_down_reply_mid_lock:
|
|
rc, mid = self._LinkKit__mqtt_client.publish(self._LinkKit__thing_topic_raw_down_reply, payload, 0)
|
|
if rc == mqtt.MQTT_ERR_SUCCESS:
|
|
self._LinkKit__thing_raw_down_reply_mid[mid] = self._LinkKit__timestamp()
|
|
return 0
|
|
return 1
|
|
|
|
def thing_update_device_info(self, payload):
|
|
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
|
|
raise LinkKit.StateError("not in CONNECTED state")
|
|
else:
|
|
raise self._LinkKit__thing_setup_state and self._LinkKit__thing_enable_state or LinkKit.StateError("not in SETUP & ENABLE state")
|
|
return (1, None)
|
|
request_id = self._LinkKit__get_thing_request_id()
|
|
with self._LinkKit__thing_update_device_info_up_mid_lock:
|
|
rc, mid = self._LinkKit__mqtt_client.publish(self._LinkKit__thing_topic_update_device_info_up, self._LinkKit__pack_alink_request(request_id, "thing.deviceinfo.update", payload), 0)
|
|
if rc == mqtt.MQTT_ERR_SUCCESS:
|
|
self._LinkKit__thing_update_device_info_up_mid[mid] = self._LinkKit__timestamp()
|
|
return (rc, request_id)
|
|
return (1, None)
|
|
|
|
def thing_delete_device_info(self, payload):
|
|
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
|
|
raise LinkKit.StateError("not in CONNECTED state")
|
|
else:
|
|
return self._LinkKit__thing_setup_state and self._LinkKit__thing_enable_state or 1
|
|
request_id = self._LinkKit__get_thing_request_id()
|
|
with self._LinkKit__thing_delete_device_info_up_mid_lock:
|
|
rc, mid = self._LinkKit__mqtt_client.publish(self._LinkKit__thing_topic_delete_device_info_up, self._LinkKit__pack_alink_request(request_id, "thing.deviceinfo.delete", payload), 0)
|
|
if rc == mqtt.MQTT_ERR_SUCCESS:
|
|
self._LinkKit__thing_delete_device_info_up_mid[mid] = self._LinkKit__timestamp()
|
|
return (rc, request_id)
|
|
return (1, None)
|
|
|
|
def thing_update_tags(self, tagMap):
|
|
if not isinstance(tagMap, dict):
|
|
raise ValueError("tagMap must be a dictionary")
|
|
return (1, None)
|
|
payload = []
|
|
for k, v in tagMap.items():
|
|
payload.append({(LinkKit.TAG_KEY): k, (LinkKit.TAG_VALUE): v})
|
|
|
|
return self.thing_update_device_info(payload)
|
|
|
|
def thing_remove_tags(self, tagKeys):
|
|
if not isinstance(tagKeys, list):
|
|
if not isinstance(tagKeys, tuple):
|
|
raise ValueError("tagKeys must be a list or tuple")
|
|
return (1, None)
|
|
payload = []
|
|
for tagKey in tagKeys:
|
|
payload.append({(LinkKit.TAG_KEY): tagKey})
|
|
|
|
return self.thing_delete_device_info(payload)
|
|
|
|
def __pack_alink_request(self, request_id, method, params):
|
|
request = {
|
|
'id': request_id,
|
|
'version': '"1.0"',
|
|
'params': params,
|
|
'method': method}
|
|
return json.dumps(request)
|
|
|
|
def thing_answer_service(self, identifier, request_id, code, data=None):
|
|
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
|
|
raise LinkKit.StateError("not in CONNECTED state")
|
|
elif self._LinkKit__thing_setup_state:
|
|
if not self._LinkKit__thing_enable_state:
|
|
return 1
|
|
if data is None:
|
|
data = {}
|
|
response = {'id':request_id,
|
|
'code':code,
|
|
'data':data}
|
|
item = self._LinkKit__pop_rrpc_service("alink_" + str(request_id))
|
|
if item:
|
|
service_reply_topic = item["topic"]
|
|
else:
|
|
service_reply_topic = self._LinkKit__thing_topic_service_pattern % (self._LinkKit__product_key,
|
|
self._LinkKit__device_name,
|
|
identifier + "_reply")
|
|
with self._LinkKit__thing_answer_service_mid_lock:
|
|
rc, mid = self._LinkKit__mqtt_client.publish(service_reply_topic, json.dumps(response), 0)
|
|
if rc == mqtt.MQTT_ERR_SUCCESS:
|
|
self._LinkKit__thing_answer_service_mid[mid] = self._LinkKit__timestamp()
|
|
return 0
|
|
return 1
|
|
|
|
def __get_thing_request_id(self):
|
|
with self._LinkKit__thing_request_id_lock:
|
|
self._LinkKit__thing_request_value += 1
|
|
if self._LinkKit__thing_request_value > self._LinkKit__thing_request_id_max:
|
|
self._LinkKit__thing_request_value = 0
|
|
if len(self._LinkKit__thing_request_id) > self._LinkKit__mqtt_max_queued_message:
|
|
return
|
|
if self._LinkKit__thing_request_value not in self._LinkKit__thing_request_id:
|
|
self._LinkKit__thing_request_id[self._LinkKit__thing_request_value] = self._LinkKit__timestamp()
|
|
self._LinkKit__link_log.debug("__get_thing_request_id pop:%r" % self._LinkKit__thing_request_value)
|
|
return str(self._LinkKit__thing_request_value)
|
|
return
|
|
|
|
def __back_thing_request_id(self, post_id):
|
|
with self._LinkKit__thing_request_id_lock:
|
|
try:
|
|
self._LinkKit__thing_request_id.pop(int(post_id))
|
|
except Exception as e:
|
|
try:
|
|
self._LinkKit__link_log.error("__back_thing_request_id pop:%r,%r" % (post_id, e))
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
def __reset_thing_request_id(self):
|
|
with self._LinkKit__thing_request_id_lock:
|
|
self._LinkKit__thing_request_value = 0
|
|
self._LinkKit__thing_request_id.clear()
|
|
|
|
def __clean_thing_timeout_request_id(self):
|
|
with self._LinkKit__thing_request_id_lock:
|
|
expire_timestamp = self._LinkKit__timestamp() - self._LinkKit__mqtt_request_timeout * 1000
|
|
for request_id in list(self._LinkKit__thing_request_id.keys()):
|
|
if self._LinkKit__thing_request_id[request_id] < expire_timestamp:
|
|
timestamp = self._LinkKit__thing_request_id.pop(request_id)
|
|
self._LinkKit__link_log.error("__clean_thing_timeout_request_id pop:%r,timestamp:%r", request_id, timestamp)
|
|
|
|
def thing_trigger_event(self, event_tuple):
|
|
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
|
|
raise LinkKit.StateError("not in CONNECTED state")
|
|
else:
|
|
return self._LinkKit__thing_setup_state and self._LinkKit__thing_enable_state or (1,
|
|
None)
|
|
if isinstance(event_tuple, tuple):
|
|
event, params = event_tuple
|
|
else:
|
|
return (1, None)
|
|
if event not in self._LinkKit__thing_topic_event_post.keys():
|
|
return (1, None)
|
|
request_id = self._LinkKit__get_thing_request_id()
|
|
if request_id is None:
|
|
return 1
|
|
request = {'id':request_id, 'version':"1.0",
|
|
'params':{"value": params},
|
|
'method':"thing.event.%s.post" % event}
|
|
with self._LinkKit__thing_event_post_mid_lock:
|
|
event_topic = self._LinkKit__thing_topic_event_post[event]
|
|
self._LinkKit__link_log.debug("thing_trigger_event publish topic")
|
|
rc, mid = self._LinkKit__mqtt_client.publish(event_topic, json.dumps(request), 0)
|
|
self._LinkKit__link_log.debug("thing_trigger_event publish done")
|
|
if rc == mqtt.MQTT_ERR_SUCCESS:
|
|
self._LinkKit__thing_event_post_mid[mid] = self._LinkKit__timestamp()
|
|
return (0, request_id)
|
|
return (1, None)
|
|
|
|
def thing_post_property(self, property_data):
|
|
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
|
|
raise LinkKit.StateError("not in CONNECTED state")
|
|
else:
|
|
return self._LinkKit__thing_setup_state and self._LinkKit__thing_enable_state or (1,
|
|
None)
|
|
request_params = property_data
|
|
request_id = self._LinkKit__get_thing_request_id()
|
|
if request_id is None:
|
|
return (1, None)
|
|
request = {'id': request_id,
|
|
'version': '"1.0"',
|
|
'params': request_params,
|
|
'method': '"thing.event.property.post"'}
|
|
with self._LinkKit__thing_prop_post_mid_lock:
|
|
rc, mid = self._LinkKit__mqtt_client.publish(self._LinkKit__thing_topic_prop_post, json.dumps(request), 1)
|
|
if rc == mqtt.MQTT_ERR_SUCCESS:
|
|
self._LinkKit__thing_prop_post_mid[mid] = self._LinkKit__timestamp()
|
|
return (0, request_id)
|
|
return (1, None)
|
|
|
|
def __on_internal_async_messageParse error at or near `COME_FROM' instruction at offset 1624_2
|
|
|
|
def __parse_raw_topic(self, topic):
|
|
return re.search("/ext/rrpc/.*?(/.*)", topic).group(1)
|
|
|
|
def __tidy_topic(self, topic):
|
|
if topic == None:
|
|
return
|
|
topic = topic.strip()
|
|
if len(topic) == 0:
|
|
return
|
|
if topic[0] != "/":
|
|
topic = "/" + topic
|
|
return topic
|
|
|
|
def __push_rrpc_service(self, item):
|
|
with self._LinkKit__user_rrpc_request_ids_lock:
|
|
if len(self._LinkKit__user_rrpc_request_ids) > self._LinkKit__user_rrpc_request_max_len:
|
|
removed_item = self._LinkKit__user_rrpc_request_ids.pop(0)
|
|
del self._LinkKit__user_rrpc_request_id_index_map[removed_item["id"]]
|
|
self._LinkKit__user_rrpc_request_ids.append(item)
|
|
self._LinkKit__user_rrpc_request_id_index_map[item["id"]] = 0
|
|
|
|
def __pop_rrpc_service(self, id):
|
|
with self._LinkKit__user_rrpc_request_ids_lock:
|
|
if id not in self._LinkKit__user_rrpc_request_id_index_map:
|
|
return
|
|
del self._LinkKit__user_rrpc_request_id_index_map[id]
|
|
for index in range(0, len(self._LinkKit__user_rrpc_request_ids)):
|
|
item = self._LinkKit__user_rrpc_request_ids[index]
|
|
if item["id"] == id:
|
|
del self._LinkKit__user_rrpc_request_ids[index]
|
|
return item
|
|
|
|
return
|
|
|
|
def thing_answer_rrpc(self, id, response):
|
|
item = self._LinkKit__pop_rrpc_service("rrpc_" + id)
|
|
if item == None:
|
|
self._LinkKit__link_log.error("answer_rrpc_topic, the id does not exist: %s" % id)
|
|
return (1, None)
|
|
rc, mid = self._LinkKit__mqtt_client.publish(item["topic"], response, 0)
|
|
self._LinkKit__link_log.debug("reply topic:%s" % item["topic"])
|
|
return (rc, mid)
|
|
|
|
def __try_parse_rrpc_topic(self, message):
|
|
self._LinkKit__link_log.debug("receive a rrpc topic:%s" % message.topic)
|
|
raw_topic = self._LinkKit__parse_raw_topic(message.topic)
|
|
if raw_topic.startswith("/sys"):
|
|
if raw_topic in self._LinkKit__thing_topic_services:
|
|
identifier = raw_topic.split("/", 6)[6]
|
|
payload = self._LinkKit__load_json(self._LinkKit__to_str(message.payload))
|
|
try:
|
|
request_id = payload["id"]
|
|
params = payload["params"]
|
|
item_id = "alink_" + request_id
|
|
item = {'id':item_id, 'request_id':request_id, 'payload':payload, 'identifier':identifier, 'topic':message.topic}
|
|
self._LinkKit__push_rrpc_service(item)
|
|
self._LinkKit__on_thing_call_service(identifier, request_id, params, self._LinkKit__user_data)
|
|
except Exception as e:
|
|
try:
|
|
self._LinkKit__link_log.error("on_thing_call_service raise exception: %s" % e)
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
return
|
|
else:
|
|
with self._LinkKit__user_rrpc_topics_subscribe_request_lock:
|
|
with self._LinkKit__user_rrpc_topics_lock:
|
|
if raw_topic not in self._LinkKit__user_rrpc_topics:
|
|
self._LinkKit__link_log.error("%s is not in the rrpc-subscribed list" % raw_topic)
|
|
return
|
|
return self._LinkKit__on_topic_rrpc_message or None
|
|
try:
|
|
rrpc_id = message.topic.split("/", 4)[3]
|
|
item_id = "rrpc_" + rrpc_id
|
|
item = {'id':item_id, 'payload':message.payload, 'topic':message.topic}
|
|
self._LinkKit__push_rrpc_service(item)
|
|
self._LinkKit__on_topic_rrpc_message(rrpc_id, message.topic, message.payload, message.qos, self._LinkKit__user_data)
|
|
except Exception as e:
|
|
try:
|
|
self._LinkKit__link_log.error("on_topic_rrpc_message process raise exception:%r" % e)
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
def __try_parse_try_shadow(self, payload):
|
|
try:
|
|
self._LinkKit__latest_shadow.set_latest_recevied_time(self._LinkKit__timestamp())
|
|
self._LinkKit__latest_shadow.set_latest_recevied_payload(payload)
|
|
msg = self._LinkKit__load_json(payload)
|
|
if "version" in msg:
|
|
self._LinkKit__latest_shadow.set_version(msg["version"])
|
|
else:
|
|
if "payload" in msg:
|
|
if "version" in msg["payload"]:
|
|
self._LinkKit__latest_shadow.set_version(msg["payload"]["version"])
|
|
elif "timestamp" in msg:
|
|
self._LinkKit__latest_shadow.set_timestamp(msg["timestamp"])
|
|
else:
|
|
if "payload" in msg:
|
|
if "timestamp" in msg["payload"]:
|
|
self._LinkKit__latest_shadow.set_timestamp(msg["payload"]["timestamp"])
|
|
if "payload" in msg:
|
|
if msg["payload"]["status"] == "success":
|
|
if "state" in msg["payload"]:
|
|
self._LinkKit__latest_shadow.set_state(msg["payload"]["state"])
|
|
if "metadata" in msg["payload"]:
|
|
self._LinkKit__latest_shadow.set_metadata(msg["payload"]["metadata"])
|
|
except Exception as e:
|
|
try:
|
|
pass
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
def thing_update_shadow(self, reported, version):
|
|
request = {'state':{"reported": reported},
|
|
'method':"update",
|
|
'version':version}
|
|
return self._LinkKit__thing_update_shadow(request)
|
|
|
|
def thing_get_shadow(self):
|
|
request = {"method": "get"}
|
|
return self._LinkKit__thing_update_shadow(request)
|
|
|
|
def local_get_latest_shadow(self):
|
|
return self._LinkKit__latest_shadow
|
|
|
|
def __thing_update_shadow(self, request):
|
|
if self._LinkKit__linkkit_state is not LinkKit.LinkKitState.CONNECTED:
|
|
raise LinkKit.StateError("not in CONNECTED state")
|
|
else:
|
|
return self._LinkKit__thing_setup_state and self._LinkKit__thing_enable_state or (1,
|
|
None)
|
|
with self._LinkKit__thing_shadow_mid_lock:
|
|
rc, mid = self._LinkKit__mqtt_client.publish(self._LinkKit__thing_topic_shadow_update, json.dumps(request), 1)
|
|
if rc == mqtt.MQTT_ERR_SUCCESS:
|
|
self._LinkKit__thing_shadow_mid[mid] = self._LinkKit__timestamp()
|
|
return (0, mid)
|
|
return (1, None)
|
|
|
|
def __on_internal_message(self, client, user_data, message):
|
|
self._LinkKit__link_log.info("__on_internal_message")
|
|
self._LinkKit__handler_task.post_message(self._LinkKit__handler_task_cmd_on_message, (client, user_data, message))
|
|
|
|
def __handler_task_on_message_callback(self, value):
|
|
client, user_data, message = value
|
|
self._LinkKit__on_internal_async_message(message)
|
|
|
|
def __on_internal_connect(self, client, user_data, session_flag, rc):
|
|
self._LinkKit__link_log.info("__on_internal_connect")
|
|
if rc == 0:
|
|
self._LinkKit__reset_reconnect_wait()
|
|
self._LinkKit__subscribe_sys_topic()
|
|
self._LinkKit__handler_task.post_message(self._LinkKit__handler_task_cmd_on_connect, (client, user_data, session_flag, rc))
|
|
|
|
def __handler_task_on_connect_callback(self, value):
|
|
client, user_data, session_flag, rc = value
|
|
self._LinkKit__link_log.info("__on_internal_connect enter")
|
|
self._LinkKit__link_log.debug("session:%d, return code:%d" % (session_flag["session present"], rc))
|
|
if rc == 0:
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.CONNECTED
|
|
if self._LinkKit__on_connect is not None:
|
|
try:
|
|
self._LinkKit__on_connect(session_flag["session present"], rc, self._LinkKit__user_data)
|
|
except Exception as e:
|
|
try:
|
|
self._LinkKit__link_log.error("on_connect process raise exception:%r" % e)
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
def __on_internal_disconnect(self, client, user_data, rc):
|
|
self._LinkKit__link_log.info("__on_internal_disconnect enter")
|
|
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.DESTRUCTING:
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DESTRUCTED
|
|
else:
|
|
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.DISCONNECTING:
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DISCONNECTED
|
|
else:
|
|
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.CONNECTED:
|
|
self._LinkKit__linkkit_state = LinkKit.LinkKitState.DISCONNECTED
|
|
else:
|
|
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.DISCONNECTED:
|
|
self._LinkKit__link_log.error("__on_internal_disconnect enter from wrong state:%r" % self._LinkKit__linkkit_state)
|
|
return
|
|
self._LinkKit__link_log.error("__on_internal_disconnect enter from wrong state:%r" % self._LinkKit__linkkit_state)
|
|
return
|
|
self._LinkKit__user_topics.clear()
|
|
self._LinkKit__user_topics_subscribe_request.clear()
|
|
self._LinkKit__user_topics_unsubscribe_request.clear()
|
|
self._LinkKit__user_rrpc_topics.clear()
|
|
self._LinkKit__user_rrpc_topics_subscribe_request.clear()
|
|
self._LinkKit__user_rrpc_topics_unsubscribe_request.clear()
|
|
self._LinkKit__thing_prop_post_mid.clear()
|
|
self._LinkKit__thing_event_post_mid.clear()
|
|
self._LinkKit__thing_answer_service_mid.clear()
|
|
self._LinkKit__thing_raw_down_reply_mid.clear()
|
|
self._LinkKit__thing_raw_up_mid.clear()
|
|
self._LinkKit__thing_shadow_mid.clear()
|
|
self._LinkKit__device_info_mid.clear()
|
|
self._LinkKit__thing_update_device_info_up_mid.clear()
|
|
self._LinkKit__thing_delete_device_info_up_mid.clear()
|
|
self._LinkKit__handler_task.post_message(self._LinkKit__handler_task_cmd_on_disconnect, (client, user_data, rc))
|
|
if self._LinkKit__linkkit_state == LinkKit.LinkKitState.DESTRUCTED:
|
|
self._LinkKit__handler_task.stop()
|
|
|
|
def __handler_task_on_disconnect_callback(self, value):
|
|
self._LinkKit__link_log.info("__handler_task_on_disconnect_callback enter")
|
|
client, user_data, rc = value
|
|
if self._LinkKit__thing_setup_state:
|
|
if self._LinkKit__thing_enable_state:
|
|
self._LinkKit__thing_enable_state = False
|
|
if self._LinkKit__on_thing_disable is not None:
|
|
try:
|
|
self._LinkKit__on_thing_disable(self._LinkKit__user_data)
|
|
except Exception as e:
|
|
try:
|
|
self._LinkKit__link_log.error("on_thing_disable process raise exception:%r" % e)
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
if self._LinkKit__on_disconnect is not None:
|
|
try:
|
|
self._LinkKit__on_disconnect(rc, self._LinkKit__user_data)
|
|
except Exception as e:
|
|
try:
|
|
self._LinkKit__link_log.error("on_disconnect process raise exception:%r" % e)
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
def __on_internal_publish(self, client, user_data, mid):
|
|
self._LinkKit__handler_task.post_message(self._LinkKit__handler_task_cmd_on_publish, (client, user_data, mid))
|
|
|
|
def __handler_task_on_publish_callback(self, value):
|
|
client, user_data, mid = value
|
|
self._LinkKit__link_log.debug("__on_internal_publish message:%d" % mid)
|
|
with self._LinkKit__thing_event_post_mid_lock:
|
|
if mid in self._LinkKit__thing_event_post_mid:
|
|
self._LinkKit__thing_event_post_mid.pop(mid)
|
|
self._LinkKit__link_log.debug("__on_internal_publish event post mid removed")
|
|
return
|
|
with self._LinkKit__thing_prop_post_mid_lock:
|
|
if mid in self._LinkKit__thing_prop_post_mid:
|
|
self._LinkKit__thing_prop_post_mid.pop(mid)
|
|
self._LinkKit__link_log.debug("__on_internal_publish prop post mid removed")
|
|
return
|
|
with self._LinkKit__thing_prop_set_reply_mid_lock:
|
|
if mid in self._LinkKit__thing_prop_set_reply_mid:
|
|
self._LinkKit__thing_prop_set_reply_mid.pop(mid)
|
|
self._LinkKit__link_log.debug("__on_internal_publish prop set reply mid removed")
|
|
return
|
|
with self._LinkKit__thing_answer_service_mid_lock:
|
|
if mid in self._LinkKit__thing_answer_service_mid:
|
|
self._LinkKit__thing_answer_service_mid.pop(mid)
|
|
self._LinkKit__link_log.debug("__thing_answer_service_mid mid removed")
|
|
return
|
|
with self._LinkKit__thing_raw_up_mid_lock:
|
|
if mid in self._LinkKit__thing_raw_up_mid:
|
|
self._LinkKit__thing_raw_up_mid.pop(mid)
|
|
self._LinkKit__link_log.debug("__thing_raw_up_mid mid removed")
|
|
return
|
|
with self._LinkKit__thing_raw_down_reply_mid_lock:
|
|
if mid in self._LinkKit__thing_raw_down_reply_mid:
|
|
self._LinkKit__thing_raw_down_reply_mid.pop(mid)
|
|
self._LinkKit__link_log.debug("__thing_raw_down_reply_mid mid removed")
|
|
return
|
|
with self._LinkKit__device_info_mid_lock:
|
|
if mid in self._LinkKit__device_info_mid:
|
|
self._LinkKit__device_info_mid.pop(mid)
|
|
self._LinkKit__link_log.debug("__device_info_mid mid removed")
|
|
return
|
|
with self._LinkKit__thing_shadow_mid_lock:
|
|
if mid in self._LinkKit__thing_shadow_mid:
|
|
self._LinkKit__thing_shadow_mid.pop(mid)
|
|
self._LinkKit__link_log.debug("__thing_shadow_mid mid removed")
|
|
return
|
|
with self._LinkKit__thing_update_device_info_up_mid_lock:
|
|
if mid in self._LinkKit__thing_update_device_info_up_mid:
|
|
self._LinkKit__thing_update_device_info_up_mid.pop(mid)
|
|
self._LinkKit__link_log.debug("__thing_update_device_info_up_mid mid removed")
|
|
return
|
|
with self._LinkKit__thing_delete_device_info_up_mid_lock:
|
|
if mid in self._LinkKit__thing_delete_device_info_up_mid:
|
|
self._LinkKit__thing_delete_device_info_up_mid.pop(mid)
|
|
self._LinkKit__link_log.debug("__thing_delete_device_info_up_mid mid removed")
|
|
return
|
|
if self._LinkKit__on_publish_topic is not None:
|
|
self._LinkKit__on_publish_topic(mid, self._LinkKit__user_data)
|
|
|
|
def __on_internal_subscribe(self, client, user_data, mid, granted_qos):
|
|
self._LinkKit__handler_task.post_message(self._LinkKit__handler_task_cmd_on_subscribe, (client, user_data, mid, granted_qos))
|
|
|
|
def __handler_task_on_subscribe_callback(self, value):
|
|
client, user_data, mid, granted_qos = value
|
|
self._LinkKit__link_log.debug("__on_internal_subscribe mid:%d granted_qos:%s" % (
|
|
mid, str(",".join(("%s" % it for it in granted_qos)))))
|
|
if self._LinkKit__thing_subscribe_sys_request:
|
|
if mid in self._LinkKit__thing_subscribe_sys_request_mid:
|
|
self._LinkKit__thing_subscribe_sys_request_mid.pop(mid)
|
|
self._LinkKit__thing_subscribe_sys_request = False
|
|
if self._LinkKit__thing_setup_state:
|
|
self._LinkKit__thing_enable_state = True
|
|
if self._LinkKit__on_thing_enable:
|
|
self._LinkKit__on_thing_enable(self._LinkKit__user_data)
|
|
return
|
|
with self._LinkKit__user_rrpc_topics_subscribe_request_lock:
|
|
if mid in self._LinkKit__user_rrpc_topics_subscribe_request:
|
|
self._LinkKit__user_rrpc_topics_subscribe_request.pop(mid)
|
|
if self._LinkKit__on_subscribe_rrpc_topic:
|
|
try:
|
|
self._LinkKit__on_subscribe_rrpc_topic(mid, granted_qos, self._LinkKit__user_data)
|
|
except Exception as err:
|
|
try:
|
|
self._LinkKit__link_log.error("Caught exception in on_subscribe_topic: %s", err)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
return
|
|
topics_requests = None
|
|
self._LinkKit__user_topics_request_lock.acquire()
|
|
if mid in self._LinkKit__user_topics_subscribe_request:
|
|
topics_requests = self._LinkKit__user_topics_subscribe_request.pop(mid)
|
|
self._LinkKit__user_topics_request_lock.release()
|
|
if topics_requests is not None:
|
|
return_topics = []
|
|
for index in range(len(topics_requests)):
|
|
if granted_qos[index] < 0 or granted_qos[index] > 1:
|
|
self._LinkKit__link_log.error("topics:%s, granted wrong:%d" % (
|
|
topics_requests[index], granted_qos[index]))
|
|
else:
|
|
self._LinkKit__user_topics[topics_requests[index][0]] = granted_qos[index]
|
|
return_topics.append((topics_requests[index], granted_qos[index]))
|
|
|
|
if self._LinkKit__on_subscribe_topic is not None:
|
|
try:
|
|
self._LinkKit__on_subscribe_topic(mid, granted_qos, self._LinkKit__user_data)
|
|
except Exception as err:
|
|
try:
|
|
self._LinkKit__link_log.error("Caught exception in on_subscribe_topic: %s", err)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
def __on_internal_unsubscribe(self, client, user_data, mid):
|
|
self._LinkKit__handler_task.post_message(self._LinkKit__handler_task_cmd_on_unsubscribe, (client, user_data, mid))
|
|
|
|
def __handler_task_on_unsubscribe_callback(self, value):
|
|
client, user_data, mid = value
|
|
self._LinkKit__link_log.debug("__on_internal_unsubscribe mid:%d" % mid)
|
|
unsubscribe_request = None
|
|
with self._LinkKit__user_rrpc_topics_unsubscribe_request_lock:
|
|
if mid in self._LinkKit__user_rrpc_topics_unsubscribe_request:
|
|
self._LinkKit__user_rrpc_topics_unsubscribe_request.pop(mid)
|
|
if self._LinkKit__on_unsubscribe_rrpc_topic:
|
|
try:
|
|
self._LinkKit__on_unsubscribe_rrpc_topic(mid, self._LinkKit__user_data)
|
|
except Exception as err:
|
|
try:
|
|
self._LinkKit__link_log.error("Caught exception in on_unsubscribe_rrpc_topic: %s", err)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
return
|
|
with self._LinkKit__user_topics_unsubscribe_request_lock:
|
|
if mid in self._LinkKit__user_topics_unsubscribe_request:
|
|
unsubscribe_request = self._LinkKit__user_topics_unsubscribe_request.pop(mid)
|
|
if unsubscribe_request is not None:
|
|
for t in unsubscribe_request:
|
|
self._LinkKit__link_log.debug("__user_topics:%s" % str(self._LinkKit__user_topics))
|
|
try:
|
|
self._LinkKit__user_topics.pop(t)
|
|
except Exception as e:
|
|
try:
|
|
self._LinkKit__link_log.error("__on_internal_unsubscribe e:" + str(e))
|
|
return
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
if self._LinkKit__on_unsubscribe_topic is not None:
|
|
try:
|
|
self._LinkKit__on_unsubscribe_topic(mid, self._LinkKit__user_data)
|
|
except Exception as err:
|
|
try:
|
|
self._LinkKit__link_log.error("Caught exception in on_unsubscribe_topic: %s", err)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
def dump_user_topics(self):
|
|
return self._LinkKit__user_topics
|
|
|
|
@staticmethod
|
|
def to_user_topic(topic):
|
|
topic_section = topic.split("/", 3)
|
|
user_topic = topic_section[3]
|
|
return user_topic
|
|
|
|
def to_full_topic(self, topic):
|
|
return self._LinkKit__USER_TOPIC_PREFIX % (self._LinkKit__product_key, self._LinkKit__device_name, topic)
|
|
|
|
@staticmethod
|
|
def __timestamp():
|
|
return int(time.time() * 1000) |