diff --git a/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/EthernetIP.py b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/EthernetIP.py index 4d4caa1..a111e3e 100644 --- a/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/EthernetIP.py +++ b/EthernetIP/artifacts/hp.greengrass.EthernetIP/1.1.0/EthernetIP.py @@ -7,6 +7,8 @@ import calendar import random import uuid import asyncio +import math +import statistics from stream_manager import ( AssetPropertyValue, ExportDefinition, @@ -49,48 +51,60 @@ def read_multiple(ipaddress, tags): return plc.read(*tags) #Put a single data point in a stream -def sendData(data,device): - property_alias = config[device]['tag_data'][data.tag]['property_alias'] - last_send_time = config[device]['tag_data'][data.tag]['last_send_time'] - guaranteed_send_time = config[device]['tag_data'][data.tag]['guaranteed_send_seconds'] - last_send_value = config[device]['tag_data'][data.tag]['last_send_value'] - change_threshold = config[device]['tag_data'][data.tag]['change_threshold'] - if last_send_time == None or time.time() - last_send_time >= guaranteed_send_time or last_send_value == None or abs(data.value - last_send_value) >= change_threshold: - try: - time_in_nanos = TimeInNanos( - time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) - ) - variant = Variant(double_value=data.value) - asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] - valueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=property_alias, property_values=asset) - client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(valueEntry)) - config[device]['tag_data'][data.tag]['last_send_value'] = data.value - config[device]['tag_data'][data.tag]['last_send_time'] = time_in_nanos.time_in_seconds - except Exception as e: - logging.error(e) +def sendData(data,property_alias): + try: + time_in_nanos = TimeInNanos( + time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) + ) + variant = Variant(double_value=data.value) + asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] + valueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=property_alias, property_values=asset) + client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(valueEntry)) + return time_in_nanos.time_in_seconds + except Exception as e: + logging.error(e) + return False -#Put multiple data points in a stream -def sendMultipleData(data, device): - for tag in data: - property_alias = config[device]['tag_data'][tag.tag]['property_alias'] - last_send_time = config[device]['tag_data'][tag.tag]['last_send_time'] - guaranteed_send_time = config[device]['tag_data'][tag.tag]['guaranteed_send_seconds'] - last_send_value = config[device]['tag_data'][tag.tag]['last_send_value'] - change_threshold = config[device]['tag_data'][tag.tag]['change_threshold'] - if last_send_time == None or time.time() - last_send_time >= guaranteed_send_time or last_send_value == None or abs(tag.value - last_send_value) >= change_threshold: - try: - time_in_nanos = TimeInNanos( - time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) - ) - variant = Variant(double_value=tag.value) - asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] - valueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=property_alias, property_values=asset) - client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(valueEntry)) - config[device]['tag_data'][tag.tag]['last_send_value'] = tag.value - config[device]['tag_data'][tag.tag]['last_send_time'] = time_in_nanos.time_in_seconds - except Exception as e: - logging.error(e) +def sendCheck(data, device): + last_send_time = config["devices"][device]['tag_data'][data.tag]['last_send_time'] + guaranteed_send_time = config["devices"][device]['tag_data'][data.tag]['guaranteed_send_seconds'] + last_send_value = config["devices"][device]['tag_data'][data.tag]['last_send_value'] + change_threshold = config["devices"][device]['tag_data'][data.tag]['change_threshold'] + + if last_send_time == None: + return True + elif time.time() - last_send_time >= guaranteed_send_time: + return True + elif last_send_value == None: + return True + elif abs(data.value - last_send_value) >= change_threshold: + return True + else: + return False + +def sum(operands:list): + return math.fsum(operands) + +def mean(operands:list): + return statistics.fmean(operands) + +def max(operands:list): + return max(operands) + +def min(operands:list): + return min(operands) + +def median(operands:list): + return median(operands) + +operations = { + "sum": sum, + "mean": mean, + "max": max, + "min": min, + "median": median +} #Setup a Stream Manager Message Stream try: client.create_message_stream(MessageStreamDefinition( @@ -117,21 +131,50 @@ except ConnectionError or asyncio.TimeoutError: #Main Polling Loop while(True): - for device in config.keys(): - plc_address = config[device]['source_address'] - tag_data = list(config[device]['tag_data'].keys()) + values = {} + for device in config["devices"].keys(): + plc_address = config["devices"][device]['source_address'] + tag_data = list(config["devices"][device]['tag_data'].keys()) if len(tag_data) > 1: try: response = read_multiple(plc_address,tag_data) logging.info(f"Got back: {response}") - sendMultipleData(response, device) + for tag in response: + if tag.tag in values.keys(): + values[tag.tag].append(tag.value) + else: + values[tag.tag] = [tag.value] + if sendCheck(tag, device): + sendDataResp = sendData(tag, config["devices"][device]['tag_data'][response.tag]['property_alias']) + if sendDataResp: + config["devices"][device]['tag_data'][response.tag]['last_send_value'] = response.value + config["devices"][device]['tag_data'][response.tag]['last_send_time'] = sendDataResp except Exception as e: logging.error(e) else: try: response = read_single(plc_address,tag_data[0]) logging.info(f"Got back: {response}") - sendData(response, device) + if response.tag in values.keys(): + values[response.tag].append(response.value) + else: + values[response.tag] = [response.value] + if sendCheck(response, device): + sendDataResp = sendData(response, config["devices"][device]['tag_data'][response.tag]['property_alias']) + if sendDataResp: + config["devices"][device]['tag_data'][response.tag]['last_send_value'] = response.value + config["devices"][device]['tag_data'][response.tag]['last_send_time'] = sendDataResp except Exception as e: logging.error(e) + for func in config["functions"].keys(): + operands = [] + for x in func["operands"]: + operands = operands + values[x] + value = operations[func["operation"]](operands) + #if sendCheck(response, device): + sendDataResp = sendData(value, config["functions"][func]["property_alias"]) + if sendDataResp: + config["functions"][func]["last_send_value"] = value + config["fucntions"][func]["last_send_time"] = sendDataResp time.sleep(5) +