added stream manager
This commit is contained in:
@@ -0,0 +1,137 @@
|
|||||||
|
import sys
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
from pycomm3 import LogixDriver
|
||||||
|
import time
|
||||||
|
import calendar
|
||||||
|
import random
|
||||||
|
import uuid
|
||||||
|
import asyncio
|
||||||
|
from stream_manager import (
|
||||||
|
AssetPropertyValue,
|
||||||
|
ExportDefinition,
|
||||||
|
IoTSiteWiseConfig,
|
||||||
|
MessageStreamDefinition,
|
||||||
|
PutAssetPropertyValueEntry,
|
||||||
|
Quality,
|
||||||
|
Persistence,
|
||||||
|
ResourceNotFoundException,
|
||||||
|
StreamManagerException,
|
||||||
|
StrategyOnFull,
|
||||||
|
StreamManagerClient,
|
||||||
|
TimeInNanos,
|
||||||
|
Variant
|
||||||
|
)
|
||||||
|
from stream_manager.util import Util
|
||||||
|
|
||||||
|
logging.basicConfig(filename='/tmp/EthernetIP.log', format='%(asctime)s %(message)s', level=logging.INFO)
|
||||||
|
logging.info(sys.argv[1])
|
||||||
|
|
||||||
|
#Loading config from arguements
|
||||||
|
config = json.loads(sys.argv[1])
|
||||||
|
|
||||||
|
#Create a Stream Manager Client
|
||||||
|
client = StreamManagerClient()
|
||||||
|
stream_name = "Default Stream"
|
||||||
|
# Try deleting the stream (if it exists) so that we have a fresh start
|
||||||
|
try:
|
||||||
|
client.delete_message_stream(stream_name=stream_name)
|
||||||
|
except ResourceNotFoundException:
|
||||||
|
pass
|
||||||
|
|
||||||
|
#Read a single Tag
|
||||||
|
def read_single(ipaddress, tag):
|
||||||
|
with LogixDriver(ipaddress) as plc:
|
||||||
|
return plc.read(tag)
|
||||||
|
#Read multiple tags in an array
|
||||||
|
def read_multiple(ipaddress, tags):
|
||||||
|
with LogixDriver(ipaddress)as plc:
|
||||||
|
return plc.read(*tags)
|
||||||
|
|
||||||
|
#Put a single data point in a stream
|
||||||
|
def sendData(data,device):
|
||||||
|
property_alias = config[device]['tag_data'][data.tag]['property_alias']
|
||||||
|
last_send_time = config[device]['tag_data'][data.tag]['last_send_time']
|
||||||
|
guaranteed_send_time = config[device]['tag_data'][data.tag]['guaranteed_send_time']
|
||||||
|
last_send_value = config[device]['tag_data'][data.tag]['last_send_value']
|
||||||
|
change_threshold = config[device]['tag_data'][data.tag]['change_threshold']
|
||||||
|
if last_send_time == None or time.time() - last_send_time >= guaranteed_send_time or last_send_value == None or abs(data.value - last_send_value) >= change_threshold:
|
||||||
|
try:
|
||||||
|
time_in_nanos = TimeInNanos(
|
||||||
|
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
|
||||||
|
)
|
||||||
|
variant = Variant(double_value=data.value)
|
||||||
|
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
|
||||||
|
valueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=property_alias, property_values=asset)
|
||||||
|
client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(valueEntry))
|
||||||
|
config[device]['tag_data'][data.tag]['last_send_value'] = data.value
|
||||||
|
config[device]['tag_data'][data.tag]['last_send_time'] = time_in_nanos
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(e)
|
||||||
|
|
||||||
|
#Put multiple data points in a stream
|
||||||
|
def sendMultipleData(data, device):
|
||||||
|
for tag in data:
|
||||||
|
property_alias = config[device]['tag_data'][tag.tag]['property_alias']
|
||||||
|
last_send_time = config[device]['tag_data'][tag.tag]['last_send_time']
|
||||||
|
guaranteed_send_time = config[device]['tag_data'][tag.tag]['guaranteed_send_time']
|
||||||
|
last_send_value = config[device]['tag_data'][tag.tag]['last_send_value']
|
||||||
|
change_threshold = config[device]['tag_data'][tag.tag]['change_threshold']
|
||||||
|
if last_send_time == None or time.time() - last_send_time >= guaranteed_send_time or last_send_value == None or abs(tag.value - last_send_value) >= change_threshold:
|
||||||
|
try:
|
||||||
|
time_in_nanos = TimeInNanos(
|
||||||
|
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
|
||||||
|
)
|
||||||
|
variant = Variant(double_value=tag.value)
|
||||||
|
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
|
||||||
|
valueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=property_alias, property_values=asset)
|
||||||
|
client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(valueEntry))
|
||||||
|
config[device]['tag_data'][tag.tag]['last_send_value'] = tag.value
|
||||||
|
config[device]['tag_data'][tag.tag]['last_send_time'] = time_in_nanos
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(e)
|
||||||
|
|
||||||
|
#Setup a Stream Manager Message Stream
|
||||||
|
try:
|
||||||
|
client.create_message_stream(MessageStreamDefinition(
|
||||||
|
name=stream_name,
|
||||||
|
max_size=536870912, #512MB max size
|
||||||
|
stream_segment_size=16777216, #16MB segments
|
||||||
|
time_to_live_millis=None,
|
||||||
|
strategy_on_full=StrategyOnFull.OverwriteOldestData,
|
||||||
|
persistence=Persistence.File,
|
||||||
|
flush_on_write=True,
|
||||||
|
export_definition=ExportDefinition(
|
||||||
|
iot_sitewise= [IoTSiteWiseConfig(
|
||||||
|
identifier= stream_name + "-SW",
|
||||||
|
batch_size= 10,
|
||||||
|
batch_interval_millis= 120000, #2Min gaurantee time
|
||||||
|
priority=5
|
||||||
|
)]
|
||||||
|
)
|
||||||
|
))
|
||||||
|
except StreamManagerException:
|
||||||
|
pass
|
||||||
|
except ConnectionError or asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
#Main Polling Loop
|
||||||
|
while(True):
|
||||||
|
for device in config.keys():
|
||||||
|
plc_address = config[device]['source_address']
|
||||||
|
tag_data = list(config[device]['tag_data'].keys())
|
||||||
|
if len(tag_data) > 1:
|
||||||
|
try:
|
||||||
|
response = read_multiple(plc_address,tag_data)
|
||||||
|
logging.info(f"Got back: {response}")
|
||||||
|
sendMultipleData(response, device)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(e)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
response = read_single(plc_address,tag_data[0])
|
||||||
|
logging.info(f"Got back: {response}")
|
||||||
|
sendData(response, device)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(e)
|
||||||
|
time.sleep(5)
|
||||||
@@ -0,0 +1,2 @@
|
|||||||
|
cbor2==4.1.2
|
||||||
|
pycomm3==1.2.1
|
||||||
Binary file not shown.
@@ -0,0 +1,137 @@
|
|||||||
|
import sys
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
from pycomm3 import LogixDriver
|
||||||
|
import time
|
||||||
|
import calendar
|
||||||
|
import random
|
||||||
|
import uuid
|
||||||
|
import asyncio
|
||||||
|
from stream_manager import (
|
||||||
|
AssetPropertyValue,
|
||||||
|
ExportDefinition,
|
||||||
|
IoTSiteWiseConfig,
|
||||||
|
MessageStreamDefinition,
|
||||||
|
PutAssetPropertyValueEntry,
|
||||||
|
Quality,
|
||||||
|
Persistence,
|
||||||
|
ResourceNotFoundException,
|
||||||
|
StreamManagerException,
|
||||||
|
StrategyOnFull,
|
||||||
|
StreamManagerClient,
|
||||||
|
TimeInNanos,
|
||||||
|
Variant
|
||||||
|
)
|
||||||
|
from stream_manager.util import Util
|
||||||
|
|
||||||
|
logging.basicConfig(filename='/tmp/EthernetIP.log', format='%(asctime)s %(message)s', level=logging.INFO)
|
||||||
|
logging.info(sys.argv[1])
|
||||||
|
|
||||||
|
#Loading config from arguements
|
||||||
|
config = json.loads(sys.argv[1])
|
||||||
|
|
||||||
|
#Create a Stream Manager Client
|
||||||
|
client = StreamManagerClient()
|
||||||
|
stream_name = "Default Stream"
|
||||||
|
# Try deleting the stream (if it exists) so that we have a fresh start
|
||||||
|
try:
|
||||||
|
client.delete_message_stream(stream_name=stream_name)
|
||||||
|
except ResourceNotFoundException:
|
||||||
|
pass
|
||||||
|
|
||||||
|
#Read a single Tag
|
||||||
|
def read_single(ipaddress, tag):
|
||||||
|
with LogixDriver(ipaddress) as plc:
|
||||||
|
return plc.read(tag)
|
||||||
|
#Read multiple tags in an array
|
||||||
|
def read_multiple(ipaddress, tags):
|
||||||
|
with LogixDriver(ipaddress)as plc:
|
||||||
|
return plc.read(*tags)
|
||||||
|
|
||||||
|
#Put a single data point in a stream
|
||||||
|
def sendData(data,device):
|
||||||
|
property_alias = config[device]['tag_data'][data.tag]['property_alias']
|
||||||
|
last_send_time = config[device]['tag_data'][data.tag]['last_send_time']
|
||||||
|
guaranteed_send_time = config[device]['tag_data'][data.tag]['guaranteed_send_seconds']
|
||||||
|
last_send_value = config[device]['tag_data'][data.tag]['last_send_value']
|
||||||
|
change_threshold = config[device]['tag_data'][data.tag]['change_threshold']
|
||||||
|
if last_send_time == None or time.time() - last_send_time >= guaranteed_send_time or last_send_value == None or abs(data.value - last_send_value) >= change_threshold:
|
||||||
|
try:
|
||||||
|
time_in_nanos = TimeInNanos(
|
||||||
|
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
|
||||||
|
)
|
||||||
|
variant = Variant(double_value=data.value)
|
||||||
|
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
|
||||||
|
valueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=property_alias, property_values=asset)
|
||||||
|
client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(valueEntry))
|
||||||
|
config[device]['tag_data'][data.tag]['last_send_value'] = data.value
|
||||||
|
config[device]['tag_data'][data.tag]['last_send_time'] = time_in_nanos.time_in_seconds
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(e)
|
||||||
|
|
||||||
|
#Put multiple data points in a stream
|
||||||
|
def sendMultipleData(data, device):
|
||||||
|
for tag in data:
|
||||||
|
property_alias = config[device]['tag_data'][tag.tag]['property_alias']
|
||||||
|
last_send_time = config[device]['tag_data'][tag.tag]['last_send_time']
|
||||||
|
guaranteed_send_time = config[device]['tag_data'][tag.tag]['guaranteed_send_seconds']
|
||||||
|
last_send_value = config[device]['tag_data'][tag.tag]['last_send_value']
|
||||||
|
change_threshold = config[device]['tag_data'][tag.tag]['change_threshold']
|
||||||
|
if last_send_time == None or time.time() - last_send_time >= guaranteed_send_time or last_send_value == None or abs(tag.value - last_send_value) >= change_threshold:
|
||||||
|
try:
|
||||||
|
time_in_nanos = TimeInNanos(
|
||||||
|
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
|
||||||
|
)
|
||||||
|
variant = Variant(double_value=tag.value)
|
||||||
|
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
|
||||||
|
valueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=property_alias, property_values=asset)
|
||||||
|
client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(valueEntry))
|
||||||
|
config[device]['tag_data'][tag.tag]['last_send_value'] = tag.value
|
||||||
|
config[device]['tag_data'][tag.tag]['last_send_time'] = time_in_nanos.time_in_seconds
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(e)
|
||||||
|
|
||||||
|
#Setup a Stream Manager Message Stream
|
||||||
|
try:
|
||||||
|
client.create_message_stream(MessageStreamDefinition(
|
||||||
|
name=stream_name,
|
||||||
|
max_size=536870912, #512MB max size
|
||||||
|
stream_segment_size=16777216, #16MB segments
|
||||||
|
time_to_live_millis=None,
|
||||||
|
strategy_on_full=StrategyOnFull.OverwriteOldestData,
|
||||||
|
persistence=Persistence.File,
|
||||||
|
flush_on_write=True,
|
||||||
|
export_definition=ExportDefinition(
|
||||||
|
iot_sitewise= [IoTSiteWiseConfig(
|
||||||
|
identifier= stream_name + "-SW",
|
||||||
|
batch_size= 10,
|
||||||
|
batch_interval_millis= 60000, #1Min gaurantee time
|
||||||
|
priority=5
|
||||||
|
)]
|
||||||
|
)
|
||||||
|
))
|
||||||
|
except StreamManagerException:
|
||||||
|
pass
|
||||||
|
except ConnectionError or asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
#Main Polling Loop
|
||||||
|
while(True):
|
||||||
|
for device in config.keys():
|
||||||
|
plc_address = config[device]['source_address']
|
||||||
|
tag_data = list(config[device]['tag_data'].keys())
|
||||||
|
if len(tag_data) > 1:
|
||||||
|
try:
|
||||||
|
response = read_multiple(plc_address,tag_data)
|
||||||
|
logging.info(f"Got back: {response}")
|
||||||
|
sendMultipleData(response, device)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(e)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
response = read_single(plc_address,tag_data[0])
|
||||||
|
logging.info(f"Got back: {response}")
|
||||||
|
sendData(response, device)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(e)
|
||||||
|
time.sleep(5)
|
||||||
@@ -0,0 +1,2 @@
|
|||||||
|
cbor2==4.1.2
|
||||||
|
pycomm3==1.2.1
|
||||||
Binary file not shown.
55
EthernetIP/recipes/hp.greengrass.EthernetIP.json
Normal file
55
EthernetIP/recipes/hp.greengrass.EthernetIP.json
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
{
|
||||||
|
"RecipeFormatVersion": "2020-01-25",
|
||||||
|
"ComponentName": "hp.greengrass.EthernetIP",
|
||||||
|
"ComponentVersion": "1.1.0",
|
||||||
|
"ComponentDescription": "A AWS IOT Greengrass component for collecting Ethernet/IP data",
|
||||||
|
"ComponentPublisher": "HenryPump",
|
||||||
|
"ComponentConfiguration": {
|
||||||
|
"DefaultConfiguration": {
|
||||||
|
"config": {
|
||||||
|
"device_1": {
|
||||||
|
"source_address": "127.0.0.1",
|
||||||
|
"tag_data": {
|
||||||
|
"exampleTag1": {
|
||||||
|
"property_alias": "exampleAlias1",
|
||||||
|
"change_threshold": 1,
|
||||||
|
"last_send_time": null,
|
||||||
|
"guarantee_send_seconds": 3600,
|
||||||
|
"last_send_value": null
|
||||||
|
},
|
||||||
|
"exampleTag2": {
|
||||||
|
"property_alias": "exampleAlias2",
|
||||||
|
"change_threshold": 1,
|
||||||
|
"last_send_time": null,
|
||||||
|
"guarantee_send_seconds": 3600,
|
||||||
|
"last_send_value": null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"Manifests": [
|
||||||
|
{
|
||||||
|
"Platform": {
|
||||||
|
"os": "linux"
|
||||||
|
},
|
||||||
|
"Lifecycle": {
|
||||||
|
"Install": "pip3 install -r {artifacts:path}/requirements.txt",
|
||||||
|
"Run": "export PYTHONPATH=$PTYHONPATH:{artifacts:decompressedPath}/stream_manager_sdk; python3 -u {artifacts:path}/EthernetIP.py '{configuration:/config}'"
|
||||||
|
},
|
||||||
|
"Artifacts": [
|
||||||
|
{
|
||||||
|
"URI": "s3://hp-greengrass-components/artifacts/hp.greengrass.EthernetIP/1.1.0/EthernetIP.py"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"URI": "s3://hp-greengrass-components/artifacts/hp.greengrass.EthernetIP/1.1.0/stream_manager_sdk.zip",
|
||||||
|
"Unarchive": "ZIP"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"URI": "s3://hp-greengrass-components/artifacts/hp.greengrass.EthernetIP/1.1.0/requirements.txt"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user