Reorganize and add calibration database

This commit is contained in:
Patrick McDonagh
2017-11-16 17:55:49 -06:00
parent caed9a07fb
commit 7d39dd5e63
15 changed files with 231 additions and 10 deletions

1
.gitignore vendored
View File

@@ -99,3 +99,4 @@ ENV/
# mypy
.mypy_cache/
/python_driver/cal_data.db

View File

@@ -1,10 +0,0 @@
{
"driverFileName":"pondlevel.py",
"deviceName":"pondlevel",
"driverId":"0130",
"releaseVersion":"1",
"files": {
"file1":"pondlevel.py"}
}

View File

@@ -0,0 +1,77 @@
"""Test database functions for storing configuration data."""
import sqlite3
SQLITE_DB = "cal_data.db"
SQLITE_CONN = sqlite3.connect(SQLITE_DB)
def setup_calibration_table(drop_first=False):
"""Setup the calibration table."""
cursor = SQLITE_CONN.cursor()
if drop_first:
cursor.execute("DROP TABLE IF EXISTS cal_data")
create_query = ("CREATE TABLE IF NOT EXISTS cal_data ("
"id integer PRIMARY KEY, "
"height float, "
"volume float)")
cursor.execute(create_query)
def insert_calibration_data(height, volume):
"""Insert an entry into the calibration table."""
cursor = SQLITE_CONN.cursor()
insert_query = "INSERT INTO cal_data (height, volume) VALUES (?, ?)"
cursor.execute(insert_query, [height, volume])
return cursor.lastrowid
def delete_calibration_data(id):
"""Delete an entry from the calibration table."""
cursor = SQLITE_CONN.cursor()
delete_query = "DELETE FROM cal_data WHERE id = ?"
cursor.execute(delete_query, [id])
return cursor.rowcount
def get_calibration_data(single_entry=False):
"""Retrieve either a single entry or all calibration data."""
single_entry = int(single_entry)
cursor = SQLITE_CONN.cursor()
get_single_query = "SELECT * FROM cal_data WHERE id = ?"
get_all_query = "SELECT * FROM cal_data ORDER BY height ASC"
if(single_entry > 0):
cursor.execute(get_single_query, [single_entry])
return cursor.fetchone()
else:
return [list(row) for row in cursor.execute(get_all_query)]
def get_volume_for_height(height):
"""Interpret a volume for a given height."""
calibration_data = get_calibration_data()
cal_min_height = calibration_data[0][1]
cal_max_height = calibration_data[-1][1]
lower_cal = [0.0, 0.0]
upper_cal = [0.0, 0.0]
if (height <= cal_max_height and height >= cal_min_height):
for i in range(0, len(calibration_data) - 1):
if (height >= calibration_data[i][1] and height <= calibration_data[i+1][1]):
lower_cal = calibration_data[i][1:3]
upper_cal = calibration_data[i+1][1:3]
elif height > cal_max_height:
lower_cal = calibration_data[-2][1:3]
upper_cal = calibration_data[-1][1:3]
elif height < cal_min_height:
lower_cal = calibration_data[0][1:3]
upper_cal = calibration_data[1][1:3]
line_m = (upper_cal[1] - lower_cal[1]) / (upper_cal[0] - lower_cal[0])
line_b = upper_cal[1] - line_m * upper_cal[0]
return line_m * height + line_b

10
python_driver/config.txt Normal file
View File

@@ -0,0 +1,10 @@
{
"files": {
"file2": "calibration_db.py",
"file1": "pondlevel.py"
},
"deviceName": "pondlevel",
"driverId": "0130",
"releaseVersion": "1",
"driverFileName": "pondlevel.py"
}

View File

@@ -0,0 +1,10 @@
{
"name": "pondlevel",
"driverFilename": "pondlevel.py",
"driverId": "0130",
"additionalDriverFiles": [
"calibration_db.py"
],
"version": 1,
"s3BucketName": "pondlevel"
}

View File

@@ -0,0 +1,19 @@
eq = "-7.5553x^5 + 274.72x^4 - 3764.6x^3 + 24542x^2 - 27573x + 8080.8"
coeffs = [-7.5553, 274.72, -3764.6, 24542.0, -27573.0, 8080.8]
def calc_polynomial(coefficient_list, x):
"""Calculate a polynomial value given the coefficients and x-value."""
poly_val = 0.0
new_eq = ""
for i in range(0, len(coefficient_list)):
pow_level = len(coefficient_list) - (i + 1)
poly_val += coefficient_list[i] * pow(x, pow_level)
new_eq += " + {}x^{}".format(coefficient_list[i], pow_level)
print(new_eq[2:])
return poly_val
test_x = 10.0
print(calc_polynomial(coeffs, test_x))
print(-7.5553 * test_x ** 5 + 274.72 * test_x ** 4 - 3764.6 * test_x ** 3 + 24542.0 * test_x ** 2 - 27573.0 * test_x + 8080.8)

