""" Tag Logger. Created on April 7, 2016 @author: Patrick McDonagh @description: Continuously loops through a list of tags to store values from a PLC """ import traceback import time import json import requests from requests.packages.urllib3.exceptions import InsecureRequestWarning from pycomm.ab_comm.clx import Driver as ClxDriver from pycomm.cip.cip_base import CommError requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # DEFAULTS db_address = "web_db" db_url = "https://{}:5000".format(db_address) scan_rate = 30 # seconds save_all = "test" # use True, False, or any string plc_handshake_tags = {} last_handshake_time = 0 tag_store = {} tag_list = [] handshake_list = [] device_types = {} def read_from_plc(addr, tag): """Read a value from a PLC.""" addr = str(addr) tag = str(tag) c = ClxDriver() if c.open(addr): try: v = c.read_tag(tag) return v except Exception: print("ERROR RETRIEVING TAG: {} at {}".format(tag, addr)) err = c.get_status() c.close() print(err) pass c.close() def write_to_plc(addr, tag, value): """Write a value to a tag in the PLC at the specified address.""" pv = read_from_plc(addr, tag) if pv: c = ClxDriver() if c.open(addr): try: v = c.write_tag(tag, value, pv[1]) c.close() return True except Exception: print("ERROR WRITING TAG: {} at {}".format(tag, addr)) err = c.get_status() c.close() print(err) return False c.close() return False def store_tag(tag): """Store the value of a tag in the web_db.""" global db_url url = "{}/api/tag_vals".format(db_url) tag_val_obj = { "tag_id": tag['id'], "value": tag['last_stored'] } headers = {"Content-Type": "application/json"} r = requests.post(url, data=json.dumps(tag_val_obj), headers=headers, verify=False) return r.status_code == 200 def load_data(): """Load configuration data from the web server.""" global db_url, scan_rate, save_all, tag_list, handshake_list try: # Get tags stored in database url = '{}/api/tags'.format(db_url) get_tag_request = requests.get(url, verify=False) tags = json.loads(get_tag_request.text) except Exception as e: print("Error getting tags: {}".format(e)) time.sleep(10) main() try: # Get device types stored in database get_device_type_request = requests.get('{}/api/device_types'.format(db_url), verify=False) device_types_json = json.loads(get_device_type_request.text) for t in device_types_json['objects']: device_types[t['id']] = t['device_type'] except Exception as e: print("Error getting tags: {}".format(e)) time.sleep(10) main() try: config_req = requests.get('{}/api/configs'.format(db_url), verify=False) config_json = json.loads(config_req.text) config_list = config_json['objects'] if len(config_list) > 0: for c in config_list: if c['parameter'] == "scan_rate": scan_rate = float(c['val']) elif c['parameter'] == "save_all": save_all = c['val'] except Exception as e: print("Error getting configs: {}".format(e)) new_tags = [t['name'] for t in tags['objects']] existing_tags = [t['name'] for t in tag_list] existing_handshakes = [h['name'] for h in handshake_list] tags_to_add = [] handshakes_to_add = [] tags_to_copy = [] handshakes_to_copy = [] for t in new_tags: this_tag = {} for n_t in tags['objects']: if n_t['name'] == t: this_tag["tag"] = n_t['tag'] this_tag["id"] = n_t["id"] this_tag["name"] = n_t['name'] this_tag["change_threshold"] = n_t['change_threshold'] this_tag["guarantee_sec"] = n_t['guarantee_sec'] this_tag["ip_address"] = n_t['device']['address'] this_tag["device_type"] = device_types[n_t['device']['device_type_id']] this_tag["last_stored"] = 0.0 this_tag["last_store_time"] = 0 if t in existing_tags: for e_t in tag_list: if e_t['name'] == t: this_tag['last_stored'] = e_t['last_stored'] this_tag['last_store_time'] = e_t['last_store_time'] tags_to_copy.append(this_tag) elif t in existing_handshakes: for e_h in handshake_list: if e_h['name'] == t: this_tag['last_stored'] = e_h['last_stored'] this_tag['last_store_time'] = e_h['last_store_time'] handshakes_to_copy.append(this_tag) else: if n_t['tag_class_id'] == 5: tags_to_add.append(this_tag) elif n_t['tag_class_id'] == 6: handshakes_to_add.append(this_tag) tag_list = tags_to_add + tags_to_copy handshake_list = handshakes_to_add + handshakes_to_copy def main(): """Run the main routine.""" global scan_rate, tag_store, device_types, tag_list, handshake_list, save_all while True: load_data() # print(tag_list) if len(tag_list + handshake_list) == 0: print("No tags configured. Trying again in 10 seconds.") time.sleep(10) main() if len(tag_list) > 0: for i in range(0, len(tag_list)): try: val = read_from_plc(tag_list[i]['ip_address'], tag_list[i]['tag'])[0] now = time.time() store_value = abs(val - tag_list[i]['last_stored']) > tag_list[i]['change_threshold'] store_time = (now - tag_list[i]['last_store_time']) > tag_list[i]['guarantee_sec'] if store_value or store_time or (save_all == "true"): store_reason = "" if store_time: store_reason = "time delta = {} > {}".format(now - tag_list[i]['last_store_time'], tag_list[i]['guarantee_sec']) elif store_value: store_reason = "value delta = {} > {}".format(abs(val - tag_list[i]['last_stored']), tag_list[i]['change_threshold']) elif save_all == "true": store_reason = "save all parameter" tag_list[i]['last_stored'] = val tag_list[i]['last_store_time'] = now store_tag(tag_list[i]) print("Stored {} for {} at {} due to {}".format(val, tag_list[i]['name'], now, store_reason)) except CommError: print("CommError: Error connecting to {} for {}".format(tag_list[i]['ip_address'], tag_list[i]['name'])) except TypeError: print("Error reading {}".format(tag_list[i]['name'])) if len(handshake_list) > 0: for h in range(0, len(handshake_list)): now = time.time() if (now - handshake_list[h]['last_store_time']) > handshake_list[h]['guarantee_sec']: try: write_to_plc(str(handshake_list[h]['ip_address']), str(handshake_list[h]['tag']), 1) handshake_list[h]['last_store_time'] = now print("Handshake with {} - {} at {}".format(handshake_list[h]['ip_address'], handshake_list[h]['tag'], now)) except CommError: print("CommError: Error connecting to {} for {}".format(handshake_list[h]['ip_address'], handshake_list[h]['name'])) except TypeError: print("Error writing {}".format(tag_list[i]['name'])) time.sleep(scan_rate) if __name__ == '__main__': main()