Files
HP_InHand_IG502/Pub_Sub/sp_pond/thingsboard/sub/receiveCommand.py
2024-07-31 13:56:21 -05:00

358 lines
16 KiB
Python

import json, os, time, shutil, re
from datetime import datetime as dt
from quickfaas.measure import recall, write
from quickfaas.remotebus import publish
from common.Logger import logger
# Helper function to split the payload into chunks
def chunk_payload(payload, chunk_size=20):
chunked_values = list(payload["values"].items())
for i in range(0, len(chunked_values), chunk_size):
yield {
"ts": payload["ts"],
"values": dict(chunked_values[i:i+chunk_size])
}
def sync():
#get new values and send
payload = {"ts": round(dt.timestamp(dt.now()))*1000, "values": {}}
topic = "v1/devices/me/telemetry"
qos = 1
try:
data = recall()#json.loads(recall().decode("utf-8"))
except Exception as e:
logger.error(e)
logger.debug(data)
resetPayload = {"ts": "", "values": {}}
regexPattern = r"fm_\d{2}_t\d"
pondLevels = {}
topic = "v1/devices/me/telemetry"
for controller in data:
for measure in controller["measures"]:
try:
if measure["health"] == 1:
if re.search(regexPattern, measure["name"]):
dayReset, weekReset, monthReset, yearReset = False, False, False, False
file_name = f"/var/user/files/totalizer_{measure['name']}.json"
payload["values"][measure["name"] + "_day_volume"], dayReset = totalizeDay(measure["value"], file_path=file_name)
payload["values"][measure["name"] + "_week_volume"], weekReset = totalizeWeek(measure["value"], file_path=file_name)
payload["values"][measure["name"] + "_month_volume"], monthReset = totalizeMonth(measure["value"], file_path=file_name)
payload["values"][measure["name"] + "_year_volume"], yearReset = totalizeYear(measure["value"], file_path=file_name)
if dayReset:
resetPayload["values"][measure["name"] + "_yesterday_volume"] = payload["values"][measure["name"] + "_day_volume"]
resetPayload["values"][measure["name"] + "_day_volume"] = 0
if weekReset:
resetPayload["values"][measure["name"] + "_last_week_volume"] = payload["values"][measure["name"] + "_week_volume"]
resetPayload["values"][measure["name"] + "_week_volume"] = 0
if monthReset:
resetPayload["values"][measure["name"] + "_last_month_volume"] = payload["values"][measure["name"] + "_month_volume"]
resetPayload["values"][measure["name"] + "_month_volume"] = 0
if yearReset:
resetPayload["values"][measure["name"] + "_last_year_volume"] = payload["values"][measure["name"] + "_year_volume"]
resetPayload["values"][measure["name"] + "_year_volume"] = 0
if "pond_level" in measure["name"]:
pondLevels[measure["name"]] = measure["value"]
payload["values"][measure["name"]] = measure["value"]
except Exception as e:
logger.error(e)
# GPT4: Loop through the pondLevels dictionary to calculate deviations with error checking
for i in range(1, 4): # Assuming pod IDs range from 01 to 03
pod_id = f"pod_{i:02d}" # Format pod ID
level_keys = [key for key in pondLevels if key.startswith(pod_id)]
levels = [pondLevels[key] for key in level_keys]
# Check if both pond levels are present
if len(levels) == 2:
# Calculate average and percent deviation when levels are present
avg_level = sum(levels) / len(levels) if sum(levels) > 0 else 1
deviation_percent = ((max(levels) - min(levels)) / avg_level) * 100 if avg_level != 0 else 0
payload["values"][f"{pod_id}_pond_level_deviation"] = deviation_percent
payload["values"][f"{pod_id}_pond_level_alm"] = 0 # Reset alarm to 0
else:
# Set alarm value if one or both levels are missing
payload["values"][f"{pod_id}_pond_level_alm"] = 1
for chunk in chunk_payload(payload=payload):
publish(topic, json.dumps(chunk), qos)
time.sleep(2)
publish("v1/devices/me/attributes", json.dumps({"latestReportTime": (round(dt.timestamp(dt.now())/600)*600)*1000}), qos)
if resetPayload["values"]:
resetPayload["ts"] = 1 + (round(dt.timestamp(dt.now())/600)*600)*1000
for chunk in chunk_payload(payload=resetPayload):
publish(topic, json.dumps(chunk), qos)
time.sleep(2)
def writeplctag(value):
#value in the form {"measurement": <measurement_name>, "value": <value to write>}
try:
#value = json.loads(value.replace("'",'"'))
logger.debug(value)
#payload format: [{"name": "advvfdipp", "measures": [{"name": "manualfrequencysetpoint", "value": 49}]}]
message = [{"name": "sp_pond", "measures":[{"name":value["measurement"], "value": value["value"]}]}]
resp = write(message)
logger.debug("RETURN FROM WRITE: {}".format(resp))
if resp[0]["measures"][0]["error_code"] == 0:
return True
else:
return False
except Exception as e:
logger.debug(e)
return False
def receiveCommand(topic, payload):
try:
logger.debug(topic)
logger.debug(json.loads(payload))
p = json.loads(payload)
command = p["method"]
commands = {
"sync": sync,
"writeplctag": writeplctag,
}
if command == "setPLCTag":
try:
result = commands["writeplctag"](p["params"])
logger.debug(result)
if result:
ack(topic.split("/")[-1])
if "cmd" in p["params"]["measurement"]:
time.sleep(0.5)
result = commands["writeplctag"]({"measurement": p["params"]["measurement"], "value": 0})
logger.debug(result)
if result and "start" in p["params"]["measurement"]:
for _ in range(10):
sync()
time.sleep(30)
except Exception as e:
logger.error(e)
elif command == "changeSetpoint":
try:
logger.debug("attempting controlpoint write")
params_type = {"measurement": "pidcontrolmode", "value": p["params"]["setpointType"]}
if params_type["value"]:
commands["writeplctag"](params_type)
time.sleep(2)
except Exception as e:
logger.error("DID NOT WRITE CONTROL MODE")
logger.error(e)
try:
logger.debug("attempting setpoint write")
modes = {0: "flowsetpoint", 1: "fluidlevelsetpoint", 2: "tubingpressuresetpoint", 3: "manualfrequencysetpoint"}
params_value = {"value": p["params"]["setpointValue"]}
if params_value["value"]:
params_value["measurement"] = modes[getMode()]
result = commands["writeplctag"](params_value)
logger.debug(result)
except Exception as e:
logger.error("DID NOT WRITE SETPOINT")
logger.error(e)
#logger.debug(command)
time.sleep(5)
sync()
except Exception as e:
logger.debug(e)
def ack(msgid, metadata="", msgType="ACK"):
#logger.debug(msgid)
#logger.debug(mac)
#logger.debug(name)
#logger.debug(value)
publish("v1/devices/me/rpc/response/" + str(msgid), json.dumps({"msg": {"time": time.time()}, "metadata": metadata, "msgType": msgType}), 1)
def getMode():
try:
data = recall()
for controller in data:
for measure in controller["measures"]:
if measure["name"] == "pidcontrolmode":
return measure["value"]
except:
return None
def initialize_totalizers():
return {
"day": 0,
"week": 0,
"month": 0,
"year": 0,
"lifetime": 0,
"dayHolding": 0,
"weekHolding": 0,
"monthHolding": 0,
"yearHolding": 0
}
def getTotalizers(file_path="/var/user/files/totalizers.json"):
"""
Retrieves totalizer data from a JSON file.
:param file_path: Path to the JSON file storing totalizer data.
:return: Dictionary containing totalizer values.
"""
try:
with open(file_path, "r") as t:
totalizers = json.load(t)
if not totalizers or not isinstance(totalizers, dict):
logger.info("Invalid data format in the file. Initializing totalizers.")
totalizers = initialize_totalizers()
except FileNotFoundError:
logger.info("File not found. Initializing totalizers.")
totalizers = initialize_totalizers()
except json.JSONDecodeError:
timestamp = dt.now().strftime("%Y%m%d_%H%M%S")
# Split the file path and insert the timestamp before the extension
file_name, file_extension = os.path.splitext(file_path)
backup_file_path = f"{file_name}_{timestamp}{file_extension}"
shutil.copyfile(file_path, backup_file_path)
logger.error(f"Error decoding JSON. A backup of the file is created at {backup_file_path}. Initializing totalizers.")
totalizers = initialize_totalizers()
return totalizers
def saveTotalizers(totalizers, file_path="/var/user/files/totalizers.json"):
"""
Saves totalizer data to a JSON file.
:param totalizers: Dictionary containing totalizer values to be saved.
:param file_path: Path to the JSON file where totalizer data will be saved.
"""
try:
with open(file_path, "w") as t:
json.dump(totalizers, t)
except (IOError, OSError, json.JSONEncodeError) as e:
logger.error(f"Error saving totalizers to {file_path}: {e}")
raise # Optionally re-raise the exception if it should be handled by the caller
def totalizeDay(lifetime, file_path="/var/user/files/totalizers.json", max_retries=3, retry_delay=2):
"""
Update and save daily totalizers based on the lifetime value.
:param lifetime: The current lifetime total.
:param max_retries: Maximum number of save attempts.
:param retry_delay: Delay in seconds between retries.
:return: A tuple containing the calculated value and a boolean indicating if a reset occurred, or (None, False) if save fails.
"""
totalizers = getTotalizers(file_path=file_path)
now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600)
reset = False
value = lifetime - totalizers["dayHolding"]
if not int(now.strftime("%d")) == int(totalizers["day"]):
totalizers["dayHolding"] = lifetime
totalizers["day"] = int(now.strftime("%d"))
for attempt in range(max_retries):
try:
saveTotalizers(totalizers, file_path=file_path)
reset = True
return (value, reset)
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed to save totalizers: {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
logger.error("All attempts to save totalizers failed.")
return (None, False)
return (value, reset)
def totalizeWeek(lifetime, file_path="/var/user/files/totalizers.json", max_retries=3, retry_delay=2):
"""
Update and save weekly totalizers based on the lifetime value.
:param lifetime: The current lifetime total.
:param max_retries: Maximum number of save attempts.
:param retry_delay: Delay in seconds between retries.
:return: A tuple containing the calculated value and a boolean indicating if a reset occurred, or (None, False) if save fails.
"""
totalizers = getTotalizers(file_path=file_path)
now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600)
reset = False
value = lifetime - totalizers["weekHolding"]
if (not now.strftime("%U") == totalizers["week"] and now.strftime("%a") == "Sun") or totalizers["week"] == 0:
totalizers["weekHolding"] = lifetime
totalizers["week"] = now.strftime("%U")
for attempt in range(max_retries):
try:
saveTotalizers(totalizers, file_path=file_path)
reset = True
return (value, reset)
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed to save totalizers: {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
logger.error("All attempts to save totalizers failed.")
return (None, False)
return (value, reset)
def totalizeMonth(lifetime, file_path="/var/user/files/totalizers.json", max_retries=3, retry_delay=2):
"""
Update and save monthly totalizers based on the lifetime value.
:param lifetime: The current lifetime total.
:param max_retries: Maximum number of save attempts.
:param retry_delay: Delay in seconds between retries.
:return: A tuple containing the calculated value and a boolean indicating if a reset occurred, or (None, False) if save fails.
"""
totalizers = getTotalizers(file_path=file_path)
now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600)
reset = False
value = lifetime - totalizers["monthHolding"]
if not int(now.strftime("%m")) == int(totalizers["month"]):
totalizers["monthHolding"] = lifetime
totalizers["month"] = now.strftime("%m")
for attempt in range(max_retries):
try:
saveTotalizers(totalizers, file_path=file_path)
reset = True
return (value, reset)
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed to save totalizers: {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
logger.error("All attempts to save totalizers failed.")
return (None, False)
return (value,reset)
def totalizeYear(lifetime, file_path="/var/user/files/totalizers.json", max_retries=3, retry_delay=2):
"""
Update and save yearly totalizers based on the lifetime value.
:param lifetime: The current lifetime total.
:param max_retries: Maximum number of save attempts.
:param retry_delay: Delay in seconds between retries.
:return: A tuple containing the calculated value and a boolean indicating if a reset occurred, or (None, False) if save fails.
"""
totalizers = getTotalizers(file_path=file_path)
now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600)
reset = False
value = lifetime - totalizers["yearHolding"]
if not int(now.strftime("%Y")) == int(totalizers["year"]):
totalizers["yearHolding"] = lifetime
totalizers["year"] = now.strftime("%Y")
for attempt in range(max_retries):
try:
saveTotalizers(totalizers, file_path=file_path)
reset = True
return (value, reset)
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed to save totalizers: {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
logger.error("All attempts to save totalizers failed.")
return (None, False)
return (value, reset)