import json, time from datetime import datetime as dt from quickfaas.measure import recall, write from quickfaas.remotebus import publish from common.Logger import logger # Helper function to split the payload into chunks def chunk_payload(payload, chunk_size=20): chunked_values = list(payload["values"].items()) for i in range(0, len(chunked_values), chunk_size): yield { "ts": payload["ts"], "values": dict(chunked_values[i:i+chunk_size]) } def sync(): #get new values and send payload = {"ts": round(dt.timestamp(dt.now()))*1000, "values": {}} topic = "v1/devices/me/telemetry" try: data = recall()#json.loads(recall().decode("utf-8")) except Exception as e: logger.error(e) logger.debug(data) for controller in data: for measure in controller["measures"]: #publish measure payload["values"][measure["name"]] = measure["value"] logger.debug("Sending on topic: {}".format(topic)) logger.debug("Sending value: {}".format(payload)) for chunk in chunk_payload(payload=payload): publish(topic, json.dumps(chunk), 1) time.sleep(2) def writeplctag(value): #value in the form {"measurement": , "value": } try: #value = json.loads(value.replace("'",'"')) logger.debug(value) #payload format: [{"name": "advvfdipp", "measures": [{"name": "manualfrequencysetpoint", "value": 49}]}] message = [{"name": "advvfdipp", "measures":[{"name":value["measurement"], "value": value["value"]}]}] resp = write(message) logger.debug("RETURN FROM WRITE: {}".format(resp)) return True except Exception as e: logger.debug(e) return False def receiveCommand(topic, payload): try: logger.debug(topic) logger.debug(json.loads(payload)) p = json.loads(payload) command = p["method"] commands = { "sync": sync, "writeplctag": writeplctag, } if command == "setPLCTag": try: result = commands["writeplctag"](p["params"]) logger.debug(result) except Exception as e: logger.error(e) #logger.debug(command) ack(topic.split("/")[-1]) time.sleep(5) sync() except Exception as e: logger.debug(e) def ack(msgid): #logger.debug(msgid) #logger.debug(mac) #logger.debug(name) #logger.debug(value) publish("v1/devices/me/rpc/response/" + str(msgid), json.dumps({"msg": {"time": time.time()}, "metadata": "", "msgType": ""}), 1)