Updating for adding offset

This commit is contained in:
Patrick McDonagh
2018-08-10 15:08:29 -05:00
parent e87e405580
commit 630f5960dc
15 changed files with 52 additions and 10 deletions

BIN
python-driver/cal_data.db Normal file

Binary file not shown.

View File

@@ -0,0 +1,79 @@
"""Test database functions for storing configuration data."""
import sqlite3
SQLITE_DB = "cal_data.db"
SQLITE_CONN = sqlite3.connect(SQLITE_DB, check_same_thread=False)
def setup_calibration_table(drop_first=False):
"""Setup the calibration table."""
cursor = SQLITE_CONN.cursor()
if drop_first:
cursor.execute("DROP TABLE IF EXISTS cal_data")
create_query = ("CREATE TABLE IF NOT EXISTS cal_data ("
"id integer PRIMARY KEY, "
"height float, "
"volume float)")
cursor.execute(create_query)
def insert_calibration_data(height, volume):
"""Insert an entry into the calibration table."""
cursor = SQLITE_CONN.cursor()
insert_query = "INSERT INTO cal_data (height, volume) VALUES (?, ?)"
cursor.execute(insert_query, [height, volume])
return cursor.lastrowid
def delete_calibration_data(id):
"""Delete an entry from the calibration table."""
cursor = SQLITE_CONN.cursor()
delete_query = "DELETE FROM cal_data WHERE id = ?"
cursor.execute(delete_query, [id])
return cursor.rowcount
def get_calibration_data(single_entry=False):
"""Retrieve either a single entry or all calibration data."""
single_entry = int(single_entry)
cursor = SQLITE_CONN.cursor()
get_single_query = "SELECT * FROM cal_data WHERE id = ?"
get_all_query = "SELECT * FROM cal_data ORDER BY height ASC"
if(single_entry > 0):
cursor.execute(get_single_query, [single_entry])
return cursor.fetchone()
else:
return [list(row) for row in cursor.execute(get_all_query)]
def get_volume_for_height(height):
"""Interpret a volume for a given height."""
calibration_data = get_calibration_data()
print("Calibration Data\n===")
print(calibration_data)
cal_min_height = calibration_data[0][1]
cal_max_height = calibration_data[-1][1]
lower_cal = [0.0, 0.0]
upper_cal = [0.0, 0.0]
if (height <= cal_max_height and height >= cal_min_height):
for i in range(0, len(calibration_data) - 1):
if (height >= calibration_data[i][1] and height <= calibration_data[i+1][1]):
lower_cal = calibration_data[i][1:3]
upper_cal = calibration_data[i+1][1:3]
elif height > cal_max_height:
lower_cal = calibration_data[-2][1:3]
upper_cal = calibration_data[-1][1:3]
elif height < cal_min_height:
lower_cal = calibration_data[0][1:3]
upper_cal = calibration_data[1][1:3]
line_m = (upper_cal[1] - lower_cal[1]) / (upper_cal[0] - lower_cal[0])
line_b = upper_cal[1] - line_m * upper_cal[0]
return line_m * height + line_b

View File

