diff --git a/data_points.py b/data_points.py index b7894b8..2a2dc33 100644 --- a/data_points.py +++ b/data_points.py @@ -5,18 +5,19 @@ from pycomm.ab_comm.clx import Driver as clx from pycomm.cip.cip_base import CommError, DataError class DataPoint(object): - def __init__(self,changeThreshold=0,guaranteed=3600, name="datapoint",alertThreshold=[],alertCondition=[],alertResponse=[],alertContact=[]): + def __init__(self,changeThreshold=0,guaranteed=3600, name="datapoint",alertThreshold=[],alertCondition=[],alertResponse=[],alertContact=[], alertName=[]): self.value = None self.lastvalue = None self.lastsend = 0 self.changeThreshold = changeThreshold self.guaranteed = guaranteed self.name = name - self.alerted = False self.alertThreshold = alertThreshold self.alertCondition = alertCondition self.alertResponse = alertResponse self.alertContact = alertContact + self.alertName = alertName + self.alerted = [False] * len(self.alertThreshold) def checkSend(self,value): @@ -37,8 +38,8 @@ class DataPoint(object): "not": "value != threshold" } - for thres,cond in zip(self.alertThreshold,self.alertCondition): - #check value for alert threshold + """for thres,cond,name,contact,alerted in zip(self.alertThreshold,self.alertCondition, self.alertName, self.alertContact,self.alerted): + #check value for alert threshold send back first alarmed value evalVars = { "value": value, "threshold": thres @@ -47,12 +48,32 @@ class DataPoint(object): if func == None: print("Not an available function: {}".format(cond)) else: - if eval(func, evalVars): - return {"message":"Read value for {} is {} threshold value {}".format(self.name,value,thres)} + if eval(func, evalVars) and not alerted: + + return {"alert": name} else: self.alerted = False return None - +""" + for x in range(len(self.alerted)): + #check value for alert threshold send back first alarmed value + evalVars = { + "value": value, + "threshold": self.alertThreshold[x] + } + func = conditions.get(self.alertCondition[x]) + if func == None: + print("Not an available function: {}".format(self.alertCondition[x])) + else: + result = eval(func,evalVars) + if result and not self.alerted[x]: + self.alerted[x] = True + return {"alert": self.alertName[x], "contact": self.alertContact[x]} + elif result and self.alerted[x]: + pass + else: + self.alerted[x] = False + return None class modbusDataPoint(DataPoint): def __init__(self,changeThreshold,guaranteed,name,register=1,baud=19200,stopBits=1,parity=None, device='/dev/ttyS0'): @@ -69,8 +90,8 @@ class modbusDataPoint(DataPoint): pass class plcDataPoint(DataPoint): - def __init__(self,changeThreshold,guaranteed,name,plcIP='192.168.1.10',plcType='Micro800',tag=None,alertThreshold=[],alertCondition=[],alertResponse=[],alertContact=[]): - DataPoint.__init__(self,changeThreshold,guaranteed,name,alertThreshold,alertCondition,alertResponse,alertContact) + def __init__(self,changeThreshold,guaranteed,name,plcIP='192.168.1.10',plcType='Micro800',tag=None,alertThreshold=[],alertCondition=[],alertResponse=[],alertContact=[], alertName=[]): + DataPoint.__init__(self,changeThreshold,guaranteed,name,alertThreshold,alertCondition,alertResponse,alertContact,alertName) self.plcIP = plcIP self.plcType = plcType self.tag = tag diff --git a/driver.py b/driver.py index c5d34ad..0efba22 100644 --- a/driver.py +++ b/driver.py @@ -23,7 +23,6 @@ def run(config, device, port, host, rootCAPath): console_out.setFormatter(log_formatter) filelogger.addHandler(console_out) - filelogger.info("IN Driver") filelogger.info("Got Config: \n{}".format(config)) #Extract data from passed in config @@ -75,7 +74,8 @@ def run(config, device, port, host, rootCAPath): condition = config['PLCData'][key]["alert"]["condition"] response = config['PLCData'][key]["alert"]["response"] contact = config['PLCData'][key]["alert"]["contact"] - datapoint = plcDataPoint(changeThreshold,guaranteed,str(name),plcIP=str(plcIP),plcType=str(plcType),tag=str(tag),alertThreshold=threshold,alertCondition=condition,alertResponse=response,alertContact=contact) + alertName = config['PLCData'][key]["alert"]["name"] + datapoint = plcDataPoint(changeThreshold,guaranteed,str(name),plcIP=str(plcIP),plcType=str(plcType),tag=str(tag),alertThreshold=threshold,alertCondition=condition,alertResponse=response,alertContact=contact, alertName=alertName) else: datapoint = plcDataPoint(changeThreshold,guaranteed,str(name),plcIP=str(plcIP),plcType=str(plcType),tag=str(tag)) datapoints.append(datapoint) @@ -88,18 +88,18 @@ def run(config, device, port, host, rootCAPath): pass - #build alert points - #A function for polling general data can be latent no greater than a min between polls - #loop through list of data points to read and check value changes - #sleep for 30 secs + #A function for polling data and checking alert status per datapoint. + #Latency should remain low on polls but manage thresholds for how often to send data + #Loop through list of data points to read value, check send thresholds, and check alert thresholds def dataCollection(): while True: message = {} for datapoint in datapoints: val,alertMessage = datapoint.read() - if alertMessage != None and not datapoint.alerted : + if alertMessage != None: + alertMessage["location"] = locationID + filelogger.info("Publishin: {}\nTo Topic: {}".format(alertMessage,alm_topic)) myAWSIoTMQTTClient.publish(alm_topic,json.dumps(alertMessage),1) - datapoint.alerted =True if datapoint.checkSend(val): message[datapoint.name] = val if message: @@ -108,20 +108,7 @@ def run(config, device, port, host, rootCAPath): myAWSIoTMQTTClient.publish(dt_topic, json.dumps(message),1) time.sleep(5) - #A function for polling alert data should be very near real time - #if plcdata != to empty then setup polls for tags - #use ping and reads as watchdog values for connectivity - #if modbusdata != to empty then setup polls for registers - #use reads as watchdog values for connectivity - #if currentdata != to empty then setup polls for current - #if raw current value > 3.5 then current is good else current disconnected - #if voltagedata != to empty then setup polls for voltage - #if raw voltage value > 0 then voltage is good else voltage disconnected - #sleep for 1 secs - def alertCollection(): - pass - #Start a thread for data and a thread for alerts - + # list of all threads, so that they can be killed afterwards all_threads = [] @@ -130,11 +117,7 @@ def run(config, device, port, host, rootCAPath): data_thread.start() all_threads.append(data_thread) - alert_thread = threading.Thread(target=alertCollection, args=(), name="Thread-alerts") - alert_thread.start() - all_threads.append(alert_thread) - - + for thread in all_threads: thread.join() diff --git a/utilities.py b/utilities.py index 0f1c223..e69de29 100644 --- a/utilities.py +++ b/utilities.py @@ -1,37 +0,0 @@ -def unmarshal_dynamodb_json(node): - data = dict({}) - data['M'] = node - return _unmarshal_value(data) - - -def _unmarshal_value(node): - if type(node) is not dict: - return node - - for key, value in node.items(): - key = key.lower() - if key == 'bool': - return value - if key == 'null': - return None - if key == 's': - return value - if key == 'n': - if '.' in str(value): - return float(value) - return int(value) - if key in ['m', 'l']: - if key == 'm': - data = {} - for key1, value1 in value.items(): - if key1.lower() == 'l': - data = [_unmarshal_value(n) for n in value1] - else: - if type(value1) is not dict: - return _unmarshal_value(value) - data[key1] = _unmarshal_value(value1) - return data - data = [] - for item in value: - data.append(_unmarshal_value(item)) - return data \ No newline at end of file