added overflow pump
This commit is contained in:
26
Pub_Sub/fk_plcpond_gateway/thingsboard/be-pond-transfer.csv
Normal file
26
Pub_Sub/fk_plcpond_gateway/thingsboard/be-pond-transfer.csv
Normal file
@@ -0,0 +1,26 @@
|
||||
MeasuringPointName,ControllerName,GroupName,UploadType,DeadZonePercent,DataType,ArrayIndex,EnableBit,BitIndex,reverseBit,Address,Decimal,Len,ReadWrite,Unit,Description,Transform Type,MaxValue,MinValue,MaxScale,MinScale,Gain,Offset,startBit,endBit,Pt,Ct,Mapping_table,$,,,
|
||||
alarm_enable_cmd,plcpond,default,periodic,,INT,,0,,,Alarm_Enable,,,rw,,,none,,,,,,,,,,,,,,,0
|
||||
auto,plcpond,default,periodic,,BIT,,,,0,Auto,,,rw,,,none,,,,,,,,,,,,,0,,0
|
||||
discharge_out,plcpond,default,periodic,,FLOAT,,,,,DisPSI_Out,2,,ro,,,none,,,,,,,,,,,,,,,0
|
||||
fit_rate,plcpond,default,periodic,,FLOAT,,,,,FIT_Rate,2,,ro,,,none,,,,,,,,,,,,,,,0
|
||||
hand,plcpond,default,periodic,,BIT,,,,0,Hand,,,ro,,,none,,,,,,,,,,,,,0,,0
|
||||
off,plcpond,default,periodic,,BIT,,,,0,OFF,,,ro,,,none,,,,,,,,,,,,,0,,0
|
||||
pond_1_hi_alm,plcpond,default,periodic,,BIT,,,,0,Pond_1_Hi_Alarm,,,ro,,,none,,,,,,,,,,,,,0,,0
|
||||
pond_1_hi_clr_spt,plcpond,default,periodic,,FLOAT,,,,,Pond_1_Hi_Clr_Setpoint,2,,rw,,,none,,,,,,,,,,,,,,,0
|
||||
pond_1_hi_spt,plcpond,default,periodic,,FLOAT,,,,,Pond_1_Hi_Setpoint,2,,rw,,,none,,,,,,,,,,,,,,,0
|
||||
pond_1_level,plcpond,default,periodic,,FLOAT,,,,,Pond_1_Lev,2,,ro,,,none,,,,,,,,,,,,,,,0
|
||||
pond_1_lo_alm,plcpond,default,periodic,,BIT,,,,0,Pond_1_Lo_Alarm,,,ro,,,none,,,,,,,,,,,,,0,,0
|
||||
pond_1_lo_clr_spt,plcpond,default,periodic,,FLOAT,,,,,Pond_1_Lo_Clr_Setpoint,2,,rw,,,none,,,,,,,,,,,,,,,0
|
||||
pond_1_lo_spt,plcpond,default,periodic,,FLOAT,,,,,Pond_1_Lo_Setpoint,2,,rw,,,none,,,,,,,,,,,,,,,0
|
||||
pond_1_total_bbls,plcpond,default,periodic,,FLOAT,,,,,Pond_1_Total_Barrels,2,,ro,,,none,,,,,,,,,,,,,,,0
|
||||
pond_2_hi_alm,plcpond,default,periodic,,BIT,,,,0,Pond_2_Hi_Alarm,,,ro,,,none,,,,,,,,,,,,,0,,0
|
||||
pond_2_hi_clr_spt,plcpond,default,periodic,,FLOAT,,,,,Pond_2_Hi_Clr_Setpoint,2,,rw,,,none,,,,,,,,,,,,,,,0
|
||||
pond_2_hi_spt,plcpond,default,periodic,,FLOAT,,,,,Pond_2_Hi_Setpoint,2,,rw,,,none,,,,,,,,,,,,,,,0
|
||||
pond_2_level,plcpond,default,periodic,,FLOAT,,,,,Pond_2_Lev,2,,ro,,,none,,,,,,,,,,,,,,,0
|
||||
pond_2_lo_alm,plcpond,default,periodic,,BIT,,,,0,Pond_2_Lo_Alarm,,,ro,,,none,,,,,,,,,,,,,0,,0
|
||||
pond_2_lo_clr_spt,plcpond,default,periodic,,FLOAT,,,,,Pond_2_Lo_Clr_Setpoint,2,,rw,,,none,,,,,,,,,,,,,,,0
|
||||
pond_2_lo_spt,plcpond,default,periodic,,FLOAT,,,,,Pond_2_Lo_Setpoint,2,,rw,,,none,,,,,,,,,,,,,,,0
|
||||
pond_2_total_bbls,plcpond,default,periodic,,FLOAT,,,,,Pond_2_Total_Barrels,2,,ro,,,none,,,,,,,,,,,,,,,0
|
||||
pump_permissive,plcpond,default,periodic,,BIT,,,,0,Pump_Permissive,,,ro,,,none,,,,,,,,,,,,,0,,0
|
||||
pump_status,plcpond,default,periodic,,BIT,,,,0,Pump_Status,,,ro,,,none,,,,,,,,,,,,,0,,0
|
||||
run_permissive,plcpond,default,periodic,,BIT,,,,0,Run_Perm,,,ro,,,none,,,,,,,,,,,,,0,,0
|
||||
|
1908
Pub_Sub/fk_plcpond_gateway/thingsboard/bn_tag_dump.json
Normal file
1908
Pub_Sub/fk_plcpond_gateway/thingsboard/bn_tag_dump.json
Normal file
File diff suppressed because it is too large
Load Diff
322
Pub_Sub/fk_plcpond_gateway/thingsboard/bp-compressor.cfg
Normal file
322
Pub_Sub/fk_plcpond_gateway/thingsboard/bp-compressor.cfg
Normal file
File diff suppressed because one or more lines are too long
666
Pub_Sub/fk_plcpond_gateway/thingsboard/fk_plcpond_tb_v3.cfg
Normal file
666
Pub_Sub/fk_plcpond_gateway/thingsboard/fk_plcpond_tb_v3.cfg
Normal file
File diff suppressed because one or more lines are too long
666
Pub_Sub/fk_plcpond_gateway/thingsboard/fk_plcpond_tb_v4.cfg
Normal file
666
Pub_Sub/fk_plcpond_gateway/thingsboard/fk_plcpond_tb_v4.cfg
Normal file
File diff suppressed because one or more lines are too long
26
Pub_Sub/fk_plcpond_gateway/thingsboard/overflow_measures.csv
Normal file
26
Pub_Sub/fk_plcpond_gateway/thingsboard/overflow_measures.csv
Normal file
@@ -0,0 +1,26 @@
|
||||
MeasuringPointName,ControllerName,GroupName,UploadType,DataType,EnableBit,BitIndex,reverseBit,Address,Decimal,Len,ReadWrite,Unit,Description,Transform Type,MaxValue,MinValue,MaxScale,MinScale,Gain,Offset,startBit,endBit,Pt,Ct,Mapping_table,TransDecimal,bitMap,msecSample,DataEndianReverse,ReadOffset,ReadLength,DataParseMethod,BitId,storageLwTSDB
|
||||
auto,overflow_pump,default,periodic,BIT,,,0,Raw_Auto_Input ,,,ro,, HOA auto status,none,,,,,,,,,,,,,0,,,,,,,1
|
||||
discharge_out,overflow_pump,default,periodic,FLOAT,,,,DisPSI_Out,2,,ro,,,none,,,,,,,,,,,,,,,0,,,,,1
|
||||
fit_rate,overflow_pump,default,periodic,FLOAT,,,,FIT_Rate,2,,ro,,,none,,,,,,,,,,,,,,,0,,,,,1
|
||||
hand,overflow_pump,default,periodic,BIT,,,0,Raw_Hand_Input ,,,ro,, HOA hand status,none,,,,,,,,,,,,,0,,,,,,,1
|
||||
level_override,overflow_pump,default,periodic,BIT,,,0,Level_Override ,,,rw,,"1= runs regardless of level, 0= runs based on start/stop levels",none,,,,,,,,,,,,,0,,,,,,,1
|
||||
overload_status,overflow_pump,default,periodic,BIT,,,0,Raw_Overload_Status , ,,ro,,,none,,,,,,,,,,,,,0,,,,,,,1
|
||||
pond_1_hi_alm,overflow_pump,default,periodic,BIT,,,0,Pond_1_Hi_Alarm,,,ro,,,none,,,,,,,,,,,,,0,,,,,,,1
|
||||
pond_1_hi_clr_spt,overflow_pump,default,periodic,FLOAT,,,,Pond_1_Hi_Clr_Setpoint,2,,rw,,,none,,,,,,,,,,,,,,,,,,,,1
|
||||
pond_1_hi_spt,overflow_pump,default,periodic,FLOAT,,,,Pond_1_Hi_Setpoint,2,,rw,,,none,,,,,,,,,,,,,,,,,,,,1
|
||||
pond_1_level,overflow_pump,default,periodic,FLOAT,,,,Pond_1_Lev,2,,ro,,,none,,,,,,,,,,,,,,,,,,,,1
|
||||
pond_1_lo_alm,overflow_pump,default,periodic,BIT,,,0,Pond_1_Lo_Alarm,,,ro,,,none,,,,,,,,,,,,,0,,,,,,,1
|
||||
pond_1_lo_clr_spt,overflow_pump,default,periodic,FLOAT,,,,Pond_1_Lo_Clr_Setpoint,2,,rw,,,none,,,,,,,,,,,,,,,,,,,,1
|
||||
pond_1_lo_spt,overflow_pump,default,periodic,FLOAT,,,,Pond_1_Lo_Setpoint,2,,rw,,,none,,,,,,,,,,,,,,,,,,,,1
|
||||
pond_1_total_bbls,overflow_pump,default,periodic,FLOAT,,,,Pond_1_Total_Barrels,2,,ro,,,none,,,,,,,,,,,,,,,,,,,,1
|
||||
pond_2_hi_alm,overflow_pump,default,periodic,BIT,,,0,Pond_2_Hi_Alarm,,,ro,,,none,,,,,,,,,,,,,0,,,,,,,1
|
||||
pond_2_hi_clr_spt,overflow_pump,default,periodic,FLOAT,,,,Pond_2_Hi_Clr_Setpoint,2,,rw,,,none,,,,,,,,,,,,,,,,,,,,1
|
||||
pond_2_hi_spt,overflow_pump,default,periodic,FLOAT,,,,Pond_2_Hi_Setpoint,2,,rw,,,none,,,,,,,,,,,,,,,,,,,,1
|
||||
pond_2_level,overflow_pump,default,periodic,FLOAT,,,,Pond_2_Lev,2,,ro,,,none,,,,,,,,,,,,,,,,,,,,1
|
||||
pond_2_lo_alm,overflow_pump,default,periodic,BIT,,,0,Pond_2_Lo_Alarm,,,ro,,,none,,,,,,,,,,,,,0,,,,,,,1
|
||||
pond_2_lo_clr_spt,overflow_pump,default,periodic,FLOAT,,,,Pond_2_Lo_Clr_Setpoint,2,,rw,,,none,,,,,,,,,,,,,,,,,,,,1
|
||||
pond_2_lo_spt,overflow_pump,default,periodic,FLOAT,,,,Pond_2_Lo_Setpoint,2,,rw,,,none,,,,,,,,,,,,,,,,,,,,1
|
||||
pond_2_total_bbls,overflow_pump,default,periodic,FLOAT,,,,Pond_2_Total_Barrels,2,,ro,,,none,,,,,,,,,,,,,,,,,,,,1
|
||||
run_status,overflow_pump,default,periodic,BIT,,,0,Raw_Run_Status ,,,ro,, Pump running status,none,,,,,,,,,,,,,0,,,,,,,1
|
||||
start_level_spt,overflow_pump,default,periodic,FLOAT,,,,Start_Level_spt ,2,,rw,, start level input,none,,,,,,,,,,,,,,,,,,,,1
|
||||
stop_level_spt,overflow_pump,default,periodic,FLOAT,,,,Stop_Level_spt ,2,,rw,, stop level input,none,,,,,,,,,,,,,,,,,,,,1
|
||||
|
2348
Pub_Sub/fk_plcpond_gateway/thingsboard/overflow_tag_dump.json
Normal file
2348
Pub_Sub/fk_plcpond_gateway/thingsboard/overflow_tag_dump.json
Normal file
File diff suppressed because it is too large
Load Diff
272
Pub_Sub/fk_plcpond_gateway/thingsboard/pub/sendData.py
Normal file
272
Pub_Sub/fk_plcpond_gateway/thingsboard/pub/sendData.py
Normal file
@@ -0,0 +1,272 @@
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from datetime import datetime as dt
|
||||
from common.Logger import logger
|
||||
from quickfaas.remotebus import publish
|
||||
from quickfaas.global_dict import get as get_params
|
||||
from quickfaas.global_dict import _set_global_args
|
||||
from mobiuspi_lib.gps import GPS
|
||||
|
||||
|
||||
def reboot():
|
||||
# basic = Basic()
|
||||
logger.info("!" * 10 + "REBOOTING DEVICE" + "!"*10)
|
||||
r = os.popen("kill -s SIGHUP `cat /var/run/python/supervisord.pid`").read()
|
||||
logger.info(f"REBOOT : {r}")
|
||||
|
||||
|
||||
def checkFileExist(filename):
|
||||
path = "/var/user/files"
|
||||
if not os.path.exists(path):
|
||||
logger.info("no folder making files folder in var/user")
|
||||
os.makedirs(path)
|
||||
with open(path + "/" + filename, "a") as f:
|
||||
json.dump({}, f)
|
||||
if not os.path.exists(path + "/" + filename):
|
||||
logger.info("no creds file making creds file")
|
||||
with open(path + "/" + filename, "a") as f:
|
||||
json.dump({}, f)
|
||||
|
||||
|
||||
def convertDStoJSON(ds):
|
||||
j = dict()
|
||||
for x in ds:
|
||||
j[x["key"]] = x["value"]
|
||||
return j
|
||||
|
||||
|
||||
def convertJSONtoDS(j):
|
||||
d = []
|
||||
for key in j.keys():
|
||||
d.append({"key": key, "value": j[key]})
|
||||
return d
|
||||
|
||||
|
||||
def checkCredentialConfig():
|
||||
logger.info("CHECKING CONFIG")
|
||||
cfgpath = "/var/user/cfg/device_supervisor/device_supervisor.cfg"
|
||||
credspath = "/var/user/files/creds.json"
|
||||
cfg = dict()
|
||||
with open(cfgpath, "r") as f:
|
||||
cfg = json.load(f)
|
||||
clouds = cfg.get("clouds")
|
||||
logger.info(clouds)
|
||||
# if not configured then try to configure from stored values
|
||||
if clouds[0]["args"]["clientId"] == "unknown" or clouds[0]["args"]["username"] == "unknown" or not clouds[0]["args"]["passwd"] or clouds[0]["args"]["passwd"] == "unknown":
|
||||
checkFileExist("creds.json")
|
||||
with open(credspath, "r") as c:
|
||||
creds = json.load(c)
|
||||
if creds:
|
||||
logger.info("updating config with stored data")
|
||||
clouds[0]["args"]["clientId"] = creds["clientId"]
|
||||
clouds[0]["args"]["username"] = creds["userName"]
|
||||
clouds[0]["args"]["passwd"] = creds["password"]
|
||||
cfg["clouds"] = clouds
|
||||
cfg = checkParameterConfig(cfg)
|
||||
with open(cfgpath, "w", encoding='utf-8') as n:
|
||||
json.dump(cfg, n, indent=1, ensure_ascii=False)
|
||||
reboot()
|
||||
else:
|
||||
# assuming clouds is filled out, if data is different then assume someone typed in something new and store it, if creds is empty fill with clouds' data
|
||||
checkFileExist("creds.json")
|
||||
with open(credspath, "r") as c:
|
||||
logger.info("updating stored file with new data")
|
||||
cfg = checkParameterConfig(cfg)
|
||||
with open(cfgpath, "w", encoding='utf-8') as n:
|
||||
json.dump(cfg, n, indent=1, ensure_ascii=False)
|
||||
creds = json.load(c)
|
||||
if creds:
|
||||
if creds["clientId"] != clouds[0]["args"]["clientId"]:
|
||||
creds["clientId"] = clouds[0]["args"]["clientId"]
|
||||
if creds["userName"] != clouds[0]["args"]["username"]:
|
||||
creds["userName"] = clouds[0]["args"]["username"]
|
||||
if creds["password"] != clouds[0]["args"]["passwd"]:
|
||||
creds["password"] = clouds[0]["args"]["passwd"]
|
||||
else:
|
||||
creds["clientId"] = clouds[0]["args"]["clientId"]
|
||||
creds["userName"] = clouds[0]["args"]["username"]
|
||||
creds["password"] = clouds[0]["args"]["passwd"]
|
||||
with open(credspath, "w") as cw:
|
||||
json.dump(creds, cw)
|
||||
|
||||
|
||||
def checkParameterConfig(cfg):
|
||||
logger.info("Checking Parameters!!!!")
|
||||
paramspath = "/var/user/files/params.json"
|
||||
cfgparams = convertDStoJSON(cfg.get("labels"))
|
||||
# check stored values
|
||||
checkFileExist("params.json")
|
||||
with open(paramspath, "r") as f:
|
||||
logger.info("Opened param storage file")
|
||||
params = json.load(f)
|
||||
if params:
|
||||
if cfgparams != params:
|
||||
# go through each param
|
||||
# if not "unknown" and cfg and params aren't the same take from cfg likely updated manually
|
||||
# if key in cfg but not in params copy to params
|
||||
logger.info("equalizing params between cfg and stored")
|
||||
for key in cfgparams.keys():
|
||||
try:
|
||||
if cfgparams[key] != params[key] and cfgparams[key] != "unknown":
|
||||
params[key] = cfgparams[key]
|
||||
except:
|
||||
params[key] = cfgparams[key]
|
||||
cfg["labels"] = convertJSONtoDS(params)
|
||||
_set_global_args(convertJSONtoDS(params))
|
||||
with open(paramspath, "w") as p:
|
||||
json.dump(params, p)
|
||||
else:
|
||||
with open(paramspath, "w") as p:
|
||||
logger.info("initializing param file with params in memory")
|
||||
json.dump(convertDStoJSON(get_params()), p)
|
||||
cfg["labels"] = get_params()
|
||||
|
||||
return cfg
|
||||
|
||||
|
||||
def getGPS():
|
||||
# Create a gps instance
|
||||
gps = GPS()
|
||||
|
||||
# Retrieve GPS information
|
||||
position_status = gps.get_position_status()
|
||||
logger.debug("position_status: ")
|
||||
logger.debug(position_status)
|
||||
latitude = position_status["latitude"].split(" ")
|
||||
longitude = position_status["longitude"].split(" ")
|
||||
lat_dec = int(latitude[0][:-1]) + (float(latitude[1][:-1])/60)
|
||||
lon_dec = int(longitude[0][:-1]) + (float(longitude[1][:-1])/60)
|
||||
if latitude[2] == "S":
|
||||
lat_dec = lat_dec * -1
|
||||
if longitude[2] == "W":
|
||||
lon_dec = lon_dec * -1
|
||||
# lat_dec = round(lat_dec, 7)
|
||||
# lon_dec = round(lon_dec, 7)
|
||||
logger.info("HERE IS THE GPS COORDS")
|
||||
logger.info(f"LATITUDE: {lat_dec}, LONGITUDE: {lon_dec}")
|
||||
speedKnots = position_status["speed"].split(" ")
|
||||
speedMPH = float(speedKnots[0]) * 1.151
|
||||
return (f"{lat_dec:.8f}", f"{lon_dec:.8f}", f"{speedMPH:.2f}")
|
||||
|
||||
|
||||
def chunk_payload(payload, chunk_size=20):
|
||||
if "values" in payload:
|
||||
# Original format: {"ts": ..., "values": {...}}
|
||||
chunked_values = list(payload["values"].items())
|
||||
for i in range(0, len(chunked_values), chunk_size):
|
||||
yield {
|
||||
"ts": payload["ts"],
|
||||
"values": dict(chunked_values[i:i+chunk_size])
|
||||
}
|
||||
else:
|
||||
# New format: {"key1": "value1", "key2": "value2"}
|
||||
chunked_keys = list(payload.keys())
|
||||
for i in range(0, len(chunked_keys), chunk_size):
|
||||
yield {k: payload[k] for k in chunked_keys[i:i+chunk_size]}
|
||||
|
||||
|
||||
def chunk_payload_devices(payload, chunk_size=20, is_attributes_payload=False):
|
||||
if is_attributes_payload:
|
||||
# For attributes payload, chunk the controllers
|
||||
controllers = list(payload.items())
|
||||
for i in range(0, len(controllers), chunk_size):
|
||||
yield dict(controllers[i:i + chunk_size])
|
||||
else:
|
||||
# For data payload, chunk the values within each controller
|
||||
for controller, data in payload.items():
|
||||
for entry in data:
|
||||
ts = entry['ts']
|
||||
values = entry['values']
|
||||
chunked_values = list(values.items())
|
||||
for i in range(0, len(chunked_values), chunk_size):
|
||||
yield {
|
||||
controller: [{
|
||||
"ts": ts,
|
||||
"values": dict(chunked_values[i:i + chunk_size])
|
||||
}]
|
||||
}
|
||||
|
||||
|
||||
def controlName(name):
|
||||
logger.debug(name)
|
||||
params = convertDStoJSON(get_params())
|
||||
logger.debug(params)
|
||||
nameMap = {
|
||||
"overflow_pump": f"{params['overflow_pump']}"
|
||||
}
|
||||
return nameMap.get(name, "Gateway")
|
||||
|
||||
|
||||
def sendData(message):
|
||||
# logger.info(message)
|
||||
grouped_data = {}
|
||||
grouped_attributes = {}
|
||||
now = (round(dt.timestamp(dt.now())/600)*600)*1000
|
||||
payload = {"ts": now, "values": {}}
|
||||
attributes_payload = {}
|
||||
for measure in message["measures"]:
|
||||
try:
|
||||
logger.debug(measure)
|
||||
ctrlName = controlName(measure["ctrlName"])
|
||||
logger.debug(ctrlName)
|
||||
if ctrlName == "Gateway":
|
||||
# send to gateway with v1/devices/me/telemetry
|
||||
if measure["health"] == 1:
|
||||
if "_spt" in measure["name"]:
|
||||
attributes_payload[measure["name"]] = measure["value"]
|
||||
else:
|
||||
payload["values"][measure["name"]] = measure["value"]
|
||||
else:
|
||||
name = measure['name']
|
||||
value = measure['value']
|
||||
health = measure['health']
|
||||
# Add controller for telemetry if it doesn't exist
|
||||
if ctrlName not in grouped_data:
|
||||
grouped_data[ctrlName] = {}
|
||||
# Add controller for attributes if it doesn't exist
|
||||
if ctrlName not in grouped_attributes:
|
||||
grouped_attributes[ctrlName] = {}
|
||||
grouped_attributes[ctrlName]["latestReportTime"] = now
|
||||
# Add data to temp payload if datapoint health is good
|
||||
if health:
|
||||
if "_spt" in name:
|
||||
grouped_attributes[ctrlName][name] = value
|
||||
else:
|
||||
grouped_data[ctrlName][name] = value
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
|
||||
# Transform the grouped data to desired structure
|
||||
payload_devices = {}
|
||||
|
||||
for key, value in grouped_data.items():
|
||||
if value:
|
||||
payload_devices[key] = [{"ts": now, "values": value}]
|
||||
|
||||
attributes_payload_devices = {}
|
||||
for key, value in grouped_attributes.items():
|
||||
if value:
|
||||
attributes_payload_devices[key] = value
|
||||
logger.debug(payload_devices)
|
||||
logger.debug(attributes_payload_devices)
|
||||
# Send data belonging to Gateway
|
||||
for chunk in chunk_payload(payload=payload):
|
||||
publish(__topic__, json.dumps(chunk), __qos__)
|
||||
time.sleep(2)
|
||||
|
||||
attributes_payload["latestReportTime"] = (
|
||||
round(dt.timestamp(dt.now())/600)*600)*1000
|
||||
for chunk in chunk_payload(payload=attributes_payload):
|
||||
publish("v1/devices/me/attributes", json.dumps(chunk), __qos__)
|
||||
time.sleep(2)
|
||||
|
||||
# Send gateway devices data
|
||||
for chunk in chunk_payload_devices(payload=payload_devices):
|
||||
publish("v1/gateway/telemetry", json.dumps(chunk), __qos__)
|
||||
time.sleep(2)
|
||||
|
||||
for chunk in chunk_payload_devices(payload=attributes_payload_devices, is_attributes_payload=True):
|
||||
publish("v1/gateway/attributes",
|
||||
json.dumps(attributes_payload_devices), __qos__)
|
||||
time.sleep(2)
|
||||
184
Pub_Sub/fk_plcpond_gateway/thingsboard/sub/receiveAttribute.py
Normal file
184
Pub_Sub/fk_plcpond_gateway/thingsboard/sub/receiveAttribute.py
Normal file
@@ -0,0 +1,184 @@
|
||||
import json, time
|
||||
from datetime import datetime as dt
|
||||
from quickfaas.measure import recall, write
|
||||
from quickfaas.remotebus import publish
|
||||
from common.Logger import logger
|
||||
from quickfaas.global_dict import get as get_params
|
||||
|
||||
def convertDStoJSON(ds):
|
||||
j = dict()
|
||||
for x in ds:
|
||||
j[x["key"]] = x["value"]
|
||||
return j
|
||||
|
||||
def chunk_payload(payload, chunk_size=20):
|
||||
if "values" in payload:
|
||||
# Original format: {"ts": ..., "values": {...}}
|
||||
chunked_values = list(payload["values"].items())
|
||||
for i in range(0, len(chunked_values), chunk_size):
|
||||
yield {
|
||||
"ts": payload["ts"],
|
||||
"values": dict(chunked_values[i:i+chunk_size])
|
||||
}
|
||||
else:
|
||||
# New format: {"key1": "value1", "key2": "value2"}
|
||||
chunked_keys = list(payload.keys())
|
||||
for i in range(0, len(chunked_keys), chunk_size):
|
||||
yield {k: payload[k] for k in chunked_keys[i:i+chunk_size]}
|
||||
|
||||
def chunk_payload_gateway(payload, chunk_size=20, is_attributes_payload=False):
|
||||
if is_attributes_payload:
|
||||
# For attributes payload, chunk the controllers
|
||||
controllers = list(payload.items())
|
||||
for i in range(0, len(controllers), chunk_size):
|
||||
yield dict(controllers[i:i + chunk_size])
|
||||
else:
|
||||
# For data payload, chunk the values within each controller
|
||||
for controller, data in payload.items():
|
||||
for entry in data:
|
||||
ts = entry['ts']
|
||||
values = entry['values']
|
||||
chunked_values = list(values.items())
|
||||
for i in range(0, len(chunked_values), chunk_size):
|
||||
yield {
|
||||
controller: [{
|
||||
"ts": ts,
|
||||
"values": dict(chunked_values[i:i + chunk_size])
|
||||
}]
|
||||
}
|
||||
|
||||
|
||||
def controlName(name):
|
||||
logger.debug(name)
|
||||
params = convertDStoJSON(get_params())
|
||||
logger.debug(params)
|
||||
nameMap = {
|
||||
"overflow_pump": f"{params['overflow_pump']}"
|
||||
}
|
||||
return nameMap.get(name, "Gateway")
|
||||
|
||||
|
||||
# Filter payloads based on device_filter
|
||||
def filter_payload(payload, device_filter):
|
||||
if not device_filter: # If filter is empty, include all devices
|
||||
return payload
|
||||
return {key: value for key, value in payload.items() if key in device_filter}
|
||||
|
||||
def sync(device_filter=[]):
|
||||
#get new values and send
|
||||
now = round(dt.timestamp(dt.now()))*1000
|
||||
topic = "v1/gateway/telemetry"
|
||||
try:
|
||||
data = recall()#json.loads(recall().decode("utf-8"))
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
logger.debug(data)
|
||||
logger.info("SYNCING")
|
||||
grouped_data = {}
|
||||
grouped_attributes = {}
|
||||
payload = {"ts": now, "values":{}}
|
||||
attributes_payload = {}
|
||||
try:
|
||||
for controller in data:
|
||||
for measure in controller["measures"]:
|
||||
ctrlName = controlName(measure["ctrlName"])
|
||||
if ctrlName == "Gateway":
|
||||
#send to gateway with v1/devices/me/telemetry
|
||||
if measure["health"] == 1:
|
||||
if "_spt" in measure["name"]:
|
||||
attributes_payload[measure["name"]] = measure["value"]
|
||||
else:
|
||||
payload["values"][measure["name"]] = measure["value"]
|
||||
else:
|
||||
name = "_".join(measure['name'].split("_")[2:])
|
||||
value = measure['value']
|
||||
health = measure['health']
|
||||
#Add controller for telemetry if it doesn't exist
|
||||
if ctrlName not in grouped_data:
|
||||
grouped_data[ctrlName] = {}
|
||||
#Add controller for attributes if it doesn't exist
|
||||
if ctrlName not in grouped_attributes:
|
||||
grouped_attributes[ctrlName] = {}
|
||||
grouped_attributes[ctrlName]["latestReportTime"] = now
|
||||
#Add data to temp payload if datapoint health is good
|
||||
if health:
|
||||
if "_spt" in name:
|
||||
grouped_attributes[ctrlName][name] = value
|
||||
else:
|
||||
grouped_data[ctrlName][name] = value
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
try:
|
||||
# Transform the grouped data to desired structure
|
||||
payload_gateway = {}
|
||||
|
||||
for key, value in grouped_data.items():
|
||||
if value:
|
||||
payload_gateway[key] = [{"ts": now ,"values": value}]
|
||||
|
||||
attributes_payload_gateway = {}
|
||||
for key, value in grouped_attributes.items():
|
||||
if value:
|
||||
attributes_payload_gateway[key] = value
|
||||
|
||||
# Apply the filter
|
||||
filtered_payload_gateway = filter_payload(payload_gateway, device_filter)
|
||||
filtered_attributes_payload_gateway = filter_payload(attributes_payload_gateway, device_filter)
|
||||
|
||||
#Send data belonging to Gateway
|
||||
for chunk in chunk_payload(payload=payload):
|
||||
publish("v1/devices/me/telemetry", json.dumps(chunk), qos=1, cloud_name="default")
|
||||
time.sleep(2)
|
||||
|
||||
attributes_payload["latestReportTime"] = (round(dt.timestamp(dt.now())/600)*600)*1000
|
||||
for chunk in chunk_payload(payload=attributes_payload):
|
||||
publish("v1/devices/me/attributes", json.dumps(chunk), qos=1, cloud_name="default")
|
||||
time.sleep(2)
|
||||
|
||||
#Send gateway devices data
|
||||
for chunk in chunk_payload_gateway(payload=filtered_payload_gateway):
|
||||
publish("v1/gateway/telemetry", json.dumps(chunk), qos=1, cloud_name="default")
|
||||
time.sleep(2)
|
||||
|
||||
for chunk in chunk_payload_gateway(payload=filtered_attributes_payload_gateway, is_attributes_payload=True):
|
||||
publish("v1/gateway/attributes", json.dumps(chunk), qos=1, cloud_name="default")
|
||||
time.sleep(2)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
|
||||
def writeplctag(value):
|
||||
#value in the form {"measurement": <measurement_name>, "value": <value to write>}
|
||||
try:
|
||||
#value = json.loads(value.replace("'",'"'))
|
||||
logger.info(value)
|
||||
#payload format: [{"name": "advvfdipp", "measures": [{"name": "manualfrequencysetpoint", "value": 49}]}]
|
||||
message = [{"name": "plcpond", "measures":[{"name":value["measurement"], "value": value["value"]}]}]
|
||||
resp = write(message)
|
||||
logger.info("RETURN FROM WRITE: {}".format(resp))
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
return False
|
||||
|
||||
def receiveAttribute(topic, payload):
|
||||
try:
|
||||
logger.debug(topic)
|
||||
logger.info(json.loads(payload))
|
||||
p = json.loads(payload)
|
||||
|
||||
for key, value in p.items():
|
||||
try:
|
||||
result = writeplctag({"measurement": key, "value": value})
|
||||
logger.debug(result)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
#logger.debug(command)
|
||||
time.sleep(5)
|
||||
try:
|
||||
sync(p["device"])
|
||||
except Exception as e:
|
||||
logger.error(f"Could not sync: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(e)
|
||||
@@ -0,0 +1,198 @@
|
||||
import json, time
|
||||
from datetime import datetime as dt
|
||||
from quickfaas.measure import recall, write
|
||||
from quickfaas.remotebus import publish
|
||||
from common.Logger import logger
|
||||
from quickfaas.global_dict import get as get_params
|
||||
|
||||
def convertDStoJSON(ds):
|
||||
j = dict()
|
||||
for x in ds:
|
||||
j[x["key"]] = x["value"]
|
||||
return j
|
||||
|
||||
def formatPLCPayload(device, key, value):
|
||||
params = convertDStoJSON(get_params())
|
||||
nameMap = {
|
||||
f"{params['overflow_pump']}": "overflow_pump"
|
||||
}
|
||||
measure = key
|
||||
device = nameMap.get(device, "")
|
||||
output = {"measurement": measure, "value": value, "device": device}
|
||||
return output
|
||||
|
||||
|
||||
def chunk_payload(payload, chunk_size=20):
|
||||
if "values" in payload:
|
||||
# Original format: {"ts": ..., "values": {...}}
|
||||
chunked_values = list(payload["values"].items())
|
||||
for i in range(0, len(chunked_values), chunk_size):
|
||||
yield {
|
||||
"ts": payload["ts"],
|
||||
"values": dict(chunked_values[i:i+chunk_size])
|
||||
}
|
||||
else:
|
||||
# New format: {"key1": "value1", "key2": "value2"}
|
||||
chunked_keys = list(payload.keys())
|
||||
for i in range(0, len(chunked_keys), chunk_size):
|
||||
yield {k: payload[k] for k in chunked_keys[i:i+chunk_size]}
|
||||
|
||||
def chunk_payload_gateway(payload, chunk_size=20, is_attributes_payload=False):
|
||||
if is_attributes_payload:
|
||||
# For attributes payload, chunk the controllers
|
||||
controllers = list(payload.items())
|
||||
for i in range(0, len(controllers), chunk_size):
|
||||
yield dict(controllers[i:i + chunk_size])
|
||||
else:
|
||||
# For data payload, chunk the values within each controller
|
||||
for controller, data in payload.items():
|
||||
for entry in data:
|
||||
ts = entry['ts']
|
||||
values = entry['values']
|
||||
chunked_values = list(values.items())
|
||||
for i in range(0, len(chunked_values), chunk_size):
|
||||
yield {
|
||||
controller: [{
|
||||
"ts": ts,
|
||||
"values": dict(chunked_values[i:i + chunk_size])
|
||||
}]
|
||||
}
|
||||
|
||||
|
||||
def controlName(name):
|
||||
logger.debug(name)
|
||||
params = convertDStoJSON(get_params())
|
||||
logger.debug(params)
|
||||
nameMap = {
|
||||
"overflow_pump": f"{params['overflow_pump']}"
|
||||
}
|
||||
return nameMap.get(name, "Gateway")
|
||||
|
||||
# Filter payloads based on device_filter
|
||||
def filter_payload(payload, device_filter):
|
||||
if not device_filter: # If filter is empty, include all devices
|
||||
return payload
|
||||
return {key: value for key, value in payload.items() if key in device_filter}
|
||||
|
||||
|
||||
def sync(device_filter=[]):
|
||||
#get new values and send
|
||||
now = round(dt.timestamp(dt.now()))*1000
|
||||
topic = "v1/gateway/telemetry"
|
||||
try:
|
||||
data = recall()#json.loads(recall().decode("utf-8"))
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
logger.debug(data)
|
||||
logger.info("SYNCING")
|
||||
grouped_data = {}
|
||||
grouped_attributes = {}
|
||||
payload = {"ts": now, "values":{}}
|
||||
attributes_payload = {}
|
||||
try:
|
||||
for controller in data:
|
||||
for measure in controller["measures"]:
|
||||
ctrlName = controlName(measure["name"])
|
||||
if ctrlName == "Gateway":
|
||||
#send to gateway with v1/devices/me/telemetry
|
||||
if measure["health"] == 1:
|
||||
if "_spt" in measure["name"]:
|
||||
attributes_payload[measure["name"]] = measure["value"]
|
||||
else:
|
||||
payload["values"][measure["name"]] = measure["value"]
|
||||
else:
|
||||
name = "_".join(measure['name'].split("_")[2:])
|
||||
value = measure['value']
|
||||
health = measure['health']
|
||||
#Add controller for telemetry if it doesn't exist
|
||||
if ctrlName not in grouped_data:
|
||||
grouped_data[ctrlName] = {}
|
||||
#Add controller for attributes if it doesn't exist
|
||||
if ctrlName not in grouped_attributes:
|
||||
grouped_attributes[ctrlName] = {}
|
||||
grouped_attributes[ctrlName]["latestReportTime"] = now
|
||||
#Add data to temp payload if datapoint health is good
|
||||
if health:
|
||||
if "_spt" in name:
|
||||
grouped_attributes[ctrlName][name] = value
|
||||
else:
|
||||
grouped_data[ctrlName][name] = value
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
try:
|
||||
# Transform the grouped data to desired structure
|
||||
payload_gateway = {}
|
||||
|
||||
for key, value in grouped_data.items():
|
||||
if value:
|
||||
payload_gateway[key] = [{"ts": now ,"values": value}]
|
||||
|
||||
attributes_payload_gateway = {}
|
||||
for key, value in grouped_attributes.items():
|
||||
if value:
|
||||
attributes_payload_gateway[key] = value
|
||||
|
||||
# Apply the filter
|
||||
filtered_payload_gateway = filter_payload(payload_gateway, device_filter)
|
||||
filtered_attributes_payload_gateway = filter_payload(attributes_payload_gateway, device_filter)
|
||||
|
||||
#Send data belonging to Gateway
|
||||
for chunk in chunk_payload(payload=payload):
|
||||
publish("v1/devices/me/telemetry", json.dumps(chunk), qos=1, cloud_name="default")
|
||||
time.sleep(2)
|
||||
|
||||
attributes_payload["latestReportTime"] = (round(dt.timestamp(dt.now())/600)*600)*1000
|
||||
for chunk in chunk_payload(payload=attributes_payload):
|
||||
publish("v1/devices/me/attributes", json.dumps(chunk), qos=1, cloud_name="default")
|
||||
time.sleep(2)
|
||||
|
||||
#Send gateway devices data
|
||||
for chunk in chunk_payload_gateway(payload=filtered_payload_gateway):
|
||||
publish("v1/gateway/telemetry", json.dumps(chunk), qos=1, cloud_name="default")
|
||||
time.sleep(2)
|
||||
|
||||
for chunk in chunk_payload_gateway(payload=filtered_attributes_payload_gateway, is_attributes_payload=True):
|
||||
publish("v1/gateway/attributes", json.dumps(chunk), qos=1, cloud_name="default")
|
||||
time.sleep(2)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
|
||||
|
||||
|
||||
def writeplctag(value):
|
||||
#value in the form {"measurement": <measurement_name>, "value": <value to write>}
|
||||
try:
|
||||
logger.debug(value)
|
||||
#payload format: [{"name": "advvfdipp", "measures": [{"name": "manualfrequencysetpoint", "value": 49}]}]
|
||||
message = [{"name": value["device"], "measures": [{"name": value["measurement"], "value": value["value"]}]}]
|
||||
resp = write(message)
|
||||
logger.debug("RETURN FROM WRITE: {}".format(resp))
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
return False
|
||||
|
||||
|
||||
def receiveAttribute(topic, payload):
|
||||
try:
|
||||
logger.debug(topic)
|
||||
logger.debug(json.loads(payload))
|
||||
p = json.loads(payload)
|
||||
device = p["device"]
|
||||
for key, value in p["data"].items():
|
||||
try:
|
||||
measure = formatPLCPayload(device, key, value)
|
||||
result = writeplctag(measure)
|
||||
logger.debug(result)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
#logger.debug(command)
|
||||
time.sleep(5)
|
||||
try:
|
||||
sync(p["device"])
|
||||
except Exception as e:
|
||||
logger.error(f"Could not sync: {e}")
|
||||
except Exception as e:
|
||||
logger.debug(e)
|
||||
|
||||
75
Pub_Sub/fk_plcpond_gateway/thingsboard/sub/receiveCommand.py
Normal file
75
Pub_Sub/fk_plcpond_gateway/thingsboard/sub/receiveCommand.py
Normal file
@@ -0,0 +1,75 @@
|
||||
import json, time
|
||||
from datetime import datetime as dt
|
||||
from quickfaas.measure import recall, write
|
||||
from quickfaas.remotebus import publish
|
||||
from common.Logger import logger
|
||||
|
||||
# Helper function to split the payload into chunks
|
||||
def chunk_payload(payload, chunk_size=20):
|
||||
chunked_values = list(payload["values"].items())
|
||||
for i in range(0, len(chunked_values), chunk_size):
|
||||
yield {
|
||||
"ts": payload["ts"],
|
||||
"values": dict(chunked_values[i:i+chunk_size])
|
||||
}
|
||||
|
||||
def sync():
|
||||
#get new values and send
|
||||
payload = {"ts": round(dt.timestamp(dt.now()))*1000, "values": {}}
|
||||
topic = "v1/devices/me/telemetry"
|
||||
try:
|
||||
data = recall()#json.loads(recall().decode("utf-8"))
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
logger.debug(data)
|
||||
for controller in data:
|
||||
for measure in controller["measures"]:
|
||||
#publish measure
|
||||
payload["values"][measure["name"]] = measure["value"]
|
||||
logger.debug("Sending on topic: {}".format(topic))
|
||||
logger.debug("Sending value: {}".format(payload))
|
||||
for chunk in chunk_payload(payload=payload):
|
||||
publish(topic, json.dumps(chunk), 1)
|
||||
time.sleep(2)
|
||||
|
||||
def writeplctag(value):
|
||||
#value in the form {"measurement": <measurement_name>, "value": <value to write>}
|
||||
try:
|
||||
#value = json.loads(value.replace("'",'"'))
|
||||
logger.info(value)
|
||||
#payload format: [{"name": "advvfdipp", "measures": [{"name": "manualfrequencysetpoint", "value": 49}]}]
|
||||
message = [{"name": "rr_facility", "measures":[{"name":value["measurement"], "value": value["value"]}]}]
|
||||
resp = write(message)
|
||||
logger.info("RETURN FROM WRITE: {}".format(resp))
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
return False
|
||||
|
||||
def receiveCommand(topic, payload, wizard_api):
|
||||
try:
|
||||
logger.debug(topic)
|
||||
logger.info(json.loads(payload))
|
||||
p = json.loads(payload)
|
||||
command = p["method"]
|
||||
commands = {
|
||||
"sync": sync,
|
||||
"writeplctag": writeplctag,
|
||||
}
|
||||
if command == "setPLCTag":
|
||||
result = commands["writeplctag"](p["params"])
|
||||
if result:
|
||||
sync()
|
||||
#commands[command](p["mac"].lower(),p["payload"]["value"], wizard_api)
|
||||
#logger.debug(command)
|
||||
ack(topic.split("/")[-1], wizard_api)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
|
||||
|
||||
def ack(msgid, wizard_api):
|
||||
#logger.debug(msgid)
|
||||
#logger.debug(mac)
|
||||
#logger.debug(name)
|
||||
#logger.debug(value)
|
||||
wizard_api.mqtt_publish("v1/devices/me/rpc/response/" + str(msgid), json.dumps({"msg": {"time": time.time()}, "metadata": "", "msgType": ""}))
|
||||
@@ -0,0 +1,244 @@
|
||||
import json, time
|
||||
from datetime import datetime as dt
|
||||
from quickfaas.measure import recall, write
|
||||
from quickfaas.remotebus import publish
|
||||
from common.Logger import logger
|
||||
from quickfaas.global_dict import get as get_params
|
||||
|
||||
def convertDStoJSON(ds):
|
||||
j = dict()
|
||||
for x in ds:
|
||||
j[x["key"]] = x["value"]
|
||||
return j
|
||||
|
||||
def formatPLCPayload(payload):
|
||||
params = convertDStoJSON(get_params())
|
||||
nameMap = {
|
||||
f"{params['facilityName']} Transfer Pump #1": "tp_1_",
|
||||
f"{params['facilityName']} Transfer Pump #2": "tp_2_",
|
||||
f"{params['facilityName']} Water Well #1": "ww_1_",
|
||||
f"{params['facilityName']} Water Well #2": "ww_2_",
|
||||
f"{params['facilityName']} Water Well #3": "ww_3_",
|
||||
f"{params['facilityName']} Water Well #4": "ww_4_",
|
||||
f"{params['facilityName']} Water Well #5": "ww_5_",
|
||||
f"{params['facilityName']} Water Well #6": "ww_6_"
|
||||
}
|
||||
measure = nameMap.get(payload["device"], "") + payload["data"]["params"]["measurement"]
|
||||
output = {"measurement": measure, "value": payload["data"]["params"]["value"]}
|
||||
return output
|
||||
|
||||
|
||||
def chunk_payload(payload, chunk_size=20):
|
||||
if "values" in payload:
|
||||
# Original format: {"ts": ..., "values": {...}}
|
||||
chunked_values = list(payload["values"].items())
|
||||
for i in range(0, len(chunked_values), chunk_size):
|
||||
yield {
|
||||
"ts": payload["ts"],
|
||||
"values": dict(chunked_values[i:i+chunk_size])
|
||||
}
|
||||
else:
|
||||
# New format: {"key1": "value1", "key2": "value2"}
|
||||
chunked_keys = list(payload.keys())
|
||||
for i in range(0, len(chunked_keys), chunk_size):
|
||||
yield {k: payload[k] for k in chunked_keys[i:i+chunk_size]}
|
||||
|
||||
def chunk_payload_gateway(payload, chunk_size=20, is_attributes_payload=False):
|
||||
if is_attributes_payload:
|
||||
# For attributes payload, chunk the controllers
|
||||
controllers = list(payload.items())
|
||||
for i in range(0, len(controllers), chunk_size):
|
||||
yield dict(controllers[i:i + chunk_size])
|
||||
else:
|
||||
# For data payload, chunk the values within each controller
|
||||
for controller, data in payload.items():
|
||||
for entry in data:
|
||||
ts = entry['ts']
|
||||
values = entry['values']
|
||||
chunked_values = list(values.items())
|
||||
for i in range(0, len(chunked_values), chunk_size):
|
||||
yield {
|
||||
controller: [{
|
||||
"ts": ts,
|
||||
"values": dict(chunked_values[i:i + chunk_size])
|
||||
}]
|
||||
}
|
||||
|
||||
def controlName(name):
|
||||
params = convertDStoJSON(get_params())
|
||||
nameMap = {
|
||||
"tp_1": f"{params['facilityName']} Transfer Pump #1",
|
||||
"tp_2": f"{params['facilityName']} Transfer Pump #2",
|
||||
"ww_1": f"{params['facilityName']} Water Well #1",
|
||||
"ww_2": f"{params['facilityName']} Water Well #2",
|
||||
"ww_3": f"{params['facilityName']} Water Well #3",
|
||||
"ww_4": f"{params['facilityName']} Water Well #4",
|
||||
"ww_5": f"{params['facilityName']} Water Well #5",
|
||||
"ww_6": f"{params['facilityName']} Water Well #6"
|
||||
}
|
||||
parts = "_".join(name.split("_")[:2])
|
||||
return nameMap.get(parts, "Gateway")
|
||||
|
||||
# Filter payloads based on device_filter
|
||||
def filter_payload(payload, device_filter):
|
||||
if not device_filter: # If filter is empty, include all devices
|
||||
return payload
|
||||
return {key: value for key, value in payload.items() if key in device_filter}
|
||||
|
||||
|
||||
def sync(device_filter=[]):
|
||||
#get new values and send
|
||||
now = round(dt.timestamp(dt.now()))*1000
|
||||
topic = "v1/gateway/telemetry"
|
||||
try:
|
||||
data = recall()#json.loads(recall().decode("utf-8"))
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
logger.debug(data)
|
||||
logger.info("SYNCING")
|
||||
grouped_data = {}
|
||||
grouped_attributes = {}
|
||||
payload = {"ts": now, "values":{}}
|
||||
attributes_payload = {}
|
||||
try:
|
||||
for controller in data:
|
||||
for measure in controller["measures"]:
|
||||
ctrlName = controlName(measure["name"])
|
||||
if ctrlName == "Gateway":
|
||||
#send to gateway with v1/devices/me/telemetry
|
||||
if measure["health"] == 1:
|
||||
if "_spt" in measure["name"]:
|
||||
attributes_payload[measure["name"]] = measure["value"]
|
||||
else:
|
||||
payload["values"][measure["name"]] = measure["value"]
|
||||
else:
|
||||
name = "_".join(measure['name'].split("_")[2:])
|
||||
value = measure['value']
|
||||
health = measure['health']
|
||||
#Add controller for telemetry if it doesn't exist
|
||||
if ctrlName not in grouped_data:
|
||||
grouped_data[ctrlName] = {}
|
||||
#Add controller for attributes if it doesn't exist
|
||||
if ctrlName not in grouped_attributes:
|
||||
grouped_attributes[ctrlName] = {}
|
||||
grouped_attributes[ctrlName]["latestReportTime"] = now
|
||||
#Add data to temp payload if datapoint health is good
|
||||
if health:
|
||||
if "_spt" in name:
|
||||
grouped_attributes[ctrlName][name] = value
|
||||
else:
|
||||
grouped_data[ctrlName][name] = value
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
try:
|
||||
# Transform the grouped data to desired structure
|
||||
payload_gateway = {}
|
||||
|
||||
for key, value in grouped_data.items():
|
||||
if value:
|
||||
payload_gateway[key] = [{"ts": now ,"values": value}]
|
||||
|
||||
attributes_payload_gateway = {}
|
||||
for key, value in grouped_attributes.items():
|
||||
if value:
|
||||
attributes_payload_gateway[key] = value
|
||||
|
||||
# Apply the filter
|
||||
filtered_payload_gateway = filter_payload(payload_gateway, device_filter)
|
||||
filtered_attributes_payload_gateway = filter_payload(attributes_payload_gateway, device_filter)
|
||||
|
||||
#Send data belonging to Gateway
|
||||
for chunk in chunk_payload(payload=payload):
|
||||
publish("v1/devices/me/telemetry", json.dumps(chunk), qos=1, cloud_name="default")
|
||||
time.sleep(2)
|
||||
|
||||
attributes_payload["latestReportTime"] = (round(dt.timestamp(dt.now())/600)*600)*1000
|
||||
for chunk in chunk_payload(payload=attributes_payload):
|
||||
publish("v1/devices/me/attributes", json.dumps(chunk), qos=1, cloud_name="default")
|
||||
time.sleep(2)
|
||||
|
||||
#Send gateway devices data
|
||||
for chunk in chunk_payload_gateway(payload=filtered_payload_gateway):
|
||||
publish("v1/gateway/telemetry", json.dumps(chunk), qos=1, cloud_name="default")
|
||||
time.sleep(2)
|
||||
|
||||
for chunk in chunk_payload_gateway(payload=filtered_attributes_payload_gateway, is_attributes_payload=True):
|
||||
publish("v1/gateway/attributes", json.dumps(chunk), qos=1, cloud_name="default")
|
||||
time.sleep(2)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
|
||||
|
||||
|
||||
|
||||
def writeplctag(value):
|
||||
#value in the form {"measurement": <measurement_name>, "value": <value to write>}
|
||||
try:
|
||||
#logger.info(value)
|
||||
#payload format: [{"name": "advvfdipp", "measures": [{"name": "manualfrequencysetpoint", "value": 49}]}]
|
||||
message = [{"name": "rr_facility", "measures":[{"name":value["measurement"], "value": value["value"]}]}]
|
||||
resp = write(message)
|
||||
logger.debug("RETURN FROM WRITE: {}".format(resp))
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
return False
|
||||
|
||||
|
||||
def receiveCommand(topic, payload):
|
||||
try:
|
||||
logger.debug(topic)
|
||||
logger.debug(json.loads(payload))
|
||||
p = json.loads(payload)
|
||||
#logger.info(p)
|
||||
command = p["data"]["method"]
|
||||
commands = {
|
||||
"sync": sync,
|
||||
"writeplctag": writeplctag,
|
||||
}
|
||||
if command == "setPLCTag":
|
||||
try:
|
||||
params = formatPLCPayload(p)
|
||||
#logger.info(params)
|
||||
result = commands["writeplctag"](params)
|
||||
logger.debug(result)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
elif command == "startWW":
|
||||
try:
|
||||
params = formatPLCPayload({"device": p["device"], "data": {"params": {"measurement": "auto_cmd", "value": 0}}})
|
||||
result = commands["writeplctag"](params)
|
||||
time.sleep(1)
|
||||
params = formatPLCPayload({"device": p["device"], "data": {"params": {"measurement": "manual_run_cmd", "value": 1}}})
|
||||
result = commands["writeplctag"](params)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in startWW: {e}")
|
||||
elif command == "manualAutoSwitch":
|
||||
try:
|
||||
if p["data"]["params"]["direction"] == "manualToAuto":
|
||||
params = formatPLCPayload({"device": p["device"], "data": {"params": {"measurement": "manual_run_cmd", "value": 0}}})
|
||||
result = commands["writeplctag"](params)
|
||||
time.sleep(1)
|
||||
params = formatPLCPayload({"device": p["device"], "data": {"params": {"measurement": "auto_cmd", "value": 1}}})
|
||||
result = commands["writeplctag"](params)
|
||||
elif p["data"]["params"]["direction"] == "autoToManual":
|
||||
params = formatPLCPayload({"device": p["device"], "data": {"params": {"measurement": "auto_cmd", "value": 0}}})
|
||||
result = commands["writeplctag"](params)
|
||||
else:
|
||||
logger.error(f'Invalid input in manualAutoSwitch: {p["data"]["params"]["direction"]}')
|
||||
except Exception as e:
|
||||
logger.error(f"Error in manualToAuto: {e}")
|
||||
ackPayload = {"device": p["device"], "id": p["data"]["id"], "data": {"success": True}}
|
||||
ack(ackPayload)
|
||||
time.sleep(5)
|
||||
try:
|
||||
sync(p["device"])
|
||||
except Exception as e:
|
||||
logger.error(f"Could not sync: {e}")
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
|
||||
|
||||
def ack(message):
|
||||
publish("v1/gateway/rpc", json.dumps(message), 1, cloud_name="default")
|
||||
@@ -16,13 +16,13 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"ip_address = \"166.195.18.153\" # \"ngrok.iot.inhandnetworks.com:3021\"\n",
|
||||
"path = '/Users/nico/Documents/GitHub/HP_InHand_IG502/Pub_Sub/plcfreshwater_advvfdipp/thingsboard/as11_tag_dump.json' # code snippets/tag_dump.json'"
|
||||
"ip_address = \"166.193.23.21\" # \"ngrok.iot.inhandnetworks.com:3021\"\n",
|
||||
"path = '/Users/nico/Documents/GitHub/HP_InHand_IG502/Pub_Sub/fk_plcpond/thingsboard/overflow_tag_dump.json' # code snippets/tag_dump.json'"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"execution_count": 5,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
|
||||
Reference in New Issue
Block a user