updated code for chunking and reorganized

This commit is contained in:
Nico Melone
2023-12-14 13:16:36 -06:00
parent a75d279bf6
commit 14c29b6718
52 changed files with 7044 additions and 1824 deletions

View File

@@ -1,5 +1,5 @@
# Enter your python code.
import json, os
import json, os, time
from datetime import datetime as dt
from common.Logger import logger
from quickfaas.remotebus import publish
@@ -116,6 +116,16 @@ def checkParameterConfig(cfg):
return cfg
# Helper function to split the payload into chunks
def chunk_payload(payload, chunk_size=20):
chunked_values = list(payload["values"].items())
for i in range(0, len(chunked_values), chunk_size):
yield {
"ts": payload["ts"],
"values": dict(chunked_values[i:i+chunk_size])
}
def sendData(message):
logger.debug(message)
try:
@@ -144,7 +154,11 @@ def sendData(message):
payload["values"]["valve_status"] = 0
else:
payload["values"]["valve_status"] = -1
publish(__topic__, json.dumps(payload), __qos__)
for chunk in chunk_payload(payload=payload):
publish(__topic__, json.dumps(chunk), __qos__)
time.sleep(2)
publish("v1/devices/me/attributes", json.dumps({"latestReportTime": (round(dt.timestamp(dt.now())/600)*600)*1000}), __qos__)
except Exception as e:
logger.error(e)

View File

@@ -1,19 +1,28 @@
import json, time
from datetime import datetime as dt
from quickfaas.measure import recall, write
from quickfaas.remotebus import publish
from common.Logger import logger
# Helper function to split the payload into chunks
def chunk_payload(payload, chunk_size=20):
chunked_values = list(payload["values"].items())
for i in range(0, len(chunked_values), chunk_size):
yield {
"ts": payload["ts"],
"values": dict(chunked_values[i:i+chunk_size])
}
def sync():
#get new values and send
payload = {}
payload = {"ts": round(dt.timestamp(dt.now()))*1000, "values": {}}
topic = "v1/devices/me/telemetry"
try:
data = recall()#json.loads(recall().decode("utf-8"))
except Exception as e:
logger.error(e)
logger.info(data)
logger.debug(data)
for controller in data:
payload = {"ts": int(time.time()*1000), "values": {}}
valve_open = 0
valve_close = 0
for measure in controller["measures"]:
@@ -36,7 +45,10 @@ def sync():
payload["values"]["valve_status"] = "Unknown"
logger.debug("Sending on topic: {}".format(topic))
logger.debug("Sending value: {}".format(payload))
publish(topic, json.dumps(payload))
for chunk in chunk_payload(payload=payload):
publish(topic, json.dumps(chunk), 1)
time.sleep(2)
def writeplctag(value):
try:
#value = json.loads(value.replace("'",'"'))