Files
www-POC/tests/dbtests.py
2016-11-30 11:04:31 -06:00

228 lines
8.8 KiB
Python

import unittest
import json
import requests
REQ_METHOD = "https"
REQ_TARGET = "localhost"
REQ_PORT = 5000
REQ_URL_BASE = "{}://{}:{}/api".format(REQ_METHOD, REQ_TARGET, REQ_PORT)
def insert_data(db_table, test_data):
global REQ_URL_BASE
REQ_URL = "{}/{}".format(REQ_URL_BASE, db_table)
post_req = requests.post(REQ_URL, data=json.dumps(test_data), headers={'Content-Type': 'application/json'}, verify=False)
post_res = json.loads(post_req.text)
input_id = post_res["_id"]
return input_id
def get_data(db_table, obj_id):
global REQ_URL_BASE
REQ_URL = "{}/{}/{}".format(REQ_URL_BASE, db_table, obj_id)
get_req = requests.get(REQ_URL, verify=False)
get_res = json.loads(get_req.text)
return get_res
# class TestDevice(unittest.TestCase):
# def test_insert(self):
# # _id = db.Column(db.Integer, primary_key=True)
# # device_type_id = db.Column(db.Integer, db.ForeignKey('device_types._id'))
# # device_type = db.relationship(Device_type, primaryjoin=device_type_id==Device_type._id)
# # address = db.Column(db.String(256))
# # created_on = db.Column(db.DateTime(), default=datetime.utcnow)
# # updated_on = db.Column(db.DateTime(), default=datetime.utcnow, onupdate=datetime.utcnow)
# test_device = {
# 'device_type_id': 1,
# 'address': '192.168.1.100',
# }
# id_added = insert_data("devices", test_device)
# print("Added device at _id {}".format(id_added))
#
# if id_added > 0:
# device_in_db = get_data("devices", id_added)
# for x in test_device:
# self.assertTrue(test_device[x] == device_in_db[x])
# class TestTag(unittest.TestCase):
# def test_insert(self):
# # _id = db.Column(db.Integer, primary_key=True)
# # name = db.Column(db.String(64))
# # tag = db.Column(db.String(128))
# # tag_category = db.Column(db.String(128))
# # device_id = db.Column(db.Integer, db.ForeignKey('devices._id'))
# # device = db.relationship(Device)
# # description = db.Column(db.String(64))
# # data_type = db.Column(db.String(64))
# # change_threshold = db.Column(db.Float)
# # guarantee_sec = db.Column(db.Integer)
# # map_function = db.Column(db.String(64))
# # units = db.Column(db.String(10))
# # min_expected = db.Column(db.Float)
# # max_expected = db.Column(db.Float)
# # created_on = db.Column(db.DateTime(), default=datetime.utcnow)
# # updated_on = db.Column(db.DateTime(), default=datetime.utcnow, onupdate=datetime.utcnow)
#
# test_tag = {
# 'name': "Test Tag",
# 'tag_category': 'ts',
# 'device_id': 1,
# 'description': 'This is a test',
# 'data_type': 'Floating Point',
# 'change_threshold': 0.5,
# 'guarantee_sec': 3600,
# 'units': 'in.',
# 'min_expected': 0.0,
# 'max_expected': 1000.0
# }
# id_added = insert_data("tags", test_tag)
# print("Added tag at _id {}".format(id_added))
#
# if id_added > 0:
# tag_in_db = get_data("tags", id_added)
# for x in test_tag:
# self.assertTrue(test_tag[x] == tag_in_db[x])
class TestStroke(unittest.TestCase):
def test_insert(self):
test_card = {
'stroke_type': 'Normal',
'stroke_number': 123,
'surf_pos': '[0.0, 90.0, 100.0, 25.0, 0.0]',
'surf_lod': '[15000.0, 20000.0, 20000.0, 15000.0, 15000.0]',
'down_pos': '[0.0, 0.0, 100.0, 100.0, 0.0]',
'down_lod': '[-10000.0, -5000.0, -5000.0, -10000.0, -10000.0]'
}
id_added = insert_data("cards", test_card)
print("Added tag at _id {}".format(id_added))
if id_added > 0:
stroke_in_db = get_data("cards", id_added)
for x in test_card:
print("testing {}: {} == {}".format(x, test_card[x], stroke_in_db[x]))
self.assertTrue(test_card[x] == stroke_in_db[x])
# class TestGaugeOff(unittest.TestCase):
# def test_insert(self):
# test_gauge_date = {
# 'spm_average': 7.5,
# 'downhole_gross_stroke_average': 98.7,
# 'downhole_net_stroke_average': 92.1,
# 'electricity_cost_total': 10.55,
# 'fluid_level_average': 1034.5,
# 'full_card_production_total': 750.5,
# 'inflow_rate_average': 100.2,
# 'kWh_used_total': 800.23,
# 'kWh_regen_total': 105.3,
# 'lifting_cost_average': 9.34,
# 'peak_pr_load':25098.2,
# 'min_pr_load': 12125.9,
# 'percent_run': 87.4,
# 'polished_rod_hp_average': 2.345,
# 'pump_hp_average': 3.315,
# 'production_total': 517.0,
# 'pump_intake_pressure_average': 500.433,
# 'surface_stroke_length_average': 101.1,
# 'tubing_movement_average': 0.764
# }
# id_added = insert_data("gauge_off_vals", test_gauge_date)
# print("Added gauge off at _id {}".format(id_added))
#
# if id_added > 0:
# go_in_db = get_data("gauge_off_vals", id_added)
# for x in test_gauge_date:
# print("testing {}: {} == {}".format(x, test_gauge_date[x], go_in_db[x]))
# self.assertTrue(test_gauge_date[x] == go_in_db[x])
# class TestWellTest(unittest.TestCase):
# def test_insert(self):
# # duration_hours = db.Column(db.Float)
# # volume_h2o_actual = db.Column(db.Float)
# # volume_oil_actual = db.Column(db.Float)
# # volume_gas_actual = db.Column(db.Float)
# # volume_h2o_projected = db.Column(db.Float)
# # volume_oil_projected = db.Column(db.Float)
# # volume_gas_projected = db.Column(db.Float)
# # api_gravity_oil = db.Column(db.Float)
# # spc_gravity_h2o = db.Column(db.Float)
# test_well_test = {
# 'duration_hours': 24,
# 'volume_h2o_actual': 1000.0,
# 'volume_oil_actual': 900.0,
# 'volume_gas_actual': 3500.0,
# 'volume_h2o_projected': 1200.0,
# 'volume_oil_projected': 700.0,
# 'volume_gas_projected': 3495.5,
# 'api_gravity_oil': 1.4,
# 'spc_gravity_h2o': 1.001
# }
#
# id_added = insert_data("well_test_vals", test_well_test)
# print("Added well test at _id {}".format(id_added))
#
# if id_added > 0:
# data_in_db = get_data("well_test_vals", id_added)
# for x in test_well_test:
# print("testing {}: {} == {}".format(x, test_well_test[x], data_in_db[x]))
# self.assertTrue(test_well_test[x] == data_in_db[x])
# class TestEvents(unittest.TestCase):
# def test_insert(self):
# # event_id = db.Column(db.Integer, db.ForeignKey('event_configs._id'))
# # event_type = db.Column(db.String(64))
# # event_condition = db.Column(db.String(64))
# test_analog_event = {
# 'event_id': 1,
# 'event_type': 'Alarm',
# 'event_condition': 'High Alarm'
# }
#
# id_added = insert_data("events", test_analog_event)
# print("Added event at _id {}".format(id_added))
#
# if id_added > 0:
# data_in_db = get_data("events", id_added)
# for x in test_analog_event:
# print("testing {}: {} == {}".format(x, test_analog_event[x], data_in_db[x]))
# self.assertTrue(test_analog_event[x] == data_in_db[x])
#
# test_bit_event = {
# 'event_id': 16,
# 'event_type': 'Info',
# 'event_condition': 'Unit Start'
# }
#
# id_added = insert_data("events", test_bit_event)
# print("Added event at _id {}".format(id_added))
#
# if id_added > 0:
# data_in_db = get_data("events", id_added)
# for x in test_bit_event:
# print("testing {}: {} == {}".format(x, test_bit_event[x], data_in_db[x]))
# self.assertTrue(test_bit_event[x] == data_in_db[x])
# class TestWellTest(unittest.TestCase):
# def test_insert(self):
# # note_text = db.Column(db.String(256))
# # author = db.String(128)
#
# test_note = {
# 'note_text': "THIS IS A TEST OF THE NOTE SYSTEM.",
# 'author': "Patrick F-ing McDonagh"
# }
#
# id_added = insert_data("notes", test_note)
# print("Added note at _id {}".format(id_added))
#
# if id_added > 0:
# data_in_db = get_data("notes", id_added)
# for x in test_note:
# print("testing {}: {} == {}".format(x, test_note[x], data_in_db[x]))
# self.assertTrue(test_note[x] == data_in_db[x])
if __name__ == '__main__':
unittest.main()