111 lines
3.8 KiB
Python
111 lines
3.8 KiB
Python
import json, time
|
|
from datetime import datetime as dt
|
|
from quickfaas.measure import recall, write
|
|
from quickfaas.remotebus import publish
|
|
from common.Logger import logger
|
|
|
|
# Helper function to split the payload into chunks
|
|
def chunk_payload(payload, chunk_size=20):
|
|
chunked_values = list(payload["values"].items())
|
|
for i in range(0, len(chunked_values), chunk_size):
|
|
yield {
|
|
"ts": payload["ts"],
|
|
"values": dict(chunked_values[i:i+chunk_size])
|
|
}
|
|
|
|
def sync():
|
|
#get new values and send
|
|
payload = {"ts": round(dt.timestamp(dt.now()))*1000, "values": {}}
|
|
topic = "v1/devices/me/telemetry"
|
|
try:
|
|
data = recall()#json.loads(recall().decode("utf-8"))
|
|
except Exception as e:
|
|
logger.error(e)
|
|
logger.debug(data)
|
|
for controller in data:
|
|
for measure in controller["measures"]:
|
|
#publish measure
|
|
if measure["name"] in ["pump_1_run_status", "pump_2_run_status", "charge_pump_run_status"]:
|
|
payload["values"][measure["name"]] = convert_int(measure["name"], measure["value"])
|
|
payload["values"][measure["name"]+ "_int"] = measure["value"]
|
|
else:
|
|
payload["values"][measure["name"]] = measure["value"]
|
|
logger.debug("Sending on topic: {}".format(topic))
|
|
logger.debug("Sending value: {}".format(payload))
|
|
for chunk in chunk_payload(payload=payload):
|
|
publish(topic, json.dumps(chunk), 1, cloud_name="ThingsBoard")
|
|
time.sleep(2)
|
|
|
|
def writeplctag(value):
|
|
#value in the form {"measurement": <measurement_name>, "value": <value to write>}
|
|
try:
|
|
#value = json.loads(value.replace("'",'"'))
|
|
logger.debug(value)
|
|
#payload format: [{"name": "advvfdipp", "measures": [{"name": "manualfrequencysetpoint", "value": 49}]}]
|
|
message = [{"name": "dual_flowmeter", "measures":[{"name":value["measurement"], "value": value["value"]}]}]
|
|
resp = write(message)
|
|
logger.debug("RETURN FROM WRITE: {}".format(resp))
|
|
return True
|
|
except Exception as e:
|
|
logger.debug(e)
|
|
return False
|
|
|
|
def receiveCommand(topic, payload):
|
|
try:
|
|
logger.debug(topic)
|
|
logger.debug(json.loads(payload))
|
|
p = json.loads(payload)
|
|
command = p["method"]
|
|
commands = {
|
|
"sync": sync,
|
|
"writeplctag": writeplctag,
|
|
}
|
|
if command == "setPLCTag":
|
|
try:
|
|
result = commands["writeplctag"](p["params"])
|
|
logger.debug(result)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
#logger.debug(command)
|
|
ack(topic.split("/")[-1])
|
|
time.sleep(5)
|
|
sync()
|
|
except Exception as e:
|
|
logger.debug(e)
|
|
|
|
def receiveCommand(topic, payload, wizard_api):
|
|
logger.debug(topic)
|
|
logger.debug(json.loads(payload))
|
|
p = json.loads(payload)[0]
|
|
command = p["payload"]["name"].split(".")[1]
|
|
commands = {
|
|
"sync": sync,
|
|
"writeplctag": writeplctag,
|
|
}
|
|
commands[command](p["mac"].lower(),p["payload"]["value"], wizard_api)
|
|
#logger.debug(command)
|
|
ack(p["msgId"], p["mac"], command, p["payload"]["name"].split(".")[1], p["payload"]["value"], wizard_api)
|
|
|
|
def ack(msgid):
|
|
#logger.debug(msgid)
|
|
#logger.debug(mac)
|
|
#logger.debug(name)
|
|
#logger.debug(value)
|
|
publish("v1/devices/me/rpc/response/" + str(msgid), json.dumps({"msg": {"time": time.time()}, "metadata": "", "msgType": ""}), 1,cloud_name="ThingsBoard")
|
|
|
|
def convert_int(plc_tag, value):
|
|
|
|
status_codes = {
|
|
0: "Off",
|
|
1: "On"
|
|
}
|
|
|
|
plc_tags = {
|
|
"pump_1_run_status": status_codes.get(value, "Invalid Code"),
|
|
"pump_2_run_status": status_codes.get(value, "Invalid Code"),
|
|
"charge_pump_run_status": status_codes.get(value, "Invalid Code")
|
|
}
|
|
|
|
return plc_tags.get(plc_tag, "Invalid Tag")
|
|
|
|
|