Files
greengrass-v2-components/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/EthernetIP.py
2021-09-22 15:33:08 -05:00

233 lines
9.6 KiB
Python

import sys
import logging
import json
from pycomm3 import LogixDriver
import time
import calendar
import random
import uuid
import asyncio
import math
import statistics
from stream_manager import (
AssetPropertyValue,
ExportDefinition,
IoTSiteWiseConfig,
MessageStreamDefinition,
PutAssetPropertyValueEntry,
Quality,
Persistence,
ResourceNotFoundException,
StreamManagerException,
StrategyOnFull,
StreamManagerClient,
TimeInNanos,
Variant
)
from stream_manager.util import Util
logging.basicConfig(filename='/tmp/EthernetIP.log', format='%(asctime)s %(message)s', level=logging.INFO)
logging.info(sys.argv[1])
#Loading config from arguements
config = json.loads(sys.argv[1])
#Create a Stream Manager Client
#Create a Stream Manager Client
client = None
while not client:
try:
client = StreamManagerClient()
except ConnectionError:
logging.error("Did not connect to StreamManager")
except ConnectionRefusedError:
logging.error("Connection was refused to StreamManager")
time.sleep(5)
stream_name = "Default Stream"
# Try deleting the stream (if it exists) so that we have a fresh start
try:
client.delete_message_stream(stream_name=stream_name)
except ResourceNotFoundException:
pass
#Read a single Tag
def read_single(ipaddress, tag):
with LogixDriver(ipaddress) as plc:
return plc.read(tag)
#Read multiple tags in an array
def read_multiple(ipaddress, tags):
with LogixDriver(ipaddress)as plc:
return plc.read(*tags)
#Put a single data point in a stream
def sendData(data,property_alias):
try:
time_in_nanos = TimeInNanos(
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
)
types = {float: Variant(double_value=data), int: Variant(integer_value=data), str: Variant(string_value=data), bool: Variant(boolean_value=data)}
variant = types[type(data)]
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
valueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=property_alias, property_values=asset)
client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(valueEntry))
return time_in_nanos.time_in_seconds
except Exception as e:
logging.error(f"Error in sendData: {e}")
return False
def sendCheckDevice(data, group, device):
try:
for p in ['last_send_time','guaranteed_send_seconds','last_send_value','change_threshold']:
if p not in config[group]["devices"][device]['tag_data'][data.tag]:
config[group]["devices"][device]['tag_data'][data.tag][p] = None
last_send_time = config[group]["devices"][device]['tag_data'][data.tag]['last_send_time']
guaranteed_send_time = config[group]["devices"][device]['tag_data'][data.tag]['guaranteed_send_seconds']
last_send_value = config[group]["devices"][device]['tag_data'][data.tag]['last_send_value']
change_threshold = config[group]["devices"][device]['tag_data'][data.tag]['change_threshold']
if last_send_time == None:
return True
elif time.time() - last_send_time >= guaranteed_send_time:
return True
elif last_send_value == None:
return True
elif type(data) == float or type(data) == int and abs(data - last_send_value) >= change_threshold:
return True
elif type(data) == str or type(data) == bool and data != last_send_value:
return True
else:
return False
except Exception as e:
logging.error(f"Error in sendCheckDevice: {e}")
return False
# Function checks if data from a config function needs to be sent in
def sendCheckFunction(data,group, func):
try:
for p in ['last_send_time','guaranteed_send_seconds','last_send_value','change_threshold']:
if p not in config[group]["functions"][func]:
config[group]["functions"][func][p] = None
last_send_time = config[group]["functions"][func]['last_send_time']
guaranteed_send_time = config[group]["functions"][func]['guaranteed_send_seconds']
last_send_value = config[group]["functions"][func]['last_send_value']
change_threshold = config[group]["functions"][func]['change_threshold']
if last_send_time == None:
return True
elif time.time() - last_send_time >= guaranteed_send_time:
return True
elif last_send_value == None:
return True
elif type(data) == float or type(data) == int and abs(data - last_send_value) >= change_threshold:
return True
elif type(data) == str or type(data) == bool and data != last_send_value:
return True
else:
return False
except Exception as e:
logging.error(f"Error in sendCheckFunction: {e}")
return False
def sum(operands:list):
return math.fsum(operands)
def mean(operands:list):
return statistics.fmean(operands)
def max(operands:list):
return max(operands)
def min(operands:list):
return min(operands)
def median(operands:list):
return median(operands)
operations = {
"sum": sum,
"mean": mean,
"max": max,
"min": min,
"median": median
}
#Setup a Stream Manager Message Stream
try:
client.create_message_stream(MessageStreamDefinition(
name=stream_name,
max_size=536870912, #512MB max size
stream_segment_size=16777216, #16MB segments
time_to_live_millis=None,
strategy_on_full=StrategyOnFull.OverwriteOldestData,
persistence=Persistence.File,
flush_on_write=True,
export_definition=ExportDefinition(
iot_sitewise= [IoTSiteWiseConfig(
identifier= stream_name + "-SW",
batch_size= 10,
batch_interval_millis= 60000, # 1 Min gaurantee batch send time
priority=5
)]
)
))
except StreamManagerException:
pass
except ConnectionError or asyncio.TimeoutError:
pass
#Main Polling Loop
while(True):
#Configurations can have multiple groups of devices
for group in config.keys():
values = {} #Store values based on Tag name for doing calculations later
for device in config[group]["devices"].keys(): #for each device collect the data
plc_address = config[group]["devices"][device]['source_address']
tag_data = list(config[group]["devices"][device]['tag_data'].keys())
if len(tag_data) > 1:
try:
response = read_multiple(plc_address,tag_data)
logging.info(f"Got back: {response}")
for tag in response:
#logging.info(f"Processing: {tag.tag}")
if tag.tag in values.keys():
values[tag.tag].append(tag.value)
else:
values[tag.tag] = [tag.value]
#logging.info(f"Values: {values}")
if sendCheckDevice(tag, group, device):
sendDataResp = sendData(tag.value, config[group]["devices"][device]['tag_data'][tag.tag]['property_alias'])
if sendDataResp:
config[group]["devices"][device]['tag_data'][tag.tag]['last_send_value'] = tag.value
config[group]["devices"][device]['tag_data'][tag.tag]['last_send_time'] = sendDataResp
except Exception as e:
logging.error(f"Error polling multiple for {device} in {group}: {e}")
else:
try:
response = read_single(plc_address,tag_data[0])
logging.info(f"Got back: {response}")
if response.tag in values.keys():
values[response.tag].append(response.value)
else:
values[response.tag] = [response.value]
if sendCheckDevice(response, group, device):
sendDataResp = sendData(response.value, config[group]["devices"][device]['tag_data'][response.tag]['property_alias'])
if sendDataResp:
config[group]["devices"][device]['tag_data'][response.tag]['last_send_value'] = response.value
config[group]["devices"][device]['tag_data'][response.tag]['last_send_time'] = sendDataResp
except Exception as e:
logging.error(f"Error polling single for {device} in {group}: {e}")
for func in config[group]["functions"].keys(): #Each group has calculations done for data aggregation
operands = []
try:
for x in config[group]["functions"][func]["operands"]:
operands = operands + values[x]
value = operations[config[group]["functions"][func]["operation"]](operands)
if sendCheckFunction(value, group, func):
sendDataResp = sendData(value, config[group]["functions"][func]["property_alias"])
if sendDataResp:
config[group]["functions"][func]["last_send_value"] = value
config[group]["functions"][func]["last_send_time"] = sendDataResp
except Exception as e:
logging.error(f"Error in Calculation Loop: {e}")
time.sleep(5) #sleep a bit between polling sessions this isn't mandatory just to manage polling speed