@@ -0,0 +1,21 @@
id,name,deviceTypeId,fromMe,io,subTitle,helpExplanation,channelType,dataType,defaultValue,regex,regexErrMsg,units,min,max,change,guaranteedReportPeriod,minReportTime
12764,pond_level,412,FALSE,readonly,Pond Level,Level in Ft.,device,float,,,,,,,,,
12769,high_level_limit,412,FALSE,readwrite,High Level Limit,Limit in ft.,user input,float,25,,,,,,,,
12770,low_level_limit,412,FALSE,readwrite,Low Level Limit, Limit in ft.,user input,float,1,,,,,,,,
13352,setrawmax,412,FALSE,readwrite,Raw Max Setpoint,in mA,device,float,,,,,,,,,
13353,setrawmin,412,FALSE,readwrite,Raw Min Setpoint,in mA,device,float,,,,,,,,,
13354,seteumax,412,FALSE,readwrite,EU Max Setpoint,in Ft.,device,float,,,,,,,,,
13355,seteumin,412,FALSE,readwrite,EU Min Setpoint,in Ft.,device,float,,,,,,,,,
13356,log,412,FALSE,readwrite,Log,Log,device,string,,,,,,,,,
13357,sync,412,FALSE,readwrite,Sync,Sycn,device,string,,,,,,,,,
13358,calibration_data,412,FALSE,readonly,Calibration Data,All calibration data,device,string,,,,,,,,,
13359,addcalibrationpoint,412,FALSE,readwrite,Add Calibration Point,Add a single calibration point,device,string,,,,,,,,,
13360,deletecalibrationpoint,412,FALSE,readwrite,Delete Calibration Point,Delete a Calibration Point,device,integer,,,,,,,,,
13361,pond_volume,412,FALSE,readonly,Pond Volume,in BBL,device,float,0,,,,,,,,
13642,pressure_psi,412,FALSE,readonly,Pressure,in PSI,device,float,,,,,,,,,
13643,setpsirawmin,412,FALSE,readwrite,Pressure Raw Min Setpoint,in PSI,device,float,,,,,,,,,
13644,setpsirawmax,412,FALSE,readwrite,Pressure Raw Max Setpoint,in PSI,device,float,,,,,,,,,
13645,setpsieumax,412,FALSE,readwrite,Pressure EU Max Setpoint,in PSI,device,float,,,,,,,,,
13646,setpsieumin,412,FALSE,readwrite,Pressure EU Min Setpoint,in PSI,device,float,,,,,,,,,
13647,high_pressure_limit,412,FALSE,readwrite,High Pressure Input,Send an alert when pressure > high_pressure_input,user input,float,30,,,,,,,,
,public_ip_address,412,FALSE,readonly,Public IP Address,Network Address,device,string,,,,,,,,,
1 id name deviceTypeId fromMe io subTitle helpExplanation channelType dataType defaultValue regex regexErrMsg units min max change guaranteedReportPeriod minReportTime
2 12764 pond_level 412 FALSE readonly Pond Level Level in Ft. device float
3 12769 high_level_limit 412 FALSE readwrite High Level Limit Limit in ft. user input float 25
4 12770 low_level_limit 412 FALSE readwrite Low Level Limit Limit in ft. user input float 1
5 13352 setrawmax 412 FALSE readwrite Raw Max Setpoint in mA device float
6 13353 setrawmin 412 FALSE readwrite Raw Min Setpoint in mA device float
7 13354 seteumax 412 FALSE readwrite EU Max Setpoint in Ft. device float
8 13355 seteumin 412 FALSE readwrite EU Min Setpoint in Ft. device float
9 13356 log 412 FALSE readwrite Log Log device string
10 13357 sync 412 FALSE readwrite Sync Sycn device string
11 13358 calibration_data 412 FALSE readonly Calibration Data All calibration data device string
12 13359 addcalibrationpoint 412 FALSE readwrite Add Calibration Point Add a single calibration point device string
13 13360 deletecalibrationpoint 412 FALSE readwrite Delete Calibration Point Delete a Calibration Point device integer
14 13361 pond_volume 412 FALSE readonly Pond Volume in BBL device float 0
15 13642 pressure_psi 412 FALSE readonly Pressure in PSI device float
16 13643 setpsirawmin 412 FALSE readwrite Pressure Raw Min Setpoint in PSI device float
17 13644 setpsirawmax 412 FALSE readwrite Pressure Raw Max Setpoint in PSI device float
18 13645 setpsieumax 412 FALSE readwrite Pressure EU Max Setpoint in PSI device float
19 13646 setpsieumin 412 FALSE readwrite Pressure EU Min Setpoint in PSI device float
20 13647 high_pressure_limit 412 FALSE readwrite High Pressure Input Send an alert when pressure > high_pressure_input user input float 30
21 public_ip_address 412 FALSE readonly Public IP Address Network Address device string

13
python-driver/config.txt Normal file
View File

@@ -0,0 +1,13 @@
{
"driverFileName": "pondlevel.py",
"deviceName": "pondlevel",
"driverId": "0130",
"releaseVersion": "5",
"files": {
"file1": "pondlevel.py",
"file2": "calibration_db.py",
"file3": "persistence.py",
"file4": "utilities.py",
"file5": "file_logger.py"
}
}

View File

@@ -0,0 +1,2 @@
class deviceBase(object):
pass

