added ma_duece files and new tools for creating data collectors and alarms

This commit is contained in:
Nico Melone
2026-01-29 16:15:01 -06:00
parent d45c2c5746
commit af70b9e903
21 changed files with 140704 additions and 557 deletions

View File

@@ -0,0 +1,249 @@
#!/usr/bin/env python3
"""
add_alarms.py
Add alarm rules to a deviceprofile JSON file based on a CSV file.
Usage:
python3 add_alarms.py --profile <profile.json> --csv <keys.csv> [--out <output.json>]
If --out is omitted, the original profile file is overwritten.
"""
import argparse
import json
import csv
import os
import sys
# ------------------------------------------------------------------
# 1. Helper: read JSON and ensure nested structure exists
# ------------------------------------------------------------------
def load_profile(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
# Safeguard: ensure the path entity > profileData > alarms exists
entity = data.get("entity")
if not entity:
raise KeyError("Missing toplevel 'entity' key")
profile = entity.setdefault("profileData", {})
alarms = profile.setdefault("alarms", [])
return data, alarms
# ------------------------------------------------------------------
# 2. Helper: write JSON back
# ------------------------------------------------------------------
def write_profile(path, data):
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, sort_keys=False)
print(f"Profile written to {path}")
def build_default_name(name):
name = name.replace("_", " ")
if name.lower().endswith(" alm"):
name = name[:-4] + " Alarm"
name = name.title()
if " Psi " in name:
name = name.replace(" Psi ", " PSI ")
if " Hihi " in name:
name = name.replace(" Hihi ", " HiHi ")
if " Lolo " in name:
name = name.replace(" Lolo ", " LoLo ")
if " Vfd " in name:
name = name.replace(" Vfd ", " VFD ")
if "Lp " in name:
name = name.replace("Lp ", "LP ")
if "Hp " in name:
name = name.replace("Hp ", "HP ")
if " Lshh " in name:
name = name.replace(" Lshh ", " LSHH ")
if " Fr " in name:
name = name.replace(" Fr ", " FR ")
if " Dp " in name:
name = name.replace(" Dp ", " DP ")
if "Wtp" in name:
name = name.replace("Wtp", "WTP")
if "Vrt " in name:
name = name.replace("Vrt ", "VRT ")
if "Vru " in name:
name = name.replace("Vru ", "VRU ")
if " Plc " in name:
name = name.replace(" Plc ", " PLC ")
if "Ot " in name:
name = name.replace("Ot ", "OT ")
if "Wt " in name:
name = name.replace("Wt ", "WT ")
if "St " in name:
name = name.replace("St ", "ST ")
if " Alarms " in name:
name = name.replace(" Alarms ", " ")
return name
# ------------------------------------------------------------------
# 3. Build a single alarm rule from a key
# ------------------------------------------------------------------
def build_alarm_from_key(key, default_name: str):
"""
key : e.g. 'leak_1_lo_alm'
default_name : default string to use for alarmType if the user accepts it
"""
# Prompt user
default_name = build_default_name(name=default_name)
prompt = f"Alarm name [{default_name}]: "
try:
user_input = input(prompt).strip()
except KeyboardInterrupt:
print("\nAborted by user")
sys.exit(1)
alarm_type = user_input if user_input else default_name
# Example alarm skeleton adapt if your real structure differs
alarm = {
"configuration" : {
"type" : "ALARM",
"arguments" : {
key : {
"defaultValue" : "",
"refEntityKey" : {
"key" : key,
"type" : "TS_LATEST"
}
}
},
"clearRule" : {
"alarmDetails" : None,
"condition" : {
"type" : "DURATION",
"expression" : {
"type" : "SIMPLE",
"filters" : [ {
"argument" : key,
"operation" : "AND",
"predicates" : [ {
"type" : "BOOLEAN",
"operation" : "EQUAL",
"value" : {
"dynamicValueArgument" : None,
"staticValue" : False
}
} ],
"valueType" : "BOOLEAN"
} ],
"operation" : "AND"
},
"schedule" : None,
"unit" : "MINUTES",
"value" : {
"dynamicValueArgument" : None,
"staticValue" : 30
}
},
"dashboardId" : None
},
"createRules" : {
"CRITICAL" : {
"alarmDetails" : None,
"condition" : {
"type" : "SIMPLE",
"expression" : {
"type" : "SIMPLE",
"filters" : [ {
"argument" : key,
"operation" : "AND",
"predicates" : [ {
"type" : "BOOLEAN",
"operation" : "EQUAL",
"value" : {
"dynamicValueArgument" : None,
"staticValue" : True
}
} ],
"valueType" : "BOOLEAN"
} ],
"operation" : "AND"
},
"schedule" : None
},
"dashboardId" : None
}
},
"output" : None,
"propagate" : False,
"propagateRelationTypes" : None,
"propagateToOwner" : False,
"propagateToOwnerHierarchy" : False,
"propagateToTenant" : False
},
"configurationVersion" : 0,
"debugSettings" : {
"allEnabled" : False,
"allEnabledUntil" : 1769542094557,
"failuresEnabled" : True
},
"name" : alarm_type,
"tenantId" : {
"entityType" : "TENANT",
"id" : "a610ad00-52e2-11ec-89c2-2f343e6c262d"
},
"type" : "ALARM"
}
return alarm
# ------------------------------------------------------------------
# 4. Main flow
# ------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Add alarms from CSV to a profile")
parser.add_argument("--profile", required=True, help="Path to the device profile JSON")
parser.add_argument("--csv", required=True, help="CSV file containing key names (first column)")
parser.add_argument("--out", help="Output file (defaults to input profile)")
parser.add_argument("--fresh", help="Start the alarm list fresh")
args = parser.parse_args()
# 1) Load the profile
try:
profile_data, alarms_list = load_profile(args.profile)
if not alarms_list or args.fresh:
alarms_list = []
except Exception as e:
print(f"Error loading profile: {e}", file=sys.stderr)
sys.exit(1)
# 2) Read CSV and gather keys that end with '_alm'
new_alarms = []
try:
with open(args.csv, newline="", encoding="utf-8") as f:
reader = csv.reader(f)
for row in reader:
if not row:
continue
key = row[0].strip()
if key.endswith("_alm"):
new_alarms.append((key, key)) # (key, default_name)
except Exception as e:
print(f"Error reading CSV: {e}", file=sys.stderr)
sys.exit(1)
if not new_alarms:
print("No keys ending with '_alm' were found in the CSV.")
sys.exit(0)
# 3) Build alarms interactively
for key, default_name in new_alarms:
print(f"\nCreating alarm for key: {key}")
alarm = build_alarm_from_key(key, default_name)
alarms_list.append(alarm)
print(f"Added alarm '{alarm['name']}'")
alarms_list.sort(key=lambda r: r["name"])
profile_data["calculatedFields"] = alarms_list
# 4) Write back
output_path = args.out if args.out else args.profile
write_profile(output_path, profile_data)
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

