diff --git a/dataLogger.py b/dataLogger.py new file mode 100644 index 0000000..4233382 --- /dev/null +++ b/dataLogger.py @@ -0,0 +1,371 @@ +#!/usr/bin/env python + + +import time +from pycomm.ab_comm.clx import Driver as ClxDriver +from tag.tag import Tag +from tag.alarm import AnalogAlarm +from tag.alarm import bitAlarm +import traceback +import json +import requests + +# DEFAULTS +web_address = "http://localhost:3000" +scan_rate = 30 # seconds +save_all = "test" # use True, False, or any string +plc_handshake_tags = {} +last_handshake_time = 0 + +# GLOBAL VARIABLES +device_types = {} +devices = [] +main_plc = {} + +# ---------- MAP FUNCTIONS ---------- # +maps = { + 'modeMap': { + 0: "Error", + 1: "Auto", + 2: "POC", + 3: "Timer", + 4: "Manual", + 5: "DH PID" + }, + 'card_type_map': { + 0: "Normal", + 1: "Shutdown", + 2: "Alarm", + 3: "Startup", + 4: "Low Fillage" + }, + 'statusMap': { + 0: 'Stopped', + 1: 'Running', + 2: 'Pumped Off', + 3: 'Faulted', + 4: 'Starting', + 5: 'Recovering', + 100: 'Read Error', + 1000: 'PLC Error', + 9999: 'No Response' + }, + 'conditionMap': { + 20: "Low", + 21: "High", + 24: "LoLo", + 25: "HiHi", + 32: "Input Failure", + 34: "Configuration Error", + 16: "Failure to Stop", + 17: "Failure to Start", + 18: "Drive Fault" + } +} +# ---------- TAGS ---------- # +stroke_tags = {} # Tags stored for every single stroke +history_tags = {} # Tags stored on value change or age +gaugeoff_tags = {} # Tags stored at gauge off +welltest_tags = {} # Tags stored at well test submit +bit_tags = {} +safety_tags = {} +custom_tags = {} + + +class Status(Tag): + def sendToDB(self): + post_data = {'status': self.value} + r = requests.post('{}/run_status'.format(web_address), data=post_data) + resp = json.loads(r.text) + print("Stored {} for {} at {}".format(resp['val'], self.name, resp['createdAt'])) + self.last_send_time = time.time() + + +def getDeviceTypes(): + global web_address + device_types = {} + try: + get_device_type_request = requests.get('{}/device_type'.format(web_address)) + device_types_json = json.loads(get_device_type_request.text) + for t in device_types_json: + device_types[t['id']] = t['dType'] + return device_types + except Exception, e: + print("Error getting tags: {}".format(e)) + return False + + +def readConfig(): + global web_address, scan_rate, save_all + try: + sr_req_data = 'where={"parameter": "scan_rate"}' + sr_req = requests.get('{}/config?{}'.format(web_address, sr_req_data)) + sr_try = json.loads(sr_req.text) + if len(sr_try) > 0: + scan_rate = int(sr_try[0]['val']) + except Exception, e: + print("Error getting scan rate: {}".format(e)) + print("I'll just use {} seconds as the scan rate...".format(scan_rate)) + + try: + sa_req_data = {"where": {"parameter": "save_all"}} + sa_req = requests.get('{}/config'.format(web_address), params=sa_req_data) + sa_try = json.loads(sa_req.text) + if len(sa_try) > 0: + if sa_try[0]['val'].lower() == "true": + save_all = True + elif sa_try[0]['val'].lower() == "false": + save_all = False + except Exception, e: + print("Error getting save-all: {}".format(e)) + print("I'll just use {} as the save-all parameter...".format(save_all)) + + return True + + +def readTag(addr, tag): + c = ClxDriver() + if c.open(addr): + try: + v = c.read_tag(tag) + return v + except Exception: + print("ERROR RETRIEVING TAG: {}".format(tag)) + c.close() + traceback.print_exc() + c.close() + + +def readArray(addr, arr, length): + c = ClxDriver() + if c.open(addr): + try: + v = c.read_array(arr, length) + return map(lambda x: x[1], v) + except Exception: + print("ERROR RETRIEVING ARRAY: {}".format(arr)) + err = c.get_status() + c.close() + print err + traceback.print_exc() + c.close() + + +def setupTags(): + global device_types, web_address, stroke_tags, gaugeoff_tags, history_tags, welltest_tags, custom_tags + try: + # Get tags stored in database + get_tag_request = requests.get('{}/tag'.format(web_address)) + tags = json.loads(get_tag_request.text) + for t in tags: + if t['tag_class']['class_type'] == 'stroke': + stroke_tags[t['name']] = Tag(t['name'], t['tag'], t['id'], t['data_type'], t['change_threshold'], t['guarantee_sec'], mapFn=t['map_function'], ip_address=t['deviceID']['address'], device_type=device_types[t['deviceID']['device_type']]) + + elif t['tag_class']['class_type'] == 'history': + history_tags[t['name']] = Tag(t['name'], t['tag'], t['id'], t['data_type'], t['change_threshold'], t['guarantee_sec'], mapFn=t['map_function'], ip_address=t['deviceID']['address'], device_type=device_types[t['deviceID']['device_type']]) + + elif t['tag_class']['class_type'] == 'gaugeoff': + gaugeoff_tags[t['name']] = Tag(t['name'], t['tag'], t['id'], t['data_type'], t['change_threshold'], t['guarantee_sec'], mapFn=t['map_function'], ip_address=t['deviceID']['address'], device_type=device_types[t['deviceID']['device_type']]) + + elif t['tag_class']['class_type'] == 'welltest': + welltest_tags[t['name']] = Tag(t['name'], t['tag'], t['id'], t['data_type'], t['change_threshold'], t['guarantee_sec'], mapFn=t['map_function'], ip_address=t['deviceID']['address'], device_type=device_types[t['deviceID']['device_type']]) + + elif t['tag_class']['class_type'] == 'custom': + custom_tags[t['name']] = Tag(t['name'], t['tag'], t['id'], t['data_type'], t['change_threshold'], t['guarantee_sec'], mapFn=t['map_function'], ip_address=t['deviceID']['address'], device_type=device_types[t['deviceID']['device_type']]) + + get_event_request = requests.get('{}/event_config'.format(web_address)) + events = json.loads(get_event_request.text) + for e in events: + if e['event_class']['event_class'] == 'analog': + safety_tags[e['name']] = AnalogAlarm(e['name'], e['tag'], e['id'], ip_address=e['deviceID']['address'], device_type=device_types[e['deviceID']['device_type']]) + elif e['event_class']['event_class'] == 'bit': + bit_tags[e['name']] = bitAlarm(e['name'], e['tag'], e['event_condition'], e['id'], ip_address=e['deviceID']['address'], device_type=device_types[e['deviceID']['device_type']]) + return True + except Exception, e: + print("Error getting tags: {}".format(e)) + return False + + +def getMainPLC(): + get_plc_req_data = {'where': {'id': 0}} + get_plc_request = requests.get('{}/device'.format(web_address), data=get_plc_req_data) + return json.loads(get_plc_request.text) + + +def readPoints(): + global main_plc + num_points = readTag(main_plc['address'], "Card_Past[1].Num_Points")[0] + surf_pos = readArray(main_plc['address'], "Card_Past[1].Surface_Position", num_points + 1)[1:] + surf_pos.append(surf_pos[0]) + surf_lod = readArray(main_plc['address'], "Card_Past[1].Surface_Load", num_points + 1)[1:] + surf_lod.append(surf_lod[0]) + down_pos = readArray(main_plc['address'], "Card_Past[1].Downhole_Position", num_points + 1)[1:] + down_pos.append(down_pos[0]) + down_lod = readArray(main_plc['address'], "Card_Past[1].Downhole_Load", num_points + 1)[1:] + down_lod.append(down_lod[0]) + return([surf_pos, surf_lod, down_pos, down_lod]) + + +def evalTapers(): + return True +# TODO: Read taper data +# global main_plc +# ts = time.time() +# numTapers = int(readTag(main_plc['address'], 'Card_Current.Params.Num_Tapers')[0]) +# for t in range(1, numTapers + 1): +# taper_length = readTag(main_plc['address'], 'Taper.Taper[{}].Setup.Length'.format(t))[0] +# taper_diameter = readTag(main_plc['address'], 'Taper.Taper[{}].Setup.Diameter'.format(t))[0] +# taper_material = readTag(main_plc['address'], 'Taper.Taper[{}].Setup.Material'.format(t))[0] +# if (taper_material == 1): +# taper_material = "Steel" +# elif (taper_material == 2): +# taper_material = "Fiberglass" +# +# tStr = "{{'taper':{}, 'length': {}, 'diameter': {}, 'material':'{}'}}".format(t, taper_length, taper_diameter, taper_material) +# tQuery = 'INSERT INTO well_config (tstamp, type, val) VALUES ({}, "taper", "{}")'.format(ts, tStr) +# print tQuery +# con.connect() +# cur = con.cursor() +# cur.execute(tQuery) +# con.commit() +# +# pump_diameter = readTag(main_plc['address'], 'UnitConfig.Pump_Diameter')[0] +# cfgQuery = "INSERT INTO well_config (tstamp, type, val) VALUES ({}, 'pump_diameter', '{}')".format(ts, pump_diameter) +# con.connect() +# cur = con.cursor() +# cur.execute(cfgQuery) +# con.commit() +# print "TAPER DATA READ!" +# return True + + +def main(): + global main_plc + + readConfig() + if setupTags(): + pass + else: + main() + + status = Status('run_status', 'Pump.Run_Status', 0, 'STRING', 0, 3600, mapFn=maps['statusMap'], ip_address=main_plc['address'], device_type=device_types[main_plc['device_type']]) + read_tapers = False + already_gauged_off = False + already_entered_well_test = False + last_stroke = 0 + last_status = "" + statusChanged = False + + while True: + try: + current_status = status.read("test") + statusChanged = not (current_status == last_status) + if statusChanged: + last_status = current_status + ############# + # CARD DATA # + ############# + stroke_tags['card_id'].read('test') + + if not (last_stroke == stroke_tags['card_id'].value): + sData = {} + last_stroke = stroke_tags['card_id'].value + for t in stroke_tags: + if not t == "card_id": + stroke_tags[t].read(True) + [sData['Surface_Position'], sData['Surface_Load'], sData['Downhole_Position'], sData['Downhole_Load']] = readPoints() + + sData["card_type"] = stroke_tags['card_type'].value + sData["card_id"] = stroke_tags['card_id'].value + sData['sp_string'] = ', '.join(map(str, sData['Surface_Position'])) + sData['sl_string'] = ', '.join(map(str, sData['Surface_Load'])) + sData['dp_string'] = ', '.join(map(str, sData['Downhole_Position'])) + sData['dl_string'] = ', '.join(map(str, sData['Downhole_Load'])) + + card_ins_req_data = { + 'card_id': sData['card_id'], + 'card_type': sData['card_type'], + 'surface_position': sData['sp_string'], + 'surface_load': sData['sl_string'], + 'downhole_position': sData['dp_string'], + 'downhole_load': sData['dl_string'] + } + r = requests.post('{}/card'.format(web_address), data=card_ins_req_data) + resp = json.loads(r.text) + print "CARD NUMBER {} READ AT {}!".format(resp["card_id"], resp['createdAt']) + + ################### + # HISTORICAL DATA # + ################### + + for hist in history_tags: + h = history_tags[hist] + h.read("test") + + for cust in custom_tags: + t = custom_tags[cust] + t.read("test") + + ############## + # TAPER DATA # + ############## + + update_taper = readTag(main_plc['address'], "Write_Tapers")[0] > 0 + if (update_taper == 0): + if read_tapers: + read_tapers = False + print "Update Tapers = False" + + if (update_taper and (not read_tapers)): + print "reading taper file" + read_tapers = evalTapers() + + ################## + # GAUGE OFF DATA # + ################## + gauge_off = readTag(main_plc['address'], "Gauge_Off_Command")[0] + if (gauge_off == 0): + if already_gauged_off: + already_gauged_off = False + print "Already gauged off... Setting gauge_off to False" + + if (gauge_off and (not already_gauged_off)): + print "Gauging off..." + for goff in gaugeoff_tags: + g = gaugeoff_tags[goff] + g.read(True) + + already_gauged_off = True + print "Gauged off!" + + ################## + # WELL TEST DATA # + ################## + + well_test_entered = readTag(main_plc['address'], "Well_Test.Test_Submit")[0] > 0 + if (well_test_entered == 0): + if already_entered_well_test: + already_entered_well_test = False + print "Already entered well Test... Setting well_test_entered to False" + if (well_test_entered and (not already_entered_well_test)): + for wtest in welltest_tags: + w = welltest_tags[wtest] + w.read(True) + already_entered_well_test = True + print "Well Test Stored!" + + ################### + # ALARMS & EVENTS # + ################### + for t in safety_tags: + safety_tags[t].checkStatus(stroke_tags['card_id'].value) + + for b in bit_tags: + bit_tags[b].checkStatus(stroke_tags['card_id'].value) + + time.sleep(.20) + except Exception, e: + print("Error during loop: {}".format(e)) + traceback.print_exc() +if __name__ == '__main__': + main()