diff --git a/dataLogger.py b/dataLogger.py index 4fab31a..347ff5f 100644 --- a/dataLogger.py +++ b/dataLogger.py @@ -3,15 +3,19 @@ import time from pycomm.ab_comm.clx import Driver as ClxDriver -from tag.tag import Tag -from tag.alarm import AnalogAlarm -from tag.alarm import bitAlarm +from pycomm_helper.tag import Tag +from pycomm_helper.alarm import AnalogAlarm, bitAlarm import traceback import json import requests # DEFAULTS -web_address = "https://localhost:3000" +API_METHOD = "https" +API_ADDRESS = "localhost" +API_PORT = 5000 + +API_BASE_URL = "{}://{}:{}/api".format(API_METHOD, API_ADDRESS, API_PORT) + scan_rate = 30 # seconds save_all = "test" # use True, False, or any string plc_handshake_tags = {} @@ -65,10 +69,7 @@ maps = { 'null': None } # ---------- TAGS ---------- # -stroke_tags = {} # Tags stored for every single stroke -history_tags = {} # Tags stored on value change or age -gaugeoff_tags = {} # Tags stored at gauge off -welltest_tags = {} # Tags stored at well test submit +tag_list = {} bit_tags = {} safety_tags = {} custom_tags = {} @@ -76,54 +77,56 @@ custom_tags = {} class Status(Tag): def sendToDB(self): - post_data = {'status': self.value} - r = requests.post('{}/run_status'.format(web_address), data=post_data, verify=False) + global API_BASE_URL + post_data = {'run_status': self.value} + r = requests.post('{}/run_status_log'.format(API_BASE_URL), data=json.dumps(post_data), headers={'Content-Type': 'application/json'}, verify=False) resp = json.loads(r.text) - print("Stored {} for {} at {}".format(resp['status'], self.name, resp['createdAt'])) + print("Stored {} for Run Status at {}".format(resp['run_status'], self.name, resp['created_on'])) self.last_send_time = time.time() -def getDeviceTypes(): - global web_address - device_types = {} - try: - get_device_type_request = requests.get('{}/device_type'.format(web_address), verify=False) - device_types_json = json.loads(get_device_type_request.text) - for t in device_types_json: - device_types[t['id']] = t['dType'] - return device_types - except Exception, e: - print("Error getting tags: {}".format(e)) - return False - - def readConfig(): - global web_address, scan_rate, save_all - try: - sr_req_data = 'where={"parameter": "scan_rate"}' - sr_req = requests.get('{}/config?{}'.format(web_address, sr_req_data), verify=False) - sr_try = json.loads(sr_req.text) - if len(sr_try) > 0: - scan_rate = int(sr_try[0]['val']) - except Exception, e: - print("Error getting scan rate: {}".format(e)) - print("I'll just use {} seconds as the scan rate...".format(scan_rate)) + global API_BASE_URL, scan_rate, save_all + req = requests.get('{}/configs'.format(API_BASE_URL), verify=False) + res = json.loads(req.text)['objects'] - try: - sa_req_data = {"where": {"parameter": "save_all"}} - sa_req = requests.get('{}/config'.format(web_address), params=sa_req_data, verify=False) - sa_try = json.loads(sa_req.text) - if len(sa_try) > 0: - if sa_try[0]['val'].lower() == "true": - save_all = True - elif sa_try[0]['val'].lower() == "false": - save_all = False - except Exception, e: - print("Error getting save-all: {}".format(e)) - print("I'll just use {} as the save-all parameter...".format(save_all)) + if len(res) > 0: + for x in res: + if x['parameter'] == "scan_rate": + try: + scan_rate = int(x['val']) + except Exception as e: + print("Error setting scan_rate to {}".format(x['val'])) + + elif x['parameter'] == "save_all": + try: + if x['val'].lower() == 'true': + save_all = True + elif x['val'].lower() == 'true': + save_all = False + else: + print("Invalid save_all parameter: {}".format(x['val'])) + except Exception as e: + print("Error setting save_all to {}".format(x['val'])) + else: + print("No configuration data found.") return True +def getDeviceTypes(): + global API_BASE_URL + req = requests.get('{}/device_types'.format(API_BASE_URL), verify=False) + res = json.loads(req.text)['objects'] + device_types = {} + if len(res) > 0: + for x in res: + device_types[x['_id']] = x['device_type'] + return device_types + else: + print("No device type data found.") + + return False + def readTag(addr, tag): c = ClxDriver() @@ -148,65 +151,78 @@ def readArray(addr, arr, length): print("ERROR RETRIEVING ARRAY: {}".format(arr)) err = c.get_status() c.close() - print err + print(err) traceback.print_exc() c.close() def setupTags(): - global device_types, web_address, stroke_tags, gaugeoff_tags, history_tags, welltest_tags, custom_tags - try: - # Get tags stored in database - get_tag_request = requests.get('{}/tag'.format(web_address), verify=False) - tags = json.loads(get_tag_request.text) - for t in tags: - if t['tag_class']['class_type'] == 'stroke': - stroke_tags[t['name']] = Tag(t['name'], t['tag'], t['id'], t['data_type']['plc_type'], t['change_threshold'], t['guarantee_sec'], mapFn=maps[t['map_function']], ip_address=t['deviceID']['address'], device_type=device_types[t['deviceID']['device_type']]) + global device_types, API_BASE_URL, tag_list, safety_tags, bit_tags + # try: + # Get tags stored in database + get_tag_request = requests.get('{}/tags'.format(API_BASE_URL), verify=False) + tags = json.loads(get_tag_request.text)['objects'] + for t in tags: - elif t['tag_class']['class_type'] == 'history': - history_tags[t['name']] = Tag(t['name'], t['tag'], t['id'], t['data_type']['plc_type'], t['change_threshold'], t['guarantee_sec'], mapFn=maps[t['map_function']], ip_address=t['deviceID']['address'], device_type=device_types[t['deviceID']['device_type']]) + tag_list[t['name']] = Tag(t['name'], t['tag'], t['_id'], t['data_type'], t['change_threshold'], t['guarantee_sec'], mapFn=maps[t['map_function']], ip_address=t['device']['address'], device_type=device_types[t['device']['device_type_id']]) - elif t['tag_class']['class_type'] == 'gaugeoff': - gaugeoff_tags[t['name']] = Tag(t['name'], t['tag'], t['id'], t['data_type']['plc_type'], t['change_threshold'], t['guarantee_sec'], mapFn=maps[t['map_function']], ip_address=t['deviceID']['address'], device_type=device_types[t['deviceID']['device_type']]) - - elif t['tag_class']['class_type'] == 'welltest': - welltest_tags[t['name']] = Tag(t['name'], t['tag'], t['id'], t['data_type']['plc_type'], t['change_threshold'], t['guarantee_sec'], mapFn=maps[t['map_function']], ip_address=t['deviceID']['address'], device_type=device_types[t['deviceID']['device_type']]) - - elif t['tag_class']['class_type'] == 'custom': - custom_tags[t['name']] = Tag(t['name'], t['tag'], t['id'], t['data_type']['plc_type'], t['change_threshold'], t['guarantee_sec'], mapFn=maps[t['map_function']], ip_address=t['deviceID']['address'], device_type=device_types[t['deviceID']['device_type']]) - - get_event_request = requests.get('{}/event_config'.format(web_address), verify=False) - events = json.loads(get_event_request.text) - for e in events: - if e['event_class']['event_class'] == 'analog': - safety_tags[e['name']] = AnalogAlarm(e['name'], e['tag'], e['id'], ip_address=e['deviceID']['address'], device_type=device_types[e['deviceID']['device_type']]) - elif e['event_class']['event_class'] == 'bit': - bit_tags[e['name']] = bitAlarm(e['name'], e['tag'], e['event_condition'], e['id'], ip_address=e['deviceID']['address'], device_type=device_types[e['deviceID']['device_type']]) - return True - except Exception, e: - print("Error getting tags: {}".format(e)) - return False + get_event_request = requests.get('{}/event_configs'.format(API_BASE_URL), verify=False) + events = json.loads(get_event_request.text)['objects'] + for e in events: + if e['event_type'] == 'analog': + safety_tags[e['name']] = AnalogAlarm(e['name'], e['tag'], e['_id'], ip_address=e['device']['address'], device_type=device_types[e['device']['device_type_id']]) + elif e['event_type'] == 'bit': + bit_tags[e['name']] = bitAlarm(e['name'], e['tag'], e['condition'], e['_id'], ip_address=e['device']['address'], device_type=device_types[e['device']['device_type_id']]) + return True + # except Exception as e: + # print("Error getting tags: {}".format(e)) + # return False def getMainPLC(): - get_plc_req_data = {'where': {'id': 0}} - get_plc_request = requests.get('{}/device'.format(web_address), data=get_plc_req_data, verify=False) - return json.loads(get_plc_request.text)[0] + global API_BASE_URL + get_plc_request = requests.get('{}/devices'.format(API_BASE_URL), verify=False) + return json.loads(get_plc_request.text)['objects'][0] -def readPoints(): + + + +def readGaugeOffData(): global main_plc - num_points = readTag(main_plc['address'], "Card_Past[1].Num_Points")[0] - surf_pos = readArray(main_plc['address'], "Card_Past[1].Surface_Position", num_points + 1)[1:] - surf_pos.append(surf_pos[0]) - surf_lod = readArray(main_plc['address'], "Card_Past[1].Surface_Load", num_points + 1)[1:] - surf_lod.append(surf_lod[0]) - down_pos = readArray(main_plc['address'], "Card_Past[1].Downhole_Position", num_points + 1)[1:] - down_pos.append(down_pos[0]) - down_lod = readArray(main_plc['address'], "Card_Past[1].Downhole_Load", num_points + 1)[1:] - down_lod.append(down_lod[0]) - return([surf_pos, surf_lod, down_pos, down_lod]) + try: + gaugeOffData = { + 'spm_average': readTag(main_plc['address'], 'GAUGEOFF_Average_SPM')[0], + 'downhole_gross_stroke_average': readTag(main_plc['address'], 'GAUGEOFF_Downhole_GrossStroke')[0], + 'downhole_net_stroke_average': readTag(main_plc['address'], 'GAUGEOFF_Downhole_NetStroke')[0], + 'electricity_cost_total': readTag(main_plc['address'], 'GAUGEOFF_Electricity_Cost')[0], + 'fluid_level_average': readTag(main_plc['address'], 'GAUGEOFF_Fluid_Above_Pump')[0], + 'full_card_production_total': readTag(main_plc['address'], 'GAUGEOFF_Full_Card_Production')[0], + 'inflow_rate_average': readTag(main_plc['address'], 'GAUGEOFF_Inflow_Rate')[0], + 'kWh_used_total': readTag(main_plc['address'], 'GAUGEOFF_kWh')[0], + 'kWh_regen_total': readTag(main_plc['address'], 'GAUGEOFF_kWh_Regen')[0], + 'lifting_cost_average': readTag(main_plc['address'], 'GAUGEOFF_Lifting_Cost')[0], + 'peak_pr_load': readTag(main_plc['address'], 'GAUGEOFF_Max_Load')[0], + 'min_pr_load': readTag(main_plc['address'], 'GAUGEOFF_Min_Load')[0], + 'percent_run': readTag(main_plc['address'], 'GAUGEOFF_Percent_Run')[0], + 'polished_rod_hp_average': readTag(main_plc['address'], 'GAUGEOFF_Polished_Rod_HP')[0], + 'pump_hp_average': readTag(main_plc['address'], 'GAUGEOFF_Production_Calculated')[0], + 'production_total': readTag(main_plc['address'], 'GAUGEOFF_Pump_HP')[0], + 'pump_intake_pressure_average': readTag(main_plc['address'], 'GAUGEOFF_Pump_Intake_Pressure')[0], + 'surface_stroke_length_average': readTag(main_plc['address'], 'GAUGEOFF_Surface_StrokeLength')[0], + 'tubing_movement_average': readTag(main_plc['address'], 'GAUGEOFF_Tubing_Movement')[0] + } + except Exception as e: + print("Could not get all gauge off tags: {}".format(e)) + return False + post_req = requests.post(API_BASE_URL + "/gauge_off_vals", data=json.dumps(gaugeOffData), headers={'Content-Type': 'application/json'}, verify=False) + try: + post_res_id = json.loads(post_req.text)['_id'] + return True + except Exception as e: + print("Did not get a valid JSON object back, got: {}".format(post_req.text)) + return False def evalTapers(): return True @@ -225,7 +241,7 @@ def evalTapers(): # # tStr = "{{'taper':{}, 'length': {}, 'diameter': {}, 'material':'{}'}}".format(t, taper_length, taper_diameter, taper_material) # tQuery = 'INSERT INTO well_config (tstamp, type, val) VALUES ({}, "taper", "{}")'.format(ts, tStr) -# print tQuery +# print(tQuery) # con.connect() # cur = con.cursor() # cur.execute(tQuery) @@ -237,26 +253,82 @@ def evalTapers(): # cur = con.cursor() # cur.execute(cfgQuery) # con.commit() -# print "TAPER DATA READ!" +# print("TAPER DATA READ!") # return True +def readPoints(): + global main_plc + num_points = readTag(main_plc['address'], "Card_Past[1].Num_Points")[0] + surf_pos = readArray(main_plc['address'], "Card_Past[1].Surface_Position", num_points + 1)[1:] + surf_pos.append(surf_pos[0]) + surf_lod = readArray(main_plc['address'], "Card_Past[1].Surface_Load", num_points + 1)[1:] + surf_lod.append(surf_lod[0]) + down_pos = readArray(main_plc['address'], "Card_Past[1].Downhole_Position", num_points + 1)[1:] + down_pos.append(down_pos[0]) + down_lod = readArray(main_plc['address'], "Card_Past[1].Downhole_Load", num_points + 1)[1:] + down_lod.append(down_lod[0]) + return([surf_pos, surf_lod, down_pos, down_lod]) + + +def checkCardDataAndStore(last_card_id): + ''' + Check to see if a new stroke has been made and stores the stroke in the database. + Returns the current ID of the card + ''' + + global maps, main_plc, API_BASE_URL + + try: + current_card_id = readTag(main_plc['address'], 'Card_Past[1].ID')[0] + if not (last_card_id == current_card_id): + [surface_position, surface_load, downhole_position, downhole_load] = readPoints() + card_type = maps['card_type_map'][readTag(main_plc['address'], 'Card_Past[1].Card_Type')[0]] + + card_data = { + 'stroke_number': current_card_id, + 'stroke_type': card_type, + 'surf_pos': str(surface_position), + 'surf_lod': str(surface_load), + 'down_pos': str(downhole_position), + 'down_lod': str(downhole_load) + } + r = requests.post('{}/cards'.format(API_BASE_URL), data=json.dumps(card_data), headers={'Content-Type': 'application/json'}, verify=False) + resp = json.loads(r.text) + print("CARD NUMBER {} READ AT {}!".format(resp["stroke_number"], resp['created_on'])) + last_card_id = current_card_id + return current_card_id + else: + return last_card_id + except Exception as e: + print("Exception during checkCardDataAndStore: {}".format(e)) + return last_card_id + def main(): global main_plc, device_types main_plc = getMainPLC() - readConfig() + + rc_attempts = 0 + rc = readConfig() + while not rc and rc_attempts < 10: + rc = readConfig() + + device_type_attempts = 0 device_types = getDeviceTypes() + while not device_types and attempts < 10: + device_types = getDeviceTypes() + if setupTags(): pass else: print("Unable to read tags... Restarting.") main() - status = Status('run_status', 'Pump.Run_Status', 0, 'STRING', 0, 3600, mapFn=maps['statusMap'], ip_address=main_plc['address'], device_type=device_types[main_plc['device_type']['id']]) + status = Status('run_status', 'Pump.Run_Status', 0, 'STRING', 0, 3600, mapFn=maps['statusMap'], ip_address=main_plc['address'], device_type=device_types[main_plc['device_type']['_id']]) read_tapers = False already_gauged_off = False already_entered_well_test = False - last_stroke = 0 + last_card_id = 0 last_status = "" statusChanged = False @@ -266,97 +338,49 @@ def main(): statusChanged = not (current_status == last_status) if statusChanged: last_status = current_status - ############# - # CARD DATA # - ############# - stroke_tags['Card ID'].read('test') - if not (last_stroke == stroke_tags['Card ID'].value): - sData = {} - last_stroke = stroke_tags['Card ID'].value - for t in stroke_tags: - if not t == "Card ID": - stroke_tags[t].read(True) - [sData['Surface_Position'], sData['Surface_Load'], sData['Downhole_Position'], sData['Downhole_Load']] = readPoints() + last_card_id = checkCardDataAndStore(last_card_id) - sData["card_type"] = stroke_tags['Card Type'].value - sData["card_id"] = stroke_tags['Card ID'].value - sData['sp_string'] = ', '.join(map(str, sData['Surface_Position'])) - sData['sl_string'] = ', '.join(map(str, sData['Surface_Load'])) - sData['dp_string'] = ', '.join(map(str, sData['Downhole_Position'])) - sData['dl_string'] = ', '.join(map(str, sData['Downhole_Load'])) - - card_ins_req_data = { - 'card_id': sData['card_id'], - 'card_type': sData['card_type'], - 'surface_position': sData['sp_string'], - 'surface_load': sData['sl_string'], - 'downhole_position': sData['dp_string'], - 'downhole_load': sData['dl_string'] - } - r = requests.post('{}/card'.format(web_address), data=card_ins_req_data, verify=False) - resp = json.loads(r.text) - print "CARD NUMBER {} READ AT {}!".format(resp["card_id"], resp['createdAt']) - - ################### - # HISTORICAL DATA # - ################### - - for hist in history_tags: - h = history_tags[hist] - h.read("test") - - for cust in custom_tags: - t = custom_tags[cust] - t.read("test") - - ############## - # TAPER DATA # - ############## + # read tags in tag_list and store if values require saving + for t in tag_list: + tag = tag_list[t] + tag.read(save_all) + # check if taper data has changed and store taper parameters if it has update_taper = readTag(main_plc['address'], "Write_Tapers")[0] > 0 if (update_taper == 0): if read_tapers: read_tapers = False - print "Update Tapers = False" + print("Update Tapers = False") if (update_taper and (not read_tapers)): - print "reading taper file" + print("reading taper file") read_tapers = evalTapers() - ################## - # GAUGE OFF DATA # - ################## + # store gauge-off data once it is set gauge_off = readTag(main_plc['address'], "Gauge_Off_Command")[0] if (gauge_off == 0): if already_gauged_off: already_gauged_off = False - print "Already gauged off... Setting gauge_off to False" + print("Already gauged off... Setting gauge_off to False") if (gauge_off and (not already_gauged_off)): - print "Gauging off..." - for goff in gaugeoff_tags: - g = gaugeoff_tags[goff] - g.read(True) - - already_gauged_off = True - print "Gauged off!" - - ################## - # WELL TEST DATA # - ################## + print("Gauging off...") + already_gauged_off = readGaugeOffData() + print("Gauged off!") + # well_test_entered = readTag(main_plc['address'], "Well_Test.Test_Submit")[0] > 0 if well_test_entered: if already_entered_well_test: already_entered_well_test = False - print "Already entered well Test... Setting well_test_entered to False" + print("Already entered well Test... Setting well_test_entered to False") if (well_test_entered and (not already_entered_well_test)): for wtest in welltest_tags: w = welltest_tags[wtest] w.read(True) already_entered_well_test = True - print "Well Test Stored!" + print("Well Test Stored!") ################### # ALARMS & EVENTS # @@ -368,7 +392,7 @@ def main(): bit_tags[b].checkStatus(stroke_tags['Card ID'].value) time.sleep(.20) - except Exception, e: + except Exception as e: print("Error during loop: {}".format(e)) traceback.print_exc() if __name__ == '__main__':