diff --git a/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.0.0/EthernetIP.py b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.0.0/EthernetIP.py new file mode 100644 index 0000000..c63eb2c --- /dev/null +++ b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.0.0/EthernetIP.py @@ -0,0 +1,137 @@ +import sys +import logging +import json +from pycomm3 import LogixDriver +import time +import calendar +import random +import uuid +import asyncio +from stream_manager import ( + AssetPropertyValue, + ExportDefinition, + IoTSiteWiseConfig, + MessageStreamDefinition, + PutAssetPropertyValueEntry, + Quality, + Persistence, + ResourceNotFoundException, + StreamManagerException, + StrategyOnFull, + StreamManagerClient, + TimeInNanos, + Variant +) +from stream_manager.util import Util + +logging.basicConfig(filename='/tmp/EthernetIP.log', format='%(asctime)s %(message)s', level=logging.INFO) +logging.info(sys.argv[1]) + +#Loading config from arguements +config = json.loads(sys.argv[1]) + +#Create a Stream Manager Client +client = StreamManagerClient() +stream_name = "Default Stream" +# Try deleting the stream (if it exists) so that we have a fresh start +try: + client.delete_message_stream(stream_name=stream_name) +except ResourceNotFoundException: + pass + +#Read a single Tag +def read_single(ipaddress, tag): + with LogixDriver(ipaddress) as plc: + return plc.read(tag) +#Read multiple tags in an array +def read_multiple(ipaddress, tags): + with LogixDriver(ipaddress)as plc: + return plc.read(*tags) + +#Put a single data point in a stream +def sendData(data,device): + property_alias = config[device]['tag_data'][data.tag]['property_alias'] + last_send_time = config[device]['tag_data'][data.tag]['last_send_time'] + guaranteed_send_time = config[device]['tag_data'][data.tag]['guaranteed_send_time'] + last_send_value = config[device]['tag_data'][data.tag]['last_send_value'] + change_threshold = config[device]['tag_data'][data.tag]['change_threshold'] + if last_send_time == None or time.time() - last_send_time >= guaranteed_send_time or last_send_value == None or abs(data.value - last_send_value) >= change_threshold: + try: + time_in_nanos = TimeInNanos( + time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) + ) + variant = Variant(double_value=data.value) + asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] + valueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=property_alias, property_values=asset) + client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(valueEntry)) + config[device]['tag_data'][data.tag]['last_send_value'] = data.value + config[device]['tag_data'][data.tag]['last_send_time'] = time_in_nanos + except Exception as e: + logging.error(e) + +#Put multiple data points in a stream +def sendMultipleData(data, device): + for tag in data: + property_alias = config[device]['tag_data'][tag.tag]['property_alias'] + last_send_time = config[device]['tag_data'][tag.tag]['last_send_time'] + guaranteed_send_time = config[device]['tag_data'][tag.tag]['guaranteed_send_time'] + last_send_value = config[device]['tag_data'][tag.tag]['last_send_value'] + change_threshold = config[device]['tag_data'][tag.tag]['change_threshold'] + if last_send_time == None or time.time() - last_send_time >= guaranteed_send_time or last_send_value == None or abs(tag.value - last_send_value) >= change_threshold: + try: + time_in_nanos = TimeInNanos( + time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) + ) + variant = Variant(double_value=tag.value) + asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] + valueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=property_alias, property_values=asset) + client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(valueEntry)) + config[device]['tag_data'][tag.tag]['last_send_value'] = tag.value + config[device]['tag_data'][tag.tag]['last_send_time'] = time_in_nanos + except Exception as e: + logging.error(e) + +#Setup a Stream Manager Message Stream +try: + client.create_message_stream(MessageStreamDefinition( + name=stream_name, + max_size=536870912, #512MB max size + stream_segment_size=16777216, #16MB segments + time_to_live_millis=None, + strategy_on_full=StrategyOnFull.OverwriteOldestData, + persistence=Persistence.File, + flush_on_write=True, + export_definition=ExportDefinition( + iot_sitewise= [IoTSiteWiseConfig( + identifier= stream_name + "-SW", + batch_size= 10, + batch_interval_millis= 120000, #2Min gaurantee time + priority=5 + )] + ) + )) +except StreamManagerException: + pass +except ConnectionError or asyncio.TimeoutError: + pass + +#Main Polling Loop +while(True): + for device in config.keys(): + plc_address = config[device]['source_address'] + tag_data = list(config[device]['tag_data'].keys()) + if len(tag_data) > 1: + try: + response = read_multiple(plc_address,tag_data) + logging.info(f"Got back: {response}") + sendMultipleData(response, device) + except Exception as e: + logging.error(e) + else: + try: + response = read_single(plc_address,tag_data[0]) + logging.info(f"Got back: {response}") + sendData(response, device) + except Exception as e: + logging.error(e) + time.sleep(5) diff --git a/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.0.0/requirements.txt b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.0.0/requirements.txt new file mode 100644 index 0000000..9328918 --- /dev/null +++ b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.0.0/requirements.txt @@ -0,0 +1,2 @@ +cbor2==4.1.2 +pycomm3==1.2.1 diff --git a/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.0.0/stream_manager_sdk.zip b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.0.0/stream_manager_sdk.zip new file mode 100644 index 0000000..1d75c71 Binary files /dev/null and b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.0.0/stream_manager_sdk.zip differ diff --git a/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/EthernetIP.py b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/EthernetIP.py new file mode 100644 index 0000000..4d4caa1 --- /dev/null +++ b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/EthernetIP.py @@ -0,0 +1,137 @@ +import sys +import logging +import json +from pycomm3 import LogixDriver +import time +import calendar +import random +import uuid +import asyncio +from stream_manager import ( + AssetPropertyValue, + ExportDefinition, + IoTSiteWiseConfig, + MessageStreamDefinition, + PutAssetPropertyValueEntry, + Quality, + Persistence, + ResourceNotFoundException, + StreamManagerException, + StrategyOnFull, + StreamManagerClient, + TimeInNanos, + Variant +) +from stream_manager.util import Util + +logging.basicConfig(filename='/tmp/EthernetIP.log', format='%(asctime)s %(message)s', level=logging.INFO) +logging.info(sys.argv[1]) + +#Loading config from arguements +config = json.loads(sys.argv[1]) + +#Create a Stream Manager Client +client = StreamManagerClient() +stream_name = "Default Stream" +# Try deleting the stream (if it exists) so that we have a fresh start +try: + client.delete_message_stream(stream_name=stream_name) +except ResourceNotFoundException: + pass + +#Read a single Tag +def read_single(ipaddress, tag): + with LogixDriver(ipaddress) as plc: + return plc.read(tag) +#Read multiple tags in an array +def read_multiple(ipaddress, tags): + with LogixDriver(ipaddress)as plc: + return plc.read(*tags) + +#Put a single data point in a stream +def sendData(data,device): + property_alias = config[device]['tag_data'][data.tag]['property_alias'] + last_send_time = config[device]['tag_data'][data.tag]['last_send_time'] + guaranteed_send_time = config[device]['tag_data'][data.tag]['guaranteed_send_seconds'] + last_send_value = config[device]['tag_data'][data.tag]['last_send_value'] + change_threshold = config[device]['tag_data'][data.tag]['change_threshold'] + if last_send_time == None or time.time() - last_send_time >= guaranteed_send_time or last_send_value == None or abs(data.value - last_send_value) >= change_threshold: + try: + time_in_nanos = TimeInNanos( + time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) + ) + variant = Variant(double_value=data.value) + asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] + valueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=property_alias, property_values=asset) + client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(valueEntry)) + config[device]['tag_data'][data.tag]['last_send_value'] = data.value + config[device]['tag_data'][data.tag]['last_send_time'] = time_in_nanos.time_in_seconds + except Exception as e: + logging.error(e) + +#Put multiple data points in a stream +def sendMultipleData(data, device): + for tag in data: + property_alias = config[device]['tag_data'][tag.tag]['property_alias'] + last_send_time = config[device]['tag_data'][tag.tag]['last_send_time'] + guaranteed_send_time = config[device]['tag_data'][tag.tag]['guaranteed_send_seconds'] + last_send_value = config[device]['tag_data'][tag.tag]['last_send_value'] + change_threshold = config[device]['tag_data'][tag.tag]['change_threshold'] + if last_send_time == None or time.time() - last_send_time >= guaranteed_send_time or last_send_value == None or abs(tag.value - last_send_value) >= change_threshold: + try: + time_in_nanos = TimeInNanos( + time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) + ) + variant = Variant(double_value=tag.value) + asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] + valueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=property_alias, property_values=asset) + client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(valueEntry)) + config[device]['tag_data'][tag.tag]['last_send_value'] = tag.value + config[device]['tag_data'][tag.tag]['last_send_time'] = time_in_nanos.time_in_seconds + except Exception as e: + logging.error(e) + +#Setup a Stream Manager Message Stream +try: + client.create_message_stream(MessageStreamDefinition( + name=stream_name, + max_size=536870912, #512MB max size + stream_segment_size=16777216, #16MB segments + time_to_live_millis=None, + strategy_on_full=StrategyOnFull.OverwriteOldestData, + persistence=Persistence.File, + flush_on_write=True, + export_definition=ExportDefinition( + iot_sitewise= [IoTSiteWiseConfig( + identifier= stream_name + "-SW", + batch_size= 10, + batch_interval_millis= 60000, #1Min gaurantee time + priority=5 + )] + ) + )) +except StreamManagerException: + pass +except ConnectionError or asyncio.TimeoutError: + pass + +#Main Polling Loop +while(True): + for device in config.keys(): + plc_address = config[device]['source_address'] + tag_data = list(config[device]['tag_data'].keys()) + if len(tag_data) > 1: + try: + response = read_multiple(plc_address,tag_data) + logging.info(f"Got back: {response}") + sendMultipleData(response, device) + except Exception as e: + logging.error(e) + else: + try: + response = read_single(plc_address,tag_data[0]) + logging.info(f"Got back: {response}") + sendData(response, device) + except Exception as e: + logging.error(e) + time.sleep(5) diff --git a/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/requirements.txt b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/requirements.txt new file mode 100644 index 0000000..9328918 --- /dev/null +++ b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/requirements.txt @@ -0,0 +1,2 @@ +cbor2==4.1.2 +pycomm3==1.2.1 diff --git a/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/stream_manager_sdk.zip b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/stream_manager_sdk.zip new file mode 100644 index 0000000..1d75c71 Binary files /dev/null and b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/stream_manager_sdk.zip differ diff --git a/EthernetIP/recipes/hp.greengrass.EthernetIP.json b/EthernetIP/recipes/hp.greengrass.EthernetIP.json new file mode 100644 index 0000000..22a7295 --- /dev/null +++ b/EthernetIP/recipes/hp.greengrass.EthernetIP.json @@ -0,0 +1,55 @@ +{ + "RecipeFormatVersion": "2020-01-25", + "ComponentName": "hp.greengrass.EthernetIP", + "ComponentVersion": "1.1.0", + "ComponentDescription": "A AWS IOT Greengrass component for collecting Ethernet/IP data", + "ComponentPublisher": "HenryPump", + "ComponentConfiguration": { + "DefaultConfiguration": { + "config": { + "device_1": { + "source_address": "127.0.0.1", + "tag_data": { + "exampleTag1": { + "property_alias": "exampleAlias1", + "change_threshold": 1, + "last_send_time": null, + "guarantee_send_seconds": 3600, + "last_send_value": null + }, + "exampleTag2": { + "property_alias": "exampleAlias2", + "change_threshold": 1, + "last_send_time": null, + "guarantee_send_seconds": 3600, + "last_send_value": null + } + } + } + } + } + }, + "Manifests": [ + { + "Platform": { + "os": "linux" + }, + "Lifecycle": { + "Install": "pip3 install -r {artifacts:path}/requirements.txt", + "Run": "export PYTHONPATH=$PTYHONPATH:{artifacts:decompressedPath}/stream_manager_sdk; python3 -u {artifacts:path}/EthernetIP.py '{configuration:/config}'" + }, + "Artifacts": [ + { + "URI": "s3://hp-greengrass-components/artifacts/hp.greengrass.EthernetIP/1.1.0/EthernetIP.py" + }, + { + "URI": "s3://hp-greengrass-components/artifacts/hp.greengrass.EthernetIP/1.1.0/stream_manager_sdk.zip", + "Unarchive": "ZIP" + }, + { + "URI": "s3://hp-greengrass-components/artifacts/hp.greengrass.EthernetIP/1.1.0/requirements.txt" + } + ] + } + ] +} \ No newline at end of file