View File

@@ -0,0 +1,13 @@
{
"name": "pondlevel",
"driverFilename": "pondlevel.py",
"driverId": "0130",
"additionalDriverFiles": [
"calibration_db.py",
"persistence.py",
"utilities.py",
"file_logger.py"
],
"version": 5,
"s3BucketName": "pondlevel"
}

View File

@@ -0,0 +1,18 @@
"""Logging setup for pondlevel"""
import logging
from logging.handlers import RotatingFileHandler
import sys
log_formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s(%(lineno)d) %(message)s')
log_file = './pondlevel.log'
my_handler = RotatingFileHandler(log_file, mode='a', maxBytes=500*1024,
backupCount=2, encoding=None, delay=0)
my_handler.setFormatter(log_formatter)
my_handler.setLevel(logging.INFO)
filelogger = logging.getLogger('pondlevel')
filelogger.setLevel(logging.INFO)
filelogger.addHandler(my_handler)
console_out = logging.StreamHandler(sys.stdout)
console_out.setFormatter(log_formatter)
filelogger.addHandler(console_out)

View File

@@ -0,0 +1,19 @@
eq = "-7.5553x^5 + 274.72x^4 - 3764.6x^3 + 24542x^2 - 27573x + 8080.8"
coeffs = [-7.5553, 274.72, -3764.6, 24542.0, -27573.0, 8080.8]
def calc_polynomial(coefficient_list, x):
"""Calculate a polynomial value given the coefficients and x-value."""
poly_val = 0.0
new_eq = ""
for i in range(0, len(coefficient_list)):
pow_level = len(coefficient_list) - (i + 1)
poly_val += coefficient_list[i] * pow(x, pow_level)
new_eq += " + {}x^{}".format(coefficient_list[i], pow_level)
print(new_eq[2:])
return poly_val
test_x = 10.0
print(calc_polynomial(coeffs, test_x))
print(-7.5553 * test_x ** 5 + 274.72 * test_x ** 4 - 3764.6 * test_x ** 3 + 24542.0 * test_x ** 2 - 27573.0 * test_x + 8080.8)

View File

@@ -0,0 +1,21 @@
"""Data persistance functions."""
# if more advanced persistence is needed, use a sqlite database
import json
def load(filename="persist.json"):
"""Load persisted settings from the specified file."""
try:
with open(filename, 'r') as persist_file:
return json.load(persist_file)
except Exception:
return False
def store(persist_obj, filename="persist.json"):
"""Store the persisting settings into the specified file."""
try:
with open(filename, 'w') as persist_file:
return json.dump(persist_obj, persist_file)
except Exception:
return False

372
python-driver/pondlevel.py Normal file
View File

