From d8e3382a8dc2bdabd74b548068e608b554bae04b Mon Sep 17 00:00:00 2001 From: Patrick McDonagh Date: Thu, 17 Mar 2016 17:52:51 -0500 Subject: [PATCH] Added POC driver for reading directly from PLC --- POCloud/direct/poc.py | 367 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 367 insertions(+) create mode 100644 POCloud/direct/poc.py diff --git a/POCloud/direct/poc.py b/POCloud/direct/poc.py new file mode 100644 index 0000000..7013d38 --- /dev/null +++ b/POCloud/direct/poc.py @@ -0,0 +1,367 @@ +#!/usr/bin/python + +import types +import traceback +import binascii +import threading +import time +import thread +import os +import struct +import sys +import serial +import minimalmodbus +import pickle +from device_base import deviceBase +# from datetime import datetime +from pycomm.ab_comm.clx import Driver as ClxDriver +import logging +from collections import deque + +import requests +try: + import json +except: + import simplejson as json +import calendar + +data_source = "PLC" +plc_ip = '192.168.1.10' + + +def readTag(addr, tag): + logging.basicConfig( + filename="ClxDriver.log", + format="%(levelname)-10s %(asctime)s %(message)s", + level=logging.DEBUG + ) + c = ClxDriver() + + if c.open(addr): + try: + v = c.read_tag(tag) + # print(v) + return v + except Exception: + err = c.get_status() + c.close() + print err + pass + c.close() + + +def writeTag(addr, tag, val): + logging.basicConfig( + filename="ClxDriver.log", + format="%(levelname)-10s %(asctime)s %(message)s", + level=logging.DEBUG + ) + c = ClxDriver() + + if c.open(addr): + try: + # typ = getTagType(addr, tag) + cv = c.read_tag(tag) + wt = c.write_tag(tag, val, cv[1]) + # print(wt) + return wt + except Exception: + err = c.get_status() + c.close() + print err + pass + c.close() + + +class Channel(): + global plc_ip + + def __init__(self, mesh_name, plc_tag, data_type, chg_threshold, guarantee_sec): + self.mesh_name = mesh_name + self.plc_tag = plc_tag + self.data_type = data_type + self.last_value = None + self.value = None + self.last_send_time = 0 + self.chg_threshold = chg_threshold + self.guarantee_sec = guarantee_sec + + def read(self, forceSend): + if self.plc_tag: + v = readTag(plc_ip, self.plc_tag) + if v: + if self.data_type == 'boolean' or self.data_type == 'str': + if (self.last_send_time == 0) or (self.value is None) or not (self.value == v[0]) or ((time.time() - self.last_send_time) > self.guarantee_sec) or (forceSend): + self.last_value = self.value + self.value = v[0] + return True + else: + return False + else: + if (self.last_send_time == 0) or (self.value is None) or (abs(self.value - v[0]) > self.chg_threshold) or ((time.time() - self.last_send_time) > self.guarantee_sec) or (forceSend): + self.last_value = self.value + self.value = v[0] + return True + else: + return False + else: + return False + return False + +go_channels = { + "percent_run": Channel('go_percent_run', 'GAUGEOFF_Percent_Run', 'float', 0, 0), + "kWh": Channel('go_kwh', 'GAUGEOFF_kWh', 'float', 0, 0), + 'kWh_regen': Channel('go_kwh_regen', 'GAUGEOFF_kWh_regen', 'float', 0, 0), + "electricity_cost": Channel('go_electricity_cost', 'GAUGEOFF_Electricity_Cost', 'float', 0, 0), + 'peak_load': Channel('go_peak_load', 'GAUGEOFF_Max_Load', 'float', 0, 0), + 'min_load': Channel('go_min_load', 'GAUGEOFF_Min_Load', 'float', 0, 0), + 'polished_rod_HP': Channel('go_polished_rod_hp', 'GAUGEOFF_Polished_Rod_HP', 'float', 0, 0), + 'average_SPM': Channel('go_average_spm', "GAUGEOFF_Average_SPM", 'float', 0, 0), + 'lifting_cost': Channel('go_lifting_cost', "GAUGEOFF_Lifting_Cost", 'float', 0, 0), + 'full_card_production': Channel('go_full_card_production', "GAUGEOFF_Full_Card_Production", 'float', 0, 0), + 'fluid_above_pump': Channel('go_fluid_above_pump', 'GAUGEOFF_Fluid_Above_Pump', 'float', 0, 0), + 'production_calculated': Channel('go_production_calculated', 'GAUGEOFF_Production_Calculated', 'float', 0, 0), + 'inflow_rate': Channel('go_inflow_rate', 'GAUGEOFF_Inflow_Rate', 'float', 0, 0), + 'pump_intake_pressure': Channel('go_pump_intake_pressure', 'GAUGEOFF_pump_intake_pressure', 'float', 0, 0) +} + +statusCh = Channel('status', 'Pump.Run_Status', 'str', 0, 0) + +channels = { + 'downhole_adjusted_gross_stroke': Channel('downhole_adjusted_gross_stroke', 'Card_Past[1].Downhole_AdjustedGrossStroke', 'float', 5.0, 3600), + 'downhole_fluid_load': Channel('downhole_fluid_load', 'Card_Past[1].Downhole_FluidLoad', 'float', 100.0, 3600), + 'downhole_gross_stroke': Channel('downhole_gross_stroke', 'Card_Past[1].Downhole_GrossStroke', 'float', 1.0, 3600), + 'downhole_max_position': Channel('downhole_max_position', 'Card_Past[1].Downhole_Max_Position.Position', 'float', 1.0, 3600), + 'downhole_min_position': Channel('downhole_min_position', 'Card_Past[1].Downhole_Min_Position.Position', 'float', 1.0, 3600), + 'downhole_net_stroke': Channel('downhole_net_stroke', 'Card_Past[1].Downhole_NetStroke', 'float', 1.0, 3600), + 'fillage_percent': Channel('fillage_percent', 'Card_Past[1].Fillage_Percent', 'float', 1.0, 3600), + 'fluid_above_pump': Channel('fluid_above_pump', 'Card_Past[1].Fluid_Above_Pump', 'float', 10.0, 3600), + 'fluid_gradient': Channel('fluid_gradient', 'Card_Past[1].Params.Fluid_Gradient', 'float', 0, 3600), + 'polished_rod_hp': Channel('polished_rod_hp', 'Card_Past[1].Polished_Rod_HP', 'float', 0.5, 3600), + 'pump_hp': Channel('pump_hp', 'Card_Past[1].Pump_HP', 'float', 0.5, 3600), + 'pump_intake_pressure': Channel('pump_intake_pressure', 'Card_Past[1].Pump_Intake_Pressure', 'float', 10.0, 3600), + 'stroke_production': Channel('stroke_production', 'Stroke_Production', 'float', 0.0005, 3600), + 'surface_max_load': Channel('surface_max_load', 'Card_Past[1].Surface_Max.Load', 'float', 100.0, 3600), + 'surface_min_load': Channel('surface_min_load', 'Card_Past[1].Surface_Min.Load', 'float', 100.0, 3600), + 'surface_stroke_length': Channel('surface_stroke_length', 'Card_Past[1].Surface_StrokeLength', 'float', 1.0, 3600), + 'tubing_movement': Channel('tubing_movement', 'Card_Past[1].Tubing_Movement', 'float', 1.0, 3600), + 'SPM': Channel('SPM', 'Card_Past[1].SPM', 'float', 0.5, 3600), + 'drive_torque_mode': Channel('drive_torque_mode', 'DriveTorqueMode', 'boolean', 0, 3600), + 'dt': Channel('dt', 'Card_Past[1].Params.dt', 'float', 0.001, 3600), + 'speed_reference': Channel('speed_reference', 'Pump_PF755.PSet_SpeedRef', 'float', 5.0, 3600), + 'stuffing_box_friction': Channel('stuffing_box_friction', 'Card_Past[1].Params.Stuffing_Box_Friction', 'float', 1.0, 3600), + 'torque_reference': Channel('torque_reference', 'PF755_Drive:O.TrqRefAStpt', 'float', 1.0, 3600), + 'tubing_head_pressure': Channel('tubing_head_pressure', 'Card_Past[1].Params.Tubing_Head_Pressure', 'float', 5.0, 3600), +} + +dt_channels = { # Current Daily Totals + 'Average_SPM': Channel('dt_average_spm', 'TODAY_Average_SPM', 'float', 0.5, 3600), + 'Calculated_Production': Channel('dt_calculated_production', 'TODAY_Production_Calculated', 'float', 10.0, 3600), + 'Downhole_Net_Stroke': Channel('dt_downhole_net_stroke', 'TODAY_Downhole_NetStroke', 'float', 1.0, 3600), + 'Electricity_Cost': Channel('dt_electricity_cost', 'TODAY_Electricity_Cost', 'float', 0.1, 3600), + 'Fluid_Level': Channel('dt_fluid_level', 'TODAY_Fluid_Above_Pump', 'float', 10.0, 3600), + 'Full_Card_Production': Channel('dt_full_card_production', 'TODAY_Full_Card_Production', 'float', 10.0, 3600), + 'Inflow_Rate': Channel('dt_inflow_rate', 'TODAY_Inflow_Rate', 'float', 10.0, 3600), + 'Lifting_Cost': Channel('dt_lifting_cost', 'TODAY_Lifting_Cost', 'float', 0.01, 3600), + 'Min_Load': Channel('dt_min_load', 'TODAY_Min_Load', 'float', 100.0, 3600), + 'Peak_Load': Channel('dt_peak_load', 'TODAY_Max_Load', 'float', 100.0, 3600), + 'Percent_Run': Channel('dt_percent_run', 'TODAY_Percent_Run', 'float', 1.0, 3600), + 'Polished_Rod_HP': Channel('dt_polished_rod_hp', 'TODAY_Polished_Rod_HP', 'float', 1.0, 3600), + 'Projected_Production': Channel('dt_projected_production', 'TODAY_Production_Projected', 'float', 5.0, 3600), + 'Pump_HP': Channel('dt_pump_hp', 'TODAY_Pump_HP', 'float', 1.0, 3600), + 'Pump_Intake_Presure': Channel('dt_pump_intake_pressure', 'TODAY_Pump_Intake_Pressure', 'float', 10.0, 3600), + 'Surface_Stroke_Length': Channel('dt_surface_stroke_length', 'TODAY_Surface_StrokeLength', 'float', 1.0, 3600), + 'Tubing_Movement': Channel('dt_tubing_movement', 'TODAY_Tubing_Movement', 'float', 1.00, 3600), + 'kWh': Channel('dt_kWh', 'TODAY_kWh', 'float', 10.0, 3600), + 'kWh_Regen': Channel('dt_kWh_regen', 'TODAY_kWh_Regen', 'float', 1.0, 3600) +} + + +class Card(): + global plc_ip + + def __init__(self, unified_time): + self.sc = [] + self.dc = [] + self.sent = False + self.read_time = unified_time + self.readCard() + + def readCard(self): + self.card_id = readTag(plc_ip, "Card_Past[1].ID")[0] + self.num_points = int(readTag(plc_ip, "Card_Past[1].Num_Points")[0]) + for i in range(0, self.num_points): + self.sc.append([round(float(readTag(plc_ip, 'Card_Past[1].Surface_Position[{}]'.format(i))[0]), 3), round(float(readTag(plc_ip, 'Card_Past[1].Surface_Load[{}]'.format(i))[0]), 3)]) + self.dc.append([round(float(readTag(plc_ip, 'Card_Past[1].Downhole_Position[{}]'.format(i))[0]), 3), round(float(readTag(plc_ip, 'Card_Past[1].Downhole_Load[{}]'.format(i))[0]), 3)]) + + def stringify(self): + ''' returns a list of two strings [surface card, downhole card]''' + sc_str = "[" + dc_str = "[" + for i in range(0, self.num_points): + sc_str = sc_str + "[{},{}],".format(self.sc[i][0], self.sc[i][1]) + dc_str = dc_str + "[{},{}],".format(self.dc[i][0], self.dc[i][1]) + sc_str = sc_str[:-1] + "]" + dc_str = dc_str[:-1] + "]" + return[sc_str, dc_str] + + +class start(threading.Thread, deviceBase): + + def __init__(self, name=None, number=None, mac=None, Q=None, mcu=None, companyId=None, offset=None, mqtt=None, Nodes=None): + threading.Thread.__init__(self) + deviceBase.__init__(self, name=name, number=number, mac=mac, Q=Q, mcu=mcu, companyId=companyId, offset=offset, mqtt=mqtt, Nodes=Nodes) + + self.daemon = True + self.forceSend = True + self.version = "3" + self.device_address = "http://192.168.1.30/" + # self.device_address = "http://localhost/" + self.cardLoopTimer = 600 + self.finished = threading.Event() + threading.Thread.start(self) + self.statusChanged = False + self.al_status_last = False + self.dl_status_last = False + self.card_storage = deque([]) # array of the last x cards + self.card_storage_limit = 5 + self.last_card_sent_time = 0 + + # load stored event ID's + try: + with open('eventIds.p', 'rb') as handle: + self.eventIds = pickle.load(handle) + + print "found pickled eventID dictionary: {0}".format(self.eventIds) + except: + print "couldn't load enent ID's from pickle" + self.eventIds = [] + + # load stored wellconfig's + try: + with open('wellSetup.p', 'rb') as handle: + self.wellSetup = pickle.load(handle) + + print "Found pickled Well Setup (but it's going to be too long to print)" + # print self.wellConfig + except: + print "couldn't load Well Setup from pickle" + self.wellSetup = [] + + self.sendtodbJSON("device_address", self.device_address, 0) + + # this is a required function for all drivers, its goal is to upload some piece of data + # about your device so it can be seen on the web + def register(self): + channels["status"]["last_value"] = "" + + def channelCheck(self, c, force): + if c.read(force): + self.sendtodbJSON(c.mesh_name, c.value, 0) + + def run(self): + self.runLoopStatus = "" + while True: + try: + self.statusChanged = False + runLoopStatus = "checkStatus" + if self.checkStatus(): + self.statusChanged = True + # TODO Add event logic here + + runLoopStatus = "Daily Total Loop" + for dt in dt_channels: + self.channelCheck(dt_channels[dt], self.forceSend) + + runLoopStatus = "checkGaugeOffData" + # self.checkGaugeOffData() + + + runLoopStatus = "Stroke Parameter Data" + for ch in channels: + self.channelCheck(channels[ch], self.forceSend) + + runLoopStatus = "Reading Cards" + if len(self.card_storage) > 0: + if not readTag(plc_ip, "Card_Past[1].ID")[0] == self.card_storage[0].card_id: + current_time = time.time() + current_card = Card(current_time) + self.sendtodbJSON("card_history", current_card.card_id, current_time) + if (current_card.read_time - self.last_card_sent_time) > self.cardLoopTimer or self.forceSend: + cards = current_card.stringify() + self.sendtodbJSON("sc", cards[0], current_time) + self.sendtodbJSON("dc", cards[1], current_time) + current_card.sent = True + self.card_storage.appendleft(current_card) + while len(self.card_storage) > self.card_storage_limit: + self.card_storage.pop() + if self.statusChanged: + for c in self.card_storage: + if not c.sent: + cstr = c.stringify() + self.sendtodbJSON("sc", cstr[0], c.read_time) + self.sendtodbJSON("dc", cstr[1], c.read_time) + runLoopStatus = "Complete" + time.sleep(3) + self.forceSend = False + except Exception, e: + sleep_timer = 20 + print "Error during {0} of run loop: {1}\nWill try again in {2} seconds...".format(runLoopStatus, e, sleep_timer) + time.sleep(sleep_timer) + + def checkStatus(self): + print("!! Made it to checkStatus") + print(readTag) + statusMap = { + 0: 'Stopped', + 1: 'Running', + 2: 'Pumped Off', + 3: 'Faulted', + 4: 'Starting', + 5: 'Recovering', + 100: 'Read Error', + 1000: 'PLC Error', + 9999: 'No Response' + } + status = statusMap[int(readTag(plc_ip, "Pump.Run_Status")[0])] + + if status: + date = time.time() + if statusCh.last_value != status: + self.statusChanged = True + print "Status has changed from {0} to {1} @ {2}".format(statusCh.last_value, status, time.time()) + else: + self.statusChanged = False + return False + + if self.statusChanged or self.forceSend: + self.status = status + self.sendtodb("status", status, date) + statusCh.last_value = status + return status + + def checkEvents(self): + data = json.loads(requests.get(self.device_address + "/json/event_list").text) + events = data["events"] + for event in events: + if int(event["id"]) not in self.eventIds: + timestamp = calendar.timegm(time.strptime(event["datetime"], '%Y-%m-%dT%H:%M:%S.%fZ')) + # we have a new event + self.sendtodbJSON("events", json.dumps(event), timestamp) + self.eventIds.append(int(event["id"])) + if len(self.eventIds) > 50: + del self.eventIds[0] + with open('eventIds.p', 'wb') as handle: + pickle.dump(self.eventIds, handle) + + def poc_sync(self, name, value): + self.sendtodb("connected", "true", 0) + return True + + def poc_set_address(self, name, value): + self.device_address = value + return True + + def poc_refresh_data(self, name, value): + self.forceSend = True + return True