Files
HP_InHand_IG502/Pub_Sub/rr_pipeline/thingsboard/sub/receiveCommandGateway.py
2025-01-14 14:34:21 -06:00

217 lines
8.2 KiB
Python

import json, time
from datetime import datetime as dt
from quickfaas.measure import recall, write
from quickfaas.remotebus import publish
from common.Logger import logger
from quickfaas.global_dict import get as get_params
def convertDStoJSON(ds):
j = dict()
for x in ds:
j[x["key"]] = x["value"]
return j
def invertJSON(payload):
newJSON = {}
for key, value in payload.items():
newJSON[value] = key
return newJSON
def convertName(name):
try:
params = convertDStoJSON(get_params())
nameMap = invertJSON(json.loads(params.get("name_map")))
return nameMap.get(name, name)
except Exception as e:
logger.error(f"Error in convertName: {e}")
return name
def chunk_payload(payload, chunk_size=20):
if "values" in payload:
# Original format: {"ts": ..., "values": {...}}
chunked_values = list(payload["values"].items())
for i in range(0, len(chunked_values), chunk_size):
yield {
"ts": payload["ts"],
"values": dict(chunked_values[i:i+chunk_size])
}
else:
# New format: {"key1": "value1", "key2": "value2"}
chunked_keys = list(payload.keys())
for i in range(0, len(chunked_keys), chunk_size):
yield {k: payload[k] for k in chunked_keys[i:i+chunk_size]}
def chunk_payload_gateway(payload, chunk_size=20, is_attributes_payload=False):
if is_attributes_payload:
# For attributes payload, chunk the controllers
controllers = list(payload.items())
for i in range(0, len(controllers), chunk_size):
yield dict(controllers[i:i + chunk_size])
else:
# For data payload, chunk the values within each controller
for controller, data in payload.items():
for entry in data:
ts = entry['ts']
values = entry['values']
chunked_values = list(values.items())
for i in range(0, len(chunked_values), chunk_size):
yield {
controller: [{
"ts": ts,
"values": dict(chunked_values[i:i + chunk_size])
}]
}
def controlName(name):
try:
params = convertDStoJSON(get_params())
nameMap = json.loads(params.get("name_map"))
return nameMap.get(name, name)
except Exception as e:
logger.error(f"Error in controlName: {e}")
return name
# Filter payloads based on device_filter
def filter_payload(payload, device_filter):
if not device_filter: # If filter is empty, include all devices
return payload
return {key: value for key, value in payload.items() if key in device_filter}
def sync(device_filter=[]):
#get new values and send
now = round(dt.timestamp(dt.now()))*1000
topic = "v1/gateway/telemetry"
try:
data = recall()#json.loads(recall().decode("utf-8"))
except Exception as e:
logger.error(f"Error in trying to get data in sync: {e}")
logger.debug(data)
logger.info("SYNCING")
valves = {}
grouped_data = {}
grouped_attributes = {}
try:
for controller in data:
for measure in controller["measures"]:
ctrlName = controlName(measure["name"])
value = measure['value']
health = measure['health']
name = measure['name']
#Add controller for telemetry if it doesn't exist
if ctrlName not in grouped_data:
grouped_data[ctrlName] = {}
#Add controller for attributes if it doesn't exist
if ctrlName not in grouped_attributes:
grouped_attributes[ctrlName] = {}
grouped_attributes[ctrlName]["latestReportTime"] = now
#Add data to temp payload if datapoint health is good
if health:
if any(x in name for x in ["open", "closed"]):
valve = "_".join(name.split("_")[:-1])
if valve not in valves:
valves[valve] = [None,None]
if "open" in name:
valves[valve][1] = value
elif "closed" in name:
valves[valve][0] = value
else:
print("error")
grouped_data[ctrlName][measure["name"]] = value
except Exception as e:
logger.error(f"Error in sync trying to group data: {e}")
try:
# Transform the grouped data to desired structure
payload_gateway = {}
if valves:
for key, value in valves.items():
# 0 = In Progress | 1 = Open | 2 = Closed | 3 = Error
if value[0] == 0 and value[1] == 0: # Not closed and not open thus in progress
output = 0
elif value[0] == 0 and value[1] == 1: # Not closed but open thus open
output = 1
elif value[0] == 1 and value[1] == 0: # Closed but not open thus closed
output = 2
elif value[0] == 1 and value[1] == 1: # Closed and open thus errored not possible
output = 3
else:
output = 4 # Something didn't report
grouped_data[ctrlName][key + "_status"] = output
for key, value in grouped_data.items():
if value:
payload_gateway[key] = [{"ts": now ,"values": value}]
attributes_payload_gateway = {}
for key, value in grouped_attributes.items():
if value:
attributes_payload_gateway[key] = value
# Apply the filter
filtered_payload_gateway = filter_payload(payload_gateway, device_filter)
filtered_attributes_payload_gateway = filter_payload(attributes_payload_gateway, device_filter)
#Send gateway devices data
for chunk in chunk_payload_gateway(payload=filtered_payload_gateway):
publish("v1/gateway/telemetry", json.dumps(chunk), qos=1, cloud_name="default")
time.sleep(2)
for chunk in chunk_payload_gateway(payload=filtered_attributes_payload_gateway, is_attributes_payload=True):
publish("v1/gateway/attributes", json.dumps(chunk), qos=1, cloud_name="default")
time.sleep(2)
except Exception as e:
logger.error(f"Error in sync sending data: {e}")
def writeplctag(value, controller):
#value in the form {"measurement": <measurement_name>, "value": <value to write>}
try:
logger.info(f"Writing to {controller} with params {value}")
#payload format: [{"name": "advvfdipp", "measures": [{"name": "manualfrequencysetpoint", "value": 49}]}]
message = [{"name": controller, "measures":[{"name":value["measurement"], "value": value["value"]}]}]
resp = write(message)
logger.debug("RETURN FROM WRITE: {}".format(resp))
return True
except Exception as e:
logger.error(f"Error in writeplctag: {e}")
return False
def receiveCommand(topic, payload):
try:
logger.debug(topic)
logger.info(json.loads(payload))
p = json.loads(payload)
#logger.info(p)
command = p["data"]["method"]
device = convertName(p["device"])
commands = {
"sync": sync,
"writeplctag": writeplctag,
}
if command == "setPLCTag":
try:
#logger.info(params)
result = commands["writeplctag"](p["data"]["params"], device)
logger.debug(result)
except Exception as e:
logger.error(f"Error in receiveCommand setPLCTag: {e}")
ackPayload = {"device": p["device"], "id": p["data"]["id"], "data": {"success": True}}
ack(ackPayload)
time.sleep(5)
try:
sync(device)
except Exception as e:
logger.error(f"Could not sync: {e}")
except Exception as e:
logger.error(f"Error in receiveCommand: {e}")
def ack(message):
publish("v1/gateway/rpc", json.dumps(message), 1, cloud_name="default")