@@ -0,0 +1,372 @@
"""Driver for connecting Pond Level to Meshify."""
import threading
from device_base import deviceBase
import time
import json
import persistence
from utilities import get_public_ip_address
from file_logger import filelogger as log
_ = None
pond_level = {
'value': 15.0,
'timestamp': 0
}
psi_reading = {
'value': 0.0,
'timestamp': 0
}
send_delta = 2.0
send_time = 300
temp_pl = pond_level['value']
temp_psi = psi_reading['value']
scaling_persist = persistence.load(filename="scaling_persist.json")
strapping_persist = persistence.load(filename="strapping_persist.json")
def scale_to_eu(inp_measurement, raw_max, raw_min, eu_max, eu_min, offset=0.0):
"""Scale the inp_measurement to engineering units."""
m = (eu_max - eu_min) / (raw_max - raw_min)
b = eu_max - raw_max * m
scaled = inp_measurement * m + b
return scaled + offset
class start(threading.Thread, deviceBase):
"""Start class required by Meshify."""
def __init__(self, name=None, number=None, mac=None, Q=None, mcu=None, companyId=None, offset=None, mqtt=None, Nodes=None):
"""Initialize the driver."""
threading.Thread.__init__(self)
deviceBase.__init__(self, name=name, number=number, mac=mac, Q=Q, mcu=mcu, companyId=companyId, offset=offset, mqtt=mqtt, Nodes=Nodes)
self.daemon = True
self.version = "5"
self.finished = threading.Event()
self.forceSend = False
threading.Thread.start(self)
self.public_ip_address = ""
self.public_ip_address_last_checked = 0
self.PL_RAWMAX = 20.0
self.PL_RAWMIN = 4.0
self.PL_EUMAX = 23.068
self.PL_EUMIN = 0.0
self.PL_OFFSET = 0.0
self.PSI_RAWMAX = 10.0
self.PSI_RAWMIN = 0.0
self.PSI_EUMAX = 600.0
self.PSI_EUMIN = 0.0
# Strapping table format
# {
# height: volume,
# height: volume,
# height: volume,
# etc...
# }
self.strapping_table = {}
# this is a required function for all drivers, its goal is to upload some piece of data
# about your device so it can be seen on the web
def register(self):
"""Register the driver."""
self.sendtodb("log", "BOOM! Booted.", 0)
def run(self):
"""Actually run the driver."""
global pond_level, temp_pl, psi_reading, temp_psi, send_delta, send_time
try:
self.PL_RAWMAX = scaling_persist['raw_max']
self.PL_RAWMIN = scaling_persist['raw_min']
self.PL_EUMAX = scaling_persist['eu_max']
self.PL_EUMIN = scaling_persist['eu_min']
self.PL_OFFSET = scaling_persist['eu_offset']
except KeyError:
log.warning("No persistence data for pond level scaling parameters.")
except TypeError:
log.warning("No persistence data for pond level scaling parameters.")
try:
self.PSI_RAWMAX = scaling_persist['psi_raw_max']
self.PSI_RAWMIN = scaling_persist['psi_raw_min']
self.PSI_EUMAX = scaling_persist['psi_eu_max']
self.PSI_EUMIN = scaling_persist['psi_eu_min']
except KeyError:
log.warning("No persistence data for pressure scaling parameters.")
except TypeError:
log.warning("No persistence data for pressure scaling parameters.")
try:
if len(strapping_persist.keys()) > 0:
for k in strapping_persist.keys():
self.strapping_table[float(k)] = strapping_persist[k]
log.info("Using stored strapping table:\n{}".format(self.strapping_table))
else:
log.warning("No stored strapping table found.")
except Exception as e:
log.warning("No stored strapping table. Got Error: {}".format(e))
wait_sec = 60
for i in range(0, wait_sec):
log.info("pondlevel driver will start in {} seconds".format(wait_sec - i))
time.sleep(1)
log.info("BOOM! Starting pondlevel driver...")
# db.setup_calibration_table(drop_first=False)
self.send_calibration_points()
self._check_ip_address()
self.sendtodb("setrawmax", self.PL_RAWMAX, 0)
self.sendtodb("setrawmin", self.PL_RAWMIN, 0)
self.sendtodb("seteumax", self.PL_EUMAX, 0)
self.sendtodb("seteumin", self.PL_EUMIN, 0)
self.sendtodb("setleveloffset", self.PL_OFFSET, 0)
self.sendtodb("setpsirawmax", self.PSI_RAWMAX, 0)
self.sendtodb("setpsirawmin", self.PSI_RAWMIN, 0)
self.sendtodb("setpsieumax", self.PSI_EUMAX, 0)
self.sendtodb("setpsieumin", self.PSI_EUMIN, 0)
send_loops = 0
ip_check_after = 60
while True:
now = time.time()
pl_send_now = False
psi_send_now = False
if self.forceSend:
log.warning("FORCE SEND: TRUE")
pl_send_now = True
psi_send_now = True
try:
mcu_status = self.mcu.getDict() # Gets a dictionary of the IO states
cloop_val = float(mcu_status['cloop'])
analog1_val = float(mcu_status['analog1'])
temp_pl = scale_to_eu(cloop_val, self.PL_RAWMAX, self.PL_RAWMIN, self.PL_EUMAX, self.PL_EUMIN, offset=self.PL_OFFSET)
temp_psi = scale_to_eu(analog1_val, self.PSI_RAWMAX, self.PSI_RAWMIN, self.PSI_EUMAX, self.PSI_EUMIN)
except AttributeError as err:
log.error("Could not connect to MCU object, this device might not have an MCU: {}".format(err))
pass
except KeyError as err:
log.error("Connected to MCU, but {} does not exist.".format(err))
pass
if (now - pond_level['timestamp']) > send_time:
log.info("Sending pond level {} due to time limit".format(temp_pl))
pl_send_now = True
# Check if pressure needs to be sent
if abs(temp_psi - psi_reading['value']) > send_delta:
log.info("Sending pressure psi {} due to value change".format(temp_psi))
psi_send_now = True
if (now - psi_reading['timestamp']) > send_time:
log.info("Sending pressure psi {} due to time limit".format(temp_psi))
psi_send_now = True
if pl_send_now:
self.sendtodb('pond_level', temp_pl, 0)
pond_volume = 0.0
try:
pond_volume = self.get_volume_for_height(temp_pl)
except Exception:
pass
self.sendtodb('pond_volume', pond_volume, 0)
pond_level['value'] = temp_pl
pond_level['timestamp'] = now
if psi_send_now:
self.sendtodb('pressure_psi', temp_psi, 0)
psi_reading['value'] = temp_psi
psi_reading['timestamp'] = now
if (now - self.public_ip_address_last_checked) > ip_check_after or self.forceSend:
self._check_ip_address()
log.info("pondlevel driver still alive...")
if self.forceSend:
if send_loops > 2:
log.info("Turning off forceSend")
self.forceSend = False
send_loops = 0
else:
send_loops += 1
time.sleep(10)
def _check_ip_address(self):
"""Check the public IP address and send to Meshify if changed."""
self.public_ip_address_last_checked = time.time()
test_public_ip = get_public_ip_address()
if not test_public_ip == self.public_ip_address:
self.sendtodb('public_ip_address', test_public_ip, 0)
self.public_ip_address = test_public_ip
def send_calibration_points(self):
"""Send calibration data from database to Meshify."""
strapping_table_list = []
height_list = self.strapping_table.keys()
height_list.sort()
for height in height_list:
strapping_table_list.append([height, self.strapping_table[height]])
strapping_table_string = json.dumps(strapping_table_list)
self.sendtodb('calibration_data', strapping_table_string, 0)
def pondlevel_sync(self, name, value):
"""Sync all data from the driver."""
self.forceSend = True
self.sendtodb("log", "synced", 0)
return True
def pondlevel_addcalibrationpoint(self, name, value):
"""Add a JSON calibration point to the database."""
try:
new_point = json.loads(value.replace("'", '"'))
log.info("Trying to add Height: {}, volume: {}".format(new_point['height'], new_point['volume']))
self.strapping_table[float(new_point['height'])] = float(new_point['volume'])
self.store_strapping_table()
# db.insert_calibration_data(new_point['height'], new_point['volume'])
self.send_calibration_points()
return True
except Exception as e:
log.error("EXCEPTION: {}".format(e))
def pondlevel_deletecalibrationpoint(self, name, value):
"""Delete a calibration point from the database."""
try:
del self.strapping_table[float(value)]
self.store_strapping_table()
# db.delete_calibration_data(int(value))
self.send_calibration_points()
return True
except Exception as e:
log.error("Error deleting calibration point: {}".format(e))
return(e)
def store_scaling_data(self):
"""Store scaling data in the persist file."""
scale_obj = {
'raw_max': self.PL_RAWMAX,
'raw_min': self.PL_RAWMIN,
'eu_max': self.PL_EUMAX,
'eu_min': self.PL_EUMIN,
'psi_raw_max': self.PSI_RAWMAX,
'psi_raw_min': self.PSI_RAWMIN,
'psi_eu_max': self.PSI_EUMAX,
'psi_eu_min': self.PSI_EUMIN
}
persistence.store(scale_obj, filename="scaling_persist.json")
def store_strapping_table(self):
"""Store strapping table in the persist file."""
persistence.store(self.strapping_table, filename="strapping_persist.json")
def get_volume_for_height(self, height):
"""Interpret a volume for a given height."""
try:
height = float(height)
strapping_table_list = []
height_list = self.strapping_table.keys()
height_list.sort()
for height_i in height_list:
strapping_table_list.append([float(height_i), float(self.strapping_table[height_i])])
# print("Strapping Table Data\n===")
# print(strapping_table_list)
cal_min_height = strapping_table_list[0][0]
cal_max_height = strapping_table_list[-1][0]
lower_cal = [0.0, 0.0]
upper_cal = [0.0, 0.0]
if (height <= cal_max_height and height >= cal_min_height):
for i in range(0, len(strapping_table_list) - 1):
if (height >= strapping_table_list[i][0] and height <= strapping_table_list[i+1][0]):
lower_cal = strapping_table_list[i]
upper_cal = strapping_table_list[i+1]
elif height > cal_max_height:
lower_cal = strapping_table_list[-2]
upper_cal = strapping_table_list[-1]
elif height < cal_min_height:
lower_cal = strapping_table_list[0]
upper_cal = strapping_table_list[1]
line_m = (float(upper_cal[1]) - float(lower_cal[1])) / (float(upper_cal[0]) - float(lower_cal[0]))
line_b = float(upper_cal[1]) - line_m * float(upper_cal[0])
return line_m * height + line_b
except Exception as e:
log.error("Error in get_volume_for_height: {}".format(e))
return 0.0
def pondlevel_setrawmin(self, name, value):
"""Set the raw min scaling value."""
self.PL_RAWMIN = float(value)
self.sendtodb("setrawmin", self.PL_RAWMIN, 0)
self.store_scaling_data()
return(True)
def pondlevel_setrawmax(self, name, value):
"""Set the raw max scaling value."""
self.PL_RAWMAX = float(value)
self.sendtodb("setrawmax", self.PL_RAWMAX, 0)
self.store_scaling_data()
return(True)
def pondlevel_seteumin(self, name, value):
"""Set the gpm min scaling value."""
self.PL_EUMIN = float(value)
self.sendtodb("seteumin", self.PL_EUMIN, 0)
self.store_scaling_data()
return(True)
def pondlevel_seteumax(self, name, value):
"""Set the gpm max scaling value."""
self.PL_EUMAX = float(value)
self.sendtodb("seteumax", self.PL_EUMAX, 0)
self.store_scaling_data()
return(True)
def pondlevel_setleveloffset(self, name, value):
"""Set the Pond level offset scaling value."""
self.PL_OFFSET = float(value)
self.sendtodb("setleveloffset", self.PL_OFFSET, 0)
self.store_scaling_data()
return(True)
def pondlevel_setpsirawmin(self, name, value):
"""Set the raw min pressure scaling value."""
self.PSI_RAWMIN = float(value)
self.sendtodb("setpsirawmin", self.PSI_RAWMIN, 0)
self.store_scaling_data()
return(True)
def pondlevel_setpsirawmax(self, name, value):
"""Set the raw max pressure scaling value."""
self.PSI_RAWMAX = float(value)
self.sendtodb("setpsirawmax", self.PSI_RAWMAX, 0)
self.store_scaling_data()
return(True)
def pondlevel_setpsieumin(self, name, value):
"""Set the psi min pressure scaling value."""
self.PSI_EUMIN = float(value)
self.sendtodb("setpsieumin", self.PSI_EUMIN, 0)
self.store_scaling_data()
return(True)
def pondlevel_setpsieumax(self, name, value):
"""Set the psi max pressure scaling value."""
self.PSI_EUMAX = float(value)
self.sendtodb("setpsieumax", self.PSI_EUMAX, 0)
self.store_scaling_data()
return(True)

