diff --git a/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0-mock/EthernetIP.py b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0-mock/EthernetIP.py new file mode 100644 index 0000000..554bfe7 --- /dev/null +++ b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0-mock/EthernetIP.py @@ -0,0 +1,242 @@ +import sys +import logging +import json +from pycomm3 import LogixDriver, Tag +import time +import calendar +import random +import uuid +import asyncio +import math +import statistics +from stream_manager import ( + AssetPropertyValue, + ExportDefinition, + IoTSiteWiseConfig, + MessageStreamDefinition, + PutAssetPropertyValueEntry, + Quality, + Persistence, + ResourceNotFoundException, + StreamManagerException, + StrategyOnFull, + StreamManagerClient, + TimeInNanos, + Variant +) +from stream_manager.util import Util + +logging.basicConfig(filename='/tmp/EthernetIP.log', format='%(asctime)s %(message)s', level=logging.INFO) +logging.info(sys.argv[1]) + +#Loading config from arguements +config = json.loads(sys.argv[1]) + +#Create a Stream Manager Client +#Create a Stream Manager Client +client = None +while not client: + try: + client = StreamManagerClient() + except ConnectionError: + logging.error("Did not connect to StreamManager") + except ConnectionRefusedError: + logging.error("Connection was refused to StreamManager") + time.sleep(5) +stream_name = "Default Stream" +# Try deleting the stream (if it exists) so that we have a fresh start +try: + client.delete_message_stream(stream_name=stream_name) +except ResourceNotFoundException: + pass + + +mock = True + +#Read a single Tag +def read_single(ipaddress, tag): + if mock: + return Tag(tag, random.random()*100, None, None ) + else: + with LogixDriver(ipaddress) as plc: + return plc.read(tag) +#Read multiple tags in an array +def read_multiple(ipaddress, tags): + if mock: + return [Tag(tag,random.random()*100, None) for tag in tags] + else: + with LogixDriver(ipaddress)as plc: + return plc.read(*tags) + +#Put a single data point in a stream +def sendData(data,property_alias): + try: + time_in_nanos = TimeInNanos( + time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) + ) + types = {float: Variant(double_value=data), int: Variant(integer_value=data), str: Variant(string_value=data), bool: Variant(boolean_value=data)} + variant = types[type(data)] + asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] + valueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=property_alias, property_values=asset) + client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(valueEntry)) + return time_in_nanos.time_in_seconds + except Exception as e: + logging.error(f"Error in sendData: {e}") + return False + + +def sendCheckDevice(data, group, device): + try: + for p in ['last_send_time','guaranteed_send_seconds','last_send_value','change_threshold']: + if p not in config[group]["devices"][device]['tag_data'][data.tag]: + config[group]["devices"][device]['tag_data'][data.tag][p] = None + last_send_time = config[group]["devices"][device]['tag_data'][data.tag]['last_send_time'] + guaranteed_send_time = config[group]["devices"][device]['tag_data'][data.tag]['guaranteed_send_seconds'] + last_send_value = config[group]["devices"][device]['tag_data'][data.tag]['last_send_value'] + change_threshold = config[group]["devices"][device]['tag_data'][data.tag]['change_threshold'] + + if last_send_time == None: + return True + elif time.time() - last_send_time >= guaranteed_send_time: + return True + elif last_send_value == None: + return True + elif type(data) == float or type(data) == int and abs(data - last_send_value) >= change_threshold: + return True + elif type(data) == str or type(data) == bool and data != last_send_value: + return True + else: + return False + except Exception as e: + logging.error(f"Error in sendCheckDevice: {e}") + return False + +# Function checks if data from a config function needs to be sent in +def sendCheckFunction(data,group, func): + try: + for p in ['last_send_time','guaranteed_send_seconds','last_send_value','change_threshold']: + if p not in config[group]["functions"][func]: + config[group]["functions"][func][p] = None + last_send_time = config[group]["functions"][func]['last_send_time'] + guaranteed_send_time = config[group]["functions"][func]['guaranteed_send_seconds'] + last_send_value = config[group]["functions"][func]['last_send_value'] + change_threshold = config[group]["functions"][func]['change_threshold'] + + if last_send_time == None: + return True + elif time.time() - last_send_time >= guaranteed_send_time: + return True + elif last_send_value == None: + return True + elif type(data) == float or type(data) == int and abs(data - last_send_value) >= change_threshold: + return True + elif type(data) == str or type(data) == bool and data != last_send_value: + return True + else: + return False + except Exception as e: + logging.error(f"Error in sendCheckFunction: {e}") + return False + +def sum(operands:list): + return math.fsum(operands) + +def mean(operands:list): + return statistics.fmean(operands) + +def max(operands:list): + return max(operands) + +def min(operands:list): + return min(operands) + +def median(operands:list): + return median(operands) + +operations = { + "sum": sum, + "mean": mean, + "max": max, + "min": min, + "median": median +} +#Setup a Stream Manager Message Stream +try: + client.create_message_stream(MessageStreamDefinition( + name=stream_name, + max_size=536870912, #512MB max size + stream_segment_size=16777216, #16MB segments + time_to_live_millis=None, + strategy_on_full=StrategyOnFull.OverwriteOldestData, + persistence=Persistence.File, + flush_on_write=True, + export_definition=ExportDefinition( + iot_sitewise= [IoTSiteWiseConfig( + identifier= stream_name + "-SW", + batch_size= 10, + batch_interval_millis= 60000, # 1 Min gaurantee batch send time + priority=5 + )] + ) + )) +except StreamManagerException: + pass +except ConnectionError or asyncio.TimeoutError: + pass + +#Main Polling Loop +while(True): + #Configurations can have multiple groups of devices + for group in config.keys(): + values = {} #Store values based on Tag name for doing calculations later + for device in config[group]["devices"].keys(): #for each device collect the data + plc_address = config[group]["devices"][device]['source_address'] + tag_data = list(config[group]["devices"][device]['tag_data'].keys()) + if len(tag_data) > 1: + try: + response = read_multiple(plc_address,tag_data) + logging.info(f"Got back: {response}") + for tag in response: + #logging.info(f"Processing: {tag.tag}") + if tag.tag in values.keys(): + values[tag.tag].append(tag.value) + else: + values[tag.tag] = [tag.value] + #logging.info(f"Values: {values}") + if sendCheckDevice(tag, group, device): + sendDataResp = sendData(tag.value, config[group]["devices"][device]['tag_data'][tag.tag]['property_alias']) + if sendDataResp: + config[group]["devices"][device]['tag_data'][tag.tag]['last_send_value'] = tag.value + config[group]["devices"][device]['tag_data'][tag.tag]['last_send_time'] = sendDataResp + except Exception as e: + logging.error(f"Error polling multiple for {device} in {group}: {e}") + else: + try: + response = read_single(plc_address,tag_data[0]) + logging.info(f"Got back: {response}") + if response.tag in values.keys(): + values[response.tag].append(response.value) + else: + values[response.tag] = [response.value] + if sendCheckDevice(response, group, device): + sendDataResp = sendData(response.value, config[group]["devices"][device]['tag_data'][response.tag]['property_alias']) + if sendDataResp: + config[group]["devices"][device]['tag_data'][response.tag]['last_send_value'] = response.value + config[group]["devices"][device]['tag_data'][response.tag]['last_send_time'] = sendDataResp + except Exception as e: + logging.error(f"Error polling single for {device} in {group}: {e}") + for func in config[group]["functions"].keys(): #Each group has calculations done for data aggregation + operands = [] + try: + for x in config[group]["functions"][func]["operands"]: + operands = operands + values[x] + value = operations[config[group]["functions"][func]["operation"]](operands) + if sendCheckFunction(value, group, func): + sendDataResp = sendData(value, config[group]["functions"][func]["property_alias"]) + if sendDataResp: + config[group]["functions"][func]["last_send_value"] = value + config[group]["functions"][func]["last_send_time"] = sendDataResp + except Exception as e: + logging.error(f"Error in Calculation Loop: {e}") + time.sleep(60) #sleep a bit between polling sessions this isn't mandatory just to manage polling speed + diff --git a/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0-mock/requirements.txt b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0-mock/requirements.txt new file mode 100644 index 0000000..9328918 --- /dev/null +++ b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0-mock/requirements.txt @@ -0,0 +1,2 @@ +cbor2==4.1.2 +pycomm3==1.2.1 diff --git a/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0-mock/stream_manager_sdk.zip b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0-mock/stream_manager_sdk.zip new file mode 100644 index 0000000..1d75c71 Binary files /dev/null and b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0-mock/stream_manager_sdk.zip differ diff --git a/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/EthernetIP.py b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/EthernetIP.py index 75a483a..42ebd23 100644 --- a/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/EthernetIP.py +++ b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/EthernetIP.py @@ -33,14 +33,22 @@ logging.info(sys.argv[1]) config = json.loads(sys.argv[1]) #Create a Stream Manager Client -client = StreamManagerClient() +#Create a Stream Manager Client +client = None +while not client: + try: + client = StreamManagerClient() + except ConnectionError: + logging.error("Did not connect to StreamManager") + except ConnectionRefusedError: + logging.error("Connection was refused to StreamManager") + time.sleep(5) stream_name = "Default Stream" # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass - #Read a single Tag def read_single(ipaddress, tag): with LogixDriver(ipaddress) as plc: @@ -56,48 +64,68 @@ def sendData(data,property_alias): time_in_nanos = TimeInNanos( time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) ) - variant = Variant(double_value=data) + types = {float: Variant(double_value=data), int: Variant(integer_value=data), str: Variant(string_value=data), bool: Variant(boolean_value=data)} + variant = types[type(data)] asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] valueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=property_alias, property_values=asset) client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(valueEntry)) return time_in_nanos.time_in_seconds except Exception as e: - logging.error(e) + logging.error(f"Error in sendData: {e}") return False def sendCheckDevice(data, group, device): - last_send_time = config[group]["devices"][device]['tag_data'][data.tag]['last_send_time'] - guaranteed_send_time = config[group]["devices"][device]['tag_data'][data.tag]['guaranteed_send_seconds'] - last_send_value = config[group]["devices"][device]['tag_data'][data.tag]['last_send_value'] - change_threshold = config[group]["devices"][device]['tag_data'][data.tag]['change_threshold'] + try: + for p in ['last_send_time','guaranteed_send_seconds','last_send_value','change_threshold']: + if p not in config[group]["devices"][device]['tag_data'][data.tag]: + config[group]["devices"][device]['tag_data'][data.tag][p] = None + last_send_time = config[group]["devices"][device]['tag_data'][data.tag]['last_send_time'] + guaranteed_send_time = config[group]["devices"][device]['tag_data'][data.tag]['guaranteed_send_seconds'] + last_send_value = config[group]["devices"][device]['tag_data'][data.tag]['last_send_value'] + change_threshold = config[group]["devices"][device]['tag_data'][data.tag]['change_threshold'] - if last_send_time == None: - return True - elif time.time() - last_send_time >= guaranteed_send_time: - return True - elif last_send_value == None: - return True - elif abs(data.value - last_send_value) >= change_threshold: - return True - else: + if last_send_time == None: + return True + elif time.time() - last_send_time >= guaranteed_send_time: + return True + elif last_send_value == None: + return True + elif type(data) == float or type(data) == int and abs(data - last_send_value) >= change_threshold: + return True + elif type(data) == str or type(data) == bool and data != last_send_value: + return True + else: + return False + except Exception as e: + logging.error(f"Error in sendCheckDevice: {e}") return False +# Function checks if data from a config function needs to be sent in def sendCheckFunction(data,group, func): - last_send_time = config[group]["functions"][func]['last_send_time'] - guaranteed_send_time = config[group]["functions"][func]['guaranteed_send_seconds'] - last_send_value = config[group]["functions"][func]['last_send_value'] - change_threshold = config[group]["functions"][func]['change_threshold'] - - if last_send_time == None: - return True - elif time.time() - last_send_time >= guaranteed_send_time: - return True - elif last_send_value == None: - return True - elif abs(data - last_send_value) >= change_threshold: - return True - else: + try: + for p in ['last_send_time','guaranteed_send_seconds','last_send_value','change_threshold']: + if p not in config[group]["functions"][func]: + config[group]["functions"][func][p] = None + last_send_time = config[group]["functions"][func]['last_send_time'] + guaranteed_send_time = config[group]["functions"][func]['guaranteed_send_seconds'] + last_send_value = config[group]["functions"][func]['last_send_value'] + change_threshold = config[group]["functions"][func]['change_threshold'] + + if last_send_time == None: + return True + elif time.time() - last_send_time >= guaranteed_send_time: + return True + elif last_send_value == None: + return True + elif type(data) == float or type(data) == int and abs(data - last_send_value) >= change_threshold: + return True + elif type(data) == str or type(data) == bool and data != last_send_value: + return True + else: + return False + except Exception as e: + logging.error(f"Error in sendCheckFunction: {e}") return False def sum(operands:list): @@ -136,7 +164,7 @@ try: iot_sitewise= [IoTSiteWiseConfig( identifier= stream_name + "-SW", batch_size= 10, - batch_interval_millis= 60000, #1Min gaurantee time + batch_interval_millis= 60000, # 1 Min gaurantee batch send time priority=5 )] ) @@ -148,9 +176,10 @@ except ConnectionError or asyncio.TimeoutError: #Main Polling Loop while(True): + #Configurations can have multiple groups of devices for group in config.keys(): - values = {} - for device in config[group]["devices"].keys(): + values = {} #Store values based on Tag name for doing calculations later + for device in config[group]["devices"].keys(): #for each device collect the data plc_address = config[group]["devices"][device]['source_address'] tag_data = list(config[group]["devices"][device]['tag_data'].keys()) if len(tag_data) > 1: @@ -170,7 +199,7 @@ while(True): config[group]["devices"][device]['tag_data'][tag.tag]['last_send_value'] = tag.value config[group]["devices"][device]['tag_data'][tag.tag]['last_send_time'] = sendDataResp except Exception as e: - logging.error(e) + logging.error(f"Error polling multiple for {device} in {group}: {e}") else: try: response = read_single(plc_address,tag_data[0]) @@ -185,16 +214,19 @@ while(True): config[group]["devices"][device]['tag_data'][response.tag]['last_send_value'] = response.value config[group]["devices"][device]['tag_data'][response.tag]['last_send_time'] = sendDataResp except Exception as e: - logging.error(e) - for func in config[group]["functions"].keys(): + logging.error(f"Error polling single for {device} in {group}: {e}") + for func in config[group]["functions"].keys(): #Each group has calculations done for data aggregation operands = [] - for x in config[group]["functions"][func]["operands"]: - operands = operands + values[x] - value = operations[config[group]["functions"][func]["operation"]](operands) - if sendCheckFunction(value, group, func): - sendDataResp = sendData(value, config[group]["functions"][func]["property_alias"]) - if sendDataResp: - config[group]["functions"][func]["last_send_value"] = value - config[group]["functions"][func]["last_send_time"] = sendDataResp - time.sleep(5) + try: + for x in config[group]["functions"][func]["operands"]: + operands = operands + values[x] + value = operations[config[group]["functions"][func]["operation"]](operands) + if sendCheckFunction(value, group, func): + sendDataResp = sendData(value, config[group]["functions"][func]["property_alias"]) + if sendDataResp: + config[group]["functions"][func]["last_send_value"] = value + config[group]["functions"][func]["last_send_time"] = sendDataResp + except Exception as e: + logging.error(f"Error in Calculation Loop: {e}") + time.sleep(5) #sleep a bit between polling sessions this isn't mandatory just to manage polling speed