130 lines
5.8 KiB
Python
130 lines
5.8 KiB
Python
import json
|
|
import time
|
|
from datetime import datetime as datetime
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
import sys
|
|
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
|
|
import threading
|
|
from data_points import plcDataPoint,modbusDataPoint,currentDataPoint,voltageDataPoint
|
|
|
|
|
|
def run(config, device, port, host, rootCAPath):
|
|
log_formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s(%(lineno)d) %(message)s')
|
|
log_file = './logs/{}.log'.format(device)
|
|
my_handler = RotatingFileHandler(log_file, mode='a', maxBytes=500*1024, backupCount=2, encoding=None, delay=0)
|
|
my_handler.setFormatter(log_formatter)
|
|
my_handler.setLevel(logging.INFO)
|
|
filelogger = logging.getLogger('{}'.format(device))
|
|
filelogger.setLevel(logging.INFO)
|
|
filelogger.addHandler(my_handler)
|
|
|
|
console_out = logging.StreamHandler(sys.stdout)
|
|
console_out.setFormatter(log_formatter)
|
|
filelogger.addHandler(console_out)
|
|
|
|
filelogger.info("Got Config: \n{}".format(config))
|
|
|
|
#Extract data from passed in config
|
|
app = config['appname']
|
|
company = config['company']
|
|
field = config['field']
|
|
locationID = config['locationID']
|
|
#deviceType = config['deviceType']
|
|
certificateID = config['certificateID']
|
|
|
|
#Build a topic and last will payload
|
|
dt_topic = "dt/{}/{}/{}/{}".format(app, company, field, locationID)
|
|
alm_topic = "alm/{}/{}/{}/{}".format(app,company, field, locationID)
|
|
lwtPayload = {"connected": 0}
|
|
#Generate a cert if needed
|
|
|
|
#Configure connection to AWS IoT Core with proper certificate
|
|
myAWSIoTMQTTClient = None
|
|
myAWSIoTMQTTClient = AWSIoTMQTTClient(certificateID)
|
|
myAWSIoTMQTTClient.configureEndpoint(host, port)
|
|
myAWSIoTMQTTClient.configureCredentials(rootCAPath, './device1Cert.key', './device1CertAndCACert.pem')
|
|
myAWSIoTMQTTClient.configureLastWill(dt_topic,json.dumps(lwtPayload),1)
|
|
try:
|
|
myAWSIoTMQTTClient.connect()
|
|
connectedPayload = {"connected": 1}
|
|
myAWSIoTMQTTClient.publish(dt_topic, json.dumps(connectedPayload),1)
|
|
except Exception as e:
|
|
filelogger.info("Didn't connect: {}".format(e))
|
|
#build data points loop through config and use a class to make a data point
|
|
#if plcdata != to empty then setup polls for tags
|
|
#use ping and reads as watchdog values for connectivity
|
|
#if modbusdata != to empty then setup polls for registers
|
|
#use reads as watchdog values for connectivity
|
|
#if currentdata != to empty then setup polls for current
|
|
#if raw current value > 3.5 then current is good else current disconnected
|
|
#if voltagedata != to empty then setup polls for voltage
|
|
#if raw voltage value > 0 then voltage is good else voltage disconnected
|
|
datapoints = []
|
|
if not config["PLCData"] == "empty":
|
|
for key in config['PLCData'].keys():
|
|
changeThreshold = config['PLCData'][key]["changeThreshold"]
|
|
guaranteed = config['PLCData'][key]["guaranteed"]
|
|
plcIP = config['PLCData'][key]["plcIP"]
|
|
plcType = config['PLCData'][key]["plcType"]
|
|
tag = config['PLCData'][key]["tag"]
|
|
name = config['PLCData'][key]["name"]
|
|
if "alert" in config['PLCData'][key].keys():
|
|
threshold = config['PLCData'][key]["alert"]["threshold"]
|
|
condition = config['PLCData'][key]["alert"]["condition"]
|
|
response = config['PLCData'][key]["alert"]["response"]
|
|
contact = config['PLCData'][key]["alert"]["contact"]
|
|
alertName = config['PLCData'][key]["alert"]["name"]
|
|
buffer = config['PLCData'][key]["alert"]["buffer"]
|
|
reminder = config['PLCData'][key]["alert"]["reminder"]
|
|
datapoint = plcDataPoint(changeThreshold,guaranteed,str(name),plcIP=str(plcIP),plcType=str(plcType),tag=str(tag),alertThreshold=threshold,alertCondition=condition,alertResponse=response,alertContact=contact, alertName=alertName,alertBuffer=buffer,alertReminder=reminder)
|
|
else:
|
|
datapoint = plcDataPoint(changeThreshold,guaranteed,str(name),plcIP=str(plcIP),plcType=str(plcType),tag=str(tag))
|
|
datapoints.append(datapoint)
|
|
|
|
if not config["modbusData"] == "empty":
|
|
pass
|
|
if not config["currentData"] == "empty":
|
|
pass
|
|
if not config["voltageData"] == "empty":
|
|
pass
|
|
|
|
|
|
#A function for polling data and checking alert status per datapoint.
|
|
#Latency should remain low on polls but manage thresholds for how often to send data
|
|
#Loop through list of data points to read value, check send thresholds, and check alert thresholds
|
|
def dataCollection():
|
|
while True:
|
|
message = {}
|
|
for datapoint in datapoints:
|
|
val,alertMessage = datapoint.read()
|
|
if alertMessage != None:
|
|
alertMessage["location"] = locationID
|
|
alertMessage["timestamp"] = datetime.now().isoformat()
|
|
filelogger.info("Publishing: {}\tTo Topic: {}".format(alertMessage,alm_topic))
|
|
myAWSIoTMQTTClient.publish(alm_topic,json.dumps(alertMessage),1)
|
|
if datapoint.checkSend(val):
|
|
message[datapoint.name] = val
|
|
if message:
|
|
message["timestamp"] = datetime.now().isoformat()
|
|
filelogger.info("Publishing: {}\tTo Topic: {}".format(message,dt_topic))
|
|
myAWSIoTMQTTClient.publish(dt_topic, json.dumps(message),1)
|
|
time.sleep(5)
|
|
|
|
|
|
|
|
# list of all threads, so that they can be killed afterwards
|
|
all_threads = []
|
|
|
|
data_thread = threading.Thread(target=dataCollection, args=(), name="Thread-data")
|
|
data_thread.start()
|
|
all_threads.append(data_thread)
|
|
|
|
|
|
|
|
for thread in all_threads:
|
|
thread.join()
|
|
|
|
#myAWSIoTMQTTClient.disconnect()
|
|
|