Added POC driver for reading directly from PLC
This commit is contained in:
367
POCloud/direct/poc.py
Normal file
367
POCloud/direct/poc.py
Normal file
@@ -0,0 +1,367 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
import types
|
||||
import traceback
|
||||
import binascii
|
||||
import threading
|
||||
import time
|
||||
import thread
|
||||
import os
|
||||
import struct
|
||||
import sys
|
||||
import serial
|
||||
import minimalmodbus
|
||||
import pickle
|
||||
from device_base import deviceBase
|
||||
# from datetime import datetime
|
||||
from pycomm.ab_comm.clx import Driver as ClxDriver
|
||||
import logging
|
||||
from collections import deque
|
||||
|
||||
import requests
|
||||
try:
|
||||
import json
|
||||
except:
|
||||
import simplejson as json
|
||||
import calendar
|
||||
|
||||
data_source = "PLC"
|
||||
plc_ip = '192.168.1.10'
|
||||
|
||||
|
||||
def readTag(addr, tag):
|
||||
logging.basicConfig(
|
||||
filename="ClxDriver.log",
|
||||
format="%(levelname)-10s %(asctime)s %(message)s",
|
||||
level=logging.DEBUG
|
||||
)
|
||||
c = ClxDriver()
|
||||
|
||||
if c.open(addr):
|
||||
try:
|
||||
v = c.read_tag(tag)
|
||||
# print(v)
|
||||
return v
|
||||
except Exception:
|
||||
err = c.get_status()
|
||||
c.close()
|
||||
print err
|
||||
pass
|
||||
c.close()
|
||||
|
||||
|
||||
def writeTag(addr, tag, val):
|
||||
logging.basicConfig(
|
||||
filename="ClxDriver.log",
|
||||
format="%(levelname)-10s %(asctime)s %(message)s",
|
||||
level=logging.DEBUG
|
||||
)
|
||||
c = ClxDriver()
|
||||
|
||||
if c.open(addr):
|
||||
try:
|
||||
# typ = getTagType(addr, tag)
|
||||
cv = c.read_tag(tag)
|
||||
wt = c.write_tag(tag, val, cv[1])
|
||||
# print(wt)
|
||||
return wt
|
||||
except Exception:
|
||||
err = c.get_status()
|
||||
c.close()
|
||||
print err
|
||||
pass
|
||||
c.close()
|
||||
|
||||
|
||||
class Channel():
|
||||
global plc_ip
|
||||
|
||||
def __init__(self, mesh_name, plc_tag, data_type, chg_threshold, guarantee_sec):
|
||||
self.mesh_name = mesh_name
|
||||
self.plc_tag = plc_tag
|
||||
self.data_type = data_type
|
||||
self.last_value = None
|
||||
self.value = None
|
||||
self.last_send_time = 0
|
||||
self.chg_threshold = chg_threshold
|
||||
self.guarantee_sec = guarantee_sec
|
||||
|
||||
def read(self, forceSend):
|
||||
if self.plc_tag:
|
||||
v = readTag(plc_ip, self.plc_tag)
|
||||
if v:
|
||||
if self.data_type == 'boolean' or self.data_type == 'str':
|
||||
if (self.last_send_time == 0) or (self.value is None) or not (self.value == v[0]) or ((time.time() - self.last_send_time) > self.guarantee_sec) or (forceSend):
|
||||
self.last_value = self.value
|
||||
self.value = v[0]
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
if (self.last_send_time == 0) or (self.value is None) or (abs(self.value - v[0]) > self.chg_threshold) or ((time.time() - self.last_send_time) > self.guarantee_sec) or (forceSend):
|
||||
self.last_value = self.value
|
||||
self.value = v[0]
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
return False
|
||||
|
||||
go_channels = {
|
||||
"percent_run": Channel('go_percent_run', 'GAUGEOFF_Percent_Run', 'float', 0, 0),
|
||||
"kWh": Channel('go_kwh', 'GAUGEOFF_kWh', 'float', 0, 0),
|
||||
'kWh_regen': Channel('go_kwh_regen', 'GAUGEOFF_kWh_regen', 'float', 0, 0),
|
||||
"electricity_cost": Channel('go_electricity_cost', 'GAUGEOFF_Electricity_Cost', 'float', 0, 0),
|
||||
'peak_load': Channel('go_peak_load', 'GAUGEOFF_Max_Load', 'float', 0, 0),
|
||||
'min_load': Channel('go_min_load', 'GAUGEOFF_Min_Load', 'float', 0, 0),
|
||||
'polished_rod_HP': Channel('go_polished_rod_hp', 'GAUGEOFF_Polished_Rod_HP', 'float', 0, 0),
|
||||
'average_SPM': Channel('go_average_spm', "GAUGEOFF_Average_SPM", 'float', 0, 0),
|
||||
'lifting_cost': Channel('go_lifting_cost', "GAUGEOFF_Lifting_Cost", 'float', 0, 0),
|
||||
'full_card_production': Channel('go_full_card_production', "GAUGEOFF_Full_Card_Production", 'float', 0, 0),
|
||||
'fluid_above_pump': Channel('go_fluid_above_pump', 'GAUGEOFF_Fluid_Above_Pump', 'float', 0, 0),
|
||||
'production_calculated': Channel('go_production_calculated', 'GAUGEOFF_Production_Calculated', 'float', 0, 0),
|
||||
'inflow_rate': Channel('go_inflow_rate', 'GAUGEOFF_Inflow_Rate', 'float', 0, 0),
|
||||
'pump_intake_pressure': Channel('go_pump_intake_pressure', 'GAUGEOFF_pump_intake_pressure', 'float', 0, 0)
|
||||
}
|
||||
|
||||
statusCh = Channel('status', 'Pump.Run_Status', 'str', 0, 0)
|
||||
|
||||
channels = {
|
||||
'downhole_adjusted_gross_stroke': Channel('downhole_adjusted_gross_stroke', 'Card_Past[1].Downhole_AdjustedGrossStroke', 'float', 5.0, 3600),
|
||||
'downhole_fluid_load': Channel('downhole_fluid_load', 'Card_Past[1].Downhole_FluidLoad', 'float', 100.0, 3600),
|
||||
'downhole_gross_stroke': Channel('downhole_gross_stroke', 'Card_Past[1].Downhole_GrossStroke', 'float', 1.0, 3600),
|
||||
'downhole_max_position': Channel('downhole_max_position', 'Card_Past[1].Downhole_Max_Position.Position', 'float', 1.0, 3600),
|
||||
'downhole_min_position': Channel('downhole_min_position', 'Card_Past[1].Downhole_Min_Position.Position', 'float', 1.0, 3600),
|
||||
'downhole_net_stroke': Channel('downhole_net_stroke', 'Card_Past[1].Downhole_NetStroke', 'float', 1.0, 3600),
|
||||
'fillage_percent': Channel('fillage_percent', 'Card_Past[1].Fillage_Percent', 'float', 1.0, 3600),
|
||||
'fluid_above_pump': Channel('fluid_above_pump', 'Card_Past[1].Fluid_Above_Pump', 'float', 10.0, 3600),
|
||||
'fluid_gradient': Channel('fluid_gradient', 'Card_Past[1].Params.Fluid_Gradient', 'float', 0, 3600),
|
||||
'polished_rod_hp': Channel('polished_rod_hp', 'Card_Past[1].Polished_Rod_HP', 'float', 0.5, 3600),
|
||||
'pump_hp': Channel('pump_hp', 'Card_Past[1].Pump_HP', 'float', 0.5, 3600),
|
||||
'pump_intake_pressure': Channel('pump_intake_pressure', 'Card_Past[1].Pump_Intake_Pressure', 'float', 10.0, 3600),
|
||||
'stroke_production': Channel('stroke_production', 'Stroke_Production', 'float', 0.0005, 3600),
|
||||
'surface_max_load': Channel('surface_max_load', 'Card_Past[1].Surface_Max.Load', 'float', 100.0, 3600),
|
||||
'surface_min_load': Channel('surface_min_load', 'Card_Past[1].Surface_Min.Load', 'float', 100.0, 3600),
|
||||
'surface_stroke_length': Channel('surface_stroke_length', 'Card_Past[1].Surface_StrokeLength', 'float', 1.0, 3600),
|
||||
'tubing_movement': Channel('tubing_movement', 'Card_Past[1].Tubing_Movement', 'float', 1.0, 3600),
|
||||
'SPM': Channel('SPM', 'Card_Past[1].SPM', 'float', 0.5, 3600),
|
||||
'drive_torque_mode': Channel('drive_torque_mode', 'DriveTorqueMode', 'boolean', 0, 3600),
|
||||
'dt': Channel('dt', 'Card_Past[1].Params.dt', 'float', 0.001, 3600),
|
||||
'speed_reference': Channel('speed_reference', 'Pump_PF755.PSet_SpeedRef', 'float', 5.0, 3600),
|
||||
'stuffing_box_friction': Channel('stuffing_box_friction', 'Card_Past[1].Params.Stuffing_Box_Friction', 'float', 1.0, 3600),
|
||||
'torque_reference': Channel('torque_reference', 'PF755_Drive:O.TrqRefAStpt', 'float', 1.0, 3600),
|
||||
'tubing_head_pressure': Channel('tubing_head_pressure', 'Card_Past[1].Params.Tubing_Head_Pressure', 'float', 5.0, 3600),
|
||||
}
|
||||
|
||||
dt_channels = { # Current Daily Totals
|
||||
'Average_SPM': Channel('dt_average_spm', 'TODAY_Average_SPM', 'float', 0.5, 3600),
|
||||
'Calculated_Production': Channel('dt_calculated_production', 'TODAY_Production_Calculated', 'float', 10.0, 3600),
|
||||
'Downhole_Net_Stroke': Channel('dt_downhole_net_stroke', 'TODAY_Downhole_NetStroke', 'float', 1.0, 3600),
|
||||
'Electricity_Cost': Channel('dt_electricity_cost', 'TODAY_Electricity_Cost', 'float', 0.1, 3600),
|
||||
'Fluid_Level': Channel('dt_fluid_level', 'TODAY_Fluid_Above_Pump', 'float', 10.0, 3600),
|
||||
'Full_Card_Production': Channel('dt_full_card_production', 'TODAY_Full_Card_Production', 'float', 10.0, 3600),
|
||||
'Inflow_Rate': Channel('dt_inflow_rate', 'TODAY_Inflow_Rate', 'float', 10.0, 3600),
|
||||
'Lifting_Cost': Channel('dt_lifting_cost', 'TODAY_Lifting_Cost', 'float', 0.01, 3600),
|
||||
'Min_Load': Channel('dt_min_load', 'TODAY_Min_Load', 'float', 100.0, 3600),
|
||||
'Peak_Load': Channel('dt_peak_load', 'TODAY_Max_Load', 'float', 100.0, 3600),
|
||||
'Percent_Run': Channel('dt_percent_run', 'TODAY_Percent_Run', 'float', 1.0, 3600),
|
||||
'Polished_Rod_HP': Channel('dt_polished_rod_hp', 'TODAY_Polished_Rod_HP', 'float', 1.0, 3600),
|
||||
'Projected_Production': Channel('dt_projected_production', 'TODAY_Production_Projected', 'float', 5.0, 3600),
|
||||
'Pump_HP': Channel('dt_pump_hp', 'TODAY_Pump_HP', 'float', 1.0, 3600),
|
||||
'Pump_Intake_Presure': Channel('dt_pump_intake_pressure', 'TODAY_Pump_Intake_Pressure', 'float', 10.0, 3600),
|
||||
'Surface_Stroke_Length': Channel('dt_surface_stroke_length', 'TODAY_Surface_StrokeLength', 'float', 1.0, 3600),
|
||||
'Tubing_Movement': Channel('dt_tubing_movement', 'TODAY_Tubing_Movement', 'float', 1.00, 3600),
|
||||
'kWh': Channel('dt_kWh', 'TODAY_kWh', 'float', 10.0, 3600),
|
||||
'kWh_Regen': Channel('dt_kWh_regen', 'TODAY_kWh_Regen', 'float', 1.0, 3600)
|
||||
}
|
||||
|
||||
|
||||
class Card():
|
||||
global plc_ip
|
||||
|
||||
def __init__(self, unified_time):
|
||||
self.sc = []
|
||||
self.dc = []
|
||||
self.sent = False
|
||||
self.read_time = unified_time
|
||||
self.readCard()
|
||||
|
||||
def readCard(self):
|
||||
self.card_id = readTag(plc_ip, "Card_Past[1].ID")[0]
|
||||
self.num_points = int(readTag(plc_ip, "Card_Past[1].Num_Points")[0])
|
||||
for i in range(0, self.num_points):
|
||||
self.sc.append([round(float(readTag(plc_ip, 'Card_Past[1].Surface_Position[{}]'.format(i))[0]), 3), round(float(readTag(plc_ip, 'Card_Past[1].Surface_Load[{}]'.format(i))[0]), 3)])
|
||||
self.dc.append([round(float(readTag(plc_ip, 'Card_Past[1].Downhole_Position[{}]'.format(i))[0]), 3), round(float(readTag(plc_ip, 'Card_Past[1].Downhole_Load[{}]'.format(i))[0]), 3)])
|
||||
|
||||
def stringify(self):
|
||||
''' returns a list of two strings [surface card, downhole card]'''
|
||||
sc_str = "["
|
||||
dc_str = "["
|
||||
for i in range(0, self.num_points):
|
||||
sc_str = sc_str + "[{},{}],".format(self.sc[i][0], self.sc[i][1])
|
||||
dc_str = dc_str + "[{},{}],".format(self.dc[i][0], self.dc[i][1])
|
||||
sc_str = sc_str[:-1] + "]"
|
||||
dc_str = dc_str[:-1] + "]"
|
||||
return[sc_str, dc_str]
|
||||
|
||||
|
||||
class start(threading.Thread, deviceBase):
|
||||
|
||||
def __init__(self, name=None, number=None, mac=None, Q=None, mcu=None, companyId=None, offset=None, mqtt=None, Nodes=None):
|
||||
threading.Thread.__init__(self)
|
||||
deviceBase.__init__(self, name=name, number=number, mac=mac, Q=Q, mcu=mcu, companyId=companyId, offset=offset, mqtt=mqtt, Nodes=Nodes)
|
||||
|
||||
self.daemon = True
|
||||
self.forceSend = True
|
||||
self.version = "3"
|
||||
self.device_address = "http://192.168.1.30/"
|
||||
# self.device_address = "http://localhost/"
|
||||
self.cardLoopTimer = 600
|
||||
self.finished = threading.Event()
|
||||
threading.Thread.start(self)
|
||||
self.statusChanged = False
|
||||
self.al_status_last = False
|
||||
self.dl_status_last = False
|
||||
self.card_storage = deque([]) # array of the last x cards
|
||||
self.card_storage_limit = 5
|
||||
self.last_card_sent_time = 0
|
||||
|
||||
# load stored event ID's
|
||||
try:
|
||||
with open('eventIds.p', 'rb') as handle:
|
||||
self.eventIds = pickle.load(handle)
|
||||
|
||||
print "found pickled eventID dictionary: {0}".format(self.eventIds)
|
||||
except:
|
||||
print "couldn't load enent ID's from pickle"
|
||||
self.eventIds = []
|
||||
|
||||
# load stored wellconfig's
|
||||
try:
|
||||
with open('wellSetup.p', 'rb') as handle:
|
||||
self.wellSetup = pickle.load(handle)
|
||||
|
||||
print "Found pickled Well Setup (but it's going to be too long to print)"
|
||||
# print self.wellConfig
|
||||
except:
|
||||
print "couldn't load Well Setup from pickle"
|
||||
self.wellSetup = []
|
||||
|
||||
self.sendtodbJSON("device_address", self.device_address, 0)
|
||||
|
||||
# this is a required function for all drivers, its goal is to upload some piece of data
|
||||
# about your device so it can be seen on the web
|
||||
def register(self):
|
||||
channels["status"]["last_value"] = ""
|
||||
|
||||
def channelCheck(self, c, force):
|
||||
if c.read(force):
|
||||
self.sendtodbJSON(c.mesh_name, c.value, 0)
|
||||
|
||||
def run(self):
|
||||
self.runLoopStatus = ""
|
||||
while True:
|
||||
try:
|
||||
self.statusChanged = False
|
||||
runLoopStatus = "checkStatus"
|
||||
if self.checkStatus():
|
||||
self.statusChanged = True
|
||||
# TODO Add event logic here
|
||||
|
||||
runLoopStatus = "Daily Total Loop"
|
||||
for dt in dt_channels:
|
||||
self.channelCheck(dt_channels[dt], self.forceSend)
|
||||
|
||||
runLoopStatus = "checkGaugeOffData"
|
||||
# self.checkGaugeOffData()
|
||||
|
||||
|
||||
runLoopStatus = "Stroke Parameter Data"
|
||||
for ch in channels:
|
||||
self.channelCheck(channels[ch], self.forceSend)
|
||||
|
||||
runLoopStatus = "Reading Cards"
|
||||
if len(self.card_storage) > 0:
|
||||
if not readTag(plc_ip, "Card_Past[1].ID")[0] == self.card_storage[0].card_id:
|
||||
current_time = time.time()
|
||||
current_card = Card(current_time)
|
||||
self.sendtodbJSON("card_history", current_card.card_id, current_time)
|
||||
if (current_card.read_time - self.last_card_sent_time) > self.cardLoopTimer or self.forceSend:
|
||||
cards = current_card.stringify()
|
||||
self.sendtodbJSON("sc", cards[0], current_time)
|
||||
self.sendtodbJSON("dc", cards[1], current_time)
|
||||
current_card.sent = True
|
||||
self.card_storage.appendleft(current_card)
|
||||
while len(self.card_storage) > self.card_storage_limit:
|
||||
self.card_storage.pop()
|
||||
if self.statusChanged:
|
||||
for c in self.card_storage:
|
||||
if not c.sent:
|
||||
cstr = c.stringify()
|
||||
self.sendtodbJSON("sc", cstr[0], c.read_time)
|
||||
self.sendtodbJSON("dc", cstr[1], c.read_time)
|
||||
runLoopStatus = "Complete"
|
||||
time.sleep(3)
|
||||
self.forceSend = False
|
||||
except Exception, e:
|
||||
sleep_timer = 20
|
||||
print "Error during {0} of run loop: {1}\nWill try again in {2} seconds...".format(runLoopStatus, e, sleep_timer)
|
||||
time.sleep(sleep_timer)
|
||||
|
||||
def checkStatus(self):
|
||||
print("!! Made it to checkStatus")
|
||||
print(readTag)
|
||||
statusMap = {
|
||||
0: 'Stopped',
|
||||
1: 'Running',
|
||||
2: 'Pumped Off',
|
||||
3: 'Faulted',
|
||||
4: 'Starting',
|
||||
5: 'Recovering',
|
||||
100: 'Read Error',
|
||||
1000: 'PLC Error',
|
||||
9999: 'No Response'
|
||||
}
|
||||
status = statusMap[int(readTag(plc_ip, "Pump.Run_Status")[0])]
|
||||
|
||||
if status:
|
||||
date = time.time()
|
||||
if statusCh.last_value != status:
|
||||
self.statusChanged = True
|
||||
print "Status has changed from {0} to {1} @ {2}".format(statusCh.last_value, status, time.time())
|
||||
else:
|
||||
self.statusChanged = False
|
||||
return False
|
||||
|
||||
if self.statusChanged or self.forceSend:
|
||||
self.status = status
|
||||
self.sendtodb("status", status, date)
|
||||
statusCh.last_value = status
|
||||
return status
|
||||
|
||||
def checkEvents(self):
|
||||
data = json.loads(requests.get(self.device_address + "/json/event_list").text)
|
||||
events = data["events"]
|
||||
for event in events:
|
||||
if int(event["id"]) not in self.eventIds:
|
||||
timestamp = calendar.timegm(time.strptime(event["datetime"], '%Y-%m-%dT%H:%M:%S.%fZ'))
|
||||
# we have a new event
|
||||
self.sendtodbJSON("events", json.dumps(event), timestamp)
|
||||
self.eventIds.append(int(event["id"]))
|
||||
if len(self.eventIds) > 50:
|
||||
del self.eventIds[0]
|
||||
with open('eventIds.p', 'wb') as handle:
|
||||
pickle.dump(self.eventIds, handle)
|
||||
|
||||
def poc_sync(self, name, value):
|
||||
self.sendtodb("connected", "true", 0)
|
||||
return True
|
||||
|
||||
def poc_set_address(self, name, value):
|
||||
self.device_address = value
|
||||
return True
|
||||
|
||||
def poc_refresh_data(self, name, value):
|
||||
self.forceSend = True
|
||||
return True
|
||||
Reference in New Issue
Block a user