adds driver for new Flask-based datalogger

This commit is contained in:
Patrick McDonagh
2016-11-22 19:11:39 -06:00
parent 4caaf23c73
commit fe4f07846c

540
POCloud/poc.py Normal file
View File

@@ -0,0 +1,540 @@
#!/usr/bin/python
import traceback
import threading
import time
import os
from device_base import deviceBase
from datetime import datetime
import requests
import json
API_HTTP_TYPE = "https"
API_DEVICE_ADDRESS = "192.168.1.30"
API_DEVICE_PORT = 5000
API_BASE_URL = "{}://{}:{}".format(API_HTTP_TYPE, API_DEVICE_ADDRESS, API_DEVICE_PORT)
go_channels = {
'spm_average': {'channel': 'go_average_spm', 'last_value_sent': None},
'downhole_gross_stroke_average': {'channel': 'go_downhole_gross_stroke', 'last_value_sent': None}, # TODO: ADD
'downhole_net_stroke_average': {'channel': 'go_downhole_net_stroke', 'last_value_sent': None}, # TODO: ADD
'electricity_cost_total': {'channel': 'go_electricity_cost', 'last_value_sent': None},
'fluid_level_average': {'channel': 'go_fluid_above_pump', 'last_value_sent': None},
'inflow_rate_average': {'channel': 'go_inflow_rate', 'last_value_sent': None},
'kWh_used_total': {'channel': 'go_kwh', 'last_value_sent': None},
'kWh_regen_total': {'channel': 'go_kwh_regen', 'last_value_sent': None},
'lifting_cost_average': {'channel': 'go_lifting_cost', 'last_value_sent': None},
'peak_pr_load': {'channel': 'go_peak_load', 'last_value_sent': None},
'min_pr_load': {'channel': 'go_min_load', 'last_value_sent': None},
'percent_run': {'channel': 'go_percent_run', 'last_value_sent': None},
'polished_rod_hp_average': {'channel': 'go_polished_rod_hp', 'last_value_sent': None},
'pump_hp_average': {'channel': 'go_pump_hp', 'last_value_sent': None}, # TODO: ADD
'production_total': {'channel': 'go_production_calculated', 'last_value_sent': None},
'pump_intake_pressure_average': {'channel': 'go_pump_intake_pressure', 'last_value_sent': None},
'surface_stroke_length_average': {'channel': 'go_surface_stroke_length', 'last_value_sent': None}, #TODO: ADD
'tubing_movement_average': {'channel': "go_tubing_movement", 'last_value_sent': None}, #TODO: ADD
}
tag_channels = {
'Polished Rod HP': {'channel': 'polished_rod_hp', 'last_value_sent': None}
'Peak Downhole Load': {'channel': 'downhole_peak_load', 'last_value_sent': None} #TODO: ADD
'Gross Stroke Length': {'channel': 'downhole_gross_stroke', 'last_value_sent': None}
'Stroke Speed': {'channel': 'SPM', 'last_value_sent': None}
'Tubing Head Pressure': {'channel': 'tubing_head_pressure', 'last_value_sent': None}
'Minimum Polished Rod Load': {'channel': 'surface_min_load', 'last_value_sent': None}
'Fluid Load': {'channel': 'downhole_fluid_load', 'last_value_sent': None}
'Downhole Max. Position': {'channel': 'downhole_max_position', 'last_value_sent': None}
'Downhole Net Stroke': {'channel': 'downhole_net_stroke', 'last_value_sent': None}
'Pump Fill Percent': {'channel': 'fillage_percent', 'last_value_sent': None}
'Downhole Pump HP': {'channel': 'pump_hp', 'last_value_sent': None}
'Surface Min. Position': {'channel': 'surface_min_position', 'last_value_sent': None} #TODO: ADD
'Pump Intake Pressure': {'channel': 'pump_intake_pressure', 'last_value_sent': None}
'Surface Max. Position': {'channel': 'surface_max_position', 'last_value_sent': None} #TODO: ADD
'Tubing Movement': {'channel': 'tubing_movement', 'last_value_sent': None}
'Downhole Min. Position': {'channel': 'downhole_min_position', 'last_value_sent': None}
'Peak Polished Rod Load': {'channel': 'surface_max_load', 'last_value_sent': None}
'Minimum Downhole Load': {'channel': 'downhole_min_load', 'last_value_sent': None} #TODO: ADD
'Surface Stroke Length': {'channel': 'surface_stroke_length', 'last_value_sent': None}
'Downhole Adjusted Gross Stroke': {'channel': 'downhole_adjusted_gross_stroke', 'last_value_sent': None}
'Fluid Level': {'channel': 'fluid_above_pump', 'last_value_sent': None}
'Stroke Production': {'channel': 'stroke_production', 'last_value_sent': None}
}
dt_channels = { # Current Daily Totals
'Electricity Cost': {'channel': 'dt_electricity_cost', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'Inflow Rate': {'channel': 'dt_inflow_rate', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'Energy Regen': {'channel': 'dt_kWh_regen', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'Min Load': {'channel': 'dt_min_load', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'Polished Rod HP': {'channel': 'dt_polished_rod_hp', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'Calculated Production': {'channel': 'dt_calculated_production', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'Projected Production': {'channel': 'dt_projected_production', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'Pump HP': {'channel': 'dt_pump_hp', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'Pump Intake Presure': {'channel': 'dt_pump_intake_pressure', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'Surface Stroke Length': {'channel': 'dt_surface_stroke_length', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'Tubing Movement': {'channel': 'dt_tubing_movement', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'Downhole Net Stroke': {'channel': 'dt_downhole_net_stroke', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'Average SPM': {'channel': 'dt_average_spm', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'Peak Load': {'channel': 'dt_peak_load', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'kWh': {'channel': 'dt_kWh', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'Percent Run': {'channel': 'dt_percent_run', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'Fluid Level': {'channel': 'dt_fluid_level', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'Lifting Cost': {'channel': 'dt_lifting_cost', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
'Full Card Production': {'channel': 'dt_full_card_production', 'last_value_sent': None, 'change_threshold': 1.5, 'last_send_ts': 0},
}
class start(threading.Thread, deviceBase):
def __init__(self, name=None, number=None, mac=None, Q=None, mcu=None, companyId=None, offset=None, mqtt=None, Nodes=None):
threading.Thread.__init__(self)
deviceBase.__init__(self, name=name, number=number, mac=mac, Q=Q, mcu=mcu, companyId=companyId, offset=offset, mqtt=mqtt, Nodes=Nodes)
self.daemon = True
self.forceSend = True
self.version = "3"
self.device_address = "http://192.168.1.30"
# self.device_address = "http://localhost"
self.cardLoopTimer = 600
self.finished = threading.Event()
threading.Thread.start(self)
self.status_changed = False
self.last_status = ""
self.last_card_send_time = 0
self.al_status_last = False
self.dl_status_last = False
# load stored Run Status ID's
try:
with open('runstatusIds.p', 'rb') as handle:
self.runstatusIds = pickle.load(handle)
print "found pickled Run Status ID dictionary: {0}".format(self.runstatusIds)
except:
print "couldn't load Run Status ID's from pickle"
self.runstatusIds = []
# load stored event ID's
try:
with open('eventIds.p', 'rb') as handle:
self.eventIds = pickle.load(handle)
print "found pickled eventID dictionary: {0}".format(self.eventIds)
except:
print "couldn't load event ID's from pickle"
self.eventIds = []
# load stored Well Test ID's
try:
with open('welltestIDs.p', 'rb') as handle:
self.welltestIDs = pickle.load(handle)
print "found pickled welltestIDs dictionary: {0}".format(self.welltestIDs)
except:
print "couldn't load well test ID's from pickle"
self.welltestIDs = []
# load stored Gauge Off ID's
try:
with open('gaugeoffIds.p', 'rb') as handle:
self.gaugeoffIds = pickle.load(handle)
print "found pickled gaugeoffIds dictionary: {0}".format(self.gaugeoffIds)
except:
print "couldn't load gauge off ID's from pickle"
self.gaugeoffIds = []
# load stored Fluid Shot ID's
try:
with open('fluidshotIDs.p', 'rb') as handle:
self.fluidshotIDs = pickle.load(handle)
print "found pickled fluidshotIDs dictionary: {0}".format(self.fluidshotIDs)
except:
print "couldn't load fluid shot ID's from pickle"
self.fluidshotIDs = []
# load stored note ID's
try:
with open('noteIDs.p', 'rb') as handle:
self.noteIDs = pickle.load(handle)
print "found pickled noteID dictionary: {0}".format(self.noteIDs)
except:
print "couldn't load note ID's from pickle"
self.noteIDs = []
# load stored last_card_id
try:
with open('last_card_id.p', 'rb') as handle:
self.last_card_id = pickle.load(handle)
print "found pickled last_card_id: {0}".format(self.last_card_id)
except:
print "couldn't load last_card_id from pickle"
self.last_card_id = 0
# this is a required function for all drivers, its goal is to upload some piece of data
# about your device so it can be seen on the web
def register(self):
channels["status"]["last_value"] = ""
def run(self):
runLoopStatus = "Startup"
while True:
try:
if self.forceSend:
print("!!!!!!!!!!!!!!! FORCE SEND !!!!!!!!!!!!!!!")
runLoopStatus = "checkStatus"
chk_status = self.checkStatus(self.last_status)
if chk_status:
self.last_status = chk_status
self.status_changed = True
runLoopStatus = "checkEvents"
self.checkEvents()
runLoopStatus = "checkNotes"
self.checkNotes()
runLoopStatus = "checkWellTests"
self.checkWellTests()
runLoopStatus = "checkFluidShots"
self.checkFluidShots()
runLoopStatus = "checkDailyTotals"
self.checkDailyTotals()
runLoopStatus = "checkGaugeOffData"
self.checkGaugeOffData()
runLoopStatus = "checkStoredValues"
self.checkStoredValues()
# runLoopStatus = "getDataLoggerStatus()"
# self.getDataLoggerStatus()
if self.status_changed:
runLoopStatus = "getLatestXCards"
self.forceSend = True
self.checkLatestCard(numCards=5)
else:
runLoopStatus = "checkLatestCard"
self.checkLatestCard()
# if self.forceSend or (checkBackupSkipped > checkBackupEvery):
# runLoopStatus = "checkBackup"
# self.checkBackup()
# checkBackupSkipped = 0
# checkBackupSkipped = checkBackupSkipped + 1
runLoopStatus = "Complete"
time.sleep(10)
self.forceSend = False
except Exception, e:
sleep_timer = 20
print "Error during {0} of run loop: {1}\nWill try again in {2} seconds...".format(runLoopStatus, e, sleep_timer)
traceback.print_exc()
time.sleep(sleep_timer)
def checkStatus(self, last_status):
global API_BASE_URL
try:
api_req = requests.get( '{}/api/run_status_log/?q={"order_by":[{"field":"created_on","direction":"desc"}]}'.format(API_BASE_URL))
if api_req.status_code == 200:
req_data = json.loads(api_req.text)
req_data['objects'].reverse()
for i in range(0,len(req_data['objects'])):
if int(req_data['objects'][i]['_id']) not in self.runstatusIds:
new_status = req_data['objects'][i]["run_status"]
timestamp = req_data['objects'][i]["created_on"]
self.sendtodb('status', new_status, timestamp)
self.runstatusIds.append(int(req_data['objects'][i]['_id']))
if len(self.runstatusIds) > 20:
del self.runstatusIds[0]
with open('runstatusIds.p', 'wb') as handle:
pickle.dump(self.runstatusIds, handle)
if req_data['objects'][-1:][0]['run_status'] != last_status:
print "Status has changed from {0} to {1} @ {2}".format(last_status, req_data['objects'][-1:][0]['run_status'], req_data['objects'][-1:][0]['created_on'])
return ['objects'][-1:][0]['run_status']
except Exception as e:
print "Error during checkStatus..."
print("++++++== TRACEBACK ==++++++")
traceback.print_exc()
return False
def checkEvents(self):
global API_BASE_URL
try:
api_req = requests.get( '{}/api/events/?q={"order_by":[{"field":"created_on","direction":"desc"}]}'.format(API_BASE_URL))
if api_req.status_code == 200:
req_data = json.loads(api_req.text)
req_data['objects'].reverse()
for i in range(0,len(req_data['objects'])):
if int(req_data['objects'][i]['_id']) not in self.eventIds:
timestamp = req_data['objects'][i]["created_on"]
self.sendtodbJSON('events', json.dumps(req_data['objects'][i]), timestamp)
self.eventIds.append(int(req_data['objects'][i]['_id']))
if len(self.eventIds) > 20:
del self.eventIds[0]
with open('eventIds.p', 'wb') as handle:
pickle.dump(self.eventIds, handle)
return True
except Exception as e:
print "Error during checkEvents..."
print("++++++== TRACEBACK ==++++++")
traceback.print_exc()
return False
def checkNotes(self):
global API_BASE_URL
try:
api_req = requests.get( '{}/api/notes/?q={"order_by":[{"field":"created_on","direction":"desc"}]}'.format(API_BASE_URL))
if api_req.status_code == 200:
req_data = json.loads(api_req.text)
req_data['objects'].reverse()
for i in range(0,len(req_data['objects'])):
if int(req_data['objects'][i]['_id']) not in self.noteIDs:
timestamp = req_data['objects'][i]["created_on"]
self.sendtodbJSON('notes', json.dumps(req_data['objects'][i]), timestamp)
self.noteIDs.append(int(req_data['objects'][i]['_id']))
if len(self.noteIDs) > 20:
del self.noteIDs[0]
with open('noteIDs.p', 'wb') as handle:
pickle.dump(self.noteIDs, handle)
return True
except Exception as e:
print "Error during checkNotes..."
print("++++++== TRACEBACK ==++++++")
traceback.print_exc()
return False
def checkFluidShots(self):
global API_BASE_URL
try:
api_req = requests.get( '{}/api/fluid_shots/?q={"order_by":[{"field":"created_on","direction":"desc"}]}'.format(API_BASE_URL))
if api_req.status_code == 200:
req_data = json.loads(api_req.text)
req_data['objects'].reverse()
for i in range(0,len(req_data['objects'])):
if int(req_data['objects'][i]['_id']) not in self.fluidshotIds:
timestamp = req_data['objects'][i]["created_on"]
self.sendtodbJSON('notes', json.dumps(req_data['objects'][i]), timestamp)
self.fluidshotIds.append(int(req_data['objects'][i]['_id']))
if len(self.fluidshotIds) > 20:
del self.fluidshotIds[0]
with open('fluidshotIds.p', 'wb') as handle:
pickle.dump(self.fluidshotIds, handle)
return True
except Exception as e:
print "Error during checkFluidShots..."
print("++++++== TRACEBACK ==++++++")
traceback.print_exc()
return False
def checkWellTests(self):
global API_BASE_URL
try:
api_req = requests.get( '{}/api/well_test/?q={"order_by":[{"field":"created_on","direction":"desc"}]}'.format(API_BASE_URL))
if api_req.status_code == 200:
req_data = json.loads(api_req.text)
req_data['objects'].reverse()
for i in range(0,len(req_data['objects'])):
if int(req_data['objects'][i]['_id']) not in self.welltestIDs:
timestamp = req_data['objects'][i]["created_on"]
self.sendtodbJSON('welltests', json.dumps(req_data['objects'][i]), timestamp)
self.welltestIDs.append(int(req_data['objects'][i]['_id']))
if len(self.welltestIDs) > 20:
del self.welltestIDs[0]
with open('welltestIDs.p', 'wb') as handle:
pickle.dump(self.welltestIDs, handle)
return True
except Exception as e:
print "Error during checkEvents..."
print("++++++== TRACEBACK ==++++++")
traceback.print_exc()
return False
def checkDailyTotals(self):
global API_BASE_URL, dt_channels
try:
api_req = requests.get("{}/api/today_totals".format(API_BASE_URL)).text
if api_req.status_code == 200:
req_data = json.loads(api_req.text)
for i in range(0, len(req_data)):
if req_data[i]['name'] in dt_channels:
if dt_channels[req_data[i]['name']]['last_value_sent'] is None:
self.sendtodb(dt_channels[req_data[i]['name']]['channel'], req_data[i]['value'], 0)
dt_channels[req_data[i]['name']]['last_value_sent'] = req_data[i]['value']
dt_channels[req_data[i]['name']]['last_send_ts'] = time.time()
elif abs(dt_channels[req_data[i]['name']]['last_value_sent'] - req_data[i]['value']) > dt_channels[req_data[i]['name']]['change_threshold']:
self.sendtodb(dt_channels[req_data[i]['name']]['channel'], req_data[i]['value'], 0)
dt_channels[req_data[i]['name']]['last_value_sent'] = req_data[i]['value']
dt_channels[req_data[i]['name']]['last_send_ts'] = time.time()
elif time.time() - dt_channels[req_data[i]['name']]['last_send_ts'] > 3600: # Send values every hour
self.sendtodb(dt_channels[req_data[i]['name']]['channel'], req_data[i]['value'], 0)
dt_channels[req_data[i]['name']]['last_value_sent'] = req_data[i]['value']
dt_channels[req_data[i]['name']]['last_send_ts'] = time.time()
except Exception as e:
print "Error during checkDailyTotals..."
print("++++++== TRACEBACK ==++++++")
traceback.print_exc()
def checkGaugeOffData(self):
global API_BASE_URL, go_channels
try:
api_req = requests.get( '{}/api/gauge_off/?q={"order_by":[{"field":"created_on","direction":"desc"}]}'.format(API_BASE_URL))
if api_req.status_code == 200:
req_data = json.loads(api_req.text)
req_data['objects'].reverse()
for i in range(0,len(req_data['objects'])):
if int(req_data['objects'][i]['_id']) not in self.gaugeoffIds:
timestamp = req_data['objects'][i]['created_on']
for col_name in req_data['objects'][i]:
if col_name in go_channels:
self.sendtodb(go_channels[col_name]['channel'], req_data['objects'][i][col_name], timestamp)
self.gaugeoffIds.append(int(req_data['objects'][i]['_id']))
if len(self.gaugeoffIds) > 20:
del self.gaugeoffIds[0]
with open('gaugeoffIds.p', 'wb') as handle:
pickle.dump(self.gaugeoffIds, handle)
return True
except Exception as e:
print "Error during checkGaugeOffData..."
print("++++++== TRACEBACK ==++++++")
traceback.print_exc()
return False
def checkStoredValues(self):
global API_BASE_URL, tag_channels
try:
api_req = requests.get( '{}/api/latest'.format(API_BASE_URL))
if api_req.status_code == 200:
req_data = json.loads(api_req.text)
for i in range(0, len(req_data)):
if req_data[i]['tag_name'] in tag_channels:
if tag_channels['tag_name']['last_value_sent'] is None:
self.sendtodb(tag_channels['tag_name']['channel'], req_data[i]['value'], req_data[i]['created_on'])
tag_channels['tag_name']['last_value_sent'] = req_data[i]['value']
elif req_data[i]['value'] != tag_channels['tag_name']['last_value_sent']:
self.sendtodb(tag_channels['tag_name']['channel'], req_data[i]['value'], req_data[i]['created_on'])
tag_channels['tag_name']['last_value_sent'] = req_data[i]['value']
except Exception as e:
print "Error during checkStoredValues..."
print("++++++== TRACEBACK ==++++++")
traceback.print_exc()
def checkLatestCard(self, numCards = 1):
global API_BASE_URL
try:
api_req = requests.get('{}/api/cards?q={"order_by":[{"field":"created_on","direction":"desc"}], "limit":{}}'.format(API_BASE_URL, numCards))
req_data = json.loads(api_req.text)['objects']
# check the card to see if its new
# 1. if its new send the folder/file_name to the card_history channel
# 2. if its new and its been 10 minutes since you last sent an entire card, then send up all of the data
for i in range(0, len(req_data)):
current_card = req_data[i]
if current_card['_id'] > self.last_card_id:
#2016-11-23T00:37:02.806026
dt = datetime.strptime(current_card['created_on'], '%y-%m-%dT%H-%M-%S.%f')
timestamp = calendar.timegm(dt.timetuple())
card_timestamp = int(time.mktime(dt.timetuple()))
print "New card detected @ {0}".format(datetime.strftime(datetime.fromtimestamp(timestamp), "%Y-%m-%d %H:%M:%S.%f"))
# set the last value = to current value and upload your data
self.sendtodb("card_history", current_card['_id'], timestamp)
self.last_card_id = current_card['_id']
with open('last_card_id.p', 'wb') as handle:
pickle.dump(self.last_card_id, handle)
# check the last time the card was updated
if (time.time() - self.last_card_send_time) > self.cardLoopTimer or self.status_changed or self.forceSend:
# its been 10 minutes, send the full upload
print "Either status has changed or last stored card is too old."
self.sendtodb("cardtype", current_card['stroke_type'], int(data_timestamp))
s_p = data["surf_pos"]
s_l = data["surf_lod"]
d_p = data["down_pos"]
d_l = data["down_lod"]
newSc = "["
newDc = "["
for i in range(len(s_p)):
try:
if s_p[i] is None:
continue
if s_p[i] != 0.0 and s_l[i] != 0.0:
newSc += "[" + str(round(s_p[i],3)) + ", " + str(round(s_l[i],3)) + "], "
except:
pass
newSc += "]"
for i in range(len(d_p)):
try:
if d_p[i] is None:
continue
if d_p[i] != 0.0 and d_l[i] != 0.0:
newDc += "[" + str(round(d_p[i], 3)) + ", " + str(round(d_l[i], 3)) + "], "
except:
pass
newDc += "]"
self.sendtodb("sc", newSc, card_timestamp)
self.sendtodb("dc", newDc, card_timestamp)
except Exception as e:
print "Error during checkLatestCard..."
print("++++++== TRACEBACK ==++++++")
traceback.print_exc()
# def getDataLoggerStatus(self):
# try:
# data = json.loads(requests.get(self.device_address + "/json/pythonstatus/").text)
# al_status = "Not OK"
# if data['status']['alarmLogger']:
# al_status = "OK"
#
# if al_status != self.al_status_last:
# self.sendtodb("alarmlogger_status", al_status, 0)
# self.al_status_last = al_status
#
# dl_status = "Not OK"
# if data['status']['dataLogger']:
# dl_status = "OK"
# if al_status != self.dl_status_last:
# self.sendtodb("datalogger_status", dl_status, 0)
# self.dl_status_last = dl_status
# except Exception, e:
# print("getDataLoggerStatus Error: {}".format(e))
def poc_get_card(self, name, value):
self.getcard(value)
def poc_sync(self, name, value):
self.sendtodb("connected", "true", 0)
return True
def poc_set_address(self, name, value):
self.device_address = value
return True
def poc_refresh_data(self, name, value):
self.forceSend = True
return True