auto-update-mar-10

This commit is contained in:
Nico Melone
2026-03-10 13:22:03 -05:00
parent 0ebc03d84e
commit b19004192c
9 changed files with 55187 additions and 0 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,50 @@
#!/usr/bin/env python3
# --------------------------------------------------------------
# Reinterpret a list of DINTstrings as IEEE754 REALs
# --------------------------------------------------------------
import json, struct
from pathlib import Path
# --------------------------------------------------------------
# 1. Load the JSON you already have
# --------------------------------------------------------------
source_file = Path("/Users/nico/Downloads/response_1772033457123.json") # <-- your file
data = json.loads(source_file.read_text())
# The data you showed is a dict with one key; get the list
raw_ts = data["vessel_2_oil_flow_rate"]
# --------------------------------------------------------------
# 2. Helper: reinterpret a 32bit signed int as a float
# --------------------------------------------------------------
def dint_str_to_real(value_str: str) -> float:
"""Return the IEEE754 float that has the same 4byte bit pattern."""
# Convert the string to a signed 32bit integer
dint = int(value_str) # Python ints are unbounded; pack will truncate
# Pack into 4 bytes (littleendian) and unpack as a float
return struct.unpack("<f", struct.pack("<i", dint))[0]
# --------------------------------------------------------------
# 3. Walk the list and add a new field with the real value
# --------------------------------------------------------------
for point in raw_ts:
try:
point["value"] = dint_str_to_real(point["value"])
except Exception as exc:
# In the unlikely event the string is not an int
point["value_real"] = None
point["value_error"] = str(exc)
# --------------------------------------------------------------
# 4. (Optional) Sort by timestamp the sample you posted was
# in descending order, but most timeseries libs expect ascending.
# --------------------------------------------------------------
raw_ts.sort(key=lambda p: p["ts"])
# --------------------------------------------------------------
# 5. Dump the corrected data back to JSON (or whatever you prefer)
# --------------------------------------------------------------
output_file = Path("/Users/nico/Downloads/response_1772033457123_corrected.json")
output_file.write_text(json.dumps(data, indent=2, sort_keys=False))
print(f"✔ Converted {len(raw_ts)} points written to {output_file}")

View File

@@ -0,0 +1,62 @@
#!/usr/bin/env python3
"""
Transform:
{
"<property>": [
{"ts": 123, "value": 10},
{"ts": 456, "value": 20},
...
],
...
}
into:
[
{"ts": 123, "values": {"<property>": 10}},
{"ts": 456, "values": {"<property>": 20}},
...
]
"""
import json
import sys
from collections import defaultdict
def transform(data: dict) -> list:
"""
Accepts a dict where each key is a property name and the value is a
list of {ts, value} objects. Returns a list of objects sorted by
timestamp, each with a 'values' dict that maps the property name to
its value for that timestamp.
"""
# Group values by timestamp first (so if you have several properties
# theyll be merged on the same ts).
grouped = defaultdict(dict)
for prop, items in data.items():
for item in items:
ts = item["ts"]
grouped[ts][prop] = item["value"]
# Build the final list, sorted by ts for deterministic order
result = [{"ts": ts, "values": grouped[ts]} for ts in sorted(grouped)]
return result
def main():
# Load JSON from stdin or a file
if len(sys.argv) > 1: # optional filename argument
with open(sys.argv[1], "r", encoding="utf-8") as f:
raw = json.load(f)
else:
raw = json.load(sys.stdin)
out = transform(raw)
# Prettyprint to stdout
json.dump(out, sys.stdout, indent=2, ensure_ascii=False)
print() # add final newline
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,103 @@
#!/usr/bin/env python3
"""
extract_tags.py
Usage:
python extract_tags.py <input.json> <output.csv>
Example:
python extract_tags.py tags.json tags.csv
"""
import json
import csv
import sys
from pathlib import Path
# --------------------------------------------------------------------
# Helper Recursive walk
# --------------------------------------------------------------------
def walk_tag(base_name, tag_obj, writer, include_structs=False):
"""
Recursively walk a tag object and write every atomic leaf
to the CSV writer.
Parameters
----------
base_name : str
The full name of the tag up to the current point
(e.g. "WT2_Alarms.HiHI").
tag_obj : dict
The JSON dictionary for this tag.
writer : csv.writer
The CSV writer to write rows into.
include_structs : bool
If True, write a row for the struct itself
(e.g. "WT2_Alarms.HiHI,TIMER").
"""
tag_type = tag_obj.get("tag_type")
if tag_type == "atomic":
# Leaf write it
writer.writerow([base_name, tag_obj.get("data_type")])
return
if tag_type == "struct":
# Optionally write the struct tag itself
if include_structs:
writer.writerow([base_name, tag_obj.get("data_type_name")])
# struct data lives under `data_type`
struct_data = tag_obj.get("data_type", {})
internal_tags = struct_data.get("internal_tags", {})
attributes = struct_data.get("attributes", [])
# Walk every attribute in the order the struct defines it
for attr_name in attributes:
subtag = internal_tags.get(attr_name)
if not subtag:
# Defensive: attribute mentioned but not defined
continue
sub_name = f"{base_name}.{attr_name}"
# Recurse subtag may itself be a struct
walk_tag(sub_name, subtag, writer, include_structs)
# --------------------------------------------------------------------
# Main routine
# --------------------------------------------------------------------
def main(json_path, csv_path, include_structs=False):
# Load the entire JSON file
with open(json_path, "r", encoding="utf-8") as fp:
data = json.load(fp)
# Open CSV for writing
with open(csv_path, "w", newline="", encoding="utf-8") as fp:
writer = csv.writer(fp)
# Optional header
writer.writerow(["Tag Name", "Data Type"])
# The toplevel JSON is a dictionary where each key
# is the tag name and its value is the tag object.
for top_tag, tag_obj in data.items():
walk_tag(top_tag, tag_obj, writer, include_structs)
print(f"✅ Wrote {csv_path} with all tags")
# --------------------------------------------------------------------
# Commandline interface
# --------------------------------------------------------------------
if __name__ == "__main__":
if len(sys.argv) < 3:
print(__doc__)
sys.exit(1)
json_file = Path(sys.argv[1])
csv_file = Path(sys.argv[2])
# Optional flag to also output struct tags
include_struct = "--include-structs" in sys.argv
main(json_file, csv_file, include_struct)