View File

@@ -2,8 +2,10 @@
import threading
from device_base import deviceBase
import calibration_db as db
import time
import random
import json
_ = None
@@ -81,8 +83,23 @@ class start(threading.Thread, deviceBase):
send_loops += 1
time.sleep(10)
def send_calibration_points(self):
"""Send calibration data from database to Meshify."""
cal_data = db.get_calibration_data()
self.sendtodb('pondlevel', json.dumps(cal_data), 0)
def pondlevel_sync(self, name, value):
"""Sync all data from the driver."""
self.forceSend = True
self.sendtodb("log", "synced", 0)
return True
def pondlevel_addcalibrationpoint(self, name, value):
"""Add a JSON calibration point to the database."""
try:
new_point = json.loads(value)
db.insert_calibration_data(new_point['height'], new_point['volume'])
self.send_calibration_points()
return True
except KeyError:
return "Misformed JSON"

97
python_driver/test.py Normal file
View File

@@ -0,0 +1,97 @@
"""Test the functions."""
import unittest
import calibration_db as db
import sqlite3
TEST_HEIGHT = 1.0
TEST_VOLUME = 1000.0
class TestDatabaseMethods(unittest.TestCase):
"""Class for testing database methods."""
def test_01_setuptable(self):
"""Test the setup_calibration_table function."""
try:
db.setup_calibration_table(drop_first=True)
cursor = db.SQLITE_CONN.cursor()
cursor.execute("SELECT * FROM cal_data")
cursor.fetchone()
self.assertTrue(True, msg="Table is accessible.")
except sqlite3.OperationalError:
self.assertTrue(False, msg="Table is not accessible.")
def test_02_getemptytable(self):
"""Test that an empty table is returned if no entries."""
self.assertListEqual(db.get_calibration_data(), [], msg="Calibration table is empty.")
def test_03_insertcalibrationdata(self):
"""Test the insert_calibration_data function."""
db.insert_calibration_data(TEST_HEIGHT, TEST_VOLUME)
cursor = db.SQLITE_CONN.cursor()
cursor.execute("SELECT * FROM cal_data")
result = cursor.fetchone()
self.assertEqual(result[1], TEST_HEIGHT, msg="Height fetched OK")
self.assertEqual(result[2], TEST_VOLUME, msg="Volume fetched OK")
def test_04_getcaldatasingle(self):
"""Test the get_calibration_data function for a single row."""
inserted_row = db.insert_calibration_data(TEST_HEIGHT, TEST_VOLUME)
fetched_row = db.get_calibration_data(single_entry=inserted_row)
self.assertEqual(fetched_row[1], TEST_HEIGHT, msg="Height fetched OK")
self.assertEqual(fetched_row[2], TEST_VOLUME, msg="Volume fetched OK")
def test_05_get_caldataall(self):
"""Test the get_calibration_data function for all rows."""
db.insert_calibration_data(TEST_HEIGHT, TEST_VOLUME)
fetched_data = db.get_calibration_data()
self.assertEqual(len(fetched_data), 3, msg="Should have 3 rows of data.")
def test_06_deletecaldata(self):
"""Test the delete_calibration_data function."""
cal_data = db.get_calibration_data()
start_len = len(cal_data)
db.delete_calibration_data(cal_data[-1][0])
end_len = len(db.get_calibration_data())
self.assertEqual(start_len - 1, end_len, msg="Length of data array is good after delete. {} = {}".format(start_len - 1, end_len))
def test_07_checkscaledata(self):
"""Test the scaling function."""
expected = [
[0.5, 0.5],
[1.5, 1.5],
[2.5, 3.0],
[3.5, 6.0],
[4.5, 12.0],
[5.5, 24.0],
[6.5, 48.0],
[7.5, 96.0],
[8.5, 192.0],
[9.5, 384.0],
[10.5, 640.0],
]
db.setup_calibration_table(drop_first=True)
db.insert_calibration_data(1.0, 1.0)
db.insert_calibration_data(2.0, 2.0)
db.insert_calibration_data(3.0, 4.0)
db.insert_calibration_data(4.0, 8.0)
db.insert_calibration_data(5.0, 16.0)
db.insert_calibration_data(6.0, 32.0)
db.insert_calibration_data(7.0, 64.0)
db.insert_calibration_data(8.0, 128.0)
db.insert_calibration_data(9.0, 256.0)
db.insert_calibration_data(10.0, 512.0)
for i in range(0, len(expected)):
self.assertEqual(db.get_volume_for_height(expected[i][0]), expected[i][1], msg="Volume equal. {} == {}".format(db.get_volume_for_height(expected[i][0]), expected[i][1]))
@classmethod
def tearDownClass(cls):
"""Tear down function."""
db.setup_calibration_table(drop_first=True)
if __name__ == '__main__':
unittest.main()