Files
HP_InHand_IG502/Pub_Sub/flowmeterskid/mistaway/terri-pit-inlet-send.py
2024-02-28 14:25:02 -06:00

394 lines
17 KiB
Python

import json, os, time, uuid
from datetime import datetime as dt
from common.Logger import logger
from quickfaas.remotebus import publish
from mobiuspi_lib.gps import GPS
from paho.mqtt import client
from quickfaas.global_dict import get as get_params
from quickfaas.global_dict import _set_global_args
from quickfaas.measure import recall
def reboot(reason="Rebooting for config file update"):
#basic = Basic()
logger.info("!" * 10 + "REBOOTING DEVICE" + "!"*10)
logger.info(reason)
r = os.popen("kill -s SIGHUP `cat /var/run/python/supervisord.pid`").read()
logger.info(f"REBOOT : {r}")
def checkFileExist(filename):
path = "/var/user/files"
try:
if not os.path.exists(path):
logger.debug("no folder making files folder in var/user")
os.makedirs(path)
with open(path + "/" + filename, "a") as f:
json.dump({}, f)
except Exception as e:
logger.error(f"Something went wrong in checkFileExist while making folder: {e}")
try:
if not os.path.exists(path + "/" + filename):
logger.debug("no creds file making creds file")
with open(path + "/" + filename, "a") as f:
json.dump({}, f)
except Exception as e:
logger.error(f"Something went wrong in checkFileExist while making file: {e}")
def convertDStoJSON(ds):
j = dict()
try:
for x in ds:
j[x["key"]] = x["value"]
except Exception as e:
logger.error(f"Something went wrong in convertDStoJSON: {e}")
return j
def convertJSONtoDS(j):
d = []
try:
for key in j.keys():
d.append({"key": key, "value": j[key]})
except Exception as e:
logger.error(f"Something went wrong in convertJSONtoDS: {e}")
return d
def checkCredentialConfig():
logger.debug("CHECKING CONFIG")
cfgpath = "/var/user/cfg/device_supervisor/device_supervisor.cfg"
credspath = "/var/user/files/creds.json"
cfg = dict()
with open(cfgpath, "r") as f:
try:
cfg = json.load(f)
clouds = cfg.get("clouds")
logger.debug(clouds)
#if not configured then try to configure from stored values
if clouds[0]["args"]["clientId"] == "unknown" or clouds[0]["args"]["username"] == "unknown" or not clouds[0]["args"]["passwd"] or clouds[0]["args"]["passwd"] == "unknown":
try:
checkFileExist("creds.json")
except Exception as e:
logger.error(f"Error in checkFileExist: {e}")
with open(credspath, "r") as c:
try:
creds = json.load(c)
if creds:
logger.debug("updating config with stored data")
clouds[0]["args"]["clientId"] = creds["clientId"]
clouds[0]["args"]["username"] = creds["userName"]
clouds[0]["args"]["passwd"] = creds["password"]
cfg["clouds"] = clouds
cfg = checkParameterConfig(cfg)
with open(cfgpath, "w", encoding='utf-8') as n:
json.dump(cfg, n, indent=1, ensure_ascii=False)
reboot()
except Exception as e:
logger.error(f"Error trying to load credentials from file: {e}")
else:
#assuming clouds is filled out, if data is different then assume someone typed in something new and store it, if creds is empty fill with clouds' data
checkFileExist("creds.json")
with open(credspath, "r") as c:
logger.debug("updating stored file with new data")
cfg = checkParameterConfig(cfg)
with open(cfgpath, "w", encoding='utf-8') as n:
json.dump(cfg, n, indent=1, ensure_ascii=False)
creds = json.load(c)
if creds:
if creds["clientId"] != clouds[0]["args"]["clientId"]:
creds["clientId"] = clouds[0]["args"]["clientId"]
if creds["userName"] != clouds[0]["args"]["username"]:
creds["userName"] = clouds[0]["args"]["username"]
if creds["password"] != clouds[0]["args"]["passwd"]:
creds["password"] = clouds[0]["args"]["passwd"]
else:
creds["clientId"] = clouds[0]["args"]["clientId"]
creds["userName"] = clouds[0]["args"]["username"]
creds["password"] = clouds[0]["args"]["passwd"]
with open(credspath, "w") as cw:
json.dump(creds,cw)
except Exception as e:
logger.error(f"Somethign went wrong in checkCredentialConfig: {e}")
def checkParameterConfig(cfg):
try:
logger.debug("Checking Parameters!!!!")
paramspath = "/var/user/files/params.json"
cfgparams = convertDStoJSON(cfg.get("labels"))
#check stored values
checkFileExist("params.json")
with open(paramspath, "r") as f:
logger.debug("Opened param storage file")
params = json.load(f)
if params:
if cfgparams != params:
#go through each param
#if not "unknown" and cfg and params aren't the same take from cfg likely updated manually
#if key in cfg but not in params copy to params
logger.debug("equalizing params between cfg and stored")
for key in cfgparams.keys():
try:
if cfgparams[key] != params[key] and cfgparams[key] != "unknown":
params[key] = cfgparams[key]
except:
params[key] = cfgparams[key]
cfg["labels"] = convertJSONtoDS(params)
_set_global_args(convertJSONtoDS(params))
with open(paramspath, "w") as p:
json.dump(params, p)
else:
with open(paramspath, "w") as p:
logger.debug("initializing param file with params in memory")
json.dump(convertDStoJSON(get_params()), p)
cfg["labels"] = get_params()
return cfg
except Exception as e:
logger.error(f"Something went wrong in checkParameterConfig: {e}")
os.system(f'rm {paramspath}')
return cfg
payload = {}
def get_totalizers(path):
try:
with open(path, "r") as t:
logger.info(f"OPEN FILE FOR {path}")
totalizers = json.load(t)
logger.info(f"LOADED JSON FOR {path}")
if not totalizers:
logger.info("-----INITIALIZING TOTALIZERS-----")
totalizers = {
"day": 0,
"week": 0,
"month": 0,
"year": 0,
"lifetime": 0,
"dayHolding": 0,
"weekHolding": 0,
"monthHolding": 0,
"yearHolding": 0
}
except Exception as e:
logger.error(f"Something went wrong in get_totalizers {path}: {e}")
return False
return totalizers
def aggregate():
try:
data = recall()#json.loads(recall().decode("utf-8"))
except Exception as e:
logger.error(e)
return
logger.debug(data)
flowrate, totalday, totalweek, totalmonth, totalyear = 0, 0, 0, 0, 0
for controller in data:
if controller.get("name") == "flowmeter":
path = "/var/user/files/totalizers.json"
checkCredentialConfig()
elif controller.get("name") == "flowmeter2":
path = "/var/user/files/totalizers2.json"
elif controller.get("name") == "flowmeter3":
path = "/var/user/files/totalizers3.json"
for measure in controller["measures"]:
if measure.get("name") in ["flowrate", "flowrate_2", "flowrate_3"]:
flowrate += measure.get("value")
if measure.get("name") in ["totalizer_1", "totalizer_1_2", "totalizer_1_3"]:
totalizers = get_totalizers(path)
totalday += measure.get("value") - totalizers.get("dayHolding")
totalweek += measure.get("value") - totalizers.get("weekHolding", 0.0)
totalmonth += measure.get("value") - totalizers.get("monthHolding")
totalyear += measure.get("value") - totalizers.get("yearHolding", 0.0)
payload = {"ts": (round(dt.timestamp(dt.now())/600)*600)*1000, "values": {}}
payload["values"]["flowrateSum"] = flowrate
payload["values"]["day_volume_sum"] = totalday
payload["values"]["week_volume_sum"] = totalweek
payload["values"]["month_volume_sum"] = totalmonth
payload["values"]["year_volume_sum"] = totalyear
logger.debug(payload)
publish(__topic__, json.dumps(payload), __qos__)
lwtData = {
"init":False,
"client": client.Client(client_id=str(uuid.uuid4()), clean_session=True, userdata=None, protocol=client.MQTTv311, transport="tcp")
}
def lwt(mac):
try:
#if not lwtData["connected"]:
if not lwtData["init"]:
print("INITIALIZING LWT CLIENT")
lwtData["client"].username_pw_set(username="admin", password="columbus")
lwtData["client"].will_set("meshify/db/194/_/mainHP/" + mac + ":00:00/connected",json.dumps([{"value":False}]))
lwtData["client"].reconnect_delay_set(min_delay=10, max_delay=120)
lwtData["init"] = True
print("Connecting to MQTT Broker for LWT purposes!!!!!!!")
lwtData["client"].connect("mq194.imistaway.net",1883, 1200)
lwtData["client"].reconnect()
lwtData["client"].publish("meshify/db/194/_/mainHP/" + mac + ":00:00/connected", json.dumps([{"value":True}]))
except Exception as e:
print("LWT DID NOT DO THE THING")
print(e)
def mapMeasure(measure):
measuremap = {
"flowrate": "fm1_flowrate",
"totalizer_1": "fm1_lifetime",
"month_volume": "fm1_month",
"day_volume": "fm1_todays",
"yesterday_volume": "fm1_yesterdays",
"last_month_volume": "fm1_lastmonth",
"flowrate_2": "fm2_flowrate",
"totalizer_1_2": "fm2_lifetime",
"month_volume_2": "fm2_month",
"day_volume_2": "fm2_todays",
"yesterday_volume_2": "fm2_yesterdays",
"last_month_volume_2": "fm2_lastmonth"
}
return measuremap.get(measure, None)
def sendData(message,wizard_api):
logger.debug(message) #{'group': 'default', 'measures': [{'ctrlName': 'test2', 'name': 't', 'health': 1, 'timestamp': 1682609511, 'value': 0}]}
if message["group_name"] == "default":
path = "/var/user/files/totalizers.json"
unit = ""
suffix = ""
mac = __topic__.split("/")[-1] #':'.join(re.findall('..', '%012x' % uuid.getnode()))
lwt(mac)
checkCredentialConfig()
elif message["group_name"] == "default2":
#logger.debug("processing flow meter 2!!!!!!!!!!")
path = "/var/user/files/totalizers2.json"
unit = "2"
suffix = "_2"
time.sleep(5)
elif message["group_name"] == "default3":
path = "/var/user/files/totalizers3.json"
unit = "3"
suffix = "_3"
time.sleep(10)
payload = {"ts": (round(dt.timestamp(dt.now())/600)*600)*1000, "values": {}}
resetPayload = {"ts": "", "values": {}}
dayReset, weekReset, monthReset, yearReset = False, False, False, False
for measure in message["values"]["flowmeter"+ unit].keys():
if message["values"]["flowmeter"+ unit][measure]["status"]:
try:
if measure in ["totalizer_1", "totalizer_1_2", "totalizer_1_3"]:
totalizers = get_totalizers(path)
if totalizers:
payload["values"]["day_volume" + suffix], dayReset = totalizeDay(message["values"]["flowmeter" + unit][measure]["raw_data"], totalizers, path)
payload["values"]["week_volume" + suffix], weekReset = totalizeWeek(message["values"]["flowmeter"+ unit][measure]["raw_data"], totalizers, path)
payload["values"]["month_volume" + suffix], monthReset = totalizeMonth(message["values"]["flowmeter"+ unit][measure]["raw_data"], totalizers, path)
payload["values"]["year_volume" + suffix], yearReset = totalizeYear(message["values"]["flowmeter"+ unit][measure]["raw_data"], totalizers, path)
payload["values"][measure] = message["values"]["flowmeter" + unit][measure]["raw_data"]
except Exception as e:
logger.error(e)
#publish(__topic__, json.dumps(payload), __qos__)
for measure in payload["values"].keys():
ma_payload = {"value": payload["values"][measure]}
meshifyName = mapMeasure(measure)
#logger.debug(f"Publishing: {measure} | {meshifyName}")
if meshifyName:
logger.debug(__topic__ + ":01:99/" + meshifyName )
publish(__topic__ + ":01:99/" + meshifyName, json.dumps([ma_payload]),__qos__)
if dayReset:
resetPayload["values"]["yesterday_volume" + suffix] = payload["values"]["day_volume" + suffix]
resetPayload["values"]["day_volume" + suffix] = 0
if weekReset:
resetPayload["values"]["last_week_volume" + suffix] = payload["values"]["week_volume" + suffix]
resetPayload["values"]["week_volume" + suffix] = 0
if monthReset:
resetPayload["values"]["last_month_volume" + suffix] = payload["values"]["month_volume" + suffix]
resetPayload["values"]["month_volume" + suffix] = 0
if yearReset:
resetPayload["values"]["last_year_volume" + suffix] = payload["values"]["year_volume" + suffix]
resetPayload["values"]["year_volume" + suffix] = 0
if resetPayload["values"]:
resetPayload["ts"] = 1 + (round(dt.timestamp(dt.now())/600)*600)*1000
#publish(__topic__, json.dumps(resetPayload), __qos__)
for measure in resetPayload["values"].keys():
ma_payload = {"value": resetPayload["values"][measure]}
meshifyName = mapMeasure(measure)
if meshifyName:
publish(__topic__ + ":01:99/" + meshifyName, json.dumps([ma_payload]),__qos__)
if unit == "3":
aggregate()
def saveTotalizers(path, totalizers):
try:
with open(path, "w") as t:
json.dump(totalizers,t)
except Exception as e:
logger.error(e)
def getGPS():
# Create a gps instance
gps = GPS()
# Retrieve GPS information
position_status = gps.get_position_status()
logger.debug("position_status: ")
logger.debug(position_status)
latitude = position_status["latitude"].split(" ")
longitude = position_status["longitude"].split(" ")
lat_dec = int(latitude[0][:-1]) + (float(latitude[1][:-1])/60)
lon_dec = int(longitude[0][:-1]) + (float(longitude[1][:-1])/60)
if latitude[2] == "S":
lat_dec = lat_dec * -1
if longitude[2] == "W":
lon_dec = lon_dec * -1
#lat_dec = round(lat_dec, 7)
#lon_dec = round(lon_dec, 7)
logger.info("HERE IS THE GPS COORDS")
logger.info(f"LATITUDE: {lat_dec}, LONGITUDE: {lon_dec}")
speedKnots = position_status["speed"].split(" ")
speedMPH = float(speedKnots[0]) * 1.151
return (f"{lat_dec:.8f}",f"{lon_dec:.8f}",f"{speedMPH:.2f}")
def totalizeDay(lifetime, totalizers, path):
now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600)
reset = False
value = lifetime - totalizers["dayHolding"]
if not int(now.strftime("%d")) == int(totalizers["day"]):
totalizers["dayHolding"] = lifetime
totalizers["day"] = int(now.strftime("%d"))
saveTotalizers(path,totalizers)
reset = True
return (value,reset)
def totalizeWeek(lifetime, totalizers, path):
now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600)
reset = False
value = lifetime - totalizers["weekHolding"]
if not now.strftime("%U") == totalizers["week"] and now.strftime("%a") == "Sun":
totalizers["weekHolding"] = lifetime
totalizers["week"] = now.strftime("%U")
saveTotalizers(path, totalizers)
reset = True
return (value, reset)
def totalizeMonth(lifetime, totalizers, path):
now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600)
reset = False
value = lifetime - totalizers["monthHolding"]
if not int(now.strftime("%m")) == int(totalizers["month"]):
totalizers["monthHolding"] = lifetime
totalizers["month"] = now.strftime("%m")
saveTotalizers(path, totalizers)
reset = True
return (value,reset)
def totalizeYear(lifetime, totalizers, path):
now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600)
reset = False
value = lifetime - totalizers["yearHolding"]
if not int(now.strftime("%Y")) == int(totalizers["year"]):
totalizers["yearHolding"] = lifetime
totalizers["year"] = now.strftime("%Y")
saveTotalizers(path, totalizers)
reset = True
return (value, reset)