249 lines
7.9 KiB
Python
Executable File
249 lines
7.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
add_alarms.py
|
||
|
||
Add alarm rules to a device‑profile JSON file based on a CSV file.
|
||
|
||
Usage:
|
||
python3 add_alarms.py --profile <profile.json> --csv <keys.csv> [--out <output.json>]
|
||
|
||
If --out is omitted, the original profile file is overwritten.
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import csv
|
||
import os
|
||
import sys
|
||
|
||
# ------------------------------------------------------------------
|
||
# 1. Helper: read JSON and ensure nested structure exists
|
||
# ------------------------------------------------------------------
|
||
def load_profile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
|
||
# Safeguard: ensure the path entity > profileData > alarms exists
|
||
entity = data.get("entity")
|
||
if not entity:
|
||
raise KeyError("Missing top‑level 'entity' key")
|
||
|
||
profile = entity.setdefault("profileData", {})
|
||
alarms = profile.setdefault("alarms", [])
|
||
return data, alarms
|
||
|
||
# ------------------------------------------------------------------
|
||
# 2. Helper: write JSON back
|
||
# ------------------------------------------------------------------
|
||
def write_profile(path, data):
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
json.dump(data, f, indent=2, sort_keys=False)
|
||
print(f"Profile written to {path}")
|
||
|
||
def build_default_name(name):
|
||
name = name.replace("_", " ")
|
||
if name.lower().endswith(" alm"):
|
||
name = name[:-4] + " Alarm"
|
||
name = name.title()
|
||
if " Psi " in name:
|
||
name = name.replace(" Psi ", " PSI ")
|
||
if " Hihi " in name:
|
||
name = name.replace(" Hihi ", " HiHi ")
|
||
if " Lolo " in name:
|
||
name = name.replace(" Lolo ", " LoLo ")
|
||
if " Vfd " in name:
|
||
name = name.replace(" Vfd ", " VFD ")
|
||
if "Lp " in name:
|
||
name = name.replace("Lp ", "LP ")
|
||
if "Hp " in name:
|
||
name = name.replace("Hp ", "HP ")
|
||
if " Lshh " in name:
|
||
name = name.replace(" Lshh ", " LSHH ")
|
||
if " Fr " in name:
|
||
name = name.replace(" Fr ", " FR ")
|
||
if " Dp " in name:
|
||
name = name.replace(" Dp ", " DP ")
|
||
if "Wtp" in name:
|
||
name = name.replace("Wtp", "WTP")
|
||
if "Vrt " in name:
|
||
name = name.replace("Vrt ", "VRT ")
|
||
if "Vru " in name:
|
||
name = name.replace("Vru ", "VRU ")
|
||
if " Plc " in name:
|
||
name = name.replace(" Plc ", " PLC ")
|
||
if "Ot " in name:
|
||
name = name.replace("Ot ", "OT ")
|
||
if "Wt " in name:
|
||
name = name.replace("Wt ", "WT ")
|
||
if "St " in name:
|
||
name = name.replace("St ", "ST ")
|
||
if " Alarms " in name:
|
||
name = name.replace(" Alarms ", " ")
|
||
return name
|
||
# ------------------------------------------------------------------
|
||
# 3. Build a single alarm rule from a key
|
||
# ------------------------------------------------------------------
|
||
def build_alarm_from_key(key, default_name: str):
|
||
"""
|
||
key : e.g. 'leak_1_lo_alm'
|
||
default_name : default string to use for alarmType if the user accepts it
|
||
"""
|
||
# Prompt user
|
||
default_name = build_default_name(name=default_name)
|
||
|
||
prompt = f"Alarm name [{default_name}]: "
|
||
try:
|
||
user_input = input(prompt).strip()
|
||
except KeyboardInterrupt:
|
||
print("\nAborted by user")
|
||
sys.exit(1)
|
||
|
||
alarm_type = user_input if user_input else default_name
|
||
|
||
# Example alarm skeleton – adapt if your real structure differs
|
||
alarm = {
|
||
"configuration" : {
|
||
"type" : "ALARM",
|
||
"arguments" : {
|
||
key : {
|
||
"defaultValue" : "",
|
||
"refEntityKey" : {
|
||
"key" : key,
|
||
"type" : "TS_LATEST"
|
||
}
|
||
}
|
||
},
|
||
"clearRule" : {
|
||
"alarmDetails" : None,
|
||
"condition" : {
|
||
"type" : "DURATION",
|
||
"expression" : {
|
||
"type" : "SIMPLE",
|
||
"filters" : [ {
|
||
"argument" : key,
|
||
"operation" : "AND",
|
||
"predicates" : [ {
|
||
"type" : "BOOLEAN",
|
||
"operation" : "EQUAL",
|
||
"value" : {
|
||
"dynamicValueArgument" : None,
|
||
"staticValue" : False
|
||
}
|
||
} ],
|
||
"valueType" : "BOOLEAN"
|
||
} ],
|
||
"operation" : "AND"
|
||
},
|
||
"schedule" : None,
|
||
"unit" : "MINUTES",
|
||
"value" : {
|
||
"dynamicValueArgument" : None,
|
||
"staticValue" : 30
|
||
}
|
||
},
|
||
"dashboardId" : None
|
||
},
|
||
"createRules" : {
|
||
"CRITICAL" : {
|
||
"alarmDetails" : None,
|
||
"condition" : {
|
||
"type" : "SIMPLE",
|
||
"expression" : {
|
||
"type" : "SIMPLE",
|
||
"filters" : [ {
|
||
"argument" : key,
|
||
"operation" : "AND",
|
||
"predicates" : [ {
|
||
"type" : "BOOLEAN",
|
||
"operation" : "EQUAL",
|
||
"value" : {
|
||
"dynamicValueArgument" : None,
|
||
"staticValue" : True
|
||
}
|
||
} ],
|
||
"valueType" : "BOOLEAN"
|
||
} ],
|
||
"operation" : "AND"
|
||
},
|
||
"schedule" : None
|
||
},
|
||
"dashboardId" : None
|
||
}
|
||
},
|
||
"output" : None,
|
||
"propagate" : False,
|
||
"propagateRelationTypes" : None,
|
||
"propagateToOwner" : False,
|
||
"propagateToOwnerHierarchy" : False,
|
||
"propagateToTenant" : False
|
||
},
|
||
"configurationVersion" : 0,
|
||
"debugSettings" : {
|
||
"allEnabled" : False,
|
||
"allEnabledUntil" : 1769542094557,
|
||
"failuresEnabled" : True
|
||
},
|
||
"name" : alarm_type,
|
||
"tenantId" : {
|
||
"entityType" : "TENANT",
|
||
"id" : "a610ad00-52e2-11ec-89c2-2f343e6c262d"
|
||
},
|
||
"type" : "ALARM"
|
||
}
|
||
|
||
return alarm
|
||
|
||
# ------------------------------------------------------------------
|
||
# 4. Main flow
|
||
# ------------------------------------------------------------------
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="Add alarms from CSV to a profile")
|
||
parser.add_argument("--profile", required=True, help="Path to the device profile JSON")
|
||
parser.add_argument("--csv", required=True, help="CSV file containing key names (first column)")
|
||
parser.add_argument("--out", help="Output file (defaults to input profile)")
|
||
parser.add_argument("--fresh", help="Start the alarm list fresh")
|
||
args = parser.parse_args()
|
||
|
||
# 1) Load the profile
|
||
try:
|
||
profile_data, alarms_list = load_profile(args.profile)
|
||
if not alarms_list or args.fresh:
|
||
alarms_list = []
|
||
except Exception as e:
|
||
print(f"Error loading profile: {e}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
# 2) Read CSV and gather keys that end with '_alm'
|
||
new_alarms = []
|
||
try:
|
||
with open(args.csv, newline="", encoding="utf-8") as f:
|
||
reader = csv.reader(f)
|
||
for row in reader:
|
||
if not row:
|
||
continue
|
||
key = row[0].strip()
|
||
if key.endswith("_alm"):
|
||
new_alarms.append((key, key)) # (key, default_name)
|
||
except Exception as e:
|
||
print(f"Error reading CSV: {e}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
if not new_alarms:
|
||
print("No keys ending with '_alm' were found in the CSV.")
|
||
sys.exit(0)
|
||
|
||
# 3) Build alarms interactively
|
||
for key, default_name in new_alarms:
|
||
print(f"\nCreating alarm for key: {key}")
|
||
alarm = build_alarm_from_key(key, default_name)
|
||
alarms_list.append(alarm)
|
||
print(f"Added alarm '{alarm['name']}'")
|
||
|
||
alarms_list.sort(key=lambda r: r["name"])
|
||
profile_data["calculatedFields"] = alarms_list
|
||
# 4) Write back
|
||
output_path = args.out if args.out else args.profile
|
||
write_profile(output_path, profile_data)
|
||
|
||
if __name__ == "__main__":
|
||
main() |