import json, time from datetime import datetime as dt from quickfaas.measure import recall, write from quickfaas.remotebus import publish from common.Logger import logger from quickfaas.global_dict import get as get_params def convertDStoJSON(ds): j = dict() for x in ds: j[x["key"]] = x["value"] return j def formatPLCPayload(payload): params = convertDStoJSON(get_params()) nameMap = { f"{params['facilityName']} Transfer Pump #1": "tp_1_", f"{params['facilityName']} Transfer Pump #2": "tp_2_", f"{params['facilityName']} Water Well #1": "ww_1_", f"{params['facilityName']} Water Well #2": "ww_2_", f"{params['facilityName']} Water Well #3": "ww_3_", f"{params['facilityName']} Water Well #4": "ww_4_", f"{params['facilityName']} Water Well #5": "ww_5_", f"{params['facilityName']} Water Well #6": "ww_6_" } measure = nameMap.get(payload["device"], "") + payload["data"]["params"]["measurement"] output = {"measurement": measure, "value": payload["data"]["params"]["value"]} return output def chunk_payload(payload, chunk_size=20): if "values" in payload: # Original format: {"ts": ..., "values": {...}} chunked_values = list(payload["values"].items()) for i in range(0, len(chunked_values), chunk_size): yield { "ts": payload["ts"], "values": dict(chunked_values[i:i+chunk_size]) } else: # New format: {"key1": "value1", "key2": "value2"} chunked_keys = list(payload.keys()) for i in range(0, len(chunked_keys), chunk_size): yield {k: payload[k] for k in chunked_keys[i:i+chunk_size]} def chunk_payload_gateway(payload, chunk_size=20, is_attributes_payload=False): if is_attributes_payload: # For attributes payload, chunk the controllers controllers = list(payload.items()) for i in range(0, len(controllers), chunk_size): yield dict(controllers[i:i + chunk_size]) else: # For data payload, chunk the values within each controller for controller, data in payload.items(): for entry in data: ts = entry['ts'] values = entry['values'] chunked_values = list(values.items()) for i in range(0, len(chunked_values), chunk_size): yield { controller: [{ "ts": ts, "values": dict(chunked_values[i:i + chunk_size]) }] } def controlName(name): params = convertDStoJSON(get_params()) nameMap = { "tp_1": f"{params['facilityName']} Transfer Pump #1", "tp_2": f"{params['facilityName']} Transfer Pump #2", "ww_1": f"{params['facilityName']} Water Well #1", "ww_2": f"{params['facilityName']} Water Well #2", "ww_3": f"{params['facilityName']} Water Well #3", "ww_4": f"{params['facilityName']} Water Well #4", "ww_5": f"{params['facilityName']} Water Well #5", "ww_6": f"{params['facilityName']} Water Well #6" } parts = "_".join(name.split("_")[:2]) return nameMap.get(parts, "Gateway") # Filter payloads based on device_filter def filter_payload(payload, device_filter): if not device_filter: # If filter is empty, include all devices return payload return {key: value for key, value in payload.items() if key in device_filter} def sync(device_filter=[]): #get new values and send now = round(dt.timestamp(dt.now()))*1000 topic = "v1/gateway/telemetry" try: data = recall()#json.loads(recall().decode("utf-8")) except Exception as e: logger.error(e) logger.debug(data) logger.info("SYNCING") grouped_data = {} grouped_attributes = {} payload = {"ts": now, "values":{}} attributes_payload = {} try: for controller in data: for measure in controller["measures"]: ctrlName = controlName(measure["name"]) if ctrlName == "Gateway": #send to gateway with v1/devices/me/telemetry if measure["health"] == 1: if "_spt" in measure["name"]: attributes_payload[measure["name"]] = measure["value"] else: payload["values"][measure["name"]] = measure["value"] else: name = "_".join(measure['name'].split("_")[2:]) value = measure['value'] health = measure['health'] #Add controller for telemetry if it doesn't exist if ctrlName not in grouped_data: grouped_data[ctrlName] = {} #Add controller for attributes if it doesn't exist if ctrlName not in grouped_attributes: grouped_attributes[ctrlName] = {} grouped_attributes[ctrlName]["latestReportTime"] = now #Add data to temp payload if datapoint health is good if health: if "_spt" in name: grouped_attributes[ctrlName][name] = value else: grouped_data[ctrlName][name] = value except Exception as e: logger.error(e) try: # Transform the grouped data to desired structure payload_gateway = {} for key, value in grouped_data.items(): if value: payload_gateway[key] = [{"ts": now ,"values": value}] attributes_payload_gateway = {} for key, value in grouped_attributes.items(): if value: attributes_payload_gateway[key] = value # Apply the filter filtered_payload_gateway = filter_payload(payload_gateway, device_filter) filtered_attributes_payload_gateway = filter_payload(attributes_payload_gateway, device_filter) #Send data belonging to Gateway for chunk in chunk_payload(payload=payload): publish("v1/devices/me/telemetry", json.dumps(chunk), qos=1, cloud_name="default") time.sleep(2) attributes_payload["latestReportTime"] = (round(dt.timestamp(dt.now())/600)*600)*1000 for chunk in chunk_payload(payload=attributes_payload): publish("v1/devices/me/attributes", json.dumps(chunk), qos=1, cloud_name="default") time.sleep(2) #Send gateway devices data for chunk in chunk_payload_gateway(payload=filtered_payload_gateway): publish("v1/gateway/telemetry", json.dumps(chunk), qos=1, cloud_name="default") time.sleep(2) for chunk in chunk_payload_gateway(payload=filtered_attributes_payload_gateway, is_attributes_payload=True): publish("v1/gateway/attributes", json.dumps(chunk), qos=1, cloud_name="default") time.sleep(2) except Exception as e: logger.error(e) def writeplctag(value): #value in the form {"measurement": , "value": } try: #logger.info(value) #payload format: [{"name": "advvfdipp", "measures": [{"name": "manualfrequencysetpoint", "value": 49}]}] message = [{"name": "rr_facility", "measures":[{"name":value["measurement"], "value": value["value"]}]}] resp = write(message) logger.debug("RETURN FROM WRITE: {}".format(resp)) return True except Exception as e: logger.error(e) return False def receiveCommand(topic, payload): try: logger.debug(topic) logger.debug(json.loads(payload)) p = json.loads(payload) #logger.info(p) command = p["data"]["method"] commands = { "sync": sync, "writeplctag": writeplctag, } if command == "setPLCTag": try: params = formatPLCPayload(p) #logger.info(params) result = commands["writeplctag"](params) logger.debug(result) except Exception as e: logger.error(e) elif command == "startWW": try: params = formatPLCPayload({"device": p["device"], "data": {"params": {"measurement": "auto_cmd", "value": 0}}}) result = commands["writeplctag"](params) time.sleep(1) params = formatPLCPayload({"device": p["device"], "data": {"params": {"measurement": "manual_run_cmd", "value": 1}}}) result = commands["writeplctag"](params) except Exception as e: logger.error(f"Error in startWW: {e}") elif command == "manualAutoSwitch": try: if p["data"]["params"]["direction"] == "manualToAuto": params = formatPLCPayload({"device": p["device"], "data": {"params": {"measurement": "manual_run_cmd", "value": 0}}}) result = commands["writeplctag"](params) time.sleep(1) params = formatPLCPayload({"device": p["device"], "data": {"params": {"measurement": "auto_cmd", "value": 1}}}) result = commands["writeplctag"](params) elif p["data"]["params"]["direction"] == "autoToManual": params = formatPLCPayload({"device": p["device"], "data": {"params": {"measurement": "auto_cmd", "value": 0}}}) result = commands["writeplctag"](params) else: logger.error(f'Invalid input in manualAutoSwitch: {p["data"]["params"]["direction"]}') except Exception as e: logger.error(f"Error in manualToAuto: {e}") ackPayload = {"device": p["device"], "id": p["data"]["id"], "data": {"success": True}} ack(ackPayload) time.sleep(5) try: sync(p["device"]) except Exception as e: logger.error(f"Could not sync: {e}") except Exception as e: logger.error(e) def ack(message): publish("v1/gateway/rpc", json.dumps(message), 1, cloud_name="default")