import json, os, time, shutil, re from datetime import datetime as dt from quickfaas.measure import recall, write from quickfaas.remotebus import publish from common.Logger import logger # Helper function to split the payload into chunks def chunk_payload(payload, chunk_size=20): chunked_values = list(payload["values"].items()) for i in range(0, len(chunked_values), chunk_size): yield { "ts": payload["ts"], "values": dict(chunked_values[i:i+chunk_size]) } def sync(): #get new values and send payload = {"ts": round(dt.timestamp(dt.now()))*1000, "values": {}} topic = "v1/devices/me/telemetry" qos = 1 try: data = recall()#json.loads(recall().decode("utf-8")) except Exception as e: logger.error(e) logger.debug(data) resetPayload = {"ts": "", "values": {}} regexPattern = r"fm_\d{2}_t\d" pondLevels = {} topic = "v1/devices/me/telemetry" for controller in data: for measure in controller["measures"]: try: if measure["health"] == 1: if re.search(regexPattern, measure["name"]): dayReset, weekReset, monthReset, yearReset = False, False, False, False file_name = f"/var/user/files/totalizer_{measure['name']}.json" payload["values"][measure["name"] + "_day_volume"], dayReset = totalizeDay(measure["value"], file_path=file_name) payload["values"][measure["name"] + "_week_volume"], weekReset = totalizeWeek(measure["value"], file_path=file_name) payload["values"][measure["name"] + "_month_volume"], monthReset = totalizeMonth(measure["value"], file_path=file_name) payload["values"][measure["name"] + "_year_volume"], yearReset = totalizeYear(measure["value"], file_path=file_name) if dayReset: resetPayload["values"][measure["name"] + "_yesterday_volume"] = payload["values"][measure["name"] + "_day_volume"] resetPayload["values"][measure["name"] + "_day_volume"] = 0 if weekReset: resetPayload["values"][measure["name"] + "_last_week_volume"] = payload["values"][measure["name"] + "_week_volume"] resetPayload["values"][measure["name"] + "_week_volume"] = 0 if monthReset: resetPayload["values"][measure["name"] + "_last_month_volume"] = payload["values"][measure["name"] + "_month_volume"] resetPayload["values"][measure["name"] + "_month_volume"] = 0 if yearReset: resetPayload["values"][measure["name"] + "_last_year_volume"] = payload["values"][measure["name"] + "_year_volume"] resetPayload["values"][measure["name"] + "_year_volume"] = 0 if "pond_level" in measure["name"]: pondLevels[measure["name"]] = measure["value"] payload["values"][measure["name"]] = measure["value"] except Exception as e: logger.error(e) # GPT4: Loop through the pondLevels dictionary to calculate deviations with error checking for i in range(1, 4): # Assuming pod IDs range from 01 to 03 pod_id = f"pod_{i:02d}" # Format pod ID level_keys = [key for key in pondLevels if key.startswith(pod_id)] levels = [pondLevels[key] for key in level_keys] # Check if both pond levels are present if len(levels) == 2: # Calculate average and percent deviation when levels are present avg_level = sum(levels) / len(levels) if sum(levels) > 0 else 1 deviation_percent = ((max(levels) - min(levels)) / avg_level) * 100 if avg_level != 0 else 0 payload["values"][f"{pod_id}_pond_level_deviation"] = deviation_percent payload["values"][f"{pod_id}_pond_level_alm"] = 0 # Reset alarm to 0 else: # Set alarm value if one or both levels are missing payload["values"][f"{pod_id}_pond_level_alm"] = 1 for chunk in chunk_payload(payload=payload): publish(topic, json.dumps(chunk), qos) time.sleep(2) publish("v1/devices/me/attributes", json.dumps({"latestReportTime": (round(dt.timestamp(dt.now())/600)*600)*1000}), qos) if resetPayload["values"]: resetPayload["ts"] = 1 + (round(dt.timestamp(dt.now())/600)*600)*1000 for chunk in chunk_payload(payload=resetPayload): publish(topic, json.dumps(chunk), qos) time.sleep(2) def writeplctag(value): #value in the form {"measurement": , "value": } try: #value = json.loads(value.replace("'",'"')) logger.debug(value) #payload format: [{"name": "advvfdipp", "measures": [{"name": "manualfrequencysetpoint", "value": 49}]}] message = [{"name": "sp_pond", "measures":[{"name":value["measurement"], "value": value["value"]}]}] resp = write(message) logger.debug("RETURN FROM WRITE: {}".format(resp)) if resp[0]["measures"][0]["error_code"] == 0: return True else: return False except Exception as e: logger.debug(e) return False def receiveCommand(topic, payload): try: logger.debug(topic) logger.debug(json.loads(payload)) p = json.loads(payload) command = p["method"] commands = { "sync": sync, "writeplctag": writeplctag, } if command == "setPLCTag": try: result = commands["writeplctag"](p["params"]) logger.debug(result) if result: ack(topic.split("/")[-1]) if "cmd" in p["params"]["measurement"]: time.sleep(0.5) result = commands["writeplctag"]({"measurement": p["params"]["measurement"], "value": 0}) logger.debug(result) if result and "start" in p["params"]["measurement"]: for _ in range(10): sync() time.sleep(30) except Exception as e: logger.error(e) elif command == "changeSetpoint": try: logger.debug("attempting controlpoint write") params_type = {"measurement": "pidcontrolmode", "value": p["params"]["setpointType"]} if params_type["value"]: commands["writeplctag"](params_type) time.sleep(2) except Exception as e: logger.error("DID NOT WRITE CONTROL MODE") logger.error(e) try: logger.debug("attempting setpoint write") modes = {0: "flowsetpoint", 1: "fluidlevelsetpoint", 2: "tubingpressuresetpoint", 3: "manualfrequencysetpoint"} params_value = {"value": p["params"]["setpointValue"]} if params_value["value"]: params_value["measurement"] = modes[getMode()] result = commands["writeplctag"](params_value) logger.debug(result) except Exception as e: logger.error("DID NOT WRITE SETPOINT") logger.error(e) #logger.debug(command) time.sleep(5) sync() except Exception as e: logger.debug(e) def ack(msgid, metadata="", msgType="ACK"): #logger.debug(msgid) #logger.debug(mac) #logger.debug(name) #logger.debug(value) publish("v1/devices/me/rpc/response/" + str(msgid), json.dumps({"msg": {"time": time.time()}, "metadata": metadata, "msgType": msgType}), 1) def getMode(): try: data = recall() for controller in data: for measure in controller["measures"]: if measure["name"] == "pidcontrolmode": return measure["value"] except: return None def initialize_totalizers(): return { "day": 0, "week": 0, "month": 0, "year": 0, "lifetime": 0, "dayHolding": 0, "weekHolding": 0, "monthHolding": 0, "yearHolding": 0 } def getTotalizers(file_path="/var/user/files/totalizers.json"): """ Retrieves totalizer data from a JSON file. :param file_path: Path to the JSON file storing totalizer data. :return: Dictionary containing totalizer values. """ try: with open(file_path, "r") as t: totalizers = json.load(t) if not totalizers or not isinstance(totalizers, dict): logger.info("Invalid data format in the file. Initializing totalizers.") totalizers = initialize_totalizers() except FileNotFoundError: logger.info("File not found. Initializing totalizers.") totalizers = initialize_totalizers() except json.JSONDecodeError: timestamp = dt.now().strftime("%Y%m%d_%H%M%S") # Split the file path and insert the timestamp before the extension file_name, file_extension = os.path.splitext(file_path) backup_file_path = f"{file_name}_{timestamp}{file_extension}" shutil.copyfile(file_path, backup_file_path) logger.error(f"Error decoding JSON. A backup of the file is created at {backup_file_path}. Initializing totalizers.") totalizers = initialize_totalizers() return totalizers def saveTotalizers(totalizers, file_path="/var/user/files/totalizers.json"): """ Saves totalizer data to a JSON file. :param totalizers: Dictionary containing totalizer values to be saved. :param file_path: Path to the JSON file where totalizer data will be saved. """ try: with open(file_path, "w") as t: json.dump(totalizers, t) except (IOError, OSError, json.JSONEncodeError) as e: logger.error(f"Error saving totalizers to {file_path}: {e}") raise # Optionally re-raise the exception if it should be handled by the caller def totalizeDay(lifetime, file_path="/var/user/files/totalizers.json", max_retries=3, retry_delay=2): """ Update and save daily totalizers based on the lifetime value. :param lifetime: The current lifetime total. :param max_retries: Maximum number of save attempts. :param retry_delay: Delay in seconds between retries. :return: A tuple containing the calculated value and a boolean indicating if a reset occurred, or (None, False) if save fails. """ totalizers = getTotalizers(file_path=file_path) now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600) reset = False value = lifetime - totalizers["dayHolding"] if not int(now.strftime("%d")) == int(totalizers["day"]): totalizers["dayHolding"] = lifetime totalizers["day"] = int(now.strftime("%d")) for attempt in range(max_retries): try: saveTotalizers(totalizers, file_path=file_path) reset = True return (value, reset) except Exception as e: logger.error(f"Attempt {attempt + 1} failed to save totalizers: {e}") if attempt < max_retries - 1: time.sleep(retry_delay) else: logger.error("All attempts to save totalizers failed.") return (None, False) return (value, reset) def totalizeWeek(lifetime, file_path="/var/user/files/totalizers.json", max_retries=3, retry_delay=2): """ Update and save weekly totalizers based on the lifetime value. :param lifetime: The current lifetime total. :param max_retries: Maximum number of save attempts. :param retry_delay: Delay in seconds between retries. :return: A tuple containing the calculated value and a boolean indicating if a reset occurred, or (None, False) if save fails. """ totalizers = getTotalizers(file_path=file_path) now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600) reset = False value = lifetime - totalizers["weekHolding"] if (not now.strftime("%U") == totalizers["week"] and now.strftime("%a") == "Sun") or totalizers["week"] == 0: totalizers["weekHolding"] = lifetime totalizers["week"] = now.strftime("%U") for attempt in range(max_retries): try: saveTotalizers(totalizers, file_path=file_path) reset = True return (value, reset) except Exception as e: logger.error(f"Attempt {attempt + 1} failed to save totalizers: {e}") if attempt < max_retries - 1: time.sleep(retry_delay) else: logger.error("All attempts to save totalizers failed.") return (None, False) return (value, reset) def totalizeMonth(lifetime, file_path="/var/user/files/totalizers.json", max_retries=3, retry_delay=2): """ Update and save monthly totalizers based on the lifetime value. :param lifetime: The current lifetime total. :param max_retries: Maximum number of save attempts. :param retry_delay: Delay in seconds between retries. :return: A tuple containing the calculated value and a boolean indicating if a reset occurred, or (None, False) if save fails. """ totalizers = getTotalizers(file_path=file_path) now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600) reset = False value = lifetime - totalizers["monthHolding"] if not int(now.strftime("%m")) == int(totalizers["month"]): totalizers["monthHolding"] = lifetime totalizers["month"] = now.strftime("%m") for attempt in range(max_retries): try: saveTotalizers(totalizers, file_path=file_path) reset = True return (value, reset) except Exception as e: logger.error(f"Attempt {attempt + 1} failed to save totalizers: {e}") if attempt < max_retries - 1: time.sleep(retry_delay) else: logger.error("All attempts to save totalizers failed.") return (None, False) return (value,reset) def totalizeYear(lifetime, file_path="/var/user/files/totalizers.json", max_retries=3, retry_delay=2): """ Update and save yearly totalizers based on the lifetime value. :param lifetime: The current lifetime total. :param max_retries: Maximum number of save attempts. :param retry_delay: Delay in seconds between retries. :return: A tuple containing the calculated value and a boolean indicating if a reset occurred, or (None, False) if save fails. """ totalizers = getTotalizers(file_path=file_path) now = dt.fromtimestamp(round(dt.timestamp(dt.now())/600)*600) reset = False value = lifetime - totalizers["yearHolding"] if not int(now.strftime("%Y")) == int(totalizers["year"]): totalizers["yearHolding"] = lifetime totalizers["year"] = now.strftime("%Y") for attempt in range(max_retries): try: saveTotalizers(totalizers, file_path=file_path) reset = True return (value, reset) except Exception as e: logger.error(f"Attempt {attempt + 1} failed to save totalizers: {e}") if attempt < max_retries - 1: time.sleep(retry_delay) else: logger.error("All attempts to save totalizers failed.") return (None, False) return (value, reset)