updated code for creating collectors from plc tags
This commit is contained in:
79
code snippets/csvToJSON.py
Executable file
79
code snippets/csvToJSON.py
Executable file
@@ -0,0 +1,79 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
csv_to_json.py
|
||||
|
||||
Convert a CSV into a JSON array where each element is
|
||||
|
||||
{
|
||||
"<value from column 2>": {
|
||||
"data_type": "<value from column 4>",
|
||||
"tag_name": "<value from column 2>"
|
||||
}
|
||||
}
|
||||
|
||||
Only rows whose first column is exactly "TAG" are included.
|
||||
"""
|
||||
|
||||
import csv
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
def csv_to_tag_json(csv_path: Path, json_path: Path) -> None:
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
csv_path : Path
|
||||
Path to the source CSV file.
|
||||
json_path : Path
|
||||
Path where the output JSON should be written.
|
||||
"""
|
||||
tag_rows = {}
|
||||
|
||||
# open the CSV file. Assume UTF‑8, but you can change if needed.
|
||||
with csv_path.open(newline='', encoding='utf-8') as f:
|
||||
reader = csv.reader(f)
|
||||
for row_index, row in enumerate(reader, start=1):
|
||||
# guard against empty rows
|
||||
if not row:
|
||||
continue
|
||||
|
||||
# Ensure we have at least 5 columns (index 0..4)
|
||||
if len(row) < 5:
|
||||
print(f"⚠️ Skipping line {row_index}: not enough columns", file=sys.stderr)
|
||||
continue
|
||||
|
||||
# Row 0 must be exactly "TAG" (case‑sensitive)
|
||||
if row[0] != "TAG":
|
||||
continue
|
||||
|
||||
tag_name = row[2]
|
||||
data_type = row[4]
|
||||
|
||||
# Build the object exactly as you asked
|
||||
tag_rows[tag_name] = {
|
||||
"data_type": data_type,
|
||||
"tag_name": tag_name
|
||||
}
|
||||
|
||||
|
||||
# Write the list of objects as pretty‑printed JSON
|
||||
with json_path.open('w', encoding='utf-8') as f:
|
||||
json.dump(tag_rows, f, indent=2, ensure_ascii=False)
|
||||
|
||||
print(f"✅ Wrote {len(tag_rows)} rows to {json_path}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Usage: python csv_to_json.py input.csv output.json
|
||||
if len(sys.argv) != 3:
|
||||
print(f"Usage: {sys.argv[0]} <input.csv> <output.json>", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
csv_file = Path(sys.argv[1])
|
||||
json_file = Path(sys.argv[2])
|
||||
|
||||
if not csv_file.exists():
|
||||
print(f"❌ File not found: {csv_file}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
csv_to_tag_json(csv_file, json_file)
|
||||
@@ -15,12 +15,12 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 3,
|
||||
"execution_count": 10,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"ip_address = \"166.193.23.21\"# \"ngrok.iot.inhandnetworks.com:3054\" # \"166.141.90.208\"\n",
|
||||
"device_type = \"fk_leak_detection\"\n",
|
||||
"ip_address = \"166.141.136.69\"# \"ngrok.iot.inhandnetworks.com:3054\" # \"166.141.90.208\"\n",
|
||||
"device_type = \"ba_facility\"\n",
|
||||
"today = dt.now().strftime(\"%Y_%B_%d\")\n",
|
||||
"filename = f\"tag_dump_{today}.json\"\n",
|
||||
"path = f'/Users/nico/Documents/GitHub/HP_InHand_IG502/Pub_Sub/{device_type}/thingsboard/' # code snippets/tag_dump.json'"
|
||||
@@ -28,7 +28,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"execution_count": 3,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
@@ -43,7 +43,17 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"execution_count": 11,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"with open(\"/Users/nico/Documents/GitHub/HP_InHand_IG502/Pub_Sub/ba_facility/thingsboard/ma_deuce_output.json\", \"r\") as tags:\n",
|
||||
" data = json.load(tags)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 7,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
@@ -78,11 +88,19 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"execution_count": 13,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"CSV file created: /Users/nico/Documents/GitHub/HP_InHand_IG502/Pub_Sub/ba_facility/thingsboard/ma_deuce.csv\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"controller_name = \"leak_detection\"\n",
|
||||
"controller_name = \"ma_deuce\"\n",
|
||||
"filename = f\"{controller_name}.csv\"\n",
|
||||
"header = [\n",
|
||||
" \"MeasuringPointName\",\"ControllerName\",\"GroupName\",\"UploadType\",\"DeadZoneType\",\"DeadZonePercent\",\n",
|
||||
@@ -111,7 +129,9 @@
|
||||
" if info[\"tag_name\"].startswith(\"_IO\"):\n",
|
||||
" continue\n",
|
||||
"\n",
|
||||
" dtype = data_type_map.get(info[\"data_type\"].upper(), info[\"data_type\"].upper())\n",
|
||||
" dtype = data_type_map.get(info[\"data_type\"].upper(), \"\")\n",
|
||||
" if not dtype:\n",
|
||||
" continue\n",
|
||||
" rw = \"rw\" if (\"SPT\" in tag.upper() or \"CMD\" in tag.upper()) else \"ro\"\n",
|
||||
" address_name = info[\"tag_name\"]\n",
|
||||
" measuring_point = format_measuring_point(address_name)\n",
|
||||
|
||||
Reference in New Issue
Block a user