From 69ddd7cd6021f936d78603f941a317579472925e Mon Sep 17 00:00:00 2001 From: Nico Melone Date: Wed, 2 Aug 2023 14:30:57 -0500 Subject: [PATCH] added flowmetercc --- .DS_Store | Bin 14340 -> 14340 bytes meshifyDrivers/.DS_Store | Bin 8196 -> 8196 bytes meshifyDrivers/flowmeterskid/config.txt | 12 +- meshifyDrivers/flowmeterskid/flowmeterskid.py | 20 +- meshifyDrivers/flowmeterskidcc/Channel.py | 362 ++++++++++++++++++ meshifyDrivers/flowmeterskidcc/Tags.py | 6 + meshifyDrivers/flowmeterskidcc/config.txt | 14 + meshifyDrivers/flowmeterskidcc/file_logger.py | 18 + .../flowmeterskidcc/flowmeterskid.py | 231 +++++++++++ meshifyDrivers/flowmeterskidcc/persistence.py | 21 + meshifyDrivers/flowmeterskidcc/utilities.py | 62 +++ meshifyDrivers/mainHP/config.txt | 2 +- meshifyDrivers/mainHP/main.py | 6 +- meshifyDrivers/tankalarms/tankalarms.py | 1 - 14 files changed, 734 insertions(+), 21 deletions(-) create mode 100644 meshifyDrivers/flowmeterskidcc/Channel.py create mode 100644 meshifyDrivers/flowmeterskidcc/Tags.py create mode 100644 meshifyDrivers/flowmeterskidcc/config.txt create mode 100644 meshifyDrivers/flowmeterskidcc/file_logger.py create mode 100644 meshifyDrivers/flowmeterskidcc/flowmeterskid.py create mode 100644 meshifyDrivers/flowmeterskidcc/persistence.py create mode 100644 meshifyDrivers/flowmeterskidcc/utilities.py diff --git a/.DS_Store b/.DS_Store index bff430ca4795a705c6914ee9a721434a00026959..b5688da2945cfa96088158a3751b82286f91e113 100644 GIT binary patch delta 226 zcmZoEXepTBFKWZUz`)GFAi%(o&QQdV$WX$N$&f$sqVi+|6PAhHT$4++1vyiSlXH^t z^K%#`-&Yphc+re~;sf5z>>Ml{jEa*Bw8b_z=tMJ5RuL0MQm7~xqpRo1r-iqFo;&CBndd_h2-wGXHf rY?>g*w3OoHoTU8x97dQZ3rKYHSHTM`o7pA4u}q#LB(wRt@M|Uj&{!)U delta 46 zcmV+}0MY-1K!iY$PXQOPP`eKS7nAT19 self.guarantee_sec: + send_needed = True + send_reason = "guarantee sec" + elif force_send: + send_needed = True + send_reason = "forced" + else: + if self.last_send_time == 0: + send_needed = True + send_reason = "no send time" + elif self.value is None: + send_needed = True + send_reason = "no value" + elif abs(self.value - new_value) > self.chg_threshold: + send_needed = True + send_reason = "change threshold" + elif (time.time() - self.last_send_time) > self.guarantee_sec: + send_needed = True + send_reason = "guarantee sec" + elif force_send: + send_needed = True + send_reason = "forced" + if send_needed: + self.last_value = self.value + if self.map_: + try: + self.value = self.map_[new_value] + except KeyError: + logger.error("Cannot find a map value for {} in {} for {}".format(new_value, self.map_, self.mesh_name)) + self.value = new_value + else: + self.value = new_value + self.last_send_time = time.time() + logger.info("Sending {} for {} - {}".format(self.value, self.mesh_name, send_reason)) + return send_needed + + def read(self): + """Read the value.""" + pass + +def identity(sent): + """Return exactly what was sent to it.""" + return sent + + +class ModbusChannel(Channel): + """Modbus channel object.""" + + def __init__(self, mesh_name, register_number, data_type, chg_threshold, guarantee_sec, channel_size=1, map_=False, write_enabled=False, transform_fn=identity, unit_number=1, scaling=0): + """Initialize the channel.""" + super(ModbusChannel, self).__init__(mesh_name, data_type, chg_threshold, guarantee_sec, map_, write_enabled) + self.mesh_name = mesh_name + self.register_number = register_number + self.channel_size = channel_size + self.data_type = data_type + self.last_value = None + self.value = None + self.last_send_time = 0 + self.chg_threshold = chg_threshold + self.guarantee_sec = guarantee_sec + self.map_ = map_ + self.write_enabled = write_enabled + self.transform_fn = transform_fn + self.unit_number = unit_number + self.scaling= scaling + + def read(self): + """Return the transformed read value.""" + with lock: + print("ATTEMPTING TO READ ON {}".format(self.mesh_name)) + if self.data_type == "FLOAT": + try: + read_value = instrument.read_float(self.register_number,4,self.channel_size) + except Exception as e: + logger.info("Error in read value: {}\nTrying one more time".format(e)) + try: + read_value = instrument.read_float(self.register_number,4,self.channel_size) + except: + return None + elif self.data_type == "FLOATBS": + try: + read_value = byteSwap32(instrument.read_registers(self.register_number,2, 4)) + except Exception as e: + logger.info("Error in read value: {}\nTrying one more time".format(e)) + try: + read_value = byteSwap32(instrument.read_registers(self.register_number,2,4)) + except: + return None + elif self.data_type == "INTEGER" or self.data_type == "STRING": + try: + read_value = instrument.read_register(self.register_number, self.scaling, 4) + except Exception as e: + logger.info("Error in read value: {}\nTrying one more time".format(e)) + try: + read_value = instrument.read_register(self.register_number,self.scaling,4) + except: + return None + read_value = self.transform_fn(read_value) + return read_value + + + def write(self, value): + """Write a value to a register""" + if self.data_type == "FLOAT": + value = float(value) + elif self.data_type == "INTEGER": + value = int(value) + else: + value = str(value) + try: + instrument.write_register(self.register_number,value, self.scaling, 16 if self.channel_size > 1 else 6 ) + return True + except Exception as e: + logger.info("Failed to write value: {}".format(e)) + return False + +class PLCChannel(Channel): + """PLC Channel Object.""" + + def __init__(self, ip, mesh_name, plc_tag, data_type, chg_threshold, guarantee_sec, map_=False, write_enabled=False, plc_type='CLX'): + """Initialize the channel.""" + super(PLCChannel, self).__init__(mesh_name, data_type, chg_threshold, guarantee_sec, map_, write_enabled) + self.plc_ip = ip + self.mesh_name = mesh_name + self.plc_tag = plc_tag + self.data_type = data_type + self.last_value = None + self.value = None + self.last_send_time = 0 + self.chg_threshold = chg_threshold + self.guarantee_sec = guarantee_sec + self.map_ = map_ + self.write_enabled = write_enabled + self.plc_type = plc_type + + def read(self): + """Read the value.""" + plc_value = None + if self.plc_tag and self.plc_ip: + read_value = read_tag(self.plc_ip, self.plc_tag, plc_type=self.plc_type) + if read_value: + plc_value = read_value[0] + + return plc_value + + +class BoolArrayChannels(Channel): + """Hold the configuration for a set of boolean array channels.""" + + def __init__(self, ip, mesh_name, plc_tag, data_type, chg_threshold, guarantee_sec, map_=False, write_enabled=False): + """Initialize the channel.""" + super(BoolArrayChannels, self).__init__(mesh_name, data_type, chg_threshold, guarantee_sec, map_, write_enabled) + self.plc_ip = ip + self.mesh_name = mesh_name + self.plc_tag = plc_tag + self.data_type = data_type + self.last_value = None + self.value = None + self.last_send_time = 0 + self.chg_threshold = chg_threshold + self.guarantee_sec = guarantee_sec + self.map_ = map_ + self.write_enabled = write_enabled + + def compare_values(self, new_val_dict): + """Compare new values to old values to see if the values need storing.""" + send = False + for idx in new_val_dict: + try: + if new_val_dict[idx] != self.last_value[idx]: + send = True + except KeyError: + logger.error("Key Error in self.compare_values for index {}".format(idx)) + send = True + return send + + def read(self, force_send=False): + """Read the value and check to see if needs to be stored.""" + send_needed = False + send_reason = "" + if self.plc_tag: + val = read_tag(self.plc_ip, self.plc_tag) + if val: + bool_arr = binarray(val[0]) + new_val = {} + for idx in self.map_: + try: + new_val[self.map_[idx]] = bool_arr[idx] + except KeyError: + logger.error("Not able to get value for index {}".format(idx)) + + if self.last_send_time == 0: + send_needed = True + send_reason = "no send time" + elif self.value is None: + send_needed = True + send_reason = "no value" + elif self.compare_values(new_val): + send_needed = True + send_reason = "value change" + elif (time.time() - self.last_send_time) > self.guarantee_sec: + send_needed = True + send_reason = "guarantee sec" + elif force_send: + send_needed = True + send_reason = "forced" + + if send_needed: + self.value = new_val + self.last_value = self.value + self.last_send_time = time.time() + logger.info("Sending {} for {} - {}".format(self.value, self.mesh_name, send_reason)) + return send_needed diff --git a/meshifyDrivers/flowmeterskidcc/Tags.py b/meshifyDrivers/flowmeterskidcc/Tags.py new file mode 100644 index 0000000..be8b1f2 --- /dev/null +++ b/meshifyDrivers/flowmeterskidcc/Tags.py @@ -0,0 +1,6 @@ +from Channel import PLCChannel, ModbusChannel + +tags = [ + ModbusChannel('flowrate', 3873, 'FLOATBS', 10, 3600,channel_size=2, unit_number=2), + ModbusChannel('totalizer_1', 2609, 'FLOATBS', 100, 3600,channel_size=2, unit_number=2) +] \ No newline at end of file diff --git a/meshifyDrivers/flowmeterskidcc/config.txt b/meshifyDrivers/flowmeterskidcc/config.txt new file mode 100644 index 0000000..fa57f25 --- /dev/null +++ b/meshifyDrivers/flowmeterskidcc/config.txt @@ -0,0 +1,14 @@ +{ + "files": { + "file6": "Tags.py", + "file5": "persistence.py", + "file4": "file_logger.py", + "file3": "Channel.py", + "file2": "utilities.py", + "file1": "flowmeterskid.py" + }, + "deviceName": "flowmeterskid", + "driverId": "0199", + "releaseVersion": "2", + "driverFileName": "flowmeterskid.py" +} diff --git a/meshifyDrivers/flowmeterskidcc/file_logger.py b/meshifyDrivers/flowmeterskidcc/file_logger.py new file mode 100644 index 0000000..fd8c432 --- /dev/null +++ b/meshifyDrivers/flowmeterskidcc/file_logger.py @@ -0,0 +1,18 @@ +"""Logging setup for PiFlow""" +import logging +from logging.handlers import RotatingFileHandler +import sys + +log_formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s(%(lineno)d) %(message)s') +log_file = './PiFlow.log' +my_handler = RotatingFileHandler(log_file, mode='a', maxBytes=500*1024, + backupCount=2, encoding=None, delay=0) +my_handler.setFormatter(log_formatter) +my_handler.setLevel(logging.INFO) +filelogger = logging.getLogger('PiFlow') +filelogger.setLevel(logging.INFO) +filelogger.addHandler(my_handler) + +console_out = logging.StreamHandler(sys.stdout) +console_out.setFormatter(log_formatter) +filelogger.addHandler(console_out) diff --git a/meshifyDrivers/flowmeterskidcc/flowmeterskid.py b/meshifyDrivers/flowmeterskidcc/flowmeterskid.py new file mode 100644 index 0000000..ba5a51e --- /dev/null +++ b/meshifyDrivers/flowmeterskidcc/flowmeterskid.py @@ -0,0 +1,231 @@ +"""Driver for flowmeterskid.""" +import threading +import json +import time +from random import randint +import os +import minimalmodbusM1 +from device_base import deviceBase +from Channel import PLCChannel, ModbusChannel,read_tag, write_tag, TAG_DATAERROR_SLEEPTIME +import persistence +from utilities import get_public_ip_address, get_private_ip_address +from file_logger import filelogger as logger + + +from datetime import datetime as dt + +logger.info("flowmeterskid startup") + +# GLOBAL VARIABLES +WAIT_FOR_CONNECTION_SECONDS = 5 +IP_CHECK_PERIOD = 60 + +_ = None + + + +class start(threading.Thread, deviceBase): + """Start class required by Meshify.""" + + def __init__(self, name=None, number=None, mac=None, Q=None, mcu=None, companyId=None, offset=None, mqtt=None, Nodes=None): + """Initialize the driver.""" + threading.Thread.__init__(self) + deviceBase.__init__(self, name=name, number=number, mac=mac, Q=Q, mcu=mcu, companyId=companyId, offset=offset, mqtt=mqtt, Nodes=Nodes) + self.daemon = True + self.version = "2" + self.finished = threading.Event() + self.force_send = False + self.public_ip_address = "" + self.public_ip_address_last_checked = 0 + self.private_ip_address = "" + self.ping_counter = 0 + threading.Thread.start(self) + + # this is a required function for all drivers, its goal is to upload some piece of data + # about your device so it can be seen on the web + def register(self): + """Register the driver.""" + # self.sendtodb("log", "BOOM! Booted.", 0) + pass + + def run(self): + """Actually run the driver.""" + self.instrument = self.startRS485() + global CHANNELS, INSTRUMENT, lock + INSTRUMENT = self.instrument + lock = self.lock + #from Channel import PLCChannel, ModbusChannel,read_tag, write_tag, TAG_DATAERROR_SLEEPTIME + from Tags import tags + CHANNELS = tags + for i in range(0, WAIT_FOR_CONNECTION_SECONDS): + print("flowmeterskid driver will start in {} seconds".format(WAIT_FOR_CONNECTION_SECONDS - i)) + time.sleep(1) + logger.info("BOOM! Starting flowmeterskid driver...") + + #self._check_ip_address() + + self.nodes["flowmeterskid_0199"] = self + + send_loops = 0 + while True: + if self.force_send: + logger.warning("FORCE SEND: TRUE") + if int(time.time()) % 600 == 0 or self.force_send: + payload = {"ts": round(time.time()/600)*600*1000, "values": {}} + resetPayload = {"ts": "", "values": {}} + dayReset, weekReset, monthReset, yearReset = False, False, False, False + for chan in CHANNELS: + val = chan.read() + try: + if chan.mesh_name in ["totalizer_1"]: + payload["values"]["day_volume"], dayReset = self.totalizeDay(val) + payload["values"]["week_volume"], weekReset = self.totalizeWeek(val) + payload["values"]["month_volume"], monthReset = self.totalizeMonth(val) + payload["values"]["year_volume"], yearReset = self.totalizeYear(val) + payload["values"][chan.mesh_name] = val + except Exception as e: + logger.error(e) + + self.sendToTB(json.dumps(payload)) + + if dayReset: + resetPayload["values"]["yesterday_volume"] = payload["values"]["day_volume"] + resetPayload["values"]["day_volume"] = 0 + if weekReset: + resetPayload["values"]["last_week_volume"] = payload["values"]["week_volume"] + resetPayload["values"]["week_volume"] = 0 + if monthReset: + resetPayload["values"]["last_month_volume"] = payload["values"]["month_volume"] + resetPayload["values"]["month_volume"] = 0 + if yearReset: + resetPayload["values"]["last_year_volume"] = payload["values"]["year_volume"] + resetPayload["values"]["year_volume"] = 0 + + if resetPayload["values"]: + resetPayload["ts"] = 1 + round(time.time()/600)*600*1000 + self.sendToTB(json.dumps(resetPayload)) + + if self.force_send: + self.force_send = False + + def flowmeterskid_sync(self, name, value): + """Sync all data from the driver.""" + self.force_send = True + self.sendtodb("log", "synced", 0) + return True + + def saveTotalizers(self, totalizers): + try: + with open("/root/python_firmware/totalizers.json", "w") as t: + json.dump(totalizers,t) + except Exception as e: + logger.error(e) + + def get_totalizers(self): + saveFile = "/root/python_firmware/totalizers.json" + # Check if the state file exists. + if not os.path.exists(saveFile): + return { + "day": 0, + "week": 0, + "month": 0, + "year": 0, + "lifetime": 0, + "dayHolding": 0, + "weekHolding": 0, + "monthHolding": 0, + "yearHolding": 0 + } + try: + with open("/root/python_firmware/totalizers.json", "r") as t: + totalizers = json.load(t) + if not totalizers: + logger.info("-----INITIALIZING TOTALIZERS-----") + totalizers = { + "day": 0, + "week": 0, + "month": 0, + "year": 0, + "lifetime": 0, + "dayHolding": 0, + "weekHolding": 0, + "monthHolding": 0, + "yearHolding": 0 + } + except: + totalizers = { + "day": 0, + "week": 0, + "month": 0, + "year": 0, + "lifetime": 0, + "dayHolding": 0, + "weekHolding": 0, + "monthHolding": 0, + "yearHolding": 0 + } + return totalizers + + def totalizeDay(self,lifetime): + totalizers = self.get_totalizers() + now = dt.fromtimestamp(round(time.time()/600)*600) + reset = False + value = lifetime - totalizers["dayHolding"] + if not int(now.strftime("%d")) == int(totalizers["day"]): + totalizers["dayHolding"] = lifetime + totalizers["day"] = int(now.strftime("%d")) + self.saveTotalizers(totalizers) + reset = True + return (value,reset) + + def totalizeWeek(self,lifetime): + totalizers = self.get_totalizers() + now = dt.fromtimestamp(round(time.time()/600)*600) + reset = False + value = lifetime - totalizers["weekHolding"] + if (not now.strftime("%U") == totalizers["week"] and now.strftime("%a") == "Sun") or totalizers["week"] == 0: + totalizers["weekHolding"] = lifetime + totalizers["week"] = now.strftime("%U") + self.saveTotalizers(totalizers) + reset = True + return (value, reset) + + def totalizeMonth(self,lifetime): + totalizers = self.get_totalizers() + now = dt.fromtimestamp(round(time.time()/600)*600) + reset = False + value = lifetime - totalizers["monthHolding"] + if not int(now.strftime("%m")) == int(totalizers["month"]): + totalizers["monthHolding"] = lifetime + totalizers["month"] = now.strftime("%m") + self.saveTotalizers(totalizers) + reset = True + return (value,reset) + + def totalizeYear(self,lifetime): + totalizers = self.get_totalizers() + now = dt.fromtimestamp(round(time.time()/600)*600) + reset = False + value = lifetime - totalizers["yearHolding"] + if not int(now.strftime("%Y")) == int(totalizers["year"]): + totalizers["yearHolding"] = lifetime + totalizers["year"] = now.strftime("%Y") + self.saveTotalizers(totalizers) + reset = True + return (value, reset) + + def startRS485(self): + instrument = "" + with self.lock: + #minimalmodbus.BAUDRATE = 9600 + #minimalmodbus.STOPBITS = 1 + connected = False + while connected == False: + logger.info("Attempting to setup RS485") + connected = self.mcu.set485Baud(9600)#switch to configurable + time.sleep(1) + logger.info("RS485 SETUP SUCCESSFUL!!!!!") + serial = self.mcu.rs485 + instrument = minimalmodbusM1.Instrument(1,serial) + instrument.address = 2 #switch to configurable + return instrument \ No newline at end of file diff --git a/meshifyDrivers/flowmeterskidcc/persistence.py b/meshifyDrivers/flowmeterskidcc/persistence.py new file mode 100644 index 0000000..8c8703f --- /dev/null +++ b/meshifyDrivers/flowmeterskidcc/persistence.py @@ -0,0 +1,21 @@ +"""Data persistance functions.""" +# if more advanced persistence is needed, use a sqlite database +import json + + +def load(filename="persist.json"): + """Load persisted settings from the specified file.""" + try: + with open(filename, 'r') as persist_file: + return json.load(persist_file) + except Exception: + return False + + +def store(persist_obj, filename="persist.json"): + """Store the persisting settings into the specified file.""" + try: + with open(filename, 'w') as persist_file: + return json.dump(persist_obj, persist_file, indent=4) + except Exception: + return False diff --git a/meshifyDrivers/flowmeterskidcc/utilities.py b/meshifyDrivers/flowmeterskidcc/utilities.py new file mode 100644 index 0000000..fd69e47 --- /dev/null +++ b/meshifyDrivers/flowmeterskidcc/utilities.py @@ -0,0 +1,62 @@ +"""Utility functions for the driver.""" +import socket +import struct +import urllib +import contextlib +def get_private_ip_address(): + """Find the private IP Address of the host device.""" + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.connect(("8.8.8.8", 80)) + ip_address = sock.getsockname()[0] + sock.close() + except Exception as e: + return e + + return ip_address + +def get_public_ip_address(): + ip_address = "0.0.0.0" + try: + with contextlib.closing(urllib.urlopen("http://checkip.amazonaws.com")) as url: + ip_address = url.read() + except Exception as e: + print("Could not resolve address: {}".format(e)) + return ip_address + return ip_address[:-1] + +def int_to_float16(int_to_convert): + """Convert integer into float16 representation.""" + bin_rep = ('0' * 16 + '{0:b}'.format(int_to_convert))[-16:] + sign = 1.0 + if int(bin_rep[0]) == 1: + sign = -1.0 + exponent = float(int(bin_rep[1:6], 2)) + fraction = float(int(bin_rep[6:17], 2)) + + if exponent == float(0b00000): + return sign * 2 ** -14 * fraction / (2.0 ** 10.0) + elif exponent == float(0b11111): + if fraction == 0: + return sign * float("inf") + return float("NaN") + frac_part = 1.0 + fraction / (2.0 ** 10.0) + return sign * (2 ** (exponent - 15)) * frac_part + + +def ints_to_float(int1, int2): + """Convert 2 registers into a floating point number.""" + mypack = struct.pack('>HH', int1, int2) + f_unpacked = struct.unpack('>f', mypack) + print("[{}, {}] >> {}".format(int1, int2, f_unpacked[0])) + return f_unpacked[0] + + +def degf_to_degc(temp_f): + """Convert deg F to deg C.""" + return (temp_f - 32.0) * (5.0/9.0) + + +def degc_to_degf(temp_c): + """Convert deg C to deg F.""" + return temp_c * 1.8 + 32.0 diff --git a/meshifyDrivers/mainHP/config.txt b/meshifyDrivers/mainHP/config.txt index f76878a..43c2279 100644 --- a/meshifyDrivers/mainHP/config.txt +++ b/meshifyDrivers/mainHP/config.txt @@ -3,7 +3,7 @@ "driverFileName":"mainMeshify.py", "deviceName":"mainMeshify", "driverId":"0000", -"releaseVersion":"17", +"releaseVersion":"16", "files": { "file1":"mainMeshify.py", "file2":"main.py", diff --git a/meshifyDrivers/mainHP/main.py b/meshifyDrivers/mainHP/main.py index 1fe2e9a..1e236a4 100644 --- a/meshifyDrivers/mainHP/main.py +++ b/meshifyDrivers/mainHP/main.py @@ -82,7 +82,7 @@ class main(): self.dst = "" # queue for sets to the mesh network will handeled through a queue in this main driver self.meshQ = Queue.Queue() - version = "13" # 6 - mistification # 5 - updated for SAT data and generic sets. 4 - devices changed to drivers for dia + version = "16" # 6 - mistification # 5 - updated for SAT data and generic sets. 4 - devices changed to drivers for dia # self.sendtodb("version", version, 0) thread.start_new_thread(self.registerThread, ()) @@ -339,8 +339,8 @@ class meshifyMain(): clientData = json.load(creds) except: clientData = {"clientId": mac, "username": "admin", "password": "columbus"} - with open("mqtt.json", "w+") as creds: - json.dump(clientData, creds) + #with open("mqtt.json", "w+") as creds: + #json.dump(clientData, creds) self.mqtt = paho.Client(client_id=clientData["clientId"], clean_session=True) # change to false for mqtt.meshify.com diff --git a/meshifyDrivers/tankalarms/tankalarms.py b/meshifyDrivers/tankalarms/tankalarms.py index fe5f5f3..0ea5086 100644 --- a/meshifyDrivers/tankalarms/tankalarms.py +++ b/meshifyDrivers/tankalarms/tankalarms.py @@ -1,5 +1,4 @@ """Driver for tankalarms""" - import threading import json import time