View File

@@ -0,0 +1,104 @@
#!/usr/bin/env python3
"""
extract_tags.py
Usage:
python extract_tags.py <input.json> <output.csv> [--sort] [--include-structs]
--sort Sort the final rows alphabetically by tag name.
--include-structs Write a row for the struct itself (e.g. "WT2_Alarms.HiHI,TIMER").
Example:
python extract_tags.py tags.json tags.csv --sort
"""
import json
import csv
import sys
from pathlib import Path
# --------------------------------------------------------------------------- #
# Helper Recursive walk
# --------------------------------------------------------------------------- #
def walk_tag(base_name, tag_obj, rows, include_structs=False):
"""
Recursively walk a tag object and append every atomic leaf
to the rows list.
Parameters
----------
base_name : str
Full tag name up to the current point
(e.g. "WT2_Alarms.HiHI").
tag_obj : dict
JSON dictionary for this tag.
rows : list
Accumulator for [tag_name, data_type] rows.
include_structs : bool
If True, also append the struct itself.
"""
tag_type = tag_obj.get("tag_type")
if tag_type == "atomic":
rows.append([base_name, tag_obj.get("data_type")])
return
if tag_type == "struct":
if include_structs:
rows.append([base_name, tag_obj.get("data_type_name")])
struct_data = tag_obj.get("data_type", {})
internal_tags = struct_data.get("internal_tags", {})
attributes = struct_data.get("attributes", [])
for attr_name in attributes:
subtag = internal_tags.get(attr_name)
if not subtag:
continue
sub_name = f"{base_name}.{attr_name}"
walk_tag(sub_name, subtag, rows, include_structs)
# --------------------------------------------------------------------------- #
# Main routine
# --------------------------------------------------------------------------- #
def main(json_path, csv_path, sort_rows=False, include_structs=False):
# Load the entire JSON file
with open(json_path, "r", encoding="utf-8") as fp:
data = json.load(fp)
rows = [] # will hold all [tag_name, data_type] rows
# Iterate the toplevel tags
for top_tag, tag_obj in data.items():
walk_tag(top_tag, tag_obj, rows, include_structs)
# Optional sorting
if sort_rows:
rows.sort(key=lambda r: r[0].lower()) # caseinsensitive
# Write CSV
with open(csv_path, "w", newline="", encoding="utf-8") as fp:
writer = csv.writer(fp)
writer.writerow(["Tag Name", "Data Type"])
writer.writerows(rows)
print(f"✅ Wrote {csv_path} with {len(rows)} rows.")
# --------------------------------------------------------------------------- #
# Commandline interface
# --------------------------------------------------------------------------- #
if __name__ == "__main__":
if len(sys.argv) < 3:
print(__doc__)
sys.exit(1)
json_file = Path(sys.argv[1])
csv_file = Path(sys.argv[2])
# Flags
sort_flag = "--sort" in sys.argv
struct_flag = "--include-structs" in sys.argv
main(json_file, csv_file, sort_rows=sort_flag, include_structs=struct_flag)