Files
Device/driver.py

130 lines
5.8 KiB
Python

import json
import time
from datetime import datetime as datetime
import logging
from logging.handlers import RotatingFileHandler
import sys
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import threading
from data_points import plcDataPoint,modbusDataPoint,currentDataPoint,voltageDataPoint
def run(config, device, port, host, rootCAPath):
log_formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s(%(lineno)d) %(message)s')
log_file = './logs/{}.log'.format(device)
my_handler = RotatingFileHandler(log_file, mode='a', maxBytes=500*1024, backupCount=2, encoding=None, delay=0)
my_handler.setFormatter(log_formatter)
my_handler.setLevel(logging.INFO)
filelogger = logging.getLogger('{}'.format(device))
filelogger.setLevel(logging.INFO)
filelogger.addHandler(my_handler)
console_out = logging.StreamHandler(sys.stdout)
console_out.setFormatter(log_formatter)
filelogger.addHandler(console_out)
filelogger.info("Got Config: \n{}".format(config))
#Extract data from passed in config
app = config['appname']
company = config['company']
field = config['field']
locationID = config['locationID']
#deviceType = config['deviceType']
certificateID = config['certificateID']
#Build a topic and last will payload
dt_topic = "dt/{}/{}/{}/{}".format(app, company, field, locationID)
alm_topic = "alm/{}/{}/{}/{}".format(app,company, field, locationID)
lwtPayload = {"connected": 0}
#Generate a cert if needed
#Configure connection to AWS IoT Core with proper certificate
myAWSIoTMQTTClient = None
myAWSIoTMQTTClient = AWSIoTMQTTClient(certificateID)
myAWSIoTMQTTClient.configureEndpoint(host, port)
myAWSIoTMQTTClient.configureCredentials(rootCAPath, './device1Cert.key', './device1CertAndCACert.pem')
myAWSIoTMQTTClient.configureLastWill(dt_topic,json.dumps(lwtPayload),1)
try:
myAWSIoTMQTTClient.connect()
connectedPayload = {"connected": 1}
myAWSIoTMQTTClient.publish(dt_topic, json.dumps(connectedPayload),1)
except Exception as e:
filelogger.info("Didn't connect: {}".format(e))
#build data points loop through config and use a class to make a data point
#if plcdata != to empty then setup polls for tags
#use ping and reads as watchdog values for connectivity
#if modbusdata != to empty then setup polls for registers
#use reads as watchdog values for connectivity
#if currentdata != to empty then setup polls for current
#if raw current value > 3.5 then current is good else current disconnected
#if voltagedata != to empty then setup polls for voltage
#if raw voltage value > 0 then voltage is good else voltage disconnected
datapoints = []
if not config["PLCData"] == "empty":
for key in config['PLCData'].keys():
changeThreshold = config['PLCData'][key]["changeThreshold"]
guaranteed = config['PLCData'][key]["guaranteed"]
plcIP = config['PLCData'][key]["plcIP"]
plcType = config['PLCData'][key]["plcType"]
tag = config['PLCData'][key]["tag"]
name = config['PLCData'][key]["name"]
if "alert" in config['PLCData'][key].keys():
threshold = config['PLCData'][key]["alert"]["threshold"]
condition = config['PLCData'][key]["alert"]["condition"]
response = config['PLCData'][key]["alert"]["response"]
contact = config['PLCData'][key]["alert"]["contact"]
alertName = config['PLCData'][key]["alert"]["name"]
buffer = config['PLCData'][key]["alert"]["buffer"]
reminder = config['PLCData'][key]["alert"]["reminder"]
datapoint = plcDataPoint(changeThreshold,guaranteed,str(name),plcIP=str(plcIP),plcType=str(plcType),tag=str(tag),alertThreshold=threshold,alertCondition=condition,alertResponse=response,alertContact=contact, alertName=alertName,alertBuffer=buffer,alertReminder=reminder)
else:
datapoint = plcDataPoint(changeThreshold,guaranteed,str(name),plcIP=str(plcIP),plcType=str(plcType),tag=str(tag))
datapoints.append(datapoint)
if not config["modbusData"] == "empty":
pass
if not config["currentData"] == "empty":
pass
if not config["voltageData"] == "empty":
pass
#A function for polling data and checking alert status per datapoint.
#Latency should remain low on polls but manage thresholds for how often to send data
#Loop through list of data points to read value, check send thresholds, and check alert thresholds
def dataCollection():
while True:
message = {}
for datapoint in datapoints:
val,alertMessage = datapoint.read()
if alertMessage != None:
alertMessage["location"] = locationID
alertMessage["timestamp"] = datetime.now().isoformat()
filelogger.info("Publishing: {}\tTo Topic: {}".format(alertMessage,alm_topic))
myAWSIoTMQTTClient.publish(alm_topic,json.dumps(alertMessage),1)
if datapoint.checkSend(val):
message[datapoint.name] = val
if message:
message["timestamp"] = datetime.now().isoformat()
filelogger.info("Publishing: {}\tTo Topic: {}".format(message,dt_topic))
myAWSIoTMQTTClient.publish(dt_topic, json.dumps(message),1)
time.sleep(5)
# list of all threads, so that they can be killed afterwards
all_threads = []
data_thread = threading.Thread(target=dataCollection, args=(), name="Thread-data")
data_thread.start()
all_threads.append(data_thread)
for thread in all_threads:
thread.join()
#myAWSIoTMQTTClient.disconnect()