Files
HP_InHand_IG502/Pub_Sub/valvecontroller/thingsboard/sub/receiveCommand.py
2023-12-14 13:16:36 -06:00

117 lines
3.7 KiB
Python

import json, time
from datetime import datetime as dt
from quickfaas.measure import recall, write
from quickfaas.remotebus import publish
from common.Logger import logger
# Helper function to split the payload into chunks
def chunk_payload(payload, chunk_size=20):
chunked_values = list(payload["values"].items())
for i in range(0, len(chunked_values), chunk_size):
yield {
"ts": payload["ts"],
"values": dict(chunked_values[i:i+chunk_size])
}
def sync():
#get new values and send
payload = {"ts": round(dt.timestamp(dt.now()))*1000, "values": {}}
topic = "v1/devices/me/telemetry"
try:
data = recall()#json.loads(recall().decode("utf-8"))
except Exception as e:
logger.error(e)
logger.debug(data)
for controller in data:
valve_open = 0
valve_close = 0
for measure in controller["measures"]:
if measure["name"] in ["valve_failure"]:
value = convert_int(measure["name"], measure["value"])
payload["values"][measure["name"]] = value
elif measure["name"] == "valve_open_status":
valve_open = measure["value"]
elif measure["name"] == "valve_close_status":
valve_close = measure["value"]
elif "_cmd" in measure["name"]:
pass
else:
payload["values"][measure["name"]] = measure["value"]
if valve_open:
payload["values"]["valve_status"] = "Open"
elif valve_close:
payload["values"]["valve_status"] = "Closed"
else:
payload["values"]["valve_status"] = "Unknown"
logger.debug("Sending on topic: {}".format(topic))
logger.debug("Sending value: {}".format(payload))
for chunk in chunk_payload(payload=payload):
publish(topic, json.dumps(chunk), 1)
time.sleep(2)
def writeplctag(value):
try:
#value = json.loads(value.replace("'",'"'))
logger.debug(value)
message = [{"name": "valvecontroller", "measures":[{"name":value["measurement"], "value": value["value"]}]}]
resp = write(message)
logger.debug("RETURN FROM WRITE: {}".format(resp))
return True
except Exception as e:
logger.debug(e)
return False
def receiveCommand(topic, payload):
try:
logger.debug(topic)
logger.debug(json.loads(payload))
p = json.loads(payload)
command = p["method"]
commands = {
"sync": sync,
"writeplctag": writeplctag,
}
if command == "setPLCTag":
try:
result = commands["writeplctag"](p["params"])
logger.debug(result)
except Exception as e:
logger.error(e)
#logger.debug(command)
ack(topic.split("/")[-1])
time.sleep(5)
sync()
except Exception as e:
logger.debug(e)
def ack(msgid):
#logger.debug(msgid)
#logger.debug(mac)
#logger.debug(name)
#logger.debug(value)
publish("v1/devices/me/rpc/response/" + str(msgid), json.dumps({"msg": {"time": time.time()}, "metadata": "", "msgType": ""}))
def getMode():
try:
data = recall()
for controller in data:
for measure in controller["measures"]:
if measure["name"] == "pidcontrolmode":
return measure["value"]
except:
return None
def convert_int(name, value):
valve_failure = {
0: "OK",
1: "Failure"
}
names = {
"valve_failure": valve_failure.get(value, "Invalid Code")
}
return names.get(name, "Invalid Name")