diff --git a/.gitmodules b/.gitmodules index 7206881..663a721 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ [submodule "tag"] path = tag - url = http://patrickjmcd@bitbucket.poconsole.net/scm/poconsole/tag.git +url=ssh://git-codecommit.us-east-1.amazonaws.com/v1/repos/POCONSOLE-Tag diff --git a/dataLogger.py b/dataLogger.py new file mode 100644 index 0000000..265cbf8 --- /dev/null +++ b/dataLogger.py @@ -0,0 +1,407 @@ +#!/usr/bin/env python + + +import time +from pycomm.ab_comm.clx import Driver as ClxDriver +from pycomm_helper.tag import Tag +from pycomm_helper.alarm import AnalogAlarm, bitAlarm +import traceback +import json +import requests + +# DEFAULTS +API_METHOD = "https" +API_ADDRESS = "localhost" +API_PORT = 5000 + +API_BASE_URL = "{}://{}:{}/api".format(API_METHOD, API_ADDRESS, API_PORT) + +scan_rate = 30 # seconds +save_all = "test" # use True, False, or any string +plc_handshake_tags = {} +last_handshake_time = 0 + +# GLOBAL VARIABLES +device_types = {} +devices = [] +main_plc = {} + +# ---------- MAP FUNCTIONS ---------- # +maps = { + 'modeMap': { + 0: "Error", + 1: "Auto", + 2: "POC", + 3: "Timer", + 4: "Manual", + 5: "DH PID" + }, + 'card_type_map': { + 0: "Normal", + 1: "Shutdown", + 2: "Alarm", + 3: "Startup", + 4: "Low Fillage" + }, + 'statusMap': { + 0: 'Stopped', + 1: 'Running', + 2: 'Pumped Off', + 3: 'Faulted', + 4: 'Starting', + 5: 'Recovering', + 100: 'Read Error', + 1000: 'PLC Error', + 9999: 'No Response' + }, + 'conditionMap': { + 20: "Low", + 21: "High", + 24: "LoLo", + 25: "HiHi", + 32: "Input Failure", + 34: "Configuration Error", + 16: "Failure to Stop", + 17: "Failure to Start", + 18: "Drive Fault" + }, + None: None, + 'null': None +} +# ---------- TAGS ---------- # +tag_list = {} +bit_tags = {} +safety_tags = {} +custom_tags = {} + + +class Status(Tag): + def sendToDB(self): + global API_BASE_URL + post_data = {'run_status': self.value} + r = requests.post('{}/run_status_log'.format(API_BASE_URL), data=json.dumps(post_data), headers={'Content-Type': 'application/json'}, verify=False) + resp = json.loads(r.text) + print("Stored {} for Run Status at {}".format(resp['run_status'], self.name, resp['created_on'])) + self.last_send_time = time.time() + + +def readConfig(): + global API_BASE_URL, scan_rate, save_all + req = requests.get('{}/configs'.format(API_BASE_URL), verify=False) + res = json.loads(req.text)['objects'] + + if len(res) > 0: + for x in res: + if x['parameter'] == "scan_rate": + try: + scan_rate = int(x['val']) + except Exception as e: + print("Error setting scan_rate to {}".format(x['val'])) + + elif x['parameter'] == "save_all": + try: + if x['val'].lower() == 'true': + save_all = True + elif x['val'].lower() == 'true': + save_all = False + else: + print("Invalid save_all parameter: {}".format(x['val'])) + except Exception as e: + print("Error setting save_all to {}".format(x['val'])) + else: + print("No configuration data found.") + + return True + +def getDeviceTypes(): + global API_BASE_URL + req = requests.get('{}/device_types'.format(API_BASE_URL), verify=False) + res = json.loads(req.text)['objects'] + device_types = {} + if len(res) > 0: + for x in res: + device_types[x['_id']] = x['device_type'] + return device_types + else: + print("No device type data found.") + + return False + + +def readTag(addr, tag): + c = ClxDriver() + if c.open(addr): + try: + v = c.read_tag(tag) + return v + except Exception: + print("ERROR RETRIEVING TAG: {}".format(tag)) + c.close() + traceback.print_exc() + c.close() + + +def readArray(addr, arr, length): + c = ClxDriver() + if c.open(addr): + try: + v = c.read_array(arr, length) + return map(lambda x: x[1], v) + except Exception: + print("ERROR RETRIEVING ARRAY: {}".format(arr)) + err = c.get_status() + c.close() + print(err) + traceback.print_exc() + c.close() + + +def setupTags(): + global device_types, API_BASE_URL, tag_list, safety_tags, bit_tags + # try: + # Get tags stored in database + get_tag_request = requests.get('{}/tags'.format(API_BASE_URL), verify=False) + tags = json.loads(get_tag_request.text)['objects'] + for t in tags: + + tag_list[t['name']] = Tag(t['name'], t['tag'], t['_id'], t['data_type'], t['change_threshold'], t['guarantee_sec'], mapFn=maps[t['map_function']], ip_address=t['device']['address'], device_type=device_types[t['device']['device_type_id']]) + + get_event_request = requests.get('{}/event_configs'.format(API_BASE_URL), verify=False) + events = json.loads(get_event_request.text)['objects'] + for e in events: + if e['event_type'] == 'analog': + safety_tags[e['name']] = AnalogAlarm(e['name'], e['tag'], e['_id'], ip_address=e['device']['address'], device_type=device_types[e['device']['device_type_id']]) + elif e['event_type'] == 'bit': + bit_tags[e['name']] = bitAlarm(e['name'], e['tag'], e['condition'], e['_id'], ip_address=e['device']['address'], device_type=device_types[e['device']['device_type_id']]) + return True + # except Exception as e: + # print("Error getting tags: {}".format(e)) + # return False + + +def getMainPLC(): + global API_BASE_URL + get_plc_request = requests.get('{}/devices'.format(API_BASE_URL), verify=False) + return json.loads(get_plc_request.text)['objects'][0] + + + + + +def readGaugeOffData(): + global main_plc + try: + gaugeOffData = { + 'spm_average': readTag(main_plc['address'], 'GAUGEOFF_Average_SPM')[0], + 'downhole_gross_stroke_average': readTag(main_plc['address'], 'GAUGEOFF_Downhole_GrossStroke')[0], + 'downhole_net_stroke_average': readTag(main_plc['address'], 'GAUGEOFF_Downhole_NetStroke')[0], + 'electricity_cost_total': readTag(main_plc['address'], 'GAUGEOFF_Electricity_Cost')[0], + 'fluid_level_average': readTag(main_plc['address'], 'GAUGEOFF_Fluid_Above_Pump')[0], + 'full_card_production_total': readTag(main_plc['address'], 'GAUGEOFF_Full_Card_Production')[0], + 'inflow_rate_average': readTag(main_plc['address'], 'GAUGEOFF_Inflow_Rate')[0], + 'kWh_used_total': readTag(main_plc['address'], 'GAUGEOFF_kWh')[0], + 'kWh_regen_total': readTag(main_plc['address'], 'GAUGEOFF_kWh_Regen')[0], + 'lifting_cost_average': readTag(main_plc['address'], 'GAUGEOFF_Lifting_Cost')[0], + 'peak_pr_load': readTag(main_plc['address'], 'GAUGEOFF_Max_Load')[0], + 'min_pr_load': readTag(main_plc['address'], 'GAUGEOFF_Min_Load')[0], + 'percent_run': readTag(main_plc['address'], 'GAUGEOFF_Percent_Run')[0], + 'polished_rod_hp_average': readTag(main_plc['address'], 'GAUGEOFF_Polished_Rod_HP')[0], + 'pump_hp_average': readTag(main_plc['address'], 'GAUGEOFF_Production_Calculated')[0], + 'production_total': readTag(main_plc['address'], 'GAUGEOFF_Pump_HP')[0], + 'pump_intake_pressure_average': readTag(main_plc['address'], 'GAUGEOFF_Pump_Intake_Pressure')[0], + 'surface_stroke_length_average': readTag(main_plc['address'], 'GAUGEOFF_Surface_StrokeLength')[0], + 'tubing_movement_average': readTag(main_plc['address'], 'GAUGEOFF_Tubing_Movement')[0] + } + except Exception as e: + print("Could not get all gauge off tags: {}".format(e)) + return False + + post_req = requests.post(API_BASE_URL + "/gauge_off", data=json.dumps(gaugeOffData), headers={'Content-Type': 'application/json'}, verify=False) + try: + post_res_id = json.loads(post_req.text)['_id'] + return True + except Exception as e: + print("Did not get a valid JSON object back, got: {}".format(post_req.text)) + return False + +def evalTapers(): + return True +# TODO: Read taper data +# global main_plc +# ts = time.time() +# numTapers = int(readTag(main_plc['address'], 'Card_Current.Params.Num_Tapers')[0]) +# for t in range(1, numTapers + 1): +# taper_length = readTag(main_plc['address'], 'Taper.Taper[{}].Setup.Length'.format(t))[0] +# taper_diameter = readTag(main_plc['address'], 'Taper.Taper[{}].Setup.Diameter'.format(t))[0] +# taper_material = readTag(main_plc['address'], 'Taper.Taper[{}].Setup.Material'.format(t))[0] +# if (taper_material == 1): +# taper_material = "Steel" +# elif (taper_material == 2): +# taper_material = "Fiberglass" +# +# tStr = "{{'taper':{}, 'length': {}, 'diameter': {}, 'material':'{}'}}".format(t, taper_length, taper_diameter, taper_material) +# tQuery = 'INSERT INTO well_config (tstamp, type, val) VALUES ({}, "taper", "{}")'.format(ts, tStr) +# print(tQuery) +# con.connect() +# cur = con.cursor() +# cur.execute(tQuery) +# con.commit() +# +# pump_diameter = readTag(main_plc['address'], 'UnitConfig.Pump_Diameter')[0] +# cfgQuery = "INSERT INTO well_config (tstamp, type, val) VALUES ({}, 'pump_diameter', '{}')".format(ts, pump_diameter) +# con.connect() +# cur = con.cursor() +# cur.execute(cfgQuery) +# con.commit() +# print("TAPER DATA READ!") +# return True + +def readPoints(): + global main_plc + num_points = readTag(main_plc['address'], "Card_Past[1].Num_Points")[0] + surf_pos = readArray(main_plc['address'], "Card_Past[1].Surface_Position", num_points + 1)[1:] + if len(surf_pos) > 1: + surf_pos = [round(i,2) for i in surf_pos] + surf_pos.append(surf_pos[0]) + surf_lod = readArray(main_plc['address'], "Card_Past[1].Surface_Load", num_points + 1)[1:] + if len(surf_lod) > 1: + surf_lod = [round(i,2) for i in surf_lod] + surf_lod.append(surf_lod[0]) + down_pos = readArray(main_plc['address'], "Card_Past[1].Downhole_Position", num_points + 1)[1:] + if len(down_pos) > 1: + down_pos = [round(i,2) for i in down_pos] + down_pos.append(down_pos[0]) + down_lod = readArray(main_plc['address'], "Card_Past[1].Downhole_Load", num_points + 1)[1:] + if len(down_pos) > 1: + down_pos = [round(i,2) for i in down_pos] + down_lod.append(down_lod[0]) + return([surf_pos, surf_lod, down_pos, down_lod]) + + +def checkCardDataAndStore(last_card_id): + ''' + Check to see if a new stroke has been made and stores the stroke in the database. + Returns the current ID of the card + ''' + + global maps, main_plc, API_BASE_URL + + try: + current_card_id = readTag(main_plc['address'], 'Card_Past[1].ID')[0] + if not (last_card_id == current_card_id): + [surface_position, surface_load, downhole_position, downhole_load] = readPoints() + card_type = maps['card_type_map'][readTag(main_plc['address'], 'Card_Past[1].Card_Type')[0]] + + card_data = { + 'stroke_number': current_card_id, + 'stroke_type': card_type, + 'surf_pos': str(surface_position), + 'surf_lod': str(surface_load), + 'down_pos': str(downhole_position), + 'down_lod': str(downhole_load) + } + r = requests.post('{}/cards'.format(API_BASE_URL), data=json.dumps(card_data), headers={'Content-Type': 'application/json'}, verify=False) + resp = json.loads(r.text) + print("CARD NUMBER {} READ AT {}!".format(resp["stroke_number"], resp['created_on'])) + last_card_id = current_card_id + return current_card_id + else: + return last_card_id + except Exception as e: + print("Exception during checkCardDataAndStore: {}".format(e)) + return last_card_id + + +def main(): + global main_plc, device_types + main_plc = getMainPLC() + + rc_attempts = 0 + rc = readConfig() + while not rc and rc_attempts < 10: + rc = readConfig() + + device_type_attempts = 0 + device_types = getDeviceTypes() + while not device_types and attempts < 10: + device_types = getDeviceTypes() + + if setupTags(): + pass + else: + print("Unable to read tags... Restarting.") + main() + + status = Status('run_status', 'Pump.Run_Status', 0, 'STRING', 0, 3600, mapFn=maps['statusMap'], ip_address=main_plc['address'], device_type=device_types[main_plc['device_type']['_id']]) + read_tapers = False + already_gauged_off = False + already_entered_well_test = False + last_card_id = 0 + last_status = "" + statusChanged = False + + while True: + try: + current_status = status.read("test") + statusChanged = not (current_status == last_status) + if statusChanged: + last_status = current_status + + last_card_id = checkCardDataAndStore(last_card_id) + + # read tags in tag_list and store if values require saving + for t in tag_list: + tag = tag_list[t] + tag.read(save_all) + + # check if taper data has changed and store taper parameters if it has + update_taper = readTag(main_plc['address'], "Write_Tapers")[0] > 0 + if (update_taper == 0): + if read_tapers: + read_tapers = False + print("Update Tapers = False") + + if (update_taper and (not read_tapers)): + print("reading taper file") + read_tapers = evalTapers() + + # store gauge-off data once it is set + gauge_off = readTag(main_plc['address'], "Gauge_Off_Command")[0] + if (gauge_off == 0): + if already_gauged_off: + already_gauged_off = False + print("Already gauged off... Setting gauge_off to False") + + if (gauge_off and (not already_gauged_off)): + print("Gauging off...") + already_gauged_off = readGaugeOffData() + print("Gauged off!") + + # + well_test_entered = readTag(main_plc['address'], "Well_Test.Test_Submit")[0] > 0 + if well_test_entered: + if already_entered_well_test: + already_entered_well_test = False + print("Already entered well Test... Setting well_test_entered to False") + if (well_test_entered and (not already_entered_well_test)): + for wtest in welltest_tags: + w = welltest_tags[wtest] + w.read(True) + already_entered_well_test = True + print("Well Test Stored!") + + ################### + # ALARMS & EVENTS # + ################### + for t in safety_tags: + safety_tags[t].checkStatus(last_card_id) + + for b in bit_tags: + bit_tags[b].checkStatus(last_card_id) + + time.sleep(.20) + except Exception as e: + print("Error during loop: {}".format(e)) + traceback.print_exc() +if __name__ == '__main__': + main() diff --git a/dbMySQL/readConfig_MySQL.py b/dbMySQL/readConfig_MySQL.py index 311cc05..09b1c0c 100644 --- a/dbMySQL/readConfig_MySQL.py +++ b/dbMySQL/readConfig_MySQL.py @@ -7,6 +7,7 @@ with open(os.path.realpath('.') + '/mysql_cfg.pickle', 'rb') as cfgFile: mysql_cfg = pickle.load(cfgFile) con = mysqlcon.connect(**mysql_cfg) + def readConfig(): configProperties = {} configObj = {} diff --git a/init/loggers b/init/loggers new file mode 100644 index 0000000..1005ad3 --- /dev/null +++ b/init/loggers @@ -0,0 +1,37 @@ +#! /bin/sh +# /etc/init.d/loggers + +### BEGIN INIT INFO +# Provides: loggers +# Required-Start: $remote_fs $syslog +# Required-Stop: $remote_fs $syslog +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Short-Description: Simple script to start a program at boot +# Description: A simple script from www.stuffaboutcode.com which will start / stop a program a boot / shutdown. +### END INIT INFO + +# If you want a command to always run, put it here + +# Carry out specific functions when asked to by the system +case "$1" in + start) + echo "Starting loggers" + kill -9 $(cat /root/dataLogger.pid) + # run application you want to start + /usr/bin/python /root/datalogger/dataLogger.py > /dev/null 2>&1 & echo $! > "/root/dataLogger.pid" + + ;; + stop) + echo "Stopping loggers" + # kill application you want to stop + kill -9 $(cat /root/dataLogger.pid) + +;; + *) + echo "Usage: /etc/init.d/loggers {start|stop}" + exit 1 + ;; +esac + +exit 0 diff --git a/tag b/tag index 10c044e..a89ed46 160000 --- a/tag +++ b/tag @@ -1 +1 @@ -Subproject commit 10c044e3d2985e27512b01617402c72d63d08861 +Subproject commit a89ed46bdf570044dec4bac4f17e668dd7c79ab0