added new tags and totalizer breakouts
This commit is contained in:
363
Pub_Sub/fk_plcpond/sendData.py
Normal file
363
Pub_Sub/fk_plcpond/sendData.py
Normal file
@@ -0,0 +1,363 @@
|
||||
import json, os, time, shutil
|
||||
from datetime import datetime as dt
|
||||
from common.Logger import logger
|
||||
from quickfaas.remotebus import publish
|
||||
from mobiuspi_lib.gps import GPS
|
||||
from quickfaas.global_dict import get as get_params
|
||||
from quickfaas.global_dict import _set_global_args
|
||||
|
||||
def reboot():
|
||||
#basic = Basic()
|
||||
logger.info("!" * 10 + "REBOOTING DEVICE" + "!"*10)
|
||||
r = os.popen("kill -s SIGHUP `cat /var/run/python/supervisord.pid`").read()
|
||||
logger.info(f"REBOOT : {r}")
|
||||
|
||||
def checkFileExist(filename):
|
||||
path = "/var/user/files"
|
||||
if not os.path.exists(path):
|
||||
logger.info("no folder making files folder in var/user")
|
||||
os.makedirs(path)
|
||||
with open(path + "/" + filename, "a") as f:
|
||||
json.dump({}, f)
|
||||
if not os.path.exists(path + "/" + filename):
|
||||
logger.info("no creds file making creds file")
|
||||
with open(path + "/" + filename, "a") as f:
|
||||
json.dump({}, f)
|
||||
|
||||
def convertDStoJSON(ds):
|
||||
j = dict()
|
||||
for x in ds:
|
||||
j[x["key"]] = x["value"]
|
||||
return j
|
||||
|
||||
def convertJSONtoDS(j):
|
||||
d = []
|
||||
for key in j.keys():
|
||||
d.append({"key": key, "value": j[key]})
|
||||
return d
|
||||
|
||||
def checkCredentialConfig():
|
||||
logger.info("CHECKING CONFIG")
|
||||
cfgpath = "/var/user/cfg/device_supervisor/device_supervisor.cfg"
|
||||
credspath = "/var/user/files/creds.json"
|
||||
cfg = dict()
|
||||
with open(cfgpath, "r") as f:
|
||||
cfg = json.load(f)
|
||||
clouds = cfg.get("clouds")
|
||||
logger.info(clouds)
|
||||
#if not configured then try to configure from stored values
|
||||
if clouds[0]["args"]["clientId"] == "unknown" or clouds[0]["args"]["username"] == "unknown" or not clouds[0]["args"]["passwd"] or clouds[0]["args"]["passwd"] == "unknown":
|
||||
checkFileExist("creds.json")
|
||||
with open(credspath, "r") as c:
|
||||
creds = json.load(c)
|
||||
if creds:
|
||||
logger.info("updating config with stored data")
|
||||
clouds[0]["args"]["clientId"] = creds["clientId"]
|
||||
clouds[0]["args"]["username"] = creds["userName"]
|
||||
clouds[0]["args"]["passwd"] = creds["password"]
|
||||
cfg["clouds"] = clouds
|
||||
cfg = checkParameterConfig(cfg)
|
||||
with open(cfgpath, "w", encoding='utf-8') as n:
|
||||
json.dump(cfg, n, indent=1, ensure_ascii=False)
|
||||
reboot()
|
||||
else:
|
||||
#assuming clouds is filled out, if data is different then assume someone typed in something new and store it, if creds is empty fill with clouds' data
|
||||
checkFileExist("creds.json")
|
||||
with open(credspath, "r") as c:
|
||||
logger.info("updating stored file with new data")
|
||||
cfg = checkParameterConfig(cfg)
|
||||
with open(cfgpath, "w", encoding='utf-8') as n:
|
||||
json.dump(cfg, n, indent=1, ensure_ascii=False)
|
||||
creds = json.load(c)
|
||||
if creds:
|
||||
if creds["clientId"] != clouds[0]["args"]["clientId"]:
|
||||
creds["clientId"] = clouds[0]["args"]["clientId"]
|
||||
if creds["userName"] != clouds[0]["args"]["username"]:
|
||||
creds["userName"] = clouds[0]["args"]["username"]
|
||||
if creds["password"] != clouds[0]["args"]["passwd"]:
|
||||
creds["password"] = clouds[0]["args"]["passwd"]
|
||||
else:
|
||||
creds["clientId"] = clouds[0]["args"]["clientId"]
|
||||
creds["userName"] = clouds[0]["args"]["username"]
|
||||
creds["password"] = clouds[0]["args"]["passwd"]
|
||||
with open(credspath, "w") as cw:
|
||||
json.dump(creds,cw)
|
||||
|
||||
def checkParameterConfig(cfg):
|
||||
logger.info("Checking Parameters!!!!")
|
||||
paramspath = "/var/user/files/params.json"
|
||||
cfgparams = convertDStoJSON(cfg.get("labels"))
|
||||
#check stored values
|
||||
checkFileExist("params.json")
|
||||
with open(paramspath, "r") as f:
|
||||
logger.info("Opened param storage file")
|
||||
params = json.load(f)
|
||||
if params:
|
||||
if cfgparams != params:
|
||||
#go through each param
|
||||
#if not "unknown" and cfg and params aren't the same take from cfg likely updated manually
|
||||
#if key in cfg but not in params copy to params
|
||||
logger.info("equalizing params between cfg and stored")
|
||||
for key in cfgparams.keys():
|
||||
try:
|
||||
if cfgparams[key] != params[key] and cfgparams[key] != "unknown":
|
||||
params[key] = cfgparams[key]
|
||||
except:
|
||||
params[key] = cfgparams[key]
|
||||
cfg["labels"] = convertJSONtoDS(params)
|
||||
_set_global_args(convertJSONtoDS(params))
|
||||
with open(paramspath, "w") as p:
|
||||
json.dump(params, p)
|
||||
else:
|
||||
with open(paramspath, "w") as p:
|
||||
logger.info("initializing param file with params in memory")
|
||||
json.dump(convertDStoJSON(get_params()), p)
|
||||
cfg["labels"] = get_params()
|
||||
|
||||
return cfg
|
||||
|
||||
payload = {}
|
||||
|
||||
def initialize_totalizers():
|
||||
return {
|
||||
"day": 0,
|
||||
"week": 0,
|
||||
"month": 0,
|
||||
"year": 0,
|
||||
"lifetime": 0,
|
||||
"dayHolding": 0,
|
||||
"weekHolding": 0,
|
||||
"monthHolding": 0,
|
||||
"yearHolding": 0,
|
||||
"rolloverCounter": 0
|
||||
}
|
||||
|
||||
def getTotalizers(file_path="/var/user/files/totalizers.json"):
|
||||
"""
|
||||
Retrieves totalizer data from a JSON file.
|
||||
|
||||
:param file_path: Path to the JSON file storing totalizer data.
|
||||
:return: Dictionary containing totalizer values.
|
||||
"""
|
||||
try:
|
||||
with open(file_path, "r") as t:
|
||||
totalizers = json.load(t)
|
||||
if not totalizers or not isinstance(totalizers, dict):
|
||||
logger.info("Invalid data format in the file. Initializing totalizers.")
|
||||
totalizers = initialize_totalizers()
|
||||
except FileNotFoundError:
|
||||
logger.info("File not found. Initializing totalizers.")
|
||||
totalizers = initialize_totalizers()
|
||||
except json.JSONDecodeError:
|
||||
timestamp = dt.now().strftime("%Y%m%d_%H%M%S")
|
||||
# Split the file path and insert the timestamp before the extension
|
||||
file_name, file_extension = os.path.splitext(file_path)
|
||||
backup_file_path = f"{file_name}_{timestamp}{file_extension}"
|
||||
shutil.copyfile(file_path, backup_file_path)
|
||||
logger.error(f"Error decoding JSON. A backup of the file is created at {backup_file_path}. Initializing totalizers.")
|
||||
totalizers = initialize_totalizers()
|
||||
return totalizers
|
||||
|
||||
# Helper function to split the payload into chunks
|
||||
def chunk_payload(payload, chunk_size=20):
|
||||
chunked_values = list(payload["values"].items())
|
||||
for i in range(0, len(chunked_values), chunk_size):
|
||||
yield {
|
||||
"ts": payload["ts"],
|
||||
"values": dict(chunked_values[i:i+chunk_size])
|
||||
}
|
||||
|
||||
def sendData(message):
|
||||
payload = {}
|
||||
payload["ts"] = (round(dt.timestamp(dt.now())/600)*600)*1000
|
||||
payload["values"] = {}
|
||||
resetPayload = {"ts": "", "values": {}}
|
||||
dayReset, weekReset, monthReset, yearReset = False, False, False, False
|
||||
totalizer_1 = None
|
||||
try:
|
||||
checkCredentialConfig()
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
for measure in message["measures"]:
|
||||
try:
|
||||
logger.debug(measure)
|
||||
if measure["name"] in ["fit_1_total"]:
|
||||
totalizer_1 = measure["value"]
|
||||
payload["values"][measure["name"]] = measure["value"]
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
try:
|
||||
if totalizer_1 != None:
|
||||
real_lifetime = totalizer_1
|
||||
payload["values"]["fit_1_year_volume"], yearReset = totalizeYear(real_lifetime)
|
||||
payload["values"]["fit_1_month_volume"], monthReset = totalizeMonth(real_lifetime)
|
||||
payload["values"]["fit_1_week_volume"], weekReset = totalizeWeek(real_lifetime)
|
||||
payload["values"]["fit_1_day_volume"], dayReset = totalizeDay(real_lifetime)
|
||||
payload["values"]["fit_1_real_lifetime"] = real_lifetime
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
|
||||
for chunk in chunk_payload(payload=payload):
|
||||
publish(__topic__, json.dumps(chunk), __qos__)
|
||||
time.sleep(2)
|
||||
publish("v1/devices/me/attributes", json.dumps({"latestReportTime": (round(dt.timestamp(dt.now())/600)*600)*1000}), __qos__)
|
||||
|
||||
if dayReset:
|
||||
resetPayload["values"]["fit_1_yesterday_volume"] = payload["values"]["fit_1_day_volume"]
|
||||
resetPayload["values"]["fit_1_day_volume"] = 0
|
||||
if weekReset:
|
||||
resetPayload["values"]["fit_1_last_week_volume"] = payload["values"]["fit_1_week_volume"]
|
||||
resetPayload["values"]["fit_1_week_volume"] = 0
|
||||
if monthReset:
|
||||
resetPayload["values"]["fit_1_last_month_volume"] = payload["values"]["fit_1_month_volume"]
|
||||
resetPayload["values"]["fit_1_month_volume"] = 0
|
||||
if yearReset:
|
||||
resetPayload["values"]["fit_1_last_year_volume"] = payload["values"]["fit_1_year_volume"]
|
||||
resetPayload["values"]["fit_1_year_volume"] = 0
|
||||
|
||||
if resetPayload["values"]:
|
||||
resetPayload["ts"] = 1 + (round(dt.timestamp(dt.now())/600)*600)*1000
|
||||
publish(__topic__, json.dumps(resetPayload), __qos__)
|
||||
|
||||
def saveTotalizers(totalizers, file_path="/var/user/files/totalizers.json"):
|
||||
"""
|
||||
Saves totalizer data to a JSON file.
|
||||
|
||||
:param totalizers: Dictionary containing totalizer values to be saved.
|
||||
:param file_path: Path to the JSON file where totalizer data will be saved.
|
||||
"""
|
||||
try:
|
||||
with open(file_path, "w") as t:
|
||||
json.dump(totalizers, t)
|
||||
except (IOError, OSError, json.JSONEncodeError) as e:
|
||||
logger.error(f"Error saving totalizers to {file_path}: {e}")
|
||||
raise # Optionally re-raise the exception if it should be handled by the caller
|
||||
|
||||
def totalizeDay(lifetime, max_retries=3, retry_delay=2):
|
||||
"""
|
||||
Update and save daily totalizers based on the lifetime value.
|
||||
|
||||
:param lifetime: The current lifetime total.
|
||||
:param max_retries: Maximum number of save attempts.
|
||||
:param retry_delay: Delay in seconds between retries.
|
||||
:return: A tuple containing the calculated value and a boolean indicating if a reset occurred, or (None, False) if save fails.
|
||||
"""
|
||||
totalizers = getTotalizers()
|
||||
now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600)
|
||||
reset = False
|
||||
value = lifetime - totalizers["dayHolding"]
|
||||
|
||||
if not int(now.strftime("%d")) == int(totalizers["day"]):
|
||||
totalizers["dayHolding"] = lifetime
|
||||
totalizers["day"] = int(now.strftime("%d"))
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
saveTotalizers(totalizers)
|
||||
reset = True
|
||||
return (value, reset)
|
||||
except Exception as e:
|
||||
logger.error(f"Attempt {attempt + 1} failed to save totalizers: {e}")
|
||||
if attempt < max_retries - 1:
|
||||
time.sleep(retry_delay)
|
||||
else:
|
||||
logger.error("All attempts to save totalizers failed.")
|
||||
return (None, False)
|
||||
|
||||
return (value, reset)
|
||||
|
||||
def totalizeWeek(lifetime, max_retries=3, retry_delay=2):
|
||||
"""
|
||||
Update and save weekly totalizers based on the lifetime value.
|
||||
|
||||
:param lifetime: The current lifetime total.
|
||||
:param max_retries: Maximum number of save attempts.
|
||||
:param retry_delay: Delay in seconds between retries.
|
||||
:return: A tuple containing the calculated value and a boolean indicating if a reset occurred, or (None, False) if save fails.
|
||||
"""
|
||||
totalizers = getTotalizers()
|
||||
now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600)
|
||||
reset = False
|
||||
value = lifetime - totalizers["weekHolding"]
|
||||
|
||||
if (not now.strftime("%U") == totalizers["week"] and now.strftime("%a") == "Sun") or totalizers["week"] == 0:
|
||||
totalizers["weekHolding"] = lifetime
|
||||
totalizers["week"] = now.strftime("%U")
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
saveTotalizers(totalizers)
|
||||
reset = True
|
||||
return (value, reset)
|
||||
except Exception as e:
|
||||
logger.error(f"Attempt {attempt + 1} failed to save totalizers: {e}")
|
||||
if attempt < max_retries - 1:
|
||||
time.sleep(retry_delay)
|
||||
else:
|
||||
logger.error("All attempts to save totalizers failed.")
|
||||
return (None, False)
|
||||
return (value, reset)
|
||||
|
||||
def totalizeMonth(lifetime, max_retries=3, retry_delay=2):
|
||||
"""
|
||||
Update and save monthly totalizers based on the lifetime value.
|
||||
|
||||
:param lifetime: The current lifetime total.
|
||||
:param max_retries: Maximum number of save attempts.
|
||||
:param retry_delay: Delay in seconds between retries.
|
||||
:return: A tuple containing the calculated value and a boolean indicating if a reset occurred, or (None, False) if save fails.
|
||||
"""
|
||||
totalizers = getTotalizers()
|
||||
now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600)
|
||||
reset = False
|
||||
value = lifetime - totalizers["monthHolding"]
|
||||
|
||||
if not int(now.strftime("%m")) == int(totalizers["month"]):
|
||||
totalizers["monthHolding"] = lifetime
|
||||
totalizers["month"] = now.strftime("%m")
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
saveTotalizers(totalizers)
|
||||
reset = True
|
||||
return (value, reset)
|
||||
except Exception as e:
|
||||
logger.error(f"Attempt {attempt + 1} failed to save totalizers: {e}")
|
||||
if attempt < max_retries - 1:
|
||||
time.sleep(retry_delay)
|
||||
else:
|
||||
logger.error("All attempts to save totalizers failed.")
|
||||
return (None, False)
|
||||
|
||||
return (value,reset)
|
||||
|
||||
def totalizeYear(lifetime, max_retries=3, retry_delay=2):
|
||||
"""
|
||||
Update and save yearly totalizers based on the lifetime value.
|
||||
|
||||
:param lifetime: The current lifetime total.
|
||||
:param max_retries: Maximum number of save attempts.
|
||||
:param retry_delay: Delay in seconds between retries.
|
||||
:return: A tuple containing the calculated value and a boolean indicating if a reset occurred, or (None, False) if save fails.
|
||||
"""
|
||||
totalizers = getTotalizers()
|
||||
now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600)
|
||||
reset = False
|
||||
value = lifetime - totalizers["yearHolding"]
|
||||
|
||||
if not int(now.strftime("%Y")) == int(totalizers["year"]):
|
||||
totalizers["yearHolding"] = lifetime
|
||||
totalizers["year"] = now.strftime("%Y")
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
saveTotalizers(totalizers)
|
||||
reset = True
|
||||
return (value, reset)
|
||||
except Exception as e:
|
||||
logger.error(f"Attempt {attempt + 1} failed to save totalizers: {e}")
|
||||
if attempt < max_retries - 1:
|
||||
time.sleep(retry_delay)
|
||||
else:
|
||||
logger.error("All attempts to save totalizers failed.")
|
||||
return (None, False)
|
||||
@@ -2,6 +2,8 @@ MeasuringPointName,ControllerName,GroupName,UploadType,DeadZonePercent,DataType,
|
||||
alarm_enable_cmd,plcpond,default,periodic,,INT,,0,,,Alarm_Enable,,,rw,,,none,,,,,,,,,,,,,,,0
|
||||
auto,plcpond,default,periodic,,BIT,,,,0,Auto,,,rw,,,none,,,,,,,,,,,,,0,,0
|
||||
discharge_out,plcpond,default,periodic,,FLOAT,,,,,DisPSI_Out,2,,ro,,,none,,,,,,,,,,,,,,,0
|
||||
fit_1_rate,plcpond,default,periodic,,FLOAT,,,,,FIT1_FLOWRATE,2,,ro,,,none,,,,,,,,,,,,,,,0
|
||||
fit_1_total,plcpond,default,periodic,,FLOAT,,,,,FIT1_TOTAL_FLOWRATE,2,,ro,,,none,,,,,,,,,,,,,,,0
|
||||
fit_rate,plcpond,default,periodic,,FLOAT,,,,,FIT_Rate,2,,ro,,,none,,,,,,,,,,,,,,,0
|
||||
hand,plcpond,default,periodic,,BIT,,,,0,Hand,,,ro,,,none,,,,,,,,,,,,,0,,0
|
||||
off,plcpond,default,periodic,,BIT,,,,0,OFF,,,ro,,,none,,,,,,,,,,,,,0,,0
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user