import json import time from datetime import datetime as datetime import logging from logging.handlers import RotatingFileHandler import sys from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient import threading from data_points import plcDataPoint,modbusDataPoint,currentDataPoint,voltageDataPoint def run(config, device, port, host, rootCAPath): log_formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s(%(lineno)d) %(message)s') log_file = './logs/{}.log'.format(device) my_handler = RotatingFileHandler(log_file, mode='a', maxBytes=500*1024, backupCount=2, encoding=None, delay=0) my_handler.setFormatter(log_formatter) my_handler.setLevel(logging.INFO) filelogger = logging.getLogger('{}'.format(device)) filelogger.setLevel(logging.INFO) filelogger.addHandler(my_handler) console_out = logging.StreamHandler(sys.stdout) console_out.setFormatter(log_formatter) filelogger.addHandler(console_out) filelogger.info("Got Config: \n{}".format(config)) #Extract data from passed in config app = config['appname'] company = config['company'] field = config['field'] locationID = config['locationID'] #deviceType = config['deviceType'] certificateID = config['certificateID'] #Build a topic and last will payload dt_topic = "dt/{}/{}/{}/{}".format(app, company, field, locationID) alm_topic = "alm/{}/{}/{}/{}".format(app,company, field, locationID) lwtPayload = {"connected": 0} #Generate a cert if needed #Configure connection to AWS IoT Core with proper certificate myAWSIoTMQTTClient = None myAWSIoTMQTTClient = AWSIoTMQTTClient(certificateID) myAWSIoTMQTTClient.configureEndpoint(host, port) myAWSIoTMQTTClient.configureCredentials(rootCAPath, './device1Cert.key', './device1CertAndCACert.pem') myAWSIoTMQTTClient.configureLastWill(dt_topic,json.dumps(lwtPayload),1) try: myAWSIoTMQTTClient.connect() connectedPayload = {"connected": 1} myAWSIoTMQTTClient.publish(dt_topic, json.dumps(connectedPayload),1) except Exception as e: filelogger.info("Didn't connect: {}".format(e)) #build data points loop through config and use a class to make a data point #if plcdata != to empty then setup polls for tags #use ping and reads as watchdog values for connectivity #if modbusdata != to empty then setup polls for registers #use reads as watchdog values for connectivity #if currentdata != to empty then setup polls for current #if raw current value > 3.5 then current is good else current disconnected #if voltagedata != to empty then setup polls for voltage #if raw voltage value > 0 then voltage is good else voltage disconnected datapoints = [] if not config["PLCData"] == "empty": for key in config['PLCData'].keys(): changeThreshold = config['PLCData'][key]["changeThreshold"] guaranteed = config['PLCData'][key]["guaranteed"] plcIP = config['PLCData'][key]["plcIP"] plcType = config['PLCData'][key]["plcType"] tag = config['PLCData'][key]["tag"] name = config['PLCData'][key]["name"] if "alert" in config['PLCData'][key].keys(): threshold = config['PLCData'][key]["alert"]["threshold"] condition = config['PLCData'][key]["alert"]["condition"] response = config['PLCData'][key]["alert"]["response"] contact = config['PLCData'][key]["alert"]["contact"] alertName = config['PLCData'][key]["alert"]["name"] datapoint = plcDataPoint(changeThreshold,guaranteed,str(name),plcIP=str(plcIP),plcType=str(plcType),tag=str(tag),alertThreshold=threshold,alertCondition=condition,alertResponse=response,alertContact=contact, alertName=alertName) else: datapoint = plcDataPoint(changeThreshold,guaranteed,str(name),plcIP=str(plcIP),plcType=str(plcType),tag=str(tag)) datapoints.append(datapoint) if not config["modbusData"] == "empty": pass if not config["currentData"] == "empty": pass if not config["voltageData"] == "empty": pass #A function for polling data and checking alert status per datapoint. #Latency should remain low on polls but manage thresholds for how often to send data #Loop through list of data points to read value, check send thresholds, and check alert thresholds def dataCollection(): while True: message = {} for datapoint in datapoints: val,alertMessage = datapoint.read() if alertMessage != None: alertMessage["location"] = locationID filelogger.info("Publishin: {}\nTo Topic: {}".format(alertMessage,alm_topic)) myAWSIoTMQTTClient.publish(alm_topic,json.dumps(alertMessage),1) if datapoint.checkSend(val): message[datapoint.name] = val if message: message["timestamp"] = datetime.now().isoformat() filelogger.info("Publishing: {}\nTo Topic: {}".format(message,dt_topic)) myAWSIoTMQTTClient.publish(dt_topic, json.dumps(message),1) time.sleep(5) # list of all threads, so that they can be killed afterwards all_threads = [] data_thread = threading.Thread(target=dataCollection, args=(), name="Thread-data") data_thread.start() all_threads.append(data_thread) for thread in all_threads: thread.join() #myAWSIoTMQTTClient.disconnect()