From 6ca57ed8a2390acd9909d13a087679fa2bab8607 Mon Sep 17 00:00:00 2001 From: Nico Melone Date: Wed, 22 Jan 2025 08:35:01 -0600 Subject: [PATCH] added receive attributes to rr_facility --- .../thingsboard/sub/receiveAttribute.py | 189 ++++++++++++++++++ 1 file changed, 189 insertions(+) create mode 100644 Pub_Sub/rr_facility/thingsboard/sub/receiveAttribute.py diff --git a/Pub_Sub/rr_facility/thingsboard/sub/receiveAttribute.py b/Pub_Sub/rr_facility/thingsboard/sub/receiveAttribute.py new file mode 100644 index 0000000..502251b --- /dev/null +++ b/Pub_Sub/rr_facility/thingsboard/sub/receiveAttribute.py @@ -0,0 +1,189 @@ +import json, time +from datetime import datetime as dt +from quickfaas.measure import recall, write +from quickfaas.remotebus import publish +from common.Logger import logger +from quickfaas.global_dict import get as get_params + +def convertDStoJSON(ds): + j = dict() + for x in ds: + j[x["key"]] = x["value"] + return j + +def chunk_payload(payload, chunk_size=20): + if "values" in payload: + # Original format: {"ts": ..., "values": {...}} + chunked_values = list(payload["values"].items()) + for i in range(0, len(chunked_values), chunk_size): + yield { + "ts": payload["ts"], + "values": dict(chunked_values[i:i+chunk_size]) + } + else: + # New format: {"key1": "value1", "key2": "value2"} + chunked_keys = list(payload.keys()) + for i in range(0, len(chunked_keys), chunk_size): + yield {k: payload[k] for k in chunked_keys[i:i+chunk_size]} + +def chunk_payload_gateway(payload, chunk_size=20, is_attributes_payload=False): + if is_attributes_payload: + # For attributes payload, chunk the controllers + controllers = list(payload.items()) + for i in range(0, len(controllers), chunk_size): + yield dict(controllers[i:i + chunk_size]) + else: + # For data payload, chunk the values within each controller + for controller, data in payload.items(): + for entry in data: + ts = entry['ts'] + values = entry['values'] + chunked_values = list(values.items()) + for i in range(0, len(chunked_values), chunk_size): + yield { + controller: [{ + "ts": ts, + "values": dict(chunked_values[i:i + chunk_size]) + }] + } + +def controlName(name): + params = convertDStoJSON(get_params()) + nameMap = { + "tp_1": f"{params['facilityName']} Transfer Pump #1", + "tp_2": f"{params['facilityName']} Transfer Pump #2", + "ww_1": f"{params['facilityName']} Water Well #1", + "ww_2": f"{params['facilityName']} Water Well #2", + "ww_3": f"{params['facilityName']} Water Well #3", + "ww_4": f"{params['facilityName']} Water Well #4", + "ww_5": f"{params['facilityName']} Water Well #5", + "ww_6": f"{params['facilityName']} Water Well #6" + } + parts = "_".join(name.split("_")[:2]) + return nameMap.get(parts, "Gateway") + + +# Filter payloads based on device_filter +def filter_payload(payload, device_filter): + if not device_filter: # If filter is empty, include all devices + return payload + return {key: value for key, value in payload.items() if key in device_filter} + +def sync(device_filter=[]): + #get new values and send + now = round(dt.timestamp(dt.now()))*1000 + topic = "v1/gateway/telemetry" + try: + data = recall()#json.loads(recall().decode("utf-8")) + except Exception as e: + logger.error(e) + logger.debug(data) + logger.info("SYNCING") + grouped_data = {} + grouped_attributes = {} + payload = {"ts": now, "values":{}} + attributes_payload = {} + try: + for controller in data: + for measure in controller["measures"]: + ctrlName = controlName(measure["name"]) + if ctrlName == "Gateway": + #send to gateway with v1/devices/me/telemetry + if measure["health"] == 1: + if "_spt" in measure["name"]: + attributes_payload[measure["name"]] = measure["value"] + else: + payload["values"][measure["name"]] = measure["value"] + else: + name = "_".join(measure['name'].split("_")[2:]) + value = measure['value'] + health = measure['health'] + #Add controller for telemetry if it doesn't exist + if ctrlName not in grouped_data: + grouped_data[ctrlName] = {} + #Add controller for attributes if it doesn't exist + if ctrlName not in grouped_attributes: + grouped_attributes[ctrlName] = {} + grouped_attributes[ctrlName]["latestReportTime"] = now + #Add data to temp payload if datapoint health is good + if health: + if "_spt" in name: + grouped_attributes[ctrlName][name] = value + else: + grouped_data[ctrlName][name] = value + except Exception as e: + logger.error(e) + try: + # Transform the grouped data to desired structure + payload_gateway = {} + + for key, value in grouped_data.items(): + if value: + payload_gateway[key] = [{"ts": now ,"values": value}] + + attributes_payload_gateway = {} + for key, value in grouped_attributes.items(): + if value: + attributes_payload_gateway[key] = value + + # Apply the filter + filtered_payload_gateway = filter_payload(payload_gateway, device_filter) + filtered_attributes_payload_gateway = filter_payload(attributes_payload_gateway, device_filter) + + #Send data belonging to Gateway + for chunk in chunk_payload(payload=payload): + publish("v1/devices/me/telemetry", json.dumps(chunk), qos=1, cloud_name="default") + time.sleep(2) + + attributes_payload["latestReportTime"] = (round(dt.timestamp(dt.now())/600)*600)*1000 + for chunk in chunk_payload(payload=attributes_payload): + publish("v1/devices/me/attributes", json.dumps(chunk), qos=1, cloud_name="default") + time.sleep(2) + + #Send gateway devices data + for chunk in chunk_payload_gateway(payload=filtered_payload_gateway): + publish("v1/gateway/telemetry", json.dumps(chunk), qos=1, cloud_name="default") + time.sleep(2) + + for chunk in chunk_payload_gateway(payload=filtered_attributes_payload_gateway, is_attributes_payload=True): + publish("v1/gateway/attributes", json.dumps(chunk), qos=1, cloud_name="default") + time.sleep(2) + + except Exception as e: + logger.error(e) + +def writeplctag(value): + #value in the form {"measurement": , "value": } + try: + #value = json.loads(value.replace("'",'"')) + logger.info(value) + #payload format: [{"name": "advvfdipp", "measures": [{"name": "manualfrequencysetpoint", "value": 49}]}] + message = [{"name": "rr_facility", "measures":[{"name":value["measurement"], "value": value["value"]}]}] + resp = write(message) + logger.info("RETURN FROM WRITE: {}".format(resp)) + return True + except Exception as e: + logger.error(e) + return False + +def receiveAttribute(topic, payload): + try: + logger.debug(topic) + logger.info(json.loads(payload)) + p = json.loads(payload) + + for key, value in p.items(): + try: + result = writeplctag({"measurement": key, "value": value}) + logger.debug(result) + except Exception as e: + logger.error(e) + #logger.debug(command) + time.sleep(5) + try: + sync(p["device"]) + except Exception as e: + logger.error(f"Could not sync: {e}") + + except Exception as e: + logger.debug(e)