Files
HP_InHand_IG502/code snippets/addAlarmsDeviceProfile.py

249 lines
7.9 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
add_alarms.py
Add alarm rules to a deviceprofile JSON file based on a CSV file.
Usage:
python3 add_alarms.py --profile <profile.json> --csv <keys.csv> [--out <output.json>]
If --out is omitted, the original profile file is overwritten.
"""
import argparse
import json
import csv
import os
import sys
# ------------------------------------------------------------------
# 1. Helper: read JSON and ensure nested structure exists
# ------------------------------------------------------------------
def load_profile(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
# Safeguard: ensure the path entity > profileData > alarms exists
entity = data.get("entity")
if not entity:
raise KeyError("Missing toplevel 'entity' key")
profile = entity.setdefault("profileData", {})
alarms = profile.setdefault("alarms", [])
return data, alarms
# ------------------------------------------------------------------
# 2. Helper: write JSON back
# ------------------------------------------------------------------
def write_profile(path, data):
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, sort_keys=False)
print(f"Profile written to {path}")
def build_default_name(name):
name = name.replace("_", " ")
if name.lower().endswith(" alm"):
name = name[:-4] + " Alarm"
name = name.title()
if " Psi " in name:
name = name.replace(" Psi ", " PSI ")
if " Hihi " in name:
name = name.replace(" Hihi ", " HiHi ")
if " Lolo " in name:
name = name.replace(" Lolo ", " LoLo ")
if " Vfd " in name:
name = name.replace(" Vfd ", " VFD ")
if "Lp " in name:
name = name.replace("Lp ", "LP ")
if "Hp " in name:
name = name.replace("Hp ", "HP ")
if " Lshh " in name:
name = name.replace(" Lshh ", " LSHH ")
if " Fr " in name:
name = name.replace(" Fr ", " FR ")
if " Dp " in name:
name = name.replace(" Dp ", " DP ")
if "Wtp" in name:
name = name.replace("Wtp", "WTP")
if "Vrt " in name:
name = name.replace("Vrt ", "VRT ")
if "Vru " in name:
name = name.replace("Vru ", "VRU ")
if " Plc " in name:
name = name.replace(" Plc ", " PLC ")
if "Ot " in name:
name = name.replace("Ot ", "OT ")
if "Wt " in name:
name = name.replace("Wt ", "WT ")
if "St " in name:
name = name.replace("St ", "ST ")
if " Alarms " in name:
name = name.replace(" Alarms ", " ")
return name
# ------------------------------------------------------------------
# 3. Build a single alarm rule from a key
# ------------------------------------------------------------------
def build_alarm_from_key(key, default_name: str):
"""
key : e.g. 'leak_1_lo_alm'
default_name : default string to use for alarmType if the user accepts it
"""
# Prompt user
default_name = build_default_name(name=default_name)
prompt = f"Alarm name [{default_name}]: "
try:
user_input = input(prompt).strip()
except KeyboardInterrupt:
print("\nAborted by user")
sys.exit(1)
alarm_type = user_input if user_input else default_name
# Example alarm skeleton adapt if your real structure differs
alarm = {
"configuration" : {
"type" : "ALARM",
"arguments" : {
key : {
"defaultValue" : "",
"refEntityKey" : {
"key" : key,
"type" : "TS_LATEST"
}
}
},
"clearRule" : {
"alarmDetails" : None,
"condition" : {
"type" : "DURATION",
"expression" : {
"type" : "SIMPLE",
"filters" : [ {
"argument" : key,
"operation" : "AND",
"predicates" : [ {
"type" : "BOOLEAN",
"operation" : "EQUAL",
"value" : {
"dynamicValueArgument" : None,
"staticValue" : False
}
} ],
"valueType" : "BOOLEAN"
} ],
"operation" : "AND"
},
"schedule" : None,
"unit" : "MINUTES",
"value" : {
"dynamicValueArgument" : None,
"staticValue" : 30
}
},
"dashboardId" : None
},
"createRules" : {
"CRITICAL" : {
"alarmDetails" : None,
"condition" : {
"type" : "SIMPLE",
"expression" : {
"type" : "SIMPLE",
"filters" : [ {
"argument" : key,
"operation" : "AND",
"predicates" : [ {
"type" : "BOOLEAN",
"operation" : "EQUAL",
"value" : {
"dynamicValueArgument" : None,
"staticValue" : True
}
} ],
"valueType" : "BOOLEAN"
} ],
"operation" : "AND"
},
"schedule" : None
},
"dashboardId" : None
}
},
"output" : None,
"propagate" : False,
"propagateRelationTypes" : None,
"propagateToOwner" : False,
"propagateToOwnerHierarchy" : False,
"propagateToTenant" : False
},
"configurationVersion" : 0,
"debugSettings" : {
"allEnabled" : False,
"allEnabledUntil" : 1769542094557,
"failuresEnabled" : True
},
"name" : alarm_type,
"tenantId" : {
"entityType" : "TENANT",
"id" : "a610ad00-52e2-11ec-89c2-2f343e6c262d"
},
"type" : "ALARM"
}
return alarm
# ------------------------------------------------------------------
# 4. Main flow
# ------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Add alarms from CSV to a profile")
parser.add_argument("--profile", required=True, help="Path to the device profile JSON")
parser.add_argument("--csv", required=True, help="CSV file containing key names (first column)")
parser.add_argument("--out", help="Output file (defaults to input profile)")
parser.add_argument("--fresh", help="Start the alarm list fresh")
args = parser.parse_args()
# 1) Load the profile
try:
profile_data, alarms_list = load_profile(args.profile)
if not alarms_list or args.fresh:
alarms_list = []
except Exception as e:
print(f"Error loading profile: {e}", file=sys.stderr)
sys.exit(1)
# 2) Read CSV and gather keys that end with '_alm'
new_alarms = []
try:
with open(args.csv, newline="", encoding="utf-8") as f:
reader = csv.reader(f)
for row in reader:
if not row:
continue
key = row[0].strip()
if key.endswith("_alm"):
new_alarms.append((key, key)) # (key, default_name)
except Exception as e:
print(f"Error reading CSV: {e}", file=sys.stderr)
sys.exit(1)
if not new_alarms:
print("No keys ending with '_alm' were found in the CSV.")
sys.exit(0)
# 3) Build alarms interactively
for key, default_name in new_alarms:
print(f"\nCreating alarm for key: {key}")
alarm = build_alarm_from_key(key, default_name)
alarms_list.append(alarm)
print(f"Added alarm '{alarm['name']}'")
alarms_list.sort(key=lambda r: r["name"])
profile_data["calculatedFields"] = alarms_list
# 4) Write back
output_path = args.out if args.out else args.profile
write_profile(output_path, profile_data)
if __name__ == "__main__":
main()