added aggregation functions
This commit is contained in:
@@ -7,6 +7,8 @@ import calendar
|
|||||||
import random
|
import random
|
||||||
import uuid
|
import uuid
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import math
|
||||||
|
import statistics
|
||||||
from stream_manager import (
|
from stream_manager import (
|
||||||
AssetPropertyValue,
|
AssetPropertyValue,
|
||||||
ExportDefinition,
|
ExportDefinition,
|
||||||
@@ -49,48 +51,60 @@ def read_multiple(ipaddress, tags):
|
|||||||
return plc.read(*tags)
|
return plc.read(*tags)
|
||||||
|
|
||||||
#Put a single data point in a stream
|
#Put a single data point in a stream
|
||||||
def sendData(data,device):
|
def sendData(data,property_alias):
|
||||||
property_alias = config[device]['tag_data'][data.tag]['property_alias']
|
try:
|
||||||
last_send_time = config[device]['tag_data'][data.tag]['last_send_time']
|
time_in_nanos = TimeInNanos(
|
||||||
guaranteed_send_time = config[device]['tag_data'][data.tag]['guaranteed_send_seconds']
|
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
|
||||||
last_send_value = config[device]['tag_data'][data.tag]['last_send_value']
|
)
|
||||||
change_threshold = config[device]['tag_data'][data.tag]['change_threshold']
|
variant = Variant(double_value=data.value)
|
||||||
if last_send_time == None or time.time() - last_send_time >= guaranteed_send_time or last_send_value == None or abs(data.value - last_send_value) >= change_threshold:
|
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
|
||||||
try:
|
valueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=property_alias, property_values=asset)
|
||||||
time_in_nanos = TimeInNanos(
|
client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(valueEntry))
|
||||||
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
|
return time_in_nanos.time_in_seconds
|
||||||
)
|
except Exception as e:
|
||||||
variant = Variant(double_value=data.value)
|
logging.error(e)
|
||||||
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
|
return False
|
||||||
valueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=property_alias, property_values=asset)
|
|
||||||
client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(valueEntry))
|
|
||||||
config[device]['tag_data'][data.tag]['last_send_value'] = data.value
|
|
||||||
config[device]['tag_data'][data.tag]['last_send_time'] = time_in_nanos.time_in_seconds
|
|
||||||
except Exception as e:
|
|
||||||
logging.error(e)
|
|
||||||
|
|
||||||
#Put multiple data points in a stream
|
|
||||||
def sendMultipleData(data, device):
|
|
||||||
for tag in data:
|
|
||||||
property_alias = config[device]['tag_data'][tag.tag]['property_alias']
|
|
||||||
last_send_time = config[device]['tag_data'][tag.tag]['last_send_time']
|
|
||||||
guaranteed_send_time = config[device]['tag_data'][tag.tag]['guaranteed_send_seconds']
|
|
||||||
last_send_value = config[device]['tag_data'][tag.tag]['last_send_value']
|
|
||||||
change_threshold = config[device]['tag_data'][tag.tag]['change_threshold']
|
|
||||||
if last_send_time == None or time.time() - last_send_time >= guaranteed_send_time or last_send_value == None or abs(tag.value - last_send_value) >= change_threshold:
|
|
||||||
try:
|
|
||||||
time_in_nanos = TimeInNanos(
|
|
||||||
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
|
|
||||||
)
|
|
||||||
variant = Variant(double_value=tag.value)
|
|
||||||
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
|
|
||||||
valueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=property_alias, property_values=asset)
|
|
||||||
client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(valueEntry))
|
|
||||||
config[device]['tag_data'][tag.tag]['last_send_value'] = tag.value
|
|
||||||
config[device]['tag_data'][tag.tag]['last_send_time'] = time_in_nanos.time_in_seconds
|
|
||||||
except Exception as e:
|
|
||||||
logging.error(e)
|
|
||||||
|
|
||||||
|
def sendCheck(data, device):
|
||||||
|
last_send_time = config["devices"][device]['tag_data'][data.tag]['last_send_time']
|
||||||
|
guaranteed_send_time = config["devices"][device]['tag_data'][data.tag]['guaranteed_send_seconds']
|
||||||
|
last_send_value = config["devices"][device]['tag_data'][data.tag]['last_send_value']
|
||||||
|
change_threshold = config["devices"][device]['tag_data'][data.tag]['change_threshold']
|
||||||
|
|
||||||
|
if last_send_time == None:
|
||||||
|
return True
|
||||||
|
elif time.time() - last_send_time >= guaranteed_send_time:
|
||||||
|
return True
|
||||||
|
elif last_send_value == None:
|
||||||
|
return True
|
||||||
|
elif abs(data.value - last_send_value) >= change_threshold:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def sum(operands:list):
|
||||||
|
return math.fsum(operands)
|
||||||
|
|
||||||
|
def mean(operands:list):
|
||||||
|
return statistics.fmean(operands)
|
||||||
|
|
||||||
|
def max(operands:list):
|
||||||
|
return max(operands)
|
||||||
|
|
||||||
|
def min(operands:list):
|
||||||
|
return min(operands)
|
||||||
|
|
||||||
|
def median(operands:list):
|
||||||
|
return median(operands)
|
||||||
|
|
||||||
|
operations = {
|
||||||
|
"sum": sum,
|
||||||
|
"mean": mean,
|
||||||
|
"max": max,
|
||||||
|
"min": min,
|
||||||
|
"median": median
|
||||||
|
}
|
||||||
#Setup a Stream Manager Message Stream
|
#Setup a Stream Manager Message Stream
|
||||||
try:
|
try:
|
||||||
client.create_message_stream(MessageStreamDefinition(
|
client.create_message_stream(MessageStreamDefinition(
|
||||||
@@ -117,21 +131,50 @@ except ConnectionError or asyncio.TimeoutError:
|
|||||||
|
|
||||||
#Main Polling Loop
|
#Main Polling Loop
|
||||||
while(True):
|
while(True):
|
||||||
for device in config.keys():
|
values = {}
|
||||||
plc_address = config[device]['source_address']
|
for device in config["devices"].keys():
|
||||||
tag_data = list(config[device]['tag_data'].keys())
|
plc_address = config["devices"][device]['source_address']
|
||||||
|
tag_data = list(config["devices"][device]['tag_data'].keys())
|
||||||
if len(tag_data) > 1:
|
if len(tag_data) > 1:
|
||||||
try:
|
try:
|
||||||
response = read_multiple(plc_address,tag_data)
|
response = read_multiple(plc_address,tag_data)
|
||||||
logging.info(f"Got back: {response}")
|
logging.info(f"Got back: {response}")
|
||||||
sendMultipleData(response, device)
|
for tag in response:
|
||||||
|
if tag.tag in values.keys():
|
||||||
|
values[tag.tag].append(tag.value)
|
||||||
|
else:
|
||||||
|
values[tag.tag] = [tag.value]
|
||||||
|
if sendCheck(tag, device):
|
||||||
|
sendDataResp = sendData(tag, config["devices"][device]['tag_data'][response.tag]['property_alias'])
|
||||||
|
if sendDataResp:
|
||||||
|
config["devices"][device]['tag_data'][response.tag]['last_send_value'] = response.value
|
||||||
|
config["devices"][device]['tag_data'][response.tag]['last_send_time'] = sendDataResp
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(e)
|
logging.error(e)
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
response = read_single(plc_address,tag_data[0])
|
response = read_single(plc_address,tag_data[0])
|
||||||
logging.info(f"Got back: {response}")
|
logging.info(f"Got back: {response}")
|
||||||
sendData(response, device)
|
if response.tag in values.keys():
|
||||||
|
values[response.tag].append(response.value)
|
||||||
|
else:
|
||||||
|
values[response.tag] = [response.value]
|
||||||
|
if sendCheck(response, device):
|
||||||
|
sendDataResp = sendData(response, config["devices"][device]['tag_data'][response.tag]['property_alias'])
|
||||||
|
if sendDataResp:
|
||||||
|
config["devices"][device]['tag_data'][response.tag]['last_send_value'] = response.value
|
||||||
|
config["devices"][device]['tag_data'][response.tag]['last_send_time'] = sendDataResp
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(e)
|
logging.error(e)
|
||||||
|
for func in config["functions"].keys():
|
||||||
|
operands = []
|
||||||
|
for x in func["operands"]:
|
||||||
|
operands = operands + values[x]
|
||||||
|
value = operations[func["operation"]](operands)
|
||||||
|
#if sendCheck(response, device):
|
||||||
|
sendDataResp = sendData(value, config["functions"][func]["property_alias"])
|
||||||
|
if sendDataResp:
|
||||||
|
config["functions"][func]["last_send_value"] = value
|
||||||
|
config["fucntions"][func]["last_send_time"] = sendDataResp
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user