Files
MaxWaterSystem-Lite/flow-monitor.py

404 lines
17 KiB
Python

"""Driver for connecting Flow Monitor to Meshify."""
import threading
import time
from datetime import datetime
import sqlite3
from device_base import deviceBase
from utilities import get_public_ip_address
CREATE_FLOWDATA_TABLE = """CREATE TABLE flow_data (
id integer PRIMARY KEY,
gal_totalizer_value float,
bbl_totalizer_value float,
gal_monthly_totalizer float,
bbl_monthly_totalizer float,
last_measured_timestamp integer
);"""
INSERT_BLANK_FLOWDATA = """INSERT INTO flow_data (
id,
gal_totalizer_value,
bbl_totalizer_value,
gal_monthly_totalizer,
bbl_monthly_totalizer,
last_measured_timestamp)
VALUES (1, 0.0, 0.0, 0.0, 0.0, 0);"""
CLEAR_FLOWDATA = """UPDATE flow_data SET
gal_totalizer_value=0.0,
bbl_totalizer_value=0.0,
gal_monthly_totalizer=0.0,
bbl_monthly_totalizer=0.0,
last_measured_timestamp=0 WHERE id=1;"""
UPDATE_FLOWDATA = """UPDATE flow_data SET
gal_totalizer_value=?,
bbl_totalizer_value=?,
gal_monthly_totalizer=?,
bbl_monthly_totalizer=?,
last_measured_timestamp=? WHERE id=1"""
CREATE_SCALINGDATA_TABLE = """CREATE TABLE scaling_data (
id integer PRIMARY KEY,
raw_min float,
raw_max float,
gpm_min float,
gpm_max float
);"""
INSERT_SCALINGDATA = """INSERT INTO scaling_data (
id,
raw_min,
raw_max,
gpm_min,
gpm_max) VALUES (1, ?, ?, ?, ?);"""
UPDATE_SCALINGDATA = """UPDATE scaling_data SET
raw_min=?,
raw_max=?,
gpm_min=?,
gpm_max=? WHERE id=1;"""
class Channel(object):
"""Meshify channel structure."""
def __init__(self, meshify_name, senddelta_value, senddelta_time):
"""Initialize the channel with variables."""
self.meshify_name = meshify_name
self.senddelta_time = senddelta_time
self.senddelta_value = senddelta_value
self.last_sent_value = None
self.last_sent_timestamp = 0
def check_if_send_needed(self, value, timestamp):
"""Check to see if the value needs to be pushed."""
if self.last_sent_value is None or self.last_sent_timestamp == 0:
return True
if abs(value - self.last_sent_value) > self.senddelta_value:
return True
if (timestamp - self.last_sent_timestamp) > self.senddelta_time:
return True
return False
def update(self, last_sent_value, last_sent_timestamp):
"""Update values after a push."""
self.last_sent_value = last_sent_value
self.last_sent_timestamp = last_sent_timestamp
def scale(raw_val, raw_min, raw_max, eu_min, eu_max):
"""Scale a raw value."""
slope = (eu_max - eu_min) / (raw_max - raw_min)
intercept = eu_max - (slope * raw_max)
return slope * raw_val + intercept
def is_today(tstamp):
"""Check if a given timestamp belongs to the current date."""
midnight_today = datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)
midnight_ts = (midnight_today - datetime(1970, 1, 1)).total_seconds()
return tstamp >= midnight_ts
def is_thismonth(tstamp):
"""Check if a given timestamp belongs to the current month."""
today = datetime.today()
tstamp_date = datetime.fromtimestamp(tstamp)
return today.month == tstamp_date.month
class start(threading.Thread, deviceBase):
"""Start class required for driver."""
def __init__(self, name=None, number=None, mac=None, Q=None, mcu=None,
companyId=None, offset=None, mqtt=None, Nodes=None):
"""Initalize the driver."""
threading.Thread.__init__(self)
deviceBase.__init__(self, name=name, number=number, mac=mac, Q=Q,
mcu=mcu, companyId=companyId, offset=offset,
mqtt=mqtt, Nodes=Nodes)
# Default Scaling Values for Flowmeter
self.RAW_MIN = 3.89
self.RAW_MAX = 19.54
self.GPM_MIN = 0.0
self.GPM_MAX = 100.0
self.daemon = True
self.version = "7"
self.finished = threading.Event()
threading.Thread.start(self)
# this is a required function for all drivers
# its goal is to upload some piece of data
# about your device so it can be seen on the web
def register(self):
"""Register the driver."""
self.channels["status"]["last_value"] = ""
def run(self):
"""Run the driver."""
# Configuration Parameters
total_time_store_delta = 600 # seconds
flow_time_store_delta = 600 # seconds
startup_wait_seconds = 30
gpm_val = 0.0
gal_per_bbl = 42.0
galtotal_ch = Channel('gal_total', 100.0, total_time_store_delta)
bbltotal_ch = Channel('bbl_total', galtotal_ch.senddelta_value/gal_per_bbl, total_time_store_delta)
galtotalthismonth_ch = Channel('gal_total_thismonth', galtotal_ch.senddelta_value, total_time_store_delta)
bbltotalthismonth_ch = Channel('bbl_total_thismonth', galtotalthismonth_ch.senddelta_value/gal_per_bbl, total_time_store_delta)
gpmflow_ch = Channel('gpm_flow', 10.0, flow_time_store_delta)
bpdflow_ch = Channel('bpd_flow', gpmflow_ch.senddelta_value * 34.2857, flow_time_store_delta)
runstatus_ch = Channel('run_status', 0.5, 600)
date_reset = False
month_reset = False
gal_totalizer_value = 0.0
bbl_totalizer_value = 0.0
gal_monthly_totalizer = 0.0
bbl_monthly_totalizer = 0.0
wait_loops = 0
while wait_loops < startup_wait_seconds:
print("Waiting to start driver in {} seconds".format(startup_wait_seconds - wait_loops))
wait_loops += 1
time.sleep(1)
public_ip_address = get_public_ip_address()
self.sendtodb('public_ip_address', public_ip_address, 0)
ip_checked_time = time.time()
ip_check_after = 3600 # send after an hour
last_measured_timestamp = time.time()
conn = sqlite3.connect('/root/python_firmware/drivers/flow-monitor.db')
cursor = conn.cursor()
try:
cursor.execute('SELECT * FROM flow_data WHERE id = 1')
stored_data = cursor.fetchone()
gal_totalizer_value = stored_data[1]
bbl_totalizer_value = stored_data[2]
gal_monthly_totalizer = stored_data[3]
bbl_monthly_totalizer = stored_data[4]
last_measured_timestamp = stored_data[5]
except sqlite3.OperationalError:
print("No table flow_data in the database. I'll create it now.")
cursor.execute(CREATE_FLOWDATA_TABLE)
cursor.execute(INSERT_BLANK_FLOWDATA)
conn.commit()
try:
cursor.execute('SELECT * FROM scaling_data WHERE id = 1')
stored_data = cursor.fetchone()
self.RAW_MIN = stored_data[1]
self.RAW_MAX = stored_data[2]
self.GPM_MIN = stored_data[3]
self.GPM_MAX = stored_data[4]
self.sendtodb("setrawmin", self.RAW_MIN, 0)
self.sendtodb("setrawmax", self.RAW_MAX, 0)
self.sendtodb("setgpmmin", self.GPM_MIN, 0)
self.sendtodb("setgpmmax", self.GPM_MAX, 0)
except (sqlite3.OperationalError, TypeError):
print("No table scaling_data in the database. I'll create it now.")
cursor.execute(CREATE_SCALINGDATA_TABLE)
cursor.execute(INSERT_SCALINGDATA, (self.RAW_MIN, self.RAW_MAX, self.GPM_MIN, self.GPM_MAX))
self.sendtodb("setrawmin", self.RAW_MIN, 0)
self.sendtodb("setrawmax", self.RAW_MAX, 0)
self.sendtodb("setgpmmin", self.GPM_MIN, 0)
self.sendtodb("setgpmmax", self.GPM_MAX, 0)
conn.commit()
if not is_today(last_measured_timestamp):
gal_totalizer_value = 0.0
bbl_totalizer_value = 0.0
last_measured_timestamp = time.time()
while True:
try:
mcu_status = self.mcu.getDict()
print(mcu_status)
cloop_val = float(mcu_status['cloop'])
din1_val = 1 if mcu_status['din1'] == 'On' else 0
if din1_val == 1:
gpm_val = scale(cloop_val, self.RAW_MIN, self.RAW_MAX, self.GPM_MIN, self.GPM_MAX)
if gpm_val < 0:
gpm_val = 0
else:
gpm_val = 4.0
bpd_val = (gpm_val / gal_per_bbl) * 60.0 * 24.0
now = time.time()
time_diff = now - last_measured_timestamp
if time_diff > 0:
gal_flow_delta = (time_diff / 60.0) * gpm_val
bbl_flow_delta = (time_diff / 60.0) * (1.0 / 60.0) * (1.0 / 24.0) * bpd_val
gal_totalizer_value += gal_flow_delta
bbl_totalizer_value += bbl_flow_delta
gal_monthly_totalizer += gal_flow_delta
bbl_monthly_totalizer += bbl_flow_delta
last_measured_timestamp = now
cursor.execute(UPDATE_FLOWDATA, (gal_totalizer_value, bbl_totalizer_value, gal_monthly_totalizer, bbl_monthly_totalizer, last_measured_timestamp))
conn.commit()
print('gpm: {}, bpd: {}, gal: {}, bbl:{}, month_gal:{}, month_bbl:{}'.format(
gpm_val, bpd_val, gal_totalizer_value, bbl_totalizer_value, gal_monthly_totalizer, bbl_monthly_totalizer))
if galtotal_ch.check_if_send_needed(gal_totalizer_value, now):
self.sendtodb(galtotal_ch.meshify_name, gal_totalizer_value, 0)
galtotal_ch.update(gal_totalizer_value, now)
if bbltotal_ch.check_if_send_needed(bbl_totalizer_value, now):
self.sendtodb(bbltotal_ch.meshify_name, bbl_totalizer_value, 0)
bbltotal_ch.update(bbl_totalizer_value, now)
if galtotalthismonth_ch.check_if_send_needed(gal_monthly_totalizer, now):
self.sendtodb(galtotalthismonth_ch.meshify_name, gal_monthly_totalizer, 0)
galtotalthismonth_ch.update(gal_monthly_totalizer, now)
if bbltotalthismonth_ch.check_if_send_needed(bbl_monthly_totalizer, now):
self.sendtodb(bbltotalthismonth_ch.meshify_name, bbl_monthly_totalizer, 0)
bbltotalthismonth_ch.update(bbl_monthly_totalizer, now)
if gpmflow_ch.check_if_send_needed(gpm_val, now):
self.sendtodb(gpmflow_ch.meshify_name, gpm_val, 0)
gpmflow_ch.update(gpm_val, now)
if bpdflow_ch.check_if_send_needed(bpd_val, now):
self.sendtodb(bpdflow_ch.meshify_name, bpd_val, 0)
bpdflow_ch.update(bpd_val, now)
if runstatus_ch.check_if_send_needed(din1_val, now):
self.sendtodb(runstatus_ch.meshify_name, din1_val, 0)
runstatus_ch.update(din1_val, now)
if time.localtime(now)[3] == 0 and not date_reset:
self.sendtodb('gal_total_yesterday', gal_totalizer_value, 0)
self.sendtodb('bbl_total_yesterday', bbl_totalizer_value, 0)
gal_totalizer_value = 0.0
bbl_totalizer_value = 0.0
cursor.execute(UPDATE_FLOWDATA, (gal_totalizer_value, bbl_totalizer_value, gal_monthly_totalizer, bbl_monthly_totalizer, last_measured_timestamp))
conn.commit()
date_reset = True
if time.localtime(now)[3] != 0 and date_reset:
date_reset = False
if time.localtime(now)[2] == 1 and not month_reset:
self.sendtodb('gal_total_lastmonth', gal_monthly_totalizer, 0)
self.sendtodb('bbl_total_lastmonth', bbl_monthly_totalizer, 0)
gal_monthly_totalizer = 0.0
bbl_monthly_totalizer = 0.0
cursor.execute(UPDATE_FLOWDATA, (gal_totalizer_value, bbl_totalizer_value, gal_monthly_totalizer, bbl_monthly_totalizer, last_measured_timestamp))
conn.commit()
month_reset = True
if time.localtime(now)[2] != 1 and month_reset:
month_reset = False
if (now - ip_checked_time) > ip_check_after:
test_public_ip = get_public_ip_address()
if not test_public_ip == public_ip_address:
self.sendtodb('public_ip_address', test_public_ip, 0)
public_ip_address = test_public_ip
ip_checked_time = now
except Exception as e:
print("problem in the driver: {}".format(e))
time.sleep(5)
def flowmonitor_resetdatabase(self, name, value):
conn = sqlite3.connect('/root/python_firmware/drivers/flow-monitor.db')
cursor = conn.cursor()
try:
cursor.execute('SELECT * FROM flow_data WHERE id = 1')
cursor.execute(CLEAR_FLOWDATA)
conn.commit()
print("DATABASE HAS BEEN RESET!")
except sqlite3.OperationalError:
print("No table flow_data in the database. I'll create it now.")
cursor.execute(CREATE_FLOWDATA_TABLE)
cursor.execute(INSERT_BLANK_FLOWDATA)
conn.commit()
return(True)
def flowmonitor_setrawmin(self, name, value):
conn = sqlite3.connect('/root/python_firmware/drivers/flow-monitor.db')
cursor = conn.cursor()
try:
self.RAW_MIN = float(value)
cursor.execute(UPDATE_SCALINGDATA, (self.RAW_MIN, self.RAW_MAX, self.GPM_MIN, self.GPM_MAX))
self.sendtodb("setrawmin", self.RAW_MIN, 0)
conn.commit()
except sqlite3.OperationalError:
print("No table flow_data in the database. I'll create it now.")
cursor.execute(CREATE_SCALINGDATA_TABLE)
cursor.execute(INSERT_SCALINGDATA, (self.RAW_MIN, self.RAW_MAX, self.GPM_MIN, self.GPM_MAX))
conn.commit()
return(True)
def flowmonitor_setrawmax(self, name, value):
conn = sqlite3.connect('/root/python_firmware/drivers/flow-monitor.db')
cursor = conn.cursor()
try:
self.RAW_MAX = float(value)
self.sendtodb("setrawmax", self.RAW_MAX, 0)
cursor.execute(UPDATE_SCALINGDATA, (self.RAW_MIN, self.RAW_MAX, self.GPM_MIN, self.GPM_MAX))
conn.commit()
except sqlite3.OperationalError:
print("No table flow_data in the database. I'll create it now.")
cursor.execute(CREATE_SCALINGDATA_TABLE)
cursor.execute(INSERT_SCALINGDATA, (self.RAW_MIN, self.RAW_MAX, self.GPM_MIN, self.GPM_MAX))
conn.commit()
return(True)
def flowmonitor_setgpmmin(self, name, value):
conn = sqlite3.connect('/root/python_firmware/drivers/flow-monitor.db')
cursor = conn.cursor()
try:
self.GPM_MIN = float(value)
self.sendtodb("setgpmmin", self.GPM_MIN, 0)
cursor.execute(UPDATE_SCALINGDATA, (self.RAW_MIN, self.RAW_MAX, self.GPM_MIN, self.GPM_MAX))
conn.commit()
except sqlite3.OperationalError:
print("No table flow_data in the database. I'll create it now.")
cursor.execute(CREATE_SCALINGDATA_TABLE)
cursor.execute(INSERT_SCALINGDATA, (self.RAW_MIN, self.RAW_MAX, self.GPM_MIN, self.GPM_MAX))
conn.commit()
return(True)
def flowmonitor_setgpmmax(self, name, value):
conn = sqlite3.connect('/root/python_firmware/drivers/flow-monitor.db')
cursor = conn.cursor()
try:
self.GPM_MAX = float(value)
self.sendtodb("setgpmmax", self.GPM_MAX, 0)
cursor.execute(UPDATE_SCALINGDATA, (self.RAW_MIN, self.RAW_MAX, self.GPM_MIN, self.GPM_MAX))
conn.commit()
except sqlite3.OperationalError:
print("No table flow_data in the database. I'll create it now.")
cursor.execute(CREATE_SCALINGDATA_TABLE)
cursor.execute(INSERT_SCALINGDATA, (self.RAW_MIN, self.RAW_MAX, self.GPM_MIN, self.GPM_MAX))
conn.commit()
return(True)