Files
DataLogger-Generic/daq/tag/alarm.py
2016-10-13 17:18:52 -05:00

127 lines
3.9 KiB
Python

#! /usr/bin/python
# from datetime import datetime
import time
from pycomm.ab_comm.clx import Driver as ClxDriver
import tag.micro800.micro800 as u800
import requests
import json
# import traceback
# import pickle
web_address = "http://localhost:3000"
def readTag(addr, tag):
time.sleep(0.01)
c = ClxDriver()
if c.open(addr):
try:
v = c.read_tag(tag)
return v
except Exception:
print("ERROR RETRIEVING TAG: {} at {}".format(tag, addr))
err = c.get_status()
c.close()
print err
pass
c.close()
class AnalogAlarm():
global readTag, con
def __init__(self, name, tag, db_id, device_type='CLX', ip_address='192.168.1.10'):
self.name = name
self.tag = tag
self.alarm = False
self.warning = False
self.lastAlarmCheckVal = False
self.lastWarningCheckVal = False
self.device_type = device_type
self.readFn = readTag
self.db_id = db_id
if self.device_type == "u800":
self.readFn = u800.readTag
self.ip_address = ip_address
self.condMapFn = {
20: "Low",
21: "High",
24: "LoLo",
25: "HiHi",
32: "Input Failure",
34: "Configuration Error",
16: "Failure to Stop",
17: "Failure to Start",
18: "Drive Fault"
}
def checkStatus(self, stroke_number):
condition = ''
self.alarm = self.readFn(self.ip_address, '{}.Alarm'.format(self.tag))[0] > 0
alarmChanged = not (self.alarm == self.lastAlarmCheckVal)
self.warning = self.readFn(self.ip_address, '{}.Warning'.format(self.tag))[0] > 0
warningChanged = not (self.warning == self.lastWarningCheckVal)
if (alarmChanged and self.alarm) or (warningChanged and self.warning):
condition = self.condMapFn[int(self.readFn(self.ip_address, '{}.Alarm_Code'.format(self.tag))[0])]
value = self.readFn(self.ip_address, '{}.Alarm_Value'.format(self.tag))[0]
triggerType = "Alarm"
if warningChanged:
triggerType = 'Warning'
data = {
'alarmID': self.db_id,
'type': triggerType,
'cond': condition,
'value': value,
'stroke_number': stroke_number
}
r = requests.post('{}/event'.format(web_address), data=data)
resp = json.loads(r.text)
print("Stored Event {} at {}".format(resp['id'], resp['createdAt']))
if warningChanged:
self.lastWarningCheckVal = self.warning
if alarmChanged:
self.lastAlarmCheckVal = self.alarm
class bitAlarm():
def __init__(self, name, tag, condition, db_id, device_type='CLX', ip_address='192.168.1.10'):
self.name = name
self.tag = tag
self.condition = condition
self.status = False
self.lastStatusCheckVal = False
self.device_type = device_type
self.readFn = readTag
self.db_id = db_id
if self.device_type == "u800":
self.readFn = u800.readTag
self.ip_address = ip_address
def checkStatus(self, stroke_number):
self.status = self.readFn(self.ip_address, self.tag)[0] > 0
statusChanged = not (self.status == self.lastStatusCheckVal)
if statusChanged and self.status:
data = {
'alarmID': self.db_id,
'type': "Info",
'cond': self.condition,
'value': 0.0,
'stroke_number': stroke_number
}
r = requests.post('{}/event'.format(web_address), data=data)
resp = json.loads(r.text)
print("Stored Event {} at {}".format(resp['id'], resp['createdAt']))
if statusChanged:
self.lastStatusCheckVal = self.status