added commands
This commit is contained in:
@@ -24,12 +24,6 @@ tp_2_lo_discharge_alm,rr_facility,tp_2_lo_discharge_alm,5,eq,1,none,eq,,Failure,
|
|||||||
tp_2_lo_oil_alm,rr_facility,tp_2_lo_oil_alm,5,eq,1,none,eq,,Failure,default
|
tp_2_lo_oil_alm,rr_facility,tp_2_lo_oil_alm,5,eq,1,none,eq,,Failure,default
|
||||||
tp_2_lo_suction_alm,rr_facility,tp_2_lo_suction_alm,5,eq,1,none,eq,,Failure,default
|
tp_2_lo_suction_alm,rr_facility,tp_2_lo_suction_alm,5,eq,1,none,eq,,Failure,default
|
||||||
tp_2_oil_cooler_failed_to_start_alm,rr_facility,tp_2_oil_cooler_failed_to_start_alm,5,eq,1,none,eq,,Failure,default
|
tp_2_oil_cooler_failed_to_start_alm,rr_facility,tp_2_oil_cooler_failed_to_start_alm,5,eq,1,none,eq,,Failure,default
|
||||||
wtp_1_discharge_alm,rr_facility,wtp_1_discharge_alm,5,eq,1,none,eq,,wtp 1 discharge input failure,default
|
|
||||||
wtp_1_suction_alm,rr_facility,wtp_1_suction_alm,5,eq,1,none,eq,,wtp 1 suction input failure,default
|
|
||||||
wtp_1_vibration_alm,rr_facility,wtp_1_vibration_alm,5,eq,1,none,eq,,wtp 1 vibration failure,default
|
|
||||||
wtp_2_discharge_alm,rr_facility,wtp_2_discharge_alm,5,eq,1,none,eq,,wtp 2 discharge input failure,default
|
|
||||||
wtp_2_suction_alm,rr_facility,wtp_2_suction_alm,5,eq,1,none,eq,,wtp 2 suction input failure,default
|
|
||||||
wtp_2_vibration_alm,rr_facility,wtp_2_vibration_alm,5,eq,1,none,eq,,wtp 2 vibration failure,default
|
|
||||||
ww_1_comms_alm,rr_facility,ww_1_comms_alm,5,eq,1,none,eq,,water well 1 comms failure,default
|
ww_1_comms_alm,rr_facility,ww_1_comms_alm,5,eq,1,none,eq,,water well 1 comms failure,default
|
||||||
ww_1_control_power_alm,rr_facility,ww_1_control_power_alm,5,eq,1,none,eq,,Failure,default
|
ww_1_control_power_alm,rr_facility,ww_1_control_power_alm,5,eq,1,none,eq,,Failure,default
|
||||||
ww_1_hi_discharge_alm,rr_facility,ww_1_hi_discharge_alm,5,eq,1,none,eq,,Failure,default
|
ww_1_hi_discharge_alm,rr_facility,ww_1_hi_discharge_alm,5,eq,1,none,eq,,Failure,default
|
||||||
|
|||||||
|
196
Pub_Sub/rr_facility/thingsboard/sub/receiveAttributeGateway.py
Normal file
196
Pub_Sub/rr_facility/thingsboard/sub/receiveAttributeGateway.py
Normal file
@@ -0,0 +1,196 @@
|
|||||||
|
import json, time
|
||||||
|
from datetime import datetime as dt
|
||||||
|
from quickfaas.measure import recall, write
|
||||||
|
from quickfaas.remotebus import publish
|
||||||
|
from common.Logger import logger
|
||||||
|
from quickfaas.global_dict import get as get_params
|
||||||
|
|
||||||
|
def convertDStoJSON(ds):
|
||||||
|
j = dict()
|
||||||
|
for x in ds:
|
||||||
|
j[x["key"]] = x["value"]
|
||||||
|
return j
|
||||||
|
|
||||||
|
def formatPLCPayload(device, key, value):
|
||||||
|
params = convertDStoJSON(get_params())
|
||||||
|
nameMap = {
|
||||||
|
f"{params['facilityName']} Transfer Pump #1": "tp_1_",
|
||||||
|
f"{params['facilityName']} Transfer Pump #2": "tp_2_",
|
||||||
|
f"{params['facilityName']} Water Well #1": "ww_1_",
|
||||||
|
f"{params['facilityName']} Water Well #2": "ww_2_",
|
||||||
|
f"{params['facilityName']} Water Well #3": "ww_3_",
|
||||||
|
f"{params['facilityName']} Water Well #4": "ww_4_",
|
||||||
|
f"{params['facilityName']} Water Well #5": "ww_5_",
|
||||||
|
f"{params['facilityName']} Water Well #6": "ww_6_"
|
||||||
|
}
|
||||||
|
measure = nameMap.get(device, "") + key
|
||||||
|
output = {"measurement": measure, "value": value}
|
||||||
|
return output
|
||||||
|
|
||||||
|
|
||||||
|
def chunk_payload(payload, chunk_size=20):
|
||||||
|
if "values" in payload:
|
||||||
|
# Original format: {"ts": ..., "values": {...}}
|
||||||
|
chunked_values = list(payload["values"].items())
|
||||||
|
for i in range(0, len(chunked_values), chunk_size):
|
||||||
|
yield {
|
||||||
|
"ts": payload["ts"],
|
||||||
|
"values": dict(chunked_values[i:i+chunk_size])
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
# New format: {"key1": "value1", "key2": "value2"}
|
||||||
|
chunked_keys = list(payload.keys())
|
||||||
|
for i in range(0, len(chunked_keys), chunk_size):
|
||||||
|
yield {k: payload[k] for k in chunked_keys[i:i+chunk_size]}
|
||||||
|
|
||||||
|
def chunk_payload_gateway(payload, chunk_size=20, is_attributes_payload=False):
|
||||||
|
if is_attributes_payload:
|
||||||
|
# For attributes payload, chunk the controllers
|
||||||
|
controllers = list(payload.items())
|
||||||
|
for i in range(0, len(controllers), chunk_size):
|
||||||
|
yield dict(controllers[i:i + chunk_size])
|
||||||
|
else:
|
||||||
|
# For data payload, chunk the values within each controller
|
||||||
|
for controller, data in payload.items():
|
||||||
|
for entry in data:
|
||||||
|
ts = entry['ts']
|
||||||
|
values = entry['values']
|
||||||
|
chunked_values = list(values.items())
|
||||||
|
for i in range(0, len(chunked_values), chunk_size):
|
||||||
|
yield {
|
||||||
|
controller: [{
|
||||||
|
"ts": ts,
|
||||||
|
"values": dict(chunked_values[i:i + chunk_size])
|
||||||
|
}]
|
||||||
|
}
|
||||||
|
|
||||||
|
def controlName(name):
|
||||||
|
params = convertDStoJSON(get_params())
|
||||||
|
nameMap = {
|
||||||
|
"tp_1": f"{params['facilityName']} Transfer Pump #1",
|
||||||
|
"tp_2": f"{params['facilityName']} Transfer Pump #2",
|
||||||
|
"ww_1": f"{params['facilityName']} Water Well #1",
|
||||||
|
"ww_2": f"{params['facilityName']} Water Well #2",
|
||||||
|
"ww_3": f"{params['facilityName']} Water Well #3",
|
||||||
|
"ww_4": f"{params['facilityName']} Water Well #4",
|
||||||
|
"ww_5": f"{params['facilityName']} Water Well #5",
|
||||||
|
"ww_6": f"{params['facilityName']} Water Well #6"
|
||||||
|
}
|
||||||
|
parts = "_".join(name.split("_")[:2])
|
||||||
|
return nameMap.get(parts, "Gateway")
|
||||||
|
|
||||||
|
|
||||||
|
def sync():
|
||||||
|
#get new values and send
|
||||||
|
now = round(dt.timestamp(dt.now()))*1000
|
||||||
|
topic = "v1/gateway/telemetry"
|
||||||
|
try:
|
||||||
|
data = recall()#json.loads(recall().decode("utf-8"))
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
logger.debug(data)
|
||||||
|
logger.info("SYNCING")
|
||||||
|
grouped_data = {}
|
||||||
|
grouped_attributes = {}
|
||||||
|
payload = {"ts": now, "values":{}}
|
||||||
|
attributes_payload = {}
|
||||||
|
try:
|
||||||
|
for controller in data:
|
||||||
|
for measure in controller["measures"]:
|
||||||
|
ctrlName = controlName(measure["name"])
|
||||||
|
if ctrlName == "Gateway":
|
||||||
|
#send to gateway with v1/devices/me/telemetry
|
||||||
|
if measure["health"] == 1:
|
||||||
|
if "_spt" in measure["name"]:
|
||||||
|
attributes_payload[measure["name"]] = measure["value"]
|
||||||
|
else:
|
||||||
|
payload["values"][measure["name"]] = measure["value"]
|
||||||
|
else:
|
||||||
|
name = "_".join(measure['name'].split("_")[2:])
|
||||||
|
value = measure['value']
|
||||||
|
health = measure['health']
|
||||||
|
#Add controller for telemetry if it doesn't exist
|
||||||
|
if ctrlName not in grouped_data:
|
||||||
|
grouped_data[ctrlName] = {}
|
||||||
|
#Add controller for attributes if it doesn't exist
|
||||||
|
if ctrlName not in grouped_attributes:
|
||||||
|
grouped_attributes[ctrlName] = {}
|
||||||
|
grouped_attributes[ctrlName]["latestReportTime"] = now
|
||||||
|
#Add data to temp payload if datapoint health is good
|
||||||
|
if health:
|
||||||
|
if "_spt" in name:
|
||||||
|
grouped_attributes[ctrlName][name] = value
|
||||||
|
else:
|
||||||
|
grouped_data[ctrlName][name] = value
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
try:
|
||||||
|
# Transform the grouped data to desired structure
|
||||||
|
payload_gateway = {}
|
||||||
|
|
||||||
|
for key, value in grouped_data.items():
|
||||||
|
if value:
|
||||||
|
payload_gateway[key] = [{"ts": now ,"values": value}]
|
||||||
|
|
||||||
|
attributes_payload_gateway = {}
|
||||||
|
for key, value in grouped_attributes.items():
|
||||||
|
if value:
|
||||||
|
attributes_payload_gateway[key] = value
|
||||||
|
|
||||||
|
#Send data belonging to Gateway
|
||||||
|
for chunk in chunk_payload(payload=payload):
|
||||||
|
publish("v1/devices/me/telemetry", json.dumps(chunk), qos=1, cloud_name="default")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
attributes_payload["latestReportTime"] = (round(dt.timestamp(dt.now())/600)*600)*1000
|
||||||
|
for chunk in chunk_payload(payload=attributes_payload):
|
||||||
|
publish("v1/devices/me/attributes", json.dumps(chunk), qos=1, cloud_name="default")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
#Send gateway devices data
|
||||||
|
for chunk in chunk_payload_gateway(payload=payload_gateway):
|
||||||
|
publish("v1/gateway/telemetry", json.dumps(chunk), qos=1, cloud_name="default")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
for chunk in chunk_payload_gateway(payload=attributes_payload_gateway, is_attributes_payload=True):
|
||||||
|
publish("v1/gateway/attributes", json.dumps(attributes_payload_gateway), qos=1, cloud_name="default")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def writeplctag(value):
|
||||||
|
#value in the form {"measurement": <measurement_name>, "value": <value to write>}
|
||||||
|
try:
|
||||||
|
logger.debug(value)
|
||||||
|
#payload format: [{"name": "advvfdipp", "measures": [{"name": "manualfrequencysetpoint", "value": 49}]}]
|
||||||
|
message = [{"name": "rr_facility", "measures":[{"name":value["measurement"], "value": value["value"]}]}]
|
||||||
|
resp = write(message)
|
||||||
|
logger.debug("RETURN FROM WRITE: {}".format(resp))
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def receiveAttribute(topic, payload):
|
||||||
|
try:
|
||||||
|
logger.debug(topic)
|
||||||
|
logger.debug(json.loads(payload))
|
||||||
|
p = json.loads(payload)
|
||||||
|
device = p["device"]
|
||||||
|
for key, value in p["data"].items():
|
||||||
|
try:
|
||||||
|
measure = formatPLCPayload(device, key, value)
|
||||||
|
result = writeplctag(measure)
|
||||||
|
logger.debug(result)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
#logger.debug(command)
|
||||||
|
sync()
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(e)
|
||||||
|
|
||||||
207
Pub_Sub/rr_facility/thingsboard/sub/receiveCommandGateway.py
Normal file
207
Pub_Sub/rr_facility/thingsboard/sub/receiveCommandGateway.py
Normal file
@@ -0,0 +1,207 @@
|
|||||||
|
import json, time
|
||||||
|
from datetime import datetime as dt
|
||||||
|
from quickfaas.measure import recall, write
|
||||||
|
from quickfaas.remotebus import publish
|
||||||
|
from common.Logger import logger
|
||||||
|
from quickfaas.global_dict import get as get_params
|
||||||
|
|
||||||
|
def convertDStoJSON(ds):
|
||||||
|
j = dict()
|
||||||
|
for x in ds:
|
||||||
|
j[x["key"]] = x["value"]
|
||||||
|
return j
|
||||||
|
|
||||||
|
def formatPLCPayload(payload):
|
||||||
|
params = convertDStoJSON(get_params())
|
||||||
|
nameMap = {
|
||||||
|
f"{params['facilityName']} Transfer Pump #1": "tp_1_",
|
||||||
|
f"{params['facilityName']} Transfer Pump #2": "tp_2_",
|
||||||
|
f"{params['facilityName']} Water Well #1": "ww_1_",
|
||||||
|
f"{params['facilityName']} Water Well #2": "ww_2_",
|
||||||
|
f"{params['facilityName']} Water Well #3": "ww_3_",
|
||||||
|
f"{params['facilityName']} Water Well #4": "ww_4_",
|
||||||
|
f"{params['facilityName']} Water Well #5": "ww_5_",
|
||||||
|
f"{params['facilityName']} Water Well #6": "ww_6_"
|
||||||
|
}
|
||||||
|
measure = nameMap.get(payload["device"], "") + payload["data"]["params"]["measurement"]
|
||||||
|
output = {"measurement": measure, "value": payload["data"]["params"]["value"]}
|
||||||
|
return output
|
||||||
|
|
||||||
|
|
||||||
|
def chunk_payload(payload, chunk_size=20):
|
||||||
|
if "values" in payload:
|
||||||
|
# Original format: {"ts": ..., "values": {...}}
|
||||||
|
chunked_values = list(payload["values"].items())
|
||||||
|
for i in range(0, len(chunked_values), chunk_size):
|
||||||
|
yield {
|
||||||
|
"ts": payload["ts"],
|
||||||
|
"values": dict(chunked_values[i:i+chunk_size])
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
# New format: {"key1": "value1", "key2": "value2"}
|
||||||
|
chunked_keys = list(payload.keys())
|
||||||
|
for i in range(0, len(chunked_keys), chunk_size):
|
||||||
|
yield {k: payload[k] for k in chunked_keys[i:i+chunk_size]}
|
||||||
|
|
||||||
|
def chunk_payload_gateway(payload, chunk_size=20, is_attributes_payload=False):
|
||||||
|
if is_attributes_payload:
|
||||||
|
# For attributes payload, chunk the controllers
|
||||||
|
controllers = list(payload.items())
|
||||||
|
for i in range(0, len(controllers), chunk_size):
|
||||||
|
yield dict(controllers[i:i + chunk_size])
|
||||||
|
else:
|
||||||
|
# For data payload, chunk the values within each controller
|
||||||
|
for controller, data in payload.items():
|
||||||
|
for entry in data:
|
||||||
|
ts = entry['ts']
|
||||||
|
values = entry['values']
|
||||||
|
chunked_values = list(values.items())
|
||||||
|
for i in range(0, len(chunked_values), chunk_size):
|
||||||
|
yield {
|
||||||
|
controller: [{
|
||||||
|
"ts": ts,
|
||||||
|
"values": dict(chunked_values[i:i + chunk_size])
|
||||||
|
}]
|
||||||
|
}
|
||||||
|
|
||||||
|
def controlName(name):
|
||||||
|
params = convertDStoJSON(get_params())
|
||||||
|
nameMap = {
|
||||||
|
"tp_1": f"{params['facilityName']} Transfer Pump #1",
|
||||||
|
"tp_2": f"{params['facilityName']} Transfer Pump #2",
|
||||||
|
"ww_1": f"{params['facilityName']} Water Well #1",
|
||||||
|
"ww_2": f"{params['facilityName']} Water Well #2",
|
||||||
|
"ww_3": f"{params['facilityName']} Water Well #3",
|
||||||
|
"ww_4": f"{params['facilityName']} Water Well #4",
|
||||||
|
"ww_5": f"{params['facilityName']} Water Well #5",
|
||||||
|
"ww_6": f"{params['facilityName']} Water Well #6"
|
||||||
|
}
|
||||||
|
parts = "_".join(name.split("_")[:2])
|
||||||
|
return nameMap.get(parts, "Gateway")
|
||||||
|
|
||||||
|
|
||||||
|
def sync():
|
||||||
|
#get new values and send
|
||||||
|
now = round(dt.timestamp(dt.now()))*1000
|
||||||
|
topic = "v1/gateway/telemetry"
|
||||||
|
try:
|
||||||
|
data = recall()#json.loads(recall().decode("utf-8"))
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
logger.debug(data)
|
||||||
|
logger.info("SYNCING")
|
||||||
|
grouped_data = {}
|
||||||
|
grouped_attributes = {}
|
||||||
|
payload = {"ts": now, "values":{}}
|
||||||
|
attributes_payload = {}
|
||||||
|
try:
|
||||||
|
for controller in data:
|
||||||
|
for measure in controller["measures"]:
|
||||||
|
ctrlName = controlName(measure["name"])
|
||||||
|
if ctrlName == "Gateway":
|
||||||
|
#send to gateway with v1/devices/me/telemetry
|
||||||
|
if measure["health"] == 1:
|
||||||
|
if "_spt" in measure["name"]:
|
||||||
|
attributes_payload[measure["name"]] = measure["value"]
|
||||||
|
else:
|
||||||
|
payload["values"][measure["name"]] = measure["value"]
|
||||||
|
else:
|
||||||
|
name = "_".join(measure['name'].split("_")[2:])
|
||||||
|
value = measure['value']
|
||||||
|
health = measure['health']
|
||||||
|
#Add controller for telemetry if it doesn't exist
|
||||||
|
if ctrlName not in grouped_data:
|
||||||
|
grouped_data[ctrlName] = {}
|
||||||
|
#Add controller for attributes if it doesn't exist
|
||||||
|
if ctrlName not in grouped_attributes:
|
||||||
|
grouped_attributes[ctrlName] = {}
|
||||||
|
grouped_attributes[ctrlName]["latestReportTime"] = now
|
||||||
|
#Add data to temp payload if datapoint health is good
|
||||||
|
if health:
|
||||||
|
if "_spt" in name:
|
||||||
|
grouped_attributes[ctrlName][name] = value
|
||||||
|
else:
|
||||||
|
grouped_data[ctrlName][name] = value
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
try:
|
||||||
|
# Transform the grouped data to desired structure
|
||||||
|
payload_gateway = {}
|
||||||
|
|
||||||
|
for key, value in grouped_data.items():
|
||||||
|
if value:
|
||||||
|
payload_gateway[key] = [{"ts": now ,"values": value}]
|
||||||
|
|
||||||
|
attributes_payload_gateway = {}
|
||||||
|
for key, value in grouped_attributes.items():
|
||||||
|
if value:
|
||||||
|
attributes_payload_gateway[key] = value
|
||||||
|
|
||||||
|
#Send data belonging to Gateway
|
||||||
|
for chunk in chunk_payload(payload=payload):
|
||||||
|
publish("v1/devices/me/telemetry", json.dumps(chunk), qos=1, cloud_name="default")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
attributes_payload["latestReportTime"] = (round(dt.timestamp(dt.now())/600)*600)*1000
|
||||||
|
for chunk in chunk_payload(payload=attributes_payload):
|
||||||
|
publish("v1/devices/me/attributes", json.dumps(chunk), qos=1, cloud_name="default")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
#Send gateway devices data
|
||||||
|
for chunk in chunk_payload_gateway(payload=payload_gateway):
|
||||||
|
publish("v1/gateway/telemetry", json.dumps(chunk), qos=1, cloud_name="default")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
for chunk in chunk_payload_gateway(payload=attributes_payload_gateway, is_attributes_payload=True):
|
||||||
|
publish("v1/gateway/attributes", json.dumps(attributes_payload_gateway), qos=1, cloud_name="default")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def writeplctag(value):
|
||||||
|
#value in the form {"measurement": <measurement_name>, "value": <value to write>}
|
||||||
|
try:
|
||||||
|
#logger.info(value)
|
||||||
|
#payload format: [{"name": "advvfdipp", "measures": [{"name": "manualfrequencysetpoint", "value": 49}]}]
|
||||||
|
message = [{"name": "rr_facility", "measures":[{"name":value["measurement"], "value": value["value"]}]}]
|
||||||
|
resp = write(message)
|
||||||
|
logger.debug("RETURN FROM WRITE: {}".format(resp))
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def receiveCommand(topic, payload):
|
||||||
|
try:
|
||||||
|
logger.debug(topic)
|
||||||
|
logger.debug(json.loads(payload))
|
||||||
|
p = json.loads(payload)
|
||||||
|
#logger.info(p)
|
||||||
|
command = p["data"]["method"]
|
||||||
|
commands = {
|
||||||
|
"sync": sync,
|
||||||
|
"writeplctag": writeplctag,
|
||||||
|
}
|
||||||
|
if command == "setPLCTag":
|
||||||
|
try:
|
||||||
|
params = formatPLCPayload(p)
|
||||||
|
#logger.info(params)
|
||||||
|
result = commands["writeplctag"](params)
|
||||||
|
logger.debug(result)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
ackPayload = {"device": p["device"], "id": p["data"]["id"], "data": {"success": True}}
|
||||||
|
ack(ackPayload)
|
||||||
|
time.sleep(5)
|
||||||
|
sync()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
|
||||||
|
|
||||||
|
def ack(message):
|
||||||
|
publish("v1/gateway/rpc", json.dumps(message), 1, cloud_name="default")
|
||||||
68204
Pub_Sub/rr_facility/thingsboard/tags.json
Normal file
68204
Pub_Sub/rr_facility/thingsboard/tags.json
Normal file
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user