updated sendData

This commit is contained in:
Nico Melone
2025-05-29 09:05:27 -05:00
parent d4d056ee7f
commit 0ba8e12263
2 changed files with 38 additions and 231 deletions

View File

@@ -1,172 +1,12 @@
# Enter your python code.
import json
import os
import time
from datetime import datetime as dt
from common.Logger import logger
from quickfaas.remotebus import publish
from quickfaas.global_dict import get as get_params
from quickfaas.global_dict import _set_global_args
from mobiuspi_lib.gps import GPS
def reboot():
# basic = Basic()
logger.info("!" * 10 + "REBOOTING DEVICE" + "!"*10)
r = os.popen("kill -s SIGHUP `cat /var/run/python/supervisord.pid`").read()
logger.info(f"REBOOT : {r}")
def checkFileExist(filename):
path = "/var/user/files"
if not os.path.exists(path):
logger.info("no folder making files folder in var/user")
os.makedirs(path)
with open(path + "/" + filename, "a") as f:
json.dump({}, f)
if not os.path.exists(path + "/" + filename):
logger.info("no creds file making creds file")
with open(path + "/" + filename, "a") as f:
json.dump({}, f)
def convertDStoJSON(ds):
j = dict()
for x in ds:
j[x["key"]] = x["value"]
return j
def convertJSONtoDS(j):
d = []
for key in j.keys():
d.append({"key": key, "value": j[key]})
return d
def checkCredentialConfig():
logger.info("CHECKING CONFIG")
cfgpath = "/var/user/cfg/device_supervisor/device_supervisor.cfg"
credspath = "/var/user/files/creds.json"
cfg = dict()
with open(cfgpath, "r") as f:
cfg = json.load(f)
clouds = cfg.get("clouds")
logger.info(clouds)
# if not configured then try to configure from stored values
if clouds[0]["args"]["clientId"] == "unknown" or clouds[0]["args"]["username"] == "unknown" or not clouds[0]["args"]["passwd"] or clouds[0]["args"]["passwd"] == "unknown":
checkFileExist("creds.json")
with open(credspath, "r") as c:
creds = json.load(c)
if creds:
logger.info("updating config with stored data")
clouds[0]["args"]["clientId"] = creds["clientId"]
clouds[0]["args"]["username"] = creds["userName"]
clouds[0]["args"]["passwd"] = creds["password"]
cfg["clouds"] = clouds
cfg = checkParameterConfig(cfg)
with open(cfgpath, "w", encoding='utf-8') as n:
json.dump(cfg, n, indent=1, ensure_ascii=False)
reboot()
else:
# assuming clouds is filled out, if data is different then assume someone typed in something new and store it, if creds is empty fill with clouds' data
checkFileExist("creds.json")
with open(credspath, "r") as c:
logger.info("updating stored file with new data")
cfg = checkParameterConfig(cfg)
with open(cfgpath, "w", encoding='utf-8') as n:
json.dump(cfg, n, indent=1, ensure_ascii=False)
creds = json.load(c)
if creds:
if creds["clientId"] != clouds[0]["args"]["clientId"]:
creds["clientId"] = clouds[0]["args"]["clientId"]
if creds["userName"] != clouds[0]["args"]["username"]:
creds["userName"] = clouds[0]["args"]["username"]
if creds["password"] != clouds[0]["args"]["passwd"]:
creds["password"] = clouds[0]["args"]["passwd"]
else:
creds["clientId"] = clouds[0]["args"]["clientId"]
creds["userName"] = clouds[0]["args"]["username"]
creds["password"] = clouds[0]["args"]["passwd"]
with open(credspath, "w") as cw:
json.dump(creds, cw)
def checkParameterConfig(cfg):
logger.info("Checking Parameters!!!!")
paramspath = "/var/user/files/params.json"
cfgparams = convertDStoJSON(cfg.get("labels"))
# check stored values
checkFileExist("params.json")
with open(paramspath, "r") as f:
logger.info("Opened param storage file")
params = json.load(f)
if params:
if cfgparams != params:
# go through each param
# if not "unknown" and cfg and params aren't the same take from cfg likely updated manually
# if key in cfg but not in params copy to params
logger.info("equalizing params between cfg and stored")
for key in cfgparams.keys():
try:
if cfgparams[key] != params[key] and cfgparams[key] != "unknown":
params[key] = cfgparams[key]
except:
params[key] = cfgparams[key]
cfg["labels"] = convertJSONtoDS(params)
_set_global_args(convertJSONtoDS(params))
with open(paramspath, "w") as p:
json.dump(params, p)
else:
with open(paramspath, "w") as p:
logger.info("initializing param file with params in memory")
json.dump(convertDStoJSON(get_params()), p)
cfg["labels"] = get_params()
return cfg
def getGPS():
# Create a gps instance
gps = GPS()
# Retrieve GPS information
position_status = gps.get_position_status()
logger.debug("position_status: ")
logger.debug(position_status)
latitude = position_status["latitude"].split(" ")
longitude = position_status["longitude"].split(" ")
lat_dec = int(latitude[0][:-1]) + (float(latitude[1][:-1])/60)
lon_dec = int(longitude[0][:-1]) + (float(longitude[1][:-1])/60)
if latitude[2] == "S":
lat_dec = lat_dec * -1
if longitude[2] == "W":
lon_dec = lon_dec * -1
# lat_dec = round(lat_dec, 7)
# lon_dec = round(lon_dec, 7)
logger.info("HERE IS THE GPS COORDS")
logger.info(f"LATITUDE: {lat_dec}, LONGITUDE: {lon_dec}")
speedKnots = position_status["speed"].split(" ")
speedMPH = float(speedKnots[0]) * 1.151
return (f"{lat_dec:.8f}", f"{lon_dec:.8f}", f"{speedMPH:.2f}")
def chunk_payload(payload, chunk_size=20):
if "values" in payload:
# Original format: {"ts": ..., "values": {...}}
chunked_values = list(payload["values"].items())
for i in range(0, len(chunked_values), chunk_size):
yield {
"ts": payload["ts"],
"values": dict(chunked_values[i:i+chunk_size])
}
else:
# New format: {"key1": "value1", "key2": "value2"}
chunked_keys = list(payload.keys())
for i in range(0, len(chunked_keys), chunk_size):
yield {k: payload[k] for k in chunked_keys[i:i+chunk_size]}
def chunk_payload_devices(payload, chunk_size=20, is_attributes_payload=False):
def chunk_payload(payload, chunk_size=20, is_attributes_payload=False):
if is_attributes_payload:
# For attributes payload, chunk the controllers
controllers = list(payload.items())
@@ -188,85 +28,51 @@ def chunk_payload_devices(payload, chunk_size=20, is_attributes_payload=False):
}
def controlName(name):
logger.debug(name)
params = convertDStoJSON(get_params())
logger.debug(params)
nameMap = {
"overflow_pump": f"{params['overflow_pump']}"
}
return nameMap.get(name, "Gateway")
def sendData(message):
# logger.info(message)
def sendData(message, wizard_api, cloudName):
# logger.debug(message)
# Extract measures and group by ctrlName
grouped_data = {}
grouped_attributes = {}
now = (round(dt.timestamp(dt.now())/600)*600)*1000
payload = {"ts": now, "values": {}}
attributes_payload = {}
for measure in message["measures"]:
try:
logger.debug(measure)
ctrlName = controlName(measure["ctrlName"])
logger.debug(ctrlName)
if ctrlName == "Gateway":
# send to gateway with v1/devices/me/telemetry
if measure["health"] == 1:
if "_spt" in measure["name"]:
attributes_payload[measure["name"]] = measure["value"]
else:
payload["values"][measure["name"]] = measure["value"]
else:
name = measure['name']
value = measure['value']
health = measure['health']
# Add controller for telemetry if it doesn't exist
if ctrlName not in grouped_data:
grouped_data[ctrlName] = {}
# Add controller for attributes if it doesn't exist
if ctrlName not in grouped_attributes:
grouped_attributes[ctrlName] = {}
grouped_attributes[ctrlName]["latestReportTime"] = now
# Add data to temp payload if datapoint health is good
if health:
if "_spt" in name:
grouped_attributes[ctrlName][name] = value
else:
grouped_data[ctrlName][name] = value
except Exception as e:
logger.error(e)
# logger.info(message)
for controller, measures in message["values"].items():
for measure_name, measure_data in measures.items():
ctrlName = " ".join(controller.split("_"))
name = measure_name
value = measure_data["raw_data"]
health = measure_data["status"]
# Add controller for telemetry if it doesn't exist
if ctrlName not in grouped_data:
grouped_data[ctrlName] = {}
# Add controller for attributes if it doesn't exist
if ctrlName not in grouped_attributes:
grouped_attributes[ctrlName] = {}
# Add data to temp payload if datapoint health is good
if health:
grouped_data[ctrlName][name] = value
grouped_attributes[ctrlName]["latestReportTime"] = now
# print(grouped_data)
# Transform the grouped data to desired structure
payload_devices = {}
payload = {}
for key, value in grouped_data.items():
if value:
payload_devices[key] = [{"ts": now, "values": value}]
attributes_payload_devices = {}
payload[key] = [{"ts": now, "values": value}]
attributes_payload = {}
for key, value in grouped_attributes.items():
if value:
attributes_payload_devices[key] = value
logger.debug(payload_devices)
logger.debug(attributes_payload_devices)
# Send data belonging to Gateway
attributes_payload[key] = value
# logger.debug(payload)
for chunk in chunk_payload(payload=payload):
publish(__topic__, json.dumps(chunk), __qos__)
# logger.info(chunk)
# logger.info(__topic__)
# logger.info(cloudName)
publish(__topic__, json.dumps(chunk), __qos__, cloud_name=cloudName)
time.sleep(2)
attributes_payload["latestReportTime"] = (
round(dt.timestamp(dt.now())/600)*600)*1000
for chunk in chunk_payload(payload=attributes_payload):
publish("v1/devices/me/attributes", json.dumps(chunk), __qos__)
time.sleep(2)
# Send gateway devices data
for chunk in chunk_payload_devices(payload=payload_devices):
publish("v1/gateway/telemetry", json.dumps(chunk), __qos__)
time.sleep(2)
for chunk in chunk_payload_devices(payload=attributes_payload_devices, is_attributes_payload=True):
publish("v1/gateway/attributes",
json.dumps(attributes_payload_devices), __qos__)
for chunk in chunk_payload(payload=attributes_payload, is_attributes_payload=True):
publish("v1/gateway/attributes", json.dumps(attributes_payload),
__qos__, cloud_name=cloudName)
time.sleep(2)

View File

@@ -19,9 +19,10 @@ def chunk_payload(payload, chunk_size=20, is_attributes_payload=False):
chunked_values = list(values.items())
for i in range(0, len(chunked_values), chunk_size):
yield {
"controller": controller,
controller:[{
"ts": ts,
"values": dict(chunked_values[i:i + chunk_size])
}]
}
def sendData(message):