- fixes status check for logger - adds ability to test sample data - adds PLC Handshaking capability - adds portainer as container manager - exposes mysql port for reading database (as 6603)
212 lines
8.1 KiB
Python
212 lines
8.1 KiB
Python
"""
|
|
Tag Logger.
|
|
|
|
Created on April 7, 2016
|
|
@author: Patrick McDonagh
|
|
@description: Continuously loops through a list of tags to store values from a PLC
|
|
"""
|
|
|
|
import traceback
|
|
import time
|
|
import json
|
|
import requests
|
|
from requests.packages.urllib3.exceptions import InsecureRequestWarning
|
|
# from pycomm_helper.tag import Tag
|
|
from pycomm.ab_comm.clx import Driver as ClxDriver
|
|
from pycomm.cip.cip_base import CommError
|
|
from random import random, getrandbits
|
|
|
|
|
|
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
|
|
|
|
# DEFAULTS
|
|
db_address = 'web_db'
|
|
db_url = "https://{}:5000".format(db_address)
|
|
scan_rate = 30 # seconds
|
|
save_all = "test" # use True, False, or any string
|
|
plc_handshake_tags = {}
|
|
last_handshake_time = 0
|
|
|
|
tag_store = {}
|
|
tag_list = []
|
|
handshake_list = []
|
|
device_types = {}
|
|
|
|
|
|
def readFromPLC(addr, tag):
|
|
"""Read a value from a PLC."""
|
|
addr = str(addr)
|
|
tag = str(tag)
|
|
c = ClxDriver()
|
|
if c.open(addr):
|
|
try:
|
|
v = c.read_tag(tag)
|
|
return v
|
|
except Exception:
|
|
print("ERROR RETRIEVING TAG: {} at {}".format(tag, addr))
|
|
err = c.get_status()
|
|
c.close()
|
|
print(err)
|
|
pass
|
|
c.close()
|
|
|
|
|
|
def store_tag(tag):
|
|
"""Store the value of a tag in the web_db."""
|
|
global db_url
|
|
url = "{}/api/tag_vals".format(db_url)
|
|
tag_val_obj = {
|
|
"tag_id": tag['id'],
|
|
"value": tag['last_stored']
|
|
}
|
|
headers = {"Content-Type": "application/json"}
|
|
r = requests.post(url, data=json.dumps(tag_val_obj), headers=headers, verify=False)
|
|
return r.status_code == 200
|
|
|
|
|
|
def load_data():
|
|
"""Load configuration data from the web server."""
|
|
global db_url, scan_rate, save_all, tag_list, handshake_list
|
|
try:
|
|
# Get tags stored in database
|
|
url = '{}/api/tags'.format(db_url)
|
|
get_tag_request = requests.get(url, verify=False)
|
|
tags = json.loads(get_tag_request.text)
|
|
except Exception as e:
|
|
print("Error getting tags: {}".format(e))
|
|
time.sleep(10)
|
|
main()
|
|
|
|
try:
|
|
# Get device types stored in database
|
|
get_device_type_request = requests.get('{}/api/device_types'.format(db_url), verify=False)
|
|
device_types_json = json.loads(get_device_type_request.text)
|
|
for t in device_types_json['objects']:
|
|
device_types[t['id']] = t['device_type']
|
|
except Exception as e:
|
|
print("Error getting tags: {}".format(e))
|
|
time.sleep(10)
|
|
main()
|
|
|
|
try:
|
|
config_req = requests.get('{}/api/configs'.format(db_url), verify=False)
|
|
config_json = json.loads(config_req.text)
|
|
config_list = config_json['objects']
|
|
|
|
if len(config_list) > 0:
|
|
for c in config_list:
|
|
if c['parameter'] == "scan_rate":
|
|
scan_rate = float(c['val'])
|
|
elif c['parameter'] == "save_all":
|
|
save_all = c['val']
|
|
except Exception as e:
|
|
print("Error getting configs: {}".format(e))
|
|
|
|
new_tags = [t['name'] for t in tags['objects']]
|
|
existing_tags = [t['name'] for t in tag_list]
|
|
existing_handshakes = [h['name'] for h in handshake_list]
|
|
|
|
tags_to_add = []
|
|
handshakes_to_add = []
|
|
tags_to_copy = []
|
|
handshakes_to_copy = []
|
|
for t in new_tags:
|
|
this_tag = {}
|
|
for n_t in tags['objects']:
|
|
if n_t['name'] == t:
|
|
this_tag["tag"] = n_t['tag']
|
|
this_tag["id"] = n_t["id"]
|
|
this_tag["name"] = n_t['name']
|
|
this_tag["change_threshold"] = n_t['change_threshold']
|
|
this_tag["guarantee_sec"] = n_t['guarantee_sec']
|
|
this_tag["ip_address"] = n_t['device']['address']
|
|
this_tag["device_type"] = device_types[n_t['device']['device_type_id']]
|
|
this_tag["last_stored"] = 0.0
|
|
this_tag["last_store_time"] = 0
|
|
if t in existing_tags:
|
|
for e_t in tag_list:
|
|
if e_t['name'] == t:
|
|
this_tag['last_stored'] = e_t['last_stored']
|
|
this_tag['last_store_time'] = e_t['last_store_time']
|
|
tags_to_copy.append(this_tag)
|
|
elif t in existing_handshakes:
|
|
for e_h in handshake_list:
|
|
if e_h['name'] == t:
|
|
this_tag['last_stored'] = e_h['last_stored']
|
|
this_tag['last_store_time'] = e_h['last_store_time']
|
|
handshakes_to_copy.append(this_tag)
|
|
else:
|
|
if n_t['tag_class_id'] == 5:
|
|
tags_to_add.append(this_tag)
|
|
elif n_t['tag_class_id'] == 6:
|
|
handshakes_to_add.append(this_tag)
|
|
tag_list = tags_to_add + tags_to_copy
|
|
handshake_list = handshakes_to_add + handshakes_to_copy
|
|
|
|
|
|
def main():
|
|
"""Run the main routine."""
|
|
global scan_rate, tag_store, device_types, tag_list, handshake_list, save_all
|
|
|
|
while True:
|
|
load_data()
|
|
# print(tag_list)
|
|
if len(tag_list + handshake_list) == 0:
|
|
print("No tags configured. Trying again in 10 seconds.")
|
|
time.sleep(10)
|
|
main()
|
|
|
|
if len(tag_list) > 0:
|
|
for i in range(0, len(tag_list)):
|
|
try:
|
|
pos_neg = 1.0
|
|
if bool(getrandbits(1)):
|
|
pos_neg = -1.0
|
|
val = tag_list[i]['last_stored'] + pos_neg * random * 100.0
|
|
now = time.time()
|
|
|
|
store_value = abs(val - tag_list[i]['last_stored']) > tag_list[i]['change_threshold']
|
|
store_time = (now - tag_list[i]['last_store_time']) > tag_list[i]['guarantee_sec']
|
|
|
|
if store_value or store_time or (save_all == "true"):
|
|
store_reason = ""
|
|
if store_time:
|
|
store_reason = "time delta = {} > {}".format(now - tag_list[i]['last_store_time'],
|
|
tag_list[i]['guarantee_sec'])
|
|
elif store_value:
|
|
store_reason = "value delta = {} > {}".format(abs(val - tag_list[i]['last_stored']),
|
|
tag_list[i]['change_threshold'])
|
|
elif save_all == "true":
|
|
store_reason = "save all parameter"
|
|
|
|
tag_list[i]['last_stored'] = val
|
|
tag_list[i]['last_store_time'] = now
|
|
store_tag(tag_list[i])
|
|
|
|
print(
|
|
"Stored {} for {} at {} due to {}".format(val, tag_list[i]['name'], now, store_reason))
|
|
except CommError:
|
|
print("CommError: Error connecting to {} for {}".format(tag_list[i]['ip_address'],
|
|
tag_list[i]['name']))
|
|
except TypeError:
|
|
print("Error reading {}".format(tag_list[i]['name']))
|
|
|
|
if len(handshake_list) > 0:
|
|
for h in range(0, len(handshake_list)):
|
|
now = time.time()
|
|
if (now - handshake_list[h]['last_store_time']) > handshake_list[h]['guarantee_sec']:
|
|
try:
|
|
handshake_list[h]['last_store_time'] = now
|
|
print("Handshake with {} - {} at {}".format(handshake_list[h]['ip_address'],
|
|
handshake_list[h]['tag'], now))
|
|
except CommError:
|
|
print("CommError: Error connecting to {} for {}".format(handshake_list[h]['ip_address'],
|
|
handshake_list[h]['name']))
|
|
except TypeError:
|
|
print("Error writing {}".format(tag_list[i]['name']))
|
|
time.sleep(scan_rate)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|