View File

@@ -15,11 +15,11 @@
},
{
"cell_type": "code",
"execution_count": 10,
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"ip_address = \"166.141.136.69\"# \"ngrok.iot.inhandnetworks.com:3054\" # \"166.141.90.208\"\n",
"ip_address = \"63.46.60.220\"# \"ngrok.iot.inhandnetworks.com:3054\" # \"166.141.90.208\"\n",
"device_type = \"ba_facility\"\n",
"today = dt.now().strftime(\"%Y_%B_%d\")\n",
"filename = f\"tag_dump_{today}.json\"\n",
@@ -28,7 +28,7 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [

View File

@@ -0,0 +1,4 @@
AlarmName,ControllerName,MeasuringPointName,AlarmLevel,Condition1,Operand1,CombineMethod,Condition2,Operand2,AlarmContent,AlarmTag
flare_vessel_lshh_tripped_alm,facility,flare_vessel_lshh_tripped_alm,5,eq,1,none,eq,,ALARMED,default
flare_vessel_psi_hi_alm,facility,flare_vessel_psi_hi_alm,5,eq,1,none,eq,,ALARMED,default
flare_vessel_psi_lo_alm,facility,flare_vessel_psi_lo_alm,5,eq,1,none,eq,,ALARMED,default
1 AlarmName ControllerName MeasuringPointName AlarmLevel Condition1 Operand1 CombineMethod Condition2 Operand2 AlarmContent AlarmTag
2 flare_vessel_lshh_tripped_alm facility flare_vessel_lshh_tripped_alm 5 eq 1 none eq ALARMED default
3 flare_vessel_psi_hi_alm facility flare_vessel_psi_hi_alm 5 eq 1 none eq ALARMED default
4 flare_vessel_psi_lo_alm facility flare_vessel_psi_lo_alm 5 eq 1 none eq ALARMED default

View File

@@ -0,0 +1,272 @@
#!/usr/bin/env python3
"""
tag2csv.py
Converts the JSON “tagdictionary” you posted into the PLCCSV format.
Implements the additional normalisation rules you asked for.
Requirements: Python 3.8+
Usage
-----
1. Put the JSON into <INPUT_JSON> (default: tags.json).
2. Edit CONTROLLER_NAME if you want a different controller name.
3. Run: python3 tag2csv.py
4. Resulting CSV is written to <OUTPUT_CSV> (default: tags.csv).
"""
import json
import csv
import re
from pathlib import Path
import sys
import traceback
# --------------------------------------------------------------------------- #
# HELPER: NORMALISATION
# --------------------------------------------------------------------------- #
def normalise_tag_name(tag: str) -> str:
"""
Convert a tag name into the snake_case form you want.
Rules applied (in order):
1. Lowercase everything.
2. Replace special words:
Todays → today
Yest, Yesterdays → yesterday
CurrentMonth, CurrMonth → month
3. Split PascalCase (insert underscore before a capital that follows a
lowercase letter or a digit).
4. Split a letter followed by a number *unless* that number is the last
token (e.g. "T1" at the end stays "t1").
5. Deal with leading prefixes:
- "CMD_" → move to the end and add "_cmd"
- "VAL_" → drop the prefix
- "AL0_" → drop the prefix and add "_alm"
6. Collapse multiple underscores, strip leading/trailing ones.
"""
original = tag # keep a copy for later
# 1. lowercase
tag = tag.lower()
# 2. replace special words
tag = re.sub(r"\btodays\b", "today", tag)
tag = re.sub(r"\byest(er|erdays)?\b", "yesterday", tag)
tag = re.sub(r"\b(currentmonth|currmonth)\b", "month", tag)
# 3. split PascalCase
tag = re.sub(r"(?<=[a-z0-9])([A-Z])", r"_\1", tag)
# 4. split numbers that are *not* the final token
tag = re.sub(r"([a-z])([0-9]+)(?=[a-z])", r"\1_\2", tag)
# 5. handle leading prefixes
suffix = ""
if tag.startswith("cmd_"):
tag = tag[4:] # drop prefix
suffix = "_cmd"
elif tag.startswith("val_"):
tag = tag[4:] # drop prefix
suffix = "" # nothing appended
elif tag.startswith("fbk_"):
tag = tag[4:] # drop prefix
suffix = "" # nothing appended
elif tag.startswith("al0_"):
tag = tag[4:] # drop prefix
suffix = "_alm"
tag = tag + suffix
# 6. collapse/trim underscores
tag = re.sub(r"__+", "_", tag)
tag = tag.strip("_")
# If something went wrong, fallback to the original (lowercased) name
if not tag:
tag = original.lower()
return tag
# --------------------------------------------------------------------------- #
# DATA TYPE MAPPING (unchanged from previous script)
# --------------------------------------------------------------------------- #
def map_data_type(j_type: str) -> str:
j_type = j_type.upper()
if j_type == "BOOL":
return "BIT"
if j_type == "REAL":
return "FLOAT"
if j_type in {"INT", "UINT"}:
return "INT"
if j_type in {"DINT", "UDINT"}:
return "DINT"
return j_type
# --------------------------------------------------------------------------- #
# RO / RW logic (unchanged)
# --------------------------------------------------------------------------- #
def is_rw(tag_name: str) -> bool:
return "spt" in tag_name.lower() or "cmd" in tag_name.lower()
# --------------------------------------------------------------------------- #
# MAIN
# --------------------------------------------------------------------------- #
def main(input_json: Path, output_csv: Path, controller_name: str):
# Load tags
with input_json.open("r", encoding="utf-8") as f:
tags = json.load(f)
# Column order
columns = [
"MeasuringPointName",
"ControllerName",
"GroupName",
"UploadType",
"DeadZoneType",
"DeadZonePercent",
"DataType",
"ArrayIndex",
"EnableBit",
"BitIndex",
"reverseBit",
"Address",
"Decimal",
"Len",
"CodeType",
"ReadWrite",
"Unit",
"Description",
"Transform Type",
"MaxValue",
"MinValue",
"MaxScale",
"MinScale",
"Gain",
"Offset",
"startBit",
"endBit",
"Pt",
"Ct",
"Mapping_table",
"TransDecimal",
"bitMap",
"msecSample",
"storageLwTSDB",
"DataEndianReverse",
"ReadOffset",
"ReadLength",
"WriteOffset",
"WriteLength",
"DataParseMethod",
"BitId",
"pollCycle",
"EnableRequestCount",
"RequestCount",
]
rows = []
for i, (tag_name, attrs) in enumerate(tags.items(), start=1):
# Normalised measuringpoint name
measuring_point = normalise_tag_name(tag_name)
# Raw tag name is kept as plc_data_point_X
plc_name = tag_name
# Mandatory columns
data_type = map_data_type(attrs.get("data_type", ""))
address = attrs.get("tag_name", "")
read_write = "rw" if is_rw(tag_name) else "ro"
row = {
"MeasuringPointName": measuring_point,
"ControllerName": controller_name,
"GroupName": "default",
"UploadType": "periodic",
"DeadZoneType": "",
"DeadZonePercent": "",
"DataType": data_type,
"ArrayIndex": "",
"EnableBit": "0",
"BitIndex": "",
"reverseBit": "",
"Address": tag_name,
"Decimal": "",
"Len": "",
"CodeType": "",
"ReadWrite": read_write,
"Unit": "",
"Description": "",
"Transform Type": "",
"MaxValue": "",
"MinValue": "",
"MaxScale": "",
"MinScale": "",
"Gain": "",
"Offset": "",
"startBit": "",
"endBit": "",
"Pt": "",
"Ct": "",
"Mapping_table": "",
"TransDecimal": "",
"bitMap": "",
"msecSample": "",
"storageLwTSDB": "1",
"DataEndianReverse": "",
"ReadOffset": "",
"ReadLength": "",
"WriteOffset": "",
"WriteLength": "",
"DataParseMethod": "",
"BitId": "",
"pollCycle": "",
"EnableRequestCount": "",
"RequestCount": "",
}
rows.append(row)
#print(rows)
# Sort rows by MeasuringPointName (first column)
rows.sort(key=lambda r: r["MeasuringPointName"])
# Write CSV
with output_csv.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=columns)
writer.writeheader()
writer.writerows(rows)
print(f"✅ Wrote {len(rows)} rows to {output_csv}")
# --------------------------------------------------------------------------- #
# ENTRY POINT
# --------------------------------------------------------------------------- #
if __name__ == "__main__":
# Usage: python csv_to_json.py input.json output.csv
#if len(sys.argv) != 4:
# print(f"Usage: {sys.argv[0]} <input.json> <output.csv> <controllerName", file=sys.stderr)
# sys.exit(1)
input_json = Path("/Users/nico/Documents/GitHub/HP_InHand_IG502/Pub_Sub/ba_facility/thingsboard/ma_deuce_output_3.json")#Path(sys.argv[1]) # source JSON file
output_csv = Path("/Users/nico/Documents/GitHub/HP_InHand_IG502/Pub_Sub/ba_facility/thingsboard/madeuce_2.csv")#Path(sys.argv[2]) # destination CSV file
controller_name = "facility" #sys.argv[3]
if not input_json.exists():
print(f"❌ File not found: {input_json}", file=sys.stderr)
sys.exit(1)
try:
main(input_json,output_csv,controller_name)
except Exception as exc:
traceback.print_exc()
sys.exit(1)

View File

@@ -0,0 +1,96 @@
#!/usr/bin/env python3
"""
filter_and_expand_csv.py
Reads a CSV, keeps rows whose first column ends in "_alm",
and writes a new CSV with the desired columns and boilerplate values.
"""
import csv
import sys
from pathlib import Path
import traceback
# ------------------------------------------------------------
# The boilerplate column values (in the order requested)
DEFAULTS = {
"AlarmLevel" : "5",
"Condition1" : "eq",
"Operand1" : "1",
"CombineMethod": "none",
"Condition2" : "eq",
"Operand2" : "", # empty string -> CSV blank
"AlarmContent" : "ALARMED",
"AlarmTag" : "default"
}
# ------------------------------------------------------------
def main(input_csv: Path, output_csv: Path):
with open(input_csv, newline='', encoding='utf-8') as fin, \
open(output_csv, 'w', newline='', encoding='utf-8') as fout:
reader = csv.reader(fin)
writer = csv.writer(fout)
# write the header
writer.writerow([
"AlarmName", "ControllerName", "MeasuringPointName",
"AlarmLevel", "Condition1", "Operand1", "CombineMethod",
"Condition2", "Operand2", "AlarmContent", "AlarmTag"
])
# process each row
for row in reader:
if not row: # skip empty rows
continue
alarm_name = row[0].strip()
# keep only rows that end with "_alm"
if not alarm_name.endswith("_alm"):
continue
# Column 2 might not exist guard against it
controller = row[1].strip() if len(row) > 1 else ""
# Build the new row
new_row = [
alarm_name, # AlarmName
controller, # ControllerName
alarm_name, # MeasuringPointName
DEFAULTS["AlarmLevel"],
DEFAULTS["Condition1"],
DEFAULTS["Operand1"],
DEFAULTS["CombineMethod"],
DEFAULTS["Condition2"],
DEFAULTS["Operand2"],
DEFAULTS["AlarmContent"],
DEFAULTS["AlarmTag"]
]
writer.writerow(new_row)
print(f"✓ Finished. Result written to {output_csv}")
# ------------------------------------------------------------
if __name__ == "__main__":
# Usage: python csv_to_json.py input.json output.csv
if len(sys.argv) != 3:
print(f"Usage: {sys.argv[0]} <input.csv> <output.csv>", file=sys.stderr)
sys.exit(1)
# ------------------------------------------------------------
# Configuration change if your input is somewhere else
input_csv = Path(sys.argv[1]) # source JSON file
output_csv = Path(sys.argv[2]) # destination CSV file
if not input_csv.exists():
print(f"❌ File not found: {input_csv}", file=sys.stderr)
sys.exit(1)
try:
main(input_csv,output_csv)
except Exception as exc:
traceback.print_exc()
sys.exit(1)