97
python-driver/test.py Normal file
View File

@@ -0,0 +1,97 @@
"""Test the functions."""
import unittest
import calibration_db as db
import sqlite3
TEST_HEIGHT = 1.0
TEST_VOLUME = 1000.0
class TestDatabaseMethods(unittest.TestCase):
"""Class for testing database methods."""
def test_01_setuptable(self):
"""Test the setup_calibration_table function."""
try:
db.setup_calibration_table(drop_first=True)
cursor = db.SQLITE_CONN.cursor()
cursor.execute("SELECT * FROM cal_data")
cursor.fetchone()
self.assertTrue(True, msg="Table is accessible.")
except sqlite3.OperationalError:
self.assertTrue(False, msg="Table is not accessible.")
def test_02_getemptytable(self):
"""Test that an empty table is returned if no entries."""
self.assertListEqual(db.get_calibration_data(), [], msg="Calibration table is empty.")
def test_03_insertcalibrationdata(self):
"""Test the insert_calibration_data function."""
db.insert_calibration_data(TEST_HEIGHT, TEST_VOLUME)
cursor = db.SQLITE_CONN.cursor()
cursor.execute("SELECT * FROM cal_data")
result = cursor.fetchone()
self.assertEqual(result[1], TEST_HEIGHT, msg="Height fetched OK")
self.assertEqual(result[2], TEST_VOLUME, msg="Volume fetched OK")
def test_04_getcaldatasingle(self):
"""Test the get_calibration_data function for a single row."""
inserted_row = db.insert_calibration_data(TEST_HEIGHT, TEST_VOLUME)
fetched_row = db.get_calibration_data(single_entry=inserted_row)
self.assertEqual(fetched_row[1], TEST_HEIGHT, msg="Height fetched OK")
self.assertEqual(fetched_row[2], TEST_VOLUME, msg="Volume fetched OK")
def test_05_get_caldataall(self):
"""Test the get_calibration_data function for all rows."""
db.insert_calibration_data(TEST_HEIGHT, TEST_VOLUME)
fetched_data = db.get_calibration_data()
self.assertEqual(len(fetched_data), 3, msg="Should have 3 rows of data.")
def test_06_deletecaldata(self):
"""Test the delete_calibration_data function."""
cal_data = db.get_calibration_data()
start_len = len(cal_data)
db.delete_calibration_data(cal_data[-1][0])
end_len = len(db.get_calibration_data())
self.assertEqual(start_len - 1, end_len, msg="Length of data array is good after delete. {} = {}".format(start_len - 1, end_len))
def test_07_checkscaledata(self):
"""Test the scaling function."""
expected = [
[0.5, 0.5],
[1.5, 1.5],
[2.5, 3.0],
[3.5, 6.0],
[4.5, 12.0],
[5.5, 24.0],
[6.5, 48.0],
[7.5, 96.0],
[8.5, 192.0],
[9.5, 384.0],
[10.5, 640.0],
]
db.setup_calibration_table(drop_first=True)
db.insert_calibration_data(1.0, 1.0)
db.insert_calibration_data(2.0, 2.0)
db.insert_calibration_data(3.0, 4.0)
db.insert_calibration_data(4.0, 8.0)
db.insert_calibration_data(5.0, 16.0)
db.insert_calibration_data(6.0, 32.0)
db.insert_calibration_data(7.0, 64.0)
db.insert_calibration_data(8.0, 128.0)
db.insert_calibration_data(9.0, 256.0)
db.insert_calibration_data(10.0, 512.0)
for i in range(0, len(expected)):
self.assertEqual(db.get_volume_for_height(expected[i][0]), expected[i][1], msg="Volume equal. {} == {}".format(db.get_volume_for_height(expected[i][0]), expected[i][1]))
@classmethod
def tearDownClass(cls):
"""Tear down function."""
db.setup_calibration_table(drop_first=True)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,51 @@
"""Utility functions for the driver."""
import socket
import struct
def get_public_ip_address():
"""Find the public IP Address of the host device."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
def int_to_float16(int_to_convert):
"""Convert integer into float16 representation."""
bin_rep = ('0' * 16 + '{0:b}'.format(int_to_convert))[-16:]
sign = 1.0
if int(bin_rep[0]) == 1:
sign = -1.0
exponent = float(int(bin_rep[1:6], 2))
fraction = float(int(bin_rep[6:17], 2))
if exponent == float(0b00000):
return sign * 2 ** -14 * fraction / (2.0 ** 10.0)
elif exponent == float(0b11111):
if fraction == 0:
return sign * float("inf")
else:
return float("NaN")
else:
frac_part = 1.0 + fraction / (2.0 ** 10.0)
return sign * (2 ** (exponent - 15)) * frac_part
def ints_to_float(int1, int2):
"""Convert 2 registers into a floating point number."""
mypack = struct.pack('>HH', int1, int2)
f = struct.unpack('>f', mypack)
print("[{}, {}] >> {}".format(int1, int2, f[0]))
return f[0]
def degf_to_degc(temp_f):
"""Convert deg F to deg C."""
return (temp_f - 32.0) * (5.0/9.0)
def degc_to_degf(temp_c):
"""Convert deg C to deg F."""
return temp_c * 1.8 + 32.0