From 0ba8e12263725fbac56cbb99dbc0b46158bdfee0 Mon Sep 17 00:00:00 2001 From: Nico Melone Date: Thu, 29 May 2025 09:05:27 -0500 Subject: [PATCH] updated sendData --- .../thingsboard/pub/sendData.py | 266 +++--------------- Pub_Sub/gateway/thingsboard/pub/sendData.py | 3 +- 2 files changed, 38 insertions(+), 231 deletions(-) diff --git a/Pub_Sub/fk_plcpond_gateway/thingsboard/pub/sendData.py b/Pub_Sub/fk_plcpond_gateway/thingsboard/pub/sendData.py index 946ec71..b6ba5d7 100644 --- a/Pub_Sub/fk_plcpond_gateway/thingsboard/pub/sendData.py +++ b/Pub_Sub/fk_plcpond_gateway/thingsboard/pub/sendData.py @@ -1,172 +1,12 @@ +# Enter your python code. import json -import os import time from datetime import datetime as dt from common.Logger import logger from quickfaas.remotebus import publish -from quickfaas.global_dict import get as get_params -from quickfaas.global_dict import _set_global_args -from mobiuspi_lib.gps import GPS -def reboot(): - # basic = Basic() - logger.info("!" * 10 + "REBOOTING DEVICE" + "!"*10) - r = os.popen("kill -s SIGHUP `cat /var/run/python/supervisord.pid`").read() - logger.info(f"REBOOT : {r}") - - -def checkFileExist(filename): - path = "/var/user/files" - if not os.path.exists(path): - logger.info("no folder making files folder in var/user") - os.makedirs(path) - with open(path + "/" + filename, "a") as f: - json.dump({}, f) - if not os.path.exists(path + "/" + filename): - logger.info("no creds file making creds file") - with open(path + "/" + filename, "a") as f: - json.dump({}, f) - - -def convertDStoJSON(ds): - j = dict() - for x in ds: - j[x["key"]] = x["value"] - return j - - -def convertJSONtoDS(j): - d = [] - for key in j.keys(): - d.append({"key": key, "value": j[key]}) - return d - - -def checkCredentialConfig(): - logger.info("CHECKING CONFIG") - cfgpath = "/var/user/cfg/device_supervisor/device_supervisor.cfg" - credspath = "/var/user/files/creds.json" - cfg = dict() - with open(cfgpath, "r") as f: - cfg = json.load(f) - clouds = cfg.get("clouds") - logger.info(clouds) - # if not configured then try to configure from stored values - if clouds[0]["args"]["clientId"] == "unknown" or clouds[0]["args"]["username"] == "unknown" or not clouds[0]["args"]["passwd"] or clouds[0]["args"]["passwd"] == "unknown": - checkFileExist("creds.json") - with open(credspath, "r") as c: - creds = json.load(c) - if creds: - logger.info("updating config with stored data") - clouds[0]["args"]["clientId"] = creds["clientId"] - clouds[0]["args"]["username"] = creds["userName"] - clouds[0]["args"]["passwd"] = creds["password"] - cfg["clouds"] = clouds - cfg = checkParameterConfig(cfg) - with open(cfgpath, "w", encoding='utf-8') as n: - json.dump(cfg, n, indent=1, ensure_ascii=False) - reboot() - else: - # assuming clouds is filled out, if data is different then assume someone typed in something new and store it, if creds is empty fill with clouds' data - checkFileExist("creds.json") - with open(credspath, "r") as c: - logger.info("updating stored file with new data") - cfg = checkParameterConfig(cfg) - with open(cfgpath, "w", encoding='utf-8') as n: - json.dump(cfg, n, indent=1, ensure_ascii=False) - creds = json.load(c) - if creds: - if creds["clientId"] != clouds[0]["args"]["clientId"]: - creds["clientId"] = clouds[0]["args"]["clientId"] - if creds["userName"] != clouds[0]["args"]["username"]: - creds["userName"] = clouds[0]["args"]["username"] - if creds["password"] != clouds[0]["args"]["passwd"]: - creds["password"] = clouds[0]["args"]["passwd"] - else: - creds["clientId"] = clouds[0]["args"]["clientId"] - creds["userName"] = clouds[0]["args"]["username"] - creds["password"] = clouds[0]["args"]["passwd"] - with open(credspath, "w") as cw: - json.dump(creds, cw) - - -def checkParameterConfig(cfg): - logger.info("Checking Parameters!!!!") - paramspath = "/var/user/files/params.json" - cfgparams = convertDStoJSON(cfg.get("labels")) - # check stored values - checkFileExist("params.json") - with open(paramspath, "r") as f: - logger.info("Opened param storage file") - params = json.load(f) - if params: - if cfgparams != params: - # go through each param - # if not "unknown" and cfg and params aren't the same take from cfg likely updated manually - # if key in cfg but not in params copy to params - logger.info("equalizing params between cfg and stored") - for key in cfgparams.keys(): - try: - if cfgparams[key] != params[key] and cfgparams[key] != "unknown": - params[key] = cfgparams[key] - except: - params[key] = cfgparams[key] - cfg["labels"] = convertJSONtoDS(params) - _set_global_args(convertJSONtoDS(params)) - with open(paramspath, "w") as p: - json.dump(params, p) - else: - with open(paramspath, "w") as p: - logger.info("initializing param file with params in memory") - json.dump(convertDStoJSON(get_params()), p) - cfg["labels"] = get_params() - - return cfg - - -def getGPS(): - # Create a gps instance - gps = GPS() - - # Retrieve GPS information - position_status = gps.get_position_status() - logger.debug("position_status: ") - logger.debug(position_status) - latitude = position_status["latitude"].split(" ") - longitude = position_status["longitude"].split(" ") - lat_dec = int(latitude[0][:-1]) + (float(latitude[1][:-1])/60) - lon_dec = int(longitude[0][:-1]) + (float(longitude[1][:-1])/60) - if latitude[2] == "S": - lat_dec = lat_dec * -1 - if longitude[2] == "W": - lon_dec = lon_dec * -1 - # lat_dec = round(lat_dec, 7) - # lon_dec = round(lon_dec, 7) - logger.info("HERE IS THE GPS COORDS") - logger.info(f"LATITUDE: {lat_dec}, LONGITUDE: {lon_dec}") - speedKnots = position_status["speed"].split(" ") - speedMPH = float(speedKnots[0]) * 1.151 - return (f"{lat_dec:.8f}", f"{lon_dec:.8f}", f"{speedMPH:.2f}") - - -def chunk_payload(payload, chunk_size=20): - if "values" in payload: - # Original format: {"ts": ..., "values": {...}} - chunked_values = list(payload["values"].items()) - for i in range(0, len(chunked_values), chunk_size): - yield { - "ts": payload["ts"], - "values": dict(chunked_values[i:i+chunk_size]) - } - else: - # New format: {"key1": "value1", "key2": "value2"} - chunked_keys = list(payload.keys()) - for i in range(0, len(chunked_keys), chunk_size): - yield {k: payload[k] for k in chunked_keys[i:i+chunk_size]} - - -def chunk_payload_devices(payload, chunk_size=20, is_attributes_payload=False): +def chunk_payload(payload, chunk_size=20, is_attributes_payload=False): if is_attributes_payload: # For attributes payload, chunk the controllers controllers = list(payload.items()) @@ -188,85 +28,51 @@ def chunk_payload_devices(payload, chunk_size=20, is_attributes_payload=False): } -def controlName(name): - logger.debug(name) - params = convertDStoJSON(get_params()) - logger.debug(params) - nameMap = { - "overflow_pump": f"{params['overflow_pump']}" - } - return nameMap.get(name, "Gateway") - - -def sendData(message): - # logger.info(message) +def sendData(message, wizard_api, cloudName): + # logger.debug(message) + # Extract measures and group by ctrlName grouped_data = {} grouped_attributes = {} now = (round(dt.timestamp(dt.now())/600)*600)*1000 - payload = {"ts": now, "values": {}} - attributes_payload = {} - for measure in message["measures"]: - try: - logger.debug(measure) - ctrlName = controlName(measure["ctrlName"]) - logger.debug(ctrlName) - if ctrlName == "Gateway": - # send to gateway with v1/devices/me/telemetry - if measure["health"] == 1: - if "_spt" in measure["name"]: - attributes_payload[measure["name"]] = measure["value"] - else: - payload["values"][measure["name"]] = measure["value"] - else: - name = measure['name'] - value = measure['value'] - health = measure['health'] - # Add controller for telemetry if it doesn't exist - if ctrlName not in grouped_data: - grouped_data[ctrlName] = {} - # Add controller for attributes if it doesn't exist - if ctrlName not in grouped_attributes: - grouped_attributes[ctrlName] = {} - grouped_attributes[ctrlName]["latestReportTime"] = now - # Add data to temp payload if datapoint health is good - if health: - if "_spt" in name: - grouped_attributes[ctrlName][name] = value - else: - grouped_data[ctrlName][name] = value - except Exception as e: - logger.error(e) + # logger.info(message) + for controller, measures in message["values"].items(): + for measure_name, measure_data in measures.items(): + ctrlName = " ".join(controller.split("_")) + name = measure_name + value = measure_data["raw_data"] + health = measure_data["status"] + # Add controller for telemetry if it doesn't exist + if ctrlName not in grouped_data: + grouped_data[ctrlName] = {} + # Add controller for attributes if it doesn't exist + if ctrlName not in grouped_attributes: + grouped_attributes[ctrlName] = {} + # Add data to temp payload if datapoint health is good + if health: + grouped_data[ctrlName][name] = value + grouped_attributes[ctrlName]["latestReportTime"] = now + # print(grouped_data) # Transform the grouped data to desired structure - payload_devices = {} + payload = {} for key, value in grouped_data.items(): if value: - payload_devices[key] = [{"ts": now, "values": value}] - - attributes_payload_devices = {} + payload[key] = [{"ts": now, "values": value}] + attributes_payload = {} for key, value in grouped_attributes.items(): if value: - attributes_payload_devices[key] = value - logger.debug(payload_devices) - logger.debug(attributes_payload_devices) - # Send data belonging to Gateway + attributes_payload[key] = value + + # logger.debug(payload) for chunk in chunk_payload(payload=payload): - publish(__topic__, json.dumps(chunk), __qos__) + # logger.info(chunk) + # logger.info(__topic__) + # logger.info(cloudName) + publish(__topic__, json.dumps(chunk), __qos__, cloud_name=cloudName) time.sleep(2) - attributes_payload["latestReportTime"] = ( - round(dt.timestamp(dt.now())/600)*600)*1000 - for chunk in chunk_payload(payload=attributes_payload): - publish("v1/devices/me/attributes", json.dumps(chunk), __qos__) - time.sleep(2) - - # Send gateway devices data - for chunk in chunk_payload_devices(payload=payload_devices): - publish("v1/gateway/telemetry", json.dumps(chunk), __qos__) - time.sleep(2) - - for chunk in chunk_payload_devices(payload=attributes_payload_devices, is_attributes_payload=True): - publish("v1/gateway/attributes", - json.dumps(attributes_payload_devices), __qos__) + for chunk in chunk_payload(payload=attributes_payload, is_attributes_payload=True): + publish("v1/gateway/attributes", json.dumps(attributes_payload), + __qos__, cloud_name=cloudName) time.sleep(2) diff --git a/Pub_Sub/gateway/thingsboard/pub/sendData.py b/Pub_Sub/gateway/thingsboard/pub/sendData.py index 5e0742b..ddfdbc3 100644 --- a/Pub_Sub/gateway/thingsboard/pub/sendData.py +++ b/Pub_Sub/gateway/thingsboard/pub/sendData.py @@ -19,9 +19,10 @@ def chunk_payload(payload, chunk_size=20, is_attributes_payload=False): chunked_values = list(values.items()) for i in range(0, len(chunked_values), chunk_size): yield { - "controller": controller, + controller:[{ "ts": ts, "values": dict(chunked_values[i:i + chunk_size]) + }] } def sendData(message):