""" Tag Logger. Created on April 7, 2016 @author: Patrick McDonagh @description: Continuously loops through a list of tags to store values from a PLC """ import traceback import time import json import requests from requests.packages.urllib3.exceptions import InsecureRequestWarning from pycomm.ab_comm.clx import Driver as ClxDriver from pycomm.cip.cip_base import CommError requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # DEFAULTS DB_ADDRESS = "web_db" DB_URL = "https://{}:5000".format(DB_ADDRESS) SCAN_RATE = 30 # seconds SAVE_ALL = "test" # use True, False, or any string PLC_HANDSHAKE_TAGS = {} LAST_HANDSHAKE_TIME = 0 TAG_STORE = {} TAG_LIST = [] HANDSHAKE_LIST = [] DEVICE_TYPES = {} def read_from_plc(addr, tag): """Read a value from a PLC.""" addr = str(addr) tag = str(tag) clx = ClxDriver() if clx.open(addr): try: return clx.read_tag(tag) except Exception: print("ERROR RETRIEVING TAG: {} at {}".format(tag, addr)) err = clx.get_status() clx.close() print(err) clx.close() def write_to_plc(addr, tag, value): """Write a value to a tag in the PLC at the specified address.""" present_value = read_from_plc(addr, tag) if present_value: clx = ClxDriver() if clx.open(addr): try: clx.write_tag(tag, value, present_value[1]) clx.close() return True except Exception: print("ERROR WRITING TAG: {} at {}".format(tag, addr)) err = clx.get_status() clx.close() print(err) return False clx.close() return False def store_tag(tag): """Store the value of a tag in the web_db.""" # global DB_URL url = "{}/api/tag_vals".format(DB_URL) tag_val_obj = { "tag_id": tag['id'], "value": tag['last_stored'] } headers = {"Content-Type": "application/json"} r = requests.post(url, data=json.dumps(tag_val_obj), headers=headers, verify=False) return r.status_code == 200 def load_data(): """Load configuration data from the web server.""" global DB_URL, SCAN_RATE, SAVE_ALL, TAG_LIST, HANDSHAKE_LIST try: # Get tags stored in database url = '{}/api/tags'.format(DB_URL) get_tag_request = requests.get(url, verify=False) tags = json.loads(get_tag_request.text) except Exception as e: print("Error getting tags: {}".format(e)) time.sleep(10) main() try: # Get device types stored in database get_device_type_request = requests.get('{}/api/device_types'.format(DB_URL), verify=False) device_types_json = json.loads(get_device_type_request.text) for dev_type in device_types_json['objects']: DEVICE_TYPES[dev_type['id']] = dev_type['device_type'] except Exception as e: print("Error getting tags: {}".format(e)) time.sleep(10) main() try: config_req = requests.get('{}/api/configs'.format(DB_URL), verify=False) config_json = json.loads(config_req.text) config_list = config_json['objects'] if len(config_list) > 0: for c in config_list: if c['parameter'] == "scan_rate": SCAN_RATE = float(c['val']) elif c['parameter'] == "save_all": SAVE_ALL = c['val'] except Exception as e: print("Error getting configs: {}".format(e)) new_tags = [dev_type['name'] for dev_type in tags['objects']] existing_tags = [dev_type['name'] for dev_type in TAG_LIST] existing_handshakes = [h['name'] for h in HANDSHAKE_LIST] tags_to_add = [] handshakes_to_add = [] tags_to_copy = [] handshakes_to_copy = [] for dev_type in new_tags: this_tag = {} for n_t in tags['objects']: if n_t['name'] == dev_type: this_tag["tag"] = n_t['tag'] this_tag["id"] = n_t["id"] this_tag["name"] = n_t['name'] this_tag["change_threshold"] = n_t['change_threshold'] this_tag["guarantee_sec"] = n_t['guarantee_sec'] this_tag["ip_address"] = n_t['device']['address'] this_tag["device_type"] = DEVICE_TYPES[n_t['device']['device_type_id']] this_tag["last_stored"] = 0.0 this_tag["last_store_time"] = 0 if dev_type in existing_tags: for e_t in TAG_LIST: if e_t['name'] == dev_type: this_tag['last_stored'] = e_t['last_stored'] this_tag['last_store_time'] = e_t['last_store_time'] tags_to_copy.append(this_tag) elif dev_type in existing_handshakes: for e_h in HANDSHAKE_LIST: if e_h['name'] == dev_type: this_tag['last_stored'] = e_h['last_stored'] this_tag['last_store_time'] = e_h['last_store_time'] handshakes_to_copy.append(this_tag) else: if n_t['tag_class_id'] == 5: tags_to_add.append(this_tag) elif n_t['tag_class_id'] == 6: handshakes_to_add.append(this_tag) TAG_LIST = tags_to_add + tags_to_copy HANDSHAKE_LIST = handshakes_to_add + handshakes_to_copy def main(): """Run the main routine.""" global SCAN_RATE, TAG_STORE, DEVICE_TYPES, TAG_LIST, HANDSHAKE_LIST, SAVE_ALL while True: load_data() # print(tag_list) if len(TAG_LIST + HANDSHAKE_LIST) == 0: print("No tags configured. Trying again in 10 seconds.") time.sleep(10) main() if len(TAG_LIST) > 0: for i in range(0, len(TAG_LIST)): try: val = read_from_plc(TAG_LIST[i]['ip_address'], TAG_LIST[i]['tag'])[0] now = time.time() store_value = abs(val - TAG_LIST[i]['last_stored']) > TAG_LIST[i]['change_threshold'] store_time = (now - TAG_LIST[i]['last_store_time']) > TAG_LIST[i]['guarantee_sec'] if store_value or store_time or (SAVE_ALL == "true"): store_reason = "" if store_time: store_reason = "time delta = {} > {}".format(now - TAG_LIST[i]['last_store_time'], TAG_LIST[i]['guarantee_sec']) elif store_value: store_reason = "value delta = {} > {}".format(abs(val - TAG_LIST[i]['last_stored']), TAG_LIST[i]['change_threshold']) elif SAVE_ALL == "true": store_reason = "save all parameter" TAG_LIST[i]['last_stored'] = val TAG_LIST[i]['last_store_time'] = now store_tag(TAG_LIST[i]) print("Stored {} for {} at {} due to {}".format(val, TAG_LIST[i]['name'], now, store_reason)) except CommError: print("CommError: Error connecting to {} for {}".format(TAG_LIST[i]['ip_address'], TAG_LIST[i]['name'])) except TypeError: print("Error reading {}".format(TAG_LIST[i]['name'])) if len(HANDSHAKE_LIST) > 0: for h in range(0, len(HANDSHAKE_LIST)): now = time.time() if (now - HANDSHAKE_LIST[h]['last_store_time']) > HANDSHAKE_LIST[h]['guarantee_sec']: try: write_to_plc(str(HANDSHAKE_LIST[h]['ip_address']), str(HANDSHAKE_LIST[h]['tag']), 1) HANDSHAKE_LIST[h]['last_store_time'] = now print("Handshake with {} - {} at {}".format(HANDSHAKE_LIST[h]['ip_address'], HANDSHAKE_LIST[h]['tag'], now)) except CommError: print("CommError: Error connecting to {} for {}".format(HANDSHAKE_LIST[h]['ip_address'], HANDSHAKE_LIST[h]['name'])) except TypeError: print("Error writing {}".format(TAG_LIST[i]['name'])) time.sleep(SCAN_RATE) if __name__ == '__main__': main()