From 064be4fc73bc9dc281348faa514df01d0f6620cf Mon Sep 17 00:00:00 2001 From: Patrick McDonagh Date: Tue, 9 May 2017 15:19:46 -0500 Subject: [PATCH] Updates taglogger to working state --- daq/Dockerfile.rpi | 2 +- daq/taglogger.py | 208 ++++++++++++++++++++++++++++++------------ web_db/Dockerfile.rpi | 5 +- 3 files changed, 153 insertions(+), 62 deletions(-) diff --git a/daq/Dockerfile.rpi b/daq/Dockerfile.rpi index a0e0fd3..2e1489e 100644 --- a/daq/Dockerfile.rpi +++ b/daq/Dockerfile.rpi @@ -1,4 +1,4 @@ -FROM armv7/armhf-ubuntu +FROM resin/rpi-raspbian:jessie # Copy source files RUN mkdir /root/tag-logger diff --git a/daq/taglogger.py b/daq/taglogger.py index bb9288a..84e1d99 100644 --- a/daq/taglogger.py +++ b/daq/taglogger.py @@ -1,20 +1,25 @@ -#!/usr/bin/env python +""" +Tag Logger. -''' -Tag Logger Created on April 7, 2016 @author: Patrick McDonagh @description: Continuously loops through a list of tags to store values from a PLC -''' +""" import traceback import time import json import requests -from pycomm_helper.tag import Tag +from requests.packages.urllib3.exceptions import InsecureRequestWarning +# from pycomm_helper.tag import Tag +from pycomm.ab_comm.clx import Driver as ClxDriver +from pycomm.cip.cip_base import CommError + + +requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # DEFAULTS -db_address = "web_db:3000" +db_address = "10.0.0.103" db_url = "https://{}".format(db_address) scan_rate = 30 # seconds save_all = "test" # use True, False, or any string @@ -22,15 +27,49 @@ plc_handshake_tags = {} last_handshake_time = 0 tag_store = {} +tag_list = [] +handshake_list = [] device_types = {} -def main(): - global db_address, scan_rate, save_all, tag_store, device_types, plc_handshake_tags, last_handshake_time +def readFromPLC(addr, tag): + """Read a value from a PLC.""" + addr = str(addr) + tag = str(tag) + c = ClxDriver() + if c.open(addr): + try: + v = c.read_tag(tag) + return v + except Exception: + print("ERROR RETRIEVING TAG: {} at {}".format(tag, addr)) + err = c.get_status() + c.close() + print(err) + pass + c.close() + + +def store_tag(tag): + """Store the value of a tag in the web_db.""" + global db_url + url = "{}/api/tag_vals".format(db_url) + tag_val_obj = { + "tag_id": tag['id'], + "value": tag['last_stored'] + } + headers = {"Content-Type": "application/json"} + r = requests.post(url, data=json.dumps(tag_val_obj), headers=headers, verify=False) + return r.status_code == 200 + + +def load_data(): + """Load configuration data from the web server.""" + global db_url, scan_rate, save_all, tag_list, handshake_list try: # Get tags stored in database - get_tag_request_data = {'where': '{"tag_class": 5}'} - get_tag_request = requests.get('{}/tag'.format(db_url), params=get_tag_request_data, verify=False) + url = '{}/api/tags'.format(db_url) + get_tag_request = requests.get(url, verify=False) tags = json.loads(get_tag_request.text) except Exception as e: print("Error getting tags: {}".format(e)) @@ -38,70 +77,121 @@ def main(): main() try: - # Get tags stored in database - - get_device_type_request = requests.get('{}/device_type'.format(db_url), verify=False) + # Get device types stored in database + get_device_type_request = requests.get('{}/api/device_types'.format(db_url), verify=False) device_types_json = json.loads(get_device_type_request.text) - for t in device_types_json: - device_types[t['id']] = t['dType'] + for t in device_types_json['objects']: + device_types[t['id']] = t['device_type'] except Exception as e: print("Error getting tags: {}".format(e)) time.sleep(10) main() try: - sr_req_data = 'where={"parameter": "scan_rate"}' - sr_req = requests.get('{}/config?{}'.format(db_url, sr_req_data), verify=False) - sr_try = json.loads(sr_req.text) - if len(sr_try) > 0: - scan_rate = int(sr_try[0]['val']) - except Exception as e: - print("Error getting scan rate: {}".format(e)) - print("I'll just use {} seconds as the scan rate...".format(scan_rate)) + config_req = requests.get('{}/api/configs'.format(db_url), verify=False) + config_json = json.loads(config_req.text) + config_list = config_json['objects'] - try: - sa_req_data = {"where": {"parameter": "save_all"}} - sa_req = requests.get('{}/config'.format(db_url), params=sa_req_data, verify=False) - sa_try = json.loads(sa_req.text) - if len(sa_try) > 0: - if sa_try[0]['val'].lower() == "true": - save_all = True - elif sa_try[0]['val'].lower() == "false": - save_all = False + if len(config_list) > 0: + for c in config_list: + if c['parameter'] == "scan_rate": + scan_rate = float(c['val']) + elif c['parameter'] == "save_all": + save_all = c['val'] except Exception as e: - print("Error getting save-all: {}".format(e)) - print("I'll just use {} as the save-all parameter...".format(save_all)) + print("Error getting configs: {}".format(e)) - try: - # Get tags stored in database - get_hs_request_data = {'where': '{"tag_class": 6}'} - get_hs_request = requests.get('{}/tag'.format(db_url), params=get_hs_request_data, verify=False) - hs_tags = json.loads(get_hs_request.text) - if len(hs_tags) > 0: - for hs in hs_tags: - plc_handshake_tags[hs['name']] = Tag(hs['name'], hs['tag'], hs['id'], hs['data_type'], hs['change_threshold'], hs['guarantee_sec'], mapFn=hs['map_function'], ip_address=hs['deviceID']['address'], device_type=device_types[hs['deviceID']['device_type']]) - except Exception as e: - print("Error getting handshake tags: {}".format(e)) + new_tags = [t['name'] for t in tags['objects']] + existing_tags = [t['name'] for t in tag_list] + existing_handshakes = [t['name'] for h in handshake_list] - for t in tags: - # name, tag, db_id, data_type, change_threshold, guarantee_sec, mapFn=None, device_type='CLX', ip_address='192.168.1.10'): - tag_store[t['name']] = Tag(t['name'], t['tag'], t['id'], t['data_type'], t['change_threshold'], t['guarantee_sec'], mapFn=t['map_function'], ip_address=t['deviceID']['address'], device_type=device_types[t['deviceID']['device_type']], db_address=db_address) + tags_to_add = [] + handshakes_to_add = [] + tags_to_copy = [] + handshakes_to_copy = [] + for t in new_tags: + this_tag = {} + for n_t in tags['objects']: + if n_t['name'] == t: + this_tag["tag"] = n_t['tag'] + this_tag["id"] = n_t["id"] + this_tag["name"] = n_t['name'] + this_tag["change_threshold"] = n_t['change_threshold'] + this_tag["guarantee_sec"] = n_t['guarantee_sec'] + this_tag["ip_address"] = n_t['device']['address'] + this_tag["device_type"] = device_types[n_t['device']['device_type_id']] + this_tag["last_stored"] = 0.0 + this_tag["last_store_time"] = 0 + if t in existing_tags: + for e_t in tag_list: + if e_t['name'] == t: + this_tag['last_stored'] = e_t['last_stored'] + this_tag['last_store_time'] = e_t['last_store_time'] + tags_to_copy.append(this_tag) + elif t in existing_handshakes: + for e_h in handshake_list: + if e_h['name'] == h: + this_tag['last_stored'] = e_h['last_stored'] + this_tag['last_store_time'] = e_h['last_store_time'] + handshakes_to_copy.append(this_tag) + else: + if n_t['tag_class_id'] == 5: + tags_to_add.append(this_tag) + elif n_t['tag_class_id'] == 6: + handshakes_to_add.append(this_tag) + tag_list = tags_to_add + tags_to_copy + handshake_list = handshakes_to_add + handshakes_to_copy + + +def main(): + """Run the main routine.""" + global scan_rate, tag_store, device_types, tag_list, handshake_list, save_all while True: - for tag in tag_store: - try: - tag_store[tag].read('test') - except: - print("ERROR EVALUATING {}".format(tag)) - traceback.print_exc() + load_data() + # print(tag_list) + if len(tag_list) > 0: + for i in range(0, len(tag_list)): + try: - if plc_handshake_tags: - if time.time() - last_handshake_time > 30.0: - for hs_tag in plc_handshake_tags: - plc_handshake_tags[hs_tag].write(1) - print("Handshake with {} - {}".format(plc_handshake_tags[hs_tag].address, hs_tag)) - last_handshake_time = time.time() + val = readFromPLC(tag_list[i]['ip_address'], tag_list[i]['tag'])[0] + now = time.time() + + store_value = abs(val - tag_list[i]['last_stored']) > tag_list[i]['change_threshold'] + store_time = (now - tag_list[i]['last_store_time']) > tag_list[i]['guarantee_sec'] + + if store_value or store_time or (save_all == "true"): + store_reason = "" + if store_time: + store_reason = "time delta = {} > {}".format(now - tag_list[i]['last_store_time'], tag_list[i]['guarantee_sec']) + elif store_value: + store_reason = "value delta = {} > {}".format(abs(val - tag_list[i]['last_stored']), tag_list[i]['change_threshold']) + elif save_all == "true": + store_reason = "save all parameter" + + tag_list[i]['last_stored'] = val + tag_list[i]['last_store_time'] = now + store_tag(tag_list[i]) + + print("Stored {} for {} at {} due to {}".format(val, tag_list[i]['name'], now, store_reason)) + except CommError: + print("CommError: Error connecting to {} for {}".format(tag_list[i]['ip_address'], tag_list[i]['name'])) + traceback.print_exc() + except TypeError: + print("Error reading {}".format(tag_list[i]['name'])) + else: + print("No tags in tag_list. Trying again in 10 seconds.") + time.sleep(10) + main() + + # if plc_handshake_tags: + # if time.time() - last_handshake_time > 30.0: + # for hs_tag in plc_handshake_tags: + # plc_handshake_tags[hs_tag].write(1) + # print("Handshake with {} - {}".format(plc_handshake_tags[hs_tag].address, hs_tag)) + # last_handshake_time = time.time() time.sleep(scan_rate) + if __name__ == '__main__': main() diff --git a/web_db/Dockerfile.rpi b/web_db/Dockerfile.rpi index 8a23b08..b6cf129 100644 --- a/web_db/Dockerfile.rpi +++ b/web_db/Dockerfile.rpi @@ -1,9 +1,9 @@ -FROM armv7/armhf-ubuntu +FROM resin/rpi-raspbian:jessie RUN apt-get -y update COPY mysql-install.sh /tmp/mysql-install.sh RUN chmod +x /tmp/mysql-install.sh && /tmp/mysql-install.sh -RUN apt-get install -y python python-pip git libffi-dev libssl-dev +RUN apt-get install -y python python-dev python-pip git libffi-dev libssl-dev gcc RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* RUN mkdir /root/tag-logger @@ -15,6 +15,7 @@ RUN cd /tmp/mysql && python setup.py install && cd ~ COPY startup.sh /root/startup.sh RUN chmod +x /root/startup.sh +RUN pip install --upgrade pip setuptools RUN pip install flask flask-restless flask-sqlalchemy pyopenssl RUN apt-get clean