added new device types

This commit is contained in:
Nico Melone
2024-10-04 18:56:11 -05:00
parent 79b2f149df
commit 7c3f3519a4
24 changed files with 80728 additions and 1424 deletions

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,292 @@
# Enter your python code.
import json, os, time
from datetime import datetime as dt
from common.Logger import logger
from quickfaas.remotebus import publish
from quickfaas.global_dict import get as get_params
from quickfaas.global_dict import _set_global_args
def chunk_payload(payload, chunk_size=20, is_attributes_payload=False):
if is_attributes_payload:
# For attributes payload, chunk the controllers
controllers = list(payload.items())
for i in range(0, len(controllers), chunk_size):
yield dict(controllers[i:i + chunk_size])
else:
# For data payload, chunk the values within each controller
for controller, data in payload.items():
for entry in data:
ts = entry['ts']
values = entry['values']
chunked_values = list(values.items())
for i in range(0, len(chunked_values), chunk_size):
yield {
controller:[
{
"ts": ts,
"values": dict(chunked_values[i:i + chunk_size])}
]
}
def sendData(message):
#logger.info(message)
# Extract measures and group by ctrlName
grouped_data = {}
grouped_attributes = {}
now = (round(dt.timestamp(dt.now())/600)*600)*1000
resetPayload = {"ts": "", "values": {}}
dayReset, weekReset, monthReset, yearReset = False, False, False, False
for measure in message['measures']:
ctrlName = " ".join(measure['ctrlName'].split("_"))
name = measure['name']
value = measure['value']
health = measure['health']
#Add controller for telemetry if it doesn't exist
if ctrlName not in grouped_data:
grouped_data[ctrlName] = {}
#Add controller for attributes if it doesn't exist
if ctrlName not in grouped_attributes:
grouped_attributes[ctrlName] = {}
#Add data to temp payload if datapoint health is good
if health:
if "_spt" in name:
grouped_attributes[ctrlName][name] = value
elif "totalizer_1" in name:
grouped_data[ctrlName]["day_volume"], dayReset = totalizeDay(value)
grouped_data[ctrlName]["week_volume"], weekReset = totalizeWeek(value)
grouped_data[ctrlName]["month_volume"], monthReset = totalizeMonth(value)
grouped_data[ctrlName]["year_volume"], yearReset = totalizeYear(value)
elif name == "alarm_enable_cmd":
gatewayData = {"ts": now, "values": {name: value, "connected": True}}
grouped_data[ctrlName][name] = value
grouped_attributes[ctrlName]["latestReportTime"] = now
#logger.info(grouped_data)
# Transform the grouped data to desired structure
payload = {}
for key, value in grouped_data.items():
if value:
payload[key] = [{"ts": now ,"values": value}]
attributes_payload = {}
for key, value in grouped_attributes.items():
if value:
attributes_payload[key] = value
if dayReset:
logger.info("MADE IT TO DAY RESET")
resetPayload["values"]["yesterday_volume"] = payload["values"]["day_volume"]
resetPayload["values"]["day_volume"] = 0
if weekReset:
resetPayload["values"]["last_week_volume"] = payload["values"]["week_volume"]
resetPayload["values"]["week_volume"] = 0
if monthReset:
resetPayload["values"]["last_month_volume"] = payload["values"]["month_volume"]
resetPayload["values"]["month_volume"] = 0
if yearReset:
resetPayload["values"]["last_year_volume"] = payload["values"]["year_volume"]
resetPayload["values"]["year_volume"] = 0
if resetPayload["values"]:
logger.info("MADE IT TO APPEND RESET")
payload["AA Transfer"].append({"ts": now + 1000 ,"values": resetPayload["values"]})
logger.info(json.dumps(payload, indent=4))
#logger.info(json.dumps(payload, indent=4))
#logger.debug(payload)
#publish(__topic__, json.dumps(payload), __qos__,cloud_name="default")
publish("v1/gateway/attributes", json.dumps(attributes_payload), __qos__,cloud_name="default")
publish("v1/devices/me/telemetry", json.dumps(gatewayData), __qos__, cloud_name="default")
for chunk in chunk_payload(payload=payload):
publish(__topic__, json.dumps(chunk), __qos__, cloud_name="default")
#logger.info(json.dumps(chunk, indent=4))
time.sleep(2)
"""
for chunk in chunk_payload(payload=attributes_payload, is_attributes_payload=True):
publish("v1/gateway/attributes", json.dumps(chunk), __qos__, cloud_name="default")
logger.info(json.dumps(chunk, indent=4))
time.sleep(2)
"""
def initialize_totalizers():
return {
"day": 0,
"week": 0,
"month": 0,
"year": 0,
"lifetime": 0,
"dayHolding": 0,
"weekHolding": 0,
"monthHolding": 0,
"yearHolding": 0
}
def getTotalizers(file_path="/var/user/files/totalizers.json"):
"""
Retrieves totalizer data from a JSON file.
:param file_path: Path to the JSON file storing totalizer data.
:return: Dictionary containing totalizer values.
"""
try:
with open(file_path, "r") as t:
totalizers = json.load(t)
if not totalizers or not isinstance(totalizers, dict):
logger.info("Invalid data format in the file. Initializing totalizers.")
totalizers = initialize_totalizers()
except FileNotFoundError:
logger.info("File not found. Initializing totalizers.")
totalizers = initialize_totalizers()
except json.JSONDecodeError:
timestamp = dt.now().strftime("%Y%m%d_%H%M%S")
# Split the file path and insert the timestamp before the extension
file_name, file_extension = os.path.splitext(file_path)
backup_file_path = f"{file_name}_{timestamp}{file_extension}"
shutil.copyfile(file_path, backup_file_path)
logger.error(f"Error decoding JSON. A backup of the file is created at {backup_file_path}. Initializing totalizers.")
totalizers = initialize_totalizers()
return totalizers
def saveTotalizers(totalizers, file_path="/var/user/files/totalizers.json"):
"""
Saves totalizer data to a JSON file.
:param totalizers: Dictionary containing totalizer values to be saved.
:param file_path: Path to the JSON file where totalizer data will be saved.
"""
try:
with open(file_path, "w") as t:
json.dump(totalizers, t)
except (IOError, OSError, json.JSONEncodeError) as e:
logger.error(f"Error saving totalizers to {file_path}: {e}")
raise # Optionally re-raise the exception if it should be handled by the caller
def totalizeDay(lifetime, max_retries=3, retry_delay=2):
"""
Update and save daily totalizers based on the lifetime value.
:param lifetime: The current lifetime total.
:param max_retries: Maximum number of save attempts.
:param retry_delay: Delay in seconds between retries.
:return: A tuple containing the calculated value and a boolean indicating if a reset occurred, or (None, False) if save fails.
"""
totalizers = getTotalizers()
now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600)
reset = False
value = lifetime - totalizers["dayHolding"]
if not int(now.strftime("%d")) == int(totalizers["day"]):
totalizers["dayHolding"] = lifetime
totalizers["day"] = int(now.strftime("%d"))
for attempt in range(max_retries):
try:
saveTotalizers(totalizers)
reset = True
return (value, reset)
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed to save totalizers: {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
logger.error("All attempts to save totalizers failed.")
return (None, False)
return (value, reset)
def totalizeWeek(lifetime, max_retries=3, retry_delay=2):
"""
Update and save weekly totalizers based on the lifetime value.
:param lifetime: The current lifetime total.
:param max_retries: Maximum number of save attempts.
:param retry_delay: Delay in seconds between retries.
:return: A tuple containing the calculated value and a boolean indicating if a reset occurred, or (None, False) if save fails.
"""
totalizers = getTotalizers()
now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600)
reset = False
value = lifetime - totalizers["weekHolding"]
if (not now.strftime("%U") == totalizers["week"] and now.strftime("%a") == "Sun") or totalizers["week"] == 0:
totalizers["weekHolding"] = lifetime
totalizers["week"] = now.strftime("%U")
for attempt in range(max_retries):
try:
saveTotalizers(totalizers)
reset = True
return (value, reset)
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed to save totalizers: {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
logger.error("All attempts to save totalizers failed.")
return (None, False)
return (value, reset)
def totalizeMonth(lifetime, max_retries=3, retry_delay=2):
"""
Update and save monthly totalizers based on the lifetime value.
:param lifetime: The current lifetime total.
:param max_retries: Maximum number of save attempts.
:param retry_delay: Delay in seconds between retries.
:return: A tuple containing the calculated value and a boolean indicating if a reset occurred, or (None, False) if save fails.
"""
totalizers = getTotalizers()
now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600)
reset = False
value = lifetime - totalizers["monthHolding"]
if not int(now.strftime("%m")) == int(totalizers["month"]):
totalizers["monthHolding"] = lifetime
totalizers["month"] = now.strftime("%m")
for attempt in range(max_retries):
try:
saveTotalizers(totalizers)
reset = True
return (value, reset)
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed to save totalizers: {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
logger.error("All attempts to save totalizers failed.")
return (None, False)
return (value,reset)
def totalizeYear(lifetime, max_retries=3, retry_delay=2):
"""
Update and save yearly totalizers based on the lifetime value.
:param lifetime: The current lifetime total.
:param max_retries: Maximum number of save attempts.
:param retry_delay: Delay in seconds between retries.
:return: A tuple containing the calculated value and a boolean indicating if a reset occurred, or (None, False) if save fails.
"""
totalizers = getTotalizers()
now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600)
reset = False
value = lifetime - totalizers["yearHolding"]
if not int(now.strftime("%Y")) == int(totalizers["year"]):
totalizers["yearHolding"] = lifetime
totalizers["year"] = now.strftime("%Y")
for attempt in range(max_retries):
try:
saveTotalizers(totalizers)
reset = True
return (value, reset)
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed to save totalizers: {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
logger.error("All attempts to save totalizers failed.")
return (None, False)
return (value, reset)

View File

@@ -0,0 +1,97 @@
import json, time
from datetime import datetime as dt
from quickfaas.measure import recall, write
from quickfaas.remotebus import publish
from common.Logger import logger
def chunk_payload(payload, chunk_size=20, is_attributes_payload=False):
if is_attributes_payload:
# For attributes payload, chunk the controllers
controllers = list(payload.items())
for i in range(0, len(controllers), chunk_size):
yield dict(controllers[i:i + chunk_size])
else:
# For data payload, chunk the values within each controller
for controller, data in payload.items():
for entry in data:
ts = entry['ts']
values = entry['values']
chunked_values = list(values.items())
for i in range(0, len(chunked_values), chunk_size):
yield {
controller:[
{
"ts": ts,
"values": dict(chunked_values[i:i + chunk_size])}
]
}
def sync():
#get new values and send
now = round(dt.timestamp(dt.now()))*1000
topic = "v1/gateway/telemetry"
try:
data = recall()#json.loads(recall().decode("utf-8"))
except Exception as e:
logger.error(e)
logger.debug(data)
logger.info("SYNCING")
grouped_data = {}
try:
for controller in data:
ctrlName = " ".join(controller['name'].split("_"))
for measure in controller["measures"]:
if ctrlName not in grouped_data:
grouped_data[ctrlName] = {}
if measure["health"]:
if measure["name"] == "alarm_enable_cmd":
gatewayPayload = {"ts": now, "values": {measure["name"]: measure["value"]}}
grouped_data[ctrlName][measure["name"]] = measure["value"]
except Exception as e:
logger.error(e)
try:
payload = {}
for key, value in grouped_data.items():
if value:
payload[key] = [{"ts": now ,"values": value}]
publish("v1/devices/me/telemetry", json.dumps(gatewayPayload), qos=1,cloud_name="default")
for chunk in chunk_payload(payload=payload):
publish(topic, json.dumps(chunk), qos=1, cloud_name="default")
logger.info(json.dumps(chunk, indent=4))
time.sleep(2)
except Exception as e:
logger.error(e)
def writeplctag(value):
#value in the form {"measurement": <measurement_name>, "value": <value to write>}
try:
#value = json.loads(value.replace("'",'"'))
logger.debug(value)
#payload format: [{"name": "advvfdipp", "measures": [{"name": "manualfrequencysetpoint", "value": 49}]}]
message = [{"name": "AA_Transfer", "measures":[{"name":value["measurement"], "value": value["value"]}]}]
resp = write(message)
logger.debug("RETURN FROM WRITE: {}".format(resp))
return True
except Exception as e:
logger.debug(e)
return False
def receiveAttributes(topic, payload):
try:
logger.debug(topic)
logger.debug(json.loads(payload))
p = json.loads(payload)
for key, value in p.items():
try:
result = writeplctag({"measurement":key, "value":value})
logger.debug(result)
except Exception as e:
logger.error(e)
#logger.debug(command)
sync()
except Exception as e:
logger.debug(e)

View File

@@ -0,0 +1,104 @@
import json, time
from datetime import datetime as dt
from quickfaas.measure import recall, write
from quickfaas.remotebus import publish
from common.Logger import logger
def chunk_payload(payload, chunk_size=20, is_attributes_payload=False):
if is_attributes_payload:
# For attributes payload, chunk the controllers
controllers = list(payload.items())
for i in range(0, len(controllers), chunk_size):
yield dict(controllers[i:i + chunk_size])
else:
# For data payload, chunk the values within each controller
for controller, data in payload.items():
for entry in data:
ts = entry['ts']
values = entry['values']
chunked_values = list(values.items())
for i in range(0, len(chunked_values), chunk_size):
yield {
controller:[
{
"ts": ts,
"values": dict(chunked_values[i:i + chunk_size])}
]
}
def sync():
#get new values and send
now = round(dt.timestamp(dt.now()))*1000
topic = "v1/gateway/telemetry"
try:
data = recall()#json.loads(recall().decode("utf-8"))
except Exception as e:
logger.error(e)
logger.debug(data)
logger.info("SYNCING")
grouped_data = {}
try:
for controller in data:
ctrlName = " ".join(controller['name'].split("_"))
for measure in controller["measures"]:
if ctrlName not in grouped_data:
grouped_data[ctrlName] = {}
if measure["health"]:
if measure["name"] == "alarm_enable_cmd":
gatewayPayload = {"ts": now, "values": {measure["name"]: measure["value"]}}
grouped_data[ctrlName][measure["name"]] = measure["value"]
except Exception as e:
logger.error(e)
try:
payload = {}
for key, value in grouped_data.items():
if value:
payload[key] = [{"ts": now ,"values": value}]
publish("v1/devices/me/telemetry", json.dumps(gatewayPayload), qos=1,cloud_name="default")
for chunk in chunk_payload(payload=payload):
publish(topic, json.dumps(chunk), qos=1, cloud_name="default")
logger.info(json.dumps(chunk, indent=4))
time.sleep(2)
except Exception as e:
logger.error(e)
def writeplctag(value):
#value in the form {"measurement": <measurement_name>, "value": <value to write>}
try:
logger.debug(value)
#payload format: [{"name": "advvfdipp", "measures": [{"name": "manualfrequencysetpoint", "value": 49}]}]
message = [{"name": value["entityName"], "measures":[{"name":value["measurement"], "value": value["value"]}]}]
resp = write(message)
logger.debug("RETURN FROM WRITE: {}".format(resp))
return True
except Exception as e:
logger.error(e)
return False
def receiveCommand(topic, payload):
try:
logger.debug(topic)
logger.debug(json.loads(payload))
p = json.loads(payload)
command = p["method"]
commands = {
"sync": sync,
"writeplctag": writeplctag,
}
if command == "setPLCTag":
try:
result = commands["writeplctag"](p["params"])
logger.debug(result)
except Exception as e:
logger.error(e)
ack(topic.split("/")[-1])
time.sleep(5)
sync()
except Exception as e:
logger.error(e)
def ack(msgid):
publish("v1/devices/me/rpc/response/" + str(msgid), json.dumps({"msg": {"time": time.time()}, "metadata": "", "msgType": ""}), 1, cloud_name="default")