260 lines
12 KiB
Python
260 lines
12 KiB
Python
#!/usr/bin/python
|
|
|
|
# import types
|
|
import traceback
|
|
# import binascii
|
|
import threading
|
|
import time
|
|
# import thread
|
|
# import os
|
|
# import struct
|
|
# import sys
|
|
# import serial
|
|
# import minimalmodbus
|
|
import pickle
|
|
from device_base import deviceBase
|
|
from datetime import datetime
|
|
import logging
|
|
from collections import deque
|
|
|
|
import requests
|
|
try:
|
|
import json
|
|
except:
|
|
import simplejson as json
|
|
import calendar
|
|
|
|
class Channel():
|
|
def __init__(self, ch_name, chg_threshold, max_age):
|
|
self.channel = ch_name
|
|
self.chg_treshold = chg_treshold,
|
|
self.max_age = max_age
|
|
self.last_sent = 0
|
|
self.value = float('inf')
|
|
|
|
def check(self, val, force=False):
|
|
if (abs(self.value - val) > self.chg_treshold) or ((time.time() - self.last_sent) > self.max_age) or force:
|
|
return True
|
|
return False
|
|
|
|
card_data_channels = {
|
|
'Downhole Adjusted Gross Stroke': {channel:'downhole_adjusted_gross_stroke', chg_threshold: 5.0, max_age: 3600},
|
|
'Downhole Fluid Load': {channel:'downhole_fluid_load', chg_threshold: 100.0, max_age: 3600},
|
|
'Downhole Gross Stroke': {channel:'downhole_gross_stroke', chg_threshold: 1.0, max_age: 3600},
|
|
'Downhole Max Position': {channel:'downhole_max_position', chg_threshold: 1.0, max_age: 3600},
|
|
'Downhole Min Position': {channel:'downhole_min_position', chg_threshold: 1.0, max_age: 3600},
|
|
'Downhole Net Stroke': {channel:'downhole_net_stroke', chg_threshold: 1.0, max_age: 3600},
|
|
'Fill Percentage': {channel:'fillage_percent', chg_threshold: 1.0, max_age: 3600},
|
|
'Fluid Level': {channel:'fluid_above_pump', chg_threshold: 10.0, max_age: 3600},
|
|
'Fluid Gradient': {channel:'fluid_gradient', chg_threshold: 1.0, max_age: 3600},
|
|
'Polished Rod HP': {channel:'polished_rod_hp', chg_threshold: 0.5, max_age: 3600},
|
|
'Pump HP': {channel:'pump_hp', chg_threshold: 0.5, max_age: 3600},
|
|
'Pump Intake Pressure': {channel:'pump_intake_pressure', chg_threshold: 10.0, max_age: 3600},
|
|
'Stroke Production': {channel:'stroke_production', chg_threshold: 0.005, max_age: 3600},
|
|
'Surface Max Load': {channel:'surface_max_load', chg_threshold: 100.0, max_age: 3600},
|
|
'Surface Min Load': {channel:'surface_min_load', chg_threshold: 100.0, max_age: 3600},
|
|
'Surface Stroke Length': {channel:'surface_stroke_length', chg_threshold: 1.0, max_age: 3600},
|
|
'Tubing Movement': {channel:'tubing_movement', chg_threshold: 1.0, max_age: 3600},
|
|
'SPM': {channel:'SPM', chg_threshold: 0.5, max_age: 3600},
|
|
'dt': {channel:'dt', chg_threshold: 0.01, max_age: 3600},
|
|
'Stuffing Box Friction': {channel:'stuffing_box_friction', chg_threshold: 1.0, max_age: 3600},
|
|
'Tubing Head Pressure': {channel:'tubing_head_pressure', chg_threshold: 5.0, max_age: 3600}
|
|
}
|
|
|
|
|
|
# go_channels = {
|
|
# "percent_run": Channel('go_percent_run', 'GAUGEOFF_Percent_Run', 'float', 0, 0),
|
|
# "kWh": Channel('go_kwh', 'GAUGEOFF_kWh', 'float', 0, 0),
|
|
# 'kWh_regen': Channel('go_kwh_regen', 'GAUGEOFF_kWh_regen', 'float', 0, 0),
|
|
# "electricity_cost": Channel('go_electricity_cost', 'GAUGEOFF_Electricity_Cost', 'float', 0, 0),
|
|
# 'peak_load': Channel('go_peak_load', 'GAUGEOFF_Max_Load', 'float', 0, 0),
|
|
# 'min_load': Channel('go_min_load', 'GAUGEOFF_Min_Load', 'float', 0, 0),
|
|
# 'polished_rod_HP': Channel('go_polished_rod_hp', 'GAUGEOFF_Polished_Rod_HP', 'float', 0, 0),
|
|
# 'average_SPM': Channel('go_average_spm', "GAUGEOFF_Average_SPM", 'float', 0, 0),
|
|
# 'lifting_cost': Channel('go_lifting_cost', "GAUGEOFF_Lifting_Cost", 'float', 0, 0),
|
|
# 'full_card_production': Channel('go_full_card_production', "GAUGEOFF_Full_Card_Production", 'float', 0, 0),
|
|
# 'fluid_above_pump': Channel('go_fluid_above_pump', 'GAUGEOFF_Fluid_Above_Pump', 'float', 0, 0),
|
|
# 'production_calculated': Channel('go_production_calculated', 'GAUGEOFF_Production_Calculated', 'float', 0, 0),
|
|
# 'inflow_rate': Channel('go_inflow_rate', 'GAUGEOFF_Inflow_Rate', 'float', 0, 0),
|
|
# 'pump_intake_pressure': Channel('go_pump_intake_pressure', 'GAUGEOFF_pump_intake_pressure', 'float', 0, 0)
|
|
# }
|
|
#
|
|
#
|
|
# dt_channels = { # Current Daily Totals
|
|
# 'Average_SPM': Channel('dt_average_spm', 'TODAY_Average_SPM', 'float', 0.5, 3600),
|
|
# 'Calculated_Production': Channel('dt_calculated_production', 'TODAY_Production_Calculated', 'float', 10.0, 3600),
|
|
# 'Downhole_Net_Stroke': Channel('dt_downhole_net_stroke', 'TODAY_Downhole_NetStroke', 'float', 1.0, 3600),
|
|
# 'Electricity_Cost': Channel('dt_electricity_cost', 'TODAY_Electricity_Cost', 'float', 0.1, 3600),
|
|
# 'Fluid_Level': Channel('dt_fluid_level', 'TODAY_Fluid_Above_Pump', 'float', 10.0, 3600),
|
|
# 'Full_Card_Production': Channel('dt_full_card_production', 'TODAY_Full_Card_Production', 'float', 10.0, 3600),
|
|
# 'Inflow_Rate': Channel('dt_inflow_rate', 'TODAY_Inflow_Rate', 'float', 10.0, 3600),
|
|
# 'Lifting_Cost': Channel('dt_lifting_cost', 'TODAY_Lifting_Cost', 'float', 0.01, 3600),
|
|
# 'Min_Load': Channel('dt_min_load', 'TODAY_Min_Load', 'float', 100.0, 3600),
|
|
# 'Peak_Load': Channel('dt_peak_load', 'TODAY_Max_Load', 'float', 100.0, 3600),
|
|
# 'Percent_Run': Channel('dt_percent_run', 'TODAY_Percent_Run', 'float', 1.0, 3600),
|
|
# 'Polished_Rod_HP': Channel('dt_polished_rod_hp', 'TODAY_Polished_Rod_HP', 'float', 1.0, 3600),
|
|
# 'Projected_Production': Channel('dt_projected_production', 'TODAY_Production_Projected', 'float', 5.0, 3600),
|
|
# 'Pump_HP': Channel('dt_pump_hp', 'TODAY_Pump_HP', 'float', 1.0, 3600),
|
|
# 'Pump_Intake_Presure': Channel('dt_pump_intake_pressure', 'TODAY_Pump_Intake_Pressure', 'float', 10.0, 3600),
|
|
# 'Surface_Stroke_Length': Channel('dt_surface_stroke_length', 'TODAY_Surface_StrokeLength', 'float', 1.0, 3600),
|
|
# 'Tubing_Movement': Channel('dt_tubing_movement', 'TODAY_Tubing_Movement', 'float', 1.00, 3600),
|
|
# 'kWh': Channel('dt_kWh', 'TODAY_kWh', 'float', 10.0, 3600),
|
|
# 'kWh_Regen': Channel('dt_kWh_regen', 'TODAY_kWh_Regen', 'float', 1.0, 3600)
|
|
# }
|
|
|
|
|
|
|
|
|
|
class start(threading.Thread, deviceBase):
|
|
|
|
def __init__(self, name=None, number=None, mac=None, Q=None, mcu=None, companyId=None, offset=None, mqtt=None, Nodes=None):
|
|
threading.Thread.__init__(self)
|
|
deviceBase.__init__(self, name=name, number=number, mac=mac, Q=Q, mcu=mcu, companyId=companyId, offset=offset, mqtt=mqtt, Nodes=Nodes)
|
|
|
|
self.daemon = True
|
|
self.forceSend = True
|
|
self.version = "3"
|
|
self.device_address = "https://192.168.1.30:3000"
|
|
self.cardLoopTimer = 60
|
|
self.finished = threading.Event()
|
|
threading.Thread.start(self)
|
|
self.last_status = ""
|
|
self.statusChanged = False
|
|
self.last_stored_card_date = 0
|
|
|
|
# load stored event ID's
|
|
try:
|
|
with open('eventIds.p', 'rb') as handle:
|
|
self.eventIds = pickle.load(handle)
|
|
|
|
print "found pickled eventID dictionary: {0}".format(self.eventIds)
|
|
except:
|
|
print "couldn't load enent ID's from pickle"
|
|
self.eventIds = []
|
|
|
|
# load stored wellconfig's
|
|
try:
|
|
with open('wellSetup.p', 'rb') as handle:
|
|
self.wellSetup = pickle.load(handle)
|
|
|
|
print "Found pickled Well Setup (but it's going to be too long to print)"
|
|
# print self.wellConfig
|
|
except:
|
|
print "couldn't load Well Setup from pickle"
|
|
self.wellSetup = []
|
|
|
|
self.sendtodbJSON("device_address", self.device_address, 0)
|
|
|
|
# this is a required function for all drivers, its goal is to upload some piece of data
|
|
# about your device so it can be seen on the web
|
|
def register(self):
|
|
channels["status"]["last_value"] = ""
|
|
|
|
def sendCard(self, card):
|
|
card_date = int(datetime.strptime(card['createdAt'], '%Y-%m-%dT%H:%M:%S.%fZ').strftime('%s'))
|
|
self.sendtodb("card_history", card['card_id'], card_date)
|
|
s_p = map(float, card['surface_position'].split(", "))
|
|
s_l = map(float, card['surface_load'].split(", "))
|
|
d_p = map(float, card['downhole_position'].split(", "))
|
|
d_l = map(float, card['downhole_load'].split(", "))
|
|
|
|
surf_string = "["
|
|
for i in range(0,min(len(s_p), len(s_l))):
|
|
surf_string += "[{},{}],".format(round(s_p[i],3), round(s_l[i],3))
|
|
surf_string = surf_string[:-1] + "]"
|
|
self.sendtodb('sc', surf_string, card_date)
|
|
|
|
down_string = "["
|
|
for i in range(0,min(len(d_p), len(d_l))):
|
|
down_string += "[{},{}],".format(round(d_p[i],3), round(d_l[i],3))
|
|
down_string = down_string[:-1] + "]"
|
|
self.sendtodb('dc', down_string, card_date)
|
|
|
|
def run(self):
|
|
self.runLoopStatus = ""
|
|
while True:
|
|
try:
|
|
self.statusChanged = False
|
|
runLoopStatus = "checkStatus"
|
|
if self.checkStatus():
|
|
self.statusChanged = True
|
|
# TODO Add event logic here
|
|
|
|
# runLoopStatus = "Daily Total Loop"
|
|
# for dt in dt_channels:
|
|
# self.channelCheck(dt_channels[dt], self.forceSend)
|
|
|
|
# runLoopStatus = "checkGaugeOffData"
|
|
# if gaugeOffCh.read(self.forceSend):
|
|
# for go in go_channels:
|
|
# self.channelCheck(go_channels[go], self.forceSend)
|
|
|
|
|
|
# runLoopStatus = "Stroke Parameter Data"
|
|
# for ch in channels:
|
|
# self.channelCheck(channels[ch], self.forceSend)
|
|
|
|
runLoopStatus = "Reading Cards"
|
|
c_req = requests.get('{}/card/latest'.format(self.device_address), verify=False)
|
|
latest_card = json.loads(c_req.text)[0]
|
|
card_date = int(datetime.strptime(latest_card['createdAt'], '%Y-%m-%dT%H:%M:%S.%fZ').strftime('%s'))
|
|
if ((card_date - self.last_stored_card_date) > self.cardLoopTimer) or self.statusChanged or self.forceSend:
|
|
self.sendCard(latest_card)
|
|
self.last_stored_card_date = card_date
|
|
|
|
runLoopStatus = "Complete"
|
|
time.sleep(3)
|
|
self.forceSend = False
|
|
except Exception, e:
|
|
sleep_timer = 20
|
|
print "Error during {0} of run loop: {1}\nWill try again in {2} seconds...".format(runLoopStatus, e, sleep_timer)
|
|
traceback.print_exc()
|
|
time.sleep(sleep_timer)
|
|
|
|
def checkStatus(self):
|
|
s_req = requests.get('{}/run_status/current'.format(self.device_address), verify=False)
|
|
status_result = json.loads(s_req.text)[0]
|
|
status = status_result['status']
|
|
|
|
if status:
|
|
date = time.time()
|
|
if self.last_status != status:
|
|
self.statusChanged = True
|
|
print "Status has changed from {0} to {1} @ {2}".format(self.last_status, status, time.time())
|
|
self.last_status = status
|
|
else:
|
|
self.statusChanged = False
|
|
return False
|
|
|
|
if self.statusChanged or self.forceSend:
|
|
self.status = status
|
|
self.sendtodb("status", status, date)
|
|
self.last_status = status
|
|
return status
|
|
|
|
def checkEvents(self):
|
|
data = json.loads(requests.get(self.device_address + "/json/event_list").text)
|
|
events = data["events"]
|
|
for event in events:
|
|
if int(event["id"]) not in self.eventIds:
|
|
timestamp = calendar.timegm(time.strptime(event["datetime"], '%Y-%m-%dT%H:%M:%S.%fZ'))
|
|
# we have a new event
|
|
self.sendtodbJSON("events", json.dumps(event), timestamp)
|
|
self.eventIds.append(int(event["id"]))
|
|
if len(self.eventIds) > 50:
|
|
del self.eventIds[0]
|
|
with open('eventIds.p', 'wb') as handle:
|
|
pickle.dump(self.eventIds, handle)
|
|
|
|
def poc_sync(self, name, value):
|
|
self.sendtodb("connected", "true", 0)
|
|
return True
|
|
|
|
def poc_set_address(self, name, value):
|
|
self.device_address = value
|
|
return True
|
|
|
|
def poc_refresh_data(self, name, value):
|
|
self.forceSend = True
|
|
return True
|