Files
ThingsBoard/Report Generator/email-reports-v2.ipynb

348 lines
14 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import requests, json, time, traceback, boto3, xlsxwriter\n",
"from threading import Lock\n",
"from datetime import datetime as dt"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"THINGSBOARD_URL = \"https://hp.henrypump.cloud\"\n",
"USERNAME = \"nmelone@henry-pump.com\"\n",
"PASSWORD = \"gzU6$26v42mU%3jDzTJf\"\n",
"CONFIG_PATH = '/Users/nico/Documents/GitHub/ThingsBoard/Report Generator/lambda-python3.12/tbreport/config.json'"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"# Define a rate limiter class\n",
"class RateLimiter:\n",
" def __init__(self, max_calls, period):\n",
" self.max_calls = max_calls\n",
" self.period = period\n",
" self.call_times = []\n",
" self.lock = Lock()\n",
"\n",
" def acquire(self):\n",
" with self.lock:\n",
" current_time = time.time()\n",
" # Remove expired calls\n",
" self.call_times = [t for t in self.call_times if t > current_time - self.period]\n",
" if len(self.call_times) >= self.max_calls:\n",
" # Wait for the oldest call to expire\n",
" time_to_wait = self.period - (current_time - self.call_times[0])\n",
" time.sleep(time_to_wait)\n",
" # Register the current call\n",
" self.call_times.append(time.time())\n",
"\n",
"# Initialize a rate limiter\n",
"RATE_LIMITER = RateLimiter(max_calls=10, period=2) # Adjust `max_calls` and `period` as needed"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"def sort_dict_keys(d):\n",
" \"\"\"Sorts the keys of all nested dictionaries in a given dictionary.\n",
"\n",
" Args:\n",
" d: The input dictionary.\n",
"\n",
" Returns:\n",
" A new dictionary with sorted keys at each level.\n",
" \"\"\"\n",
" sorted_d = {}\n",
" for k, v in d.items():\n",
" if isinstance(v, dict):\n",
" sorted_d[k] = sort_dict_keys(v)\n",
" else:\n",
" sorted_d[k] = v\n",
" return dict(sorted(sorted_d.items()))"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"# Authenticate to get the JWT token\n",
"def get_jwt_token():\n",
" url = f\"{THINGSBOARD_URL}/api/auth/login\"\n",
" payload = {\"username\": USERNAME, \"password\": PASSWORD}\n",
" response = requests.post(url, json=payload)\n",
" \n",
" if response.status_code == 200:\n",
" return response.json().get(\"token\")\n",
" else:\n",
" raise Exception(f\"Authentication failed: {response.text}\")"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"def get_all_customer_devices(jwt_token, customer_id):\n",
" \"\"\"Retrieve all devices for a customer, handling pagination.\"\"\"\n",
" devices = []\n",
" page = 0\n",
" page_size = 100 # Adjust if needed\n",
" \n",
" while True:\n",
" url = f\"{THINGSBOARD_URL}/api/customer/{customer_id}/devices?pageSize={page_size}&page={page}\"\n",
" headers = {\"X-Authorization\": f\"Bearer {jwt_token}\"}\n",
" RATE_LIMITER.acquire()\n",
" response = requests.get(url, headers=headers)\n",
" \n",
" if response.status_code == 200:\n",
" data = response.json()\n",
" devices.extend(data.get(\"data\", [])) # Add devices from current page\n",
" \n",
" if page >= data.get(\"totalPages\", 1) - 1:\n",
" break # Exit loop if this is the last page\n",
" \n",
" page += 1 # Move to next page\n",
" else:\n",
" raise Exception(f\"Failed to get customer devices: {response.text}\")\n",
" \n",
" return devices\n"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"def get_timeseries_keys(jwt_token, device_id):\n",
" \"\"\"Retrieve available time-series keys (telemetry keys) for a given device ID.\"\"\"\n",
" url = f\"{THINGSBOARD_URL}/api/plugins/telemetry/DEVICE/{device_id}/keys/timeseries\"\n",
" headers = {\"X-Authorization\": f\"Bearer {jwt_token}\"}\n",
" RATE_LIMITER.acquire()\n",
" response = requests.get(url, headers=headers)\n",
" \n",
" if response.status_code == 200:\n",
" return response.json() # Returns a list of telemetry keys\n",
" else:\n",
" raise Exception(f\"Failed to get timeseries keys: {response.text}\")"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"def get_latest_telemetry(jwt_token, device_id, keys=None):\n",
" \"\"\"Retrieve the latest telemetry data for a given device ID.\n",
" \n",
" Args:\n",
" jwt_token (str): The authentication token.\n",
" device_id (str): The ID of the device.\n",
" keys (list, optional): A list of telemetry keys to fetch. Defaults to None (fetch all).\n",
" \n",
" Returns:\n",
" dict: The latest telemetry data for the device.\n",
" \"\"\"\n",
" if keys:\n",
" key_str = \",\".join(keys) # Convert list to comma-separated string\n",
" url = f\"{THINGSBOARD_URL}/api/plugins/telemetry/DEVICE/{device_id}/values/timeseries?keys={key_str}\"\n",
" else:\n",
" url = f\"{THINGSBOARD_URL}/api/plugins/telemetry/DEVICE/{device_id}/values/timeseries\"\n",
" \n",
" headers = {\"X-Authorization\": f\"Bearer {jwt_token}\"}\n",
" RATE_LIMITER.acquire()\n",
" response = requests.get(url, headers=headers)\n",
" \n",
" if response.status_code == 200:\n",
" return response.json() # Returns a dictionary with keys and their latest values\n",
" else:\n",
" raise Exception(f\"Failed to get latest telemetry: {response.text}\")\n"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"# Main execution\n",
"try:\n",
" token = get_jwt_token()\n",
" with open(CONFIG_PATH) as f:\n",
" config = json.load(f)\n",
" reportData = {}\n",
" reportToList = {}\n",
" # Loop through each item in config, each item represents a report\n",
" for report in config:\n",
" reportToList[report[\"name\"]] = report[\"emails\"]\n",
" for customer in report[\"customers\"].keys():\n",
" reportDeviceTypes = report[\"customers\"][customer][\"deviceTypes\"]\n",
" devices = get_all_customer_devices(token, customer)\n",
" if report[\"filterDevicesIn\"]:\n",
" devices = [device for device in devices if device[\"id\"][\"id\"] in report[\"filterDevicesIn\"]]\n",
" if report[\"filterDevicesOut\"]:\n",
" devices = [device for device in devices if device[\"id\"][\"id\"] not in report[\"filterDevicesOut\"]]\n",
" if not reportData.get(report[\"name\"], None):\n",
" reportData[report[\"name\"]] = {}\n",
" for device in devices:\n",
" deviceId = device[\"id\"][\"id\"]\n",
" deviceType = device[\"type\"]\n",
" deviceName = device[\"name\"]\n",
" for reportDeviceType in reportDeviceTypes:\n",
" if reportDeviceType[\"deviceType\"] == deviceType:\n",
" keys = get_timeseries_keys(token, deviceId)\n",
" keys = list(filter(lambda x: x in reportDeviceType[\"dataPoints\"], keys))\n",
" #Check for report customer\n",
" if not reportData[report[\"name\"]].get(report[\"customers\"][customer][\"name\"], None):\n",
" reportData[report[\"name\"]][report[\"customers\"][customer][\"name\"]] = {}\n",
" #Check for device type in config\n",
" if deviceType in list(map(lambda x: x[\"deviceType\"], reportDeviceTypes)):\n",
" #Check if deviceType in report\n",
" if not reportData[report[\"name\"]][report[\"customers\"][customer][\"name\"]].get(deviceType, None):\n",
" reportData[report[\"name\"]][report[\"customers\"][customer][\"name\"]][deviceType] = {}\n",
" if keys:\n",
" deviceData = get_latest_telemetry(token, deviceId, keys)\n",
" for x in reportDeviceTypes:\n",
" if x[\"deviceType\"] == deviceType:\n",
" labels = x[\"labels\"]\n",
" labelled_data = {}\n",
" for k,v in labels.items():\n",
" labelled_data[v] = {}\n",
" for k,v in deviceData.items():\n",
" labelled_data[labels[k]] = v\n",
" reportData[report[\"name\"]][report[\"customers\"][customer][\"name\"]][deviceType][deviceName] = labelled_data\n",
" else:\n",
" reportData[report[\"name\"]][report[\"customers\"][customer][\"name\"]][deviceType][deviceName] = {} \n",
" #Sort Data\n",
" reportDataSorted = sort_dict_keys(reportData)\n",
" #print(json.dumps(reportDataSorted,indent=4))\n",
"except KeyError as ke:\n",
" print(\"KeyError:\", ke)\n",
" traceback.print_exc()\n",
"except Exception as e:\n",
" print(\"Error:\", e)\n",
" traceback.print_exc()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"# Create an AWS SES client\n",
"ses_client = boto3.client('ses', region_name='us-east-1')\n",
"s3 = boto3.resource('s3')\n",
"BUCKET_NAME = \"thingsboard-email-reports\""
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
" # Create a workbook for each report\n",
"for report_name, report_data in reportDataSorted.items():\n",
" #will generate an email lower down\n",
" spreadsheets = []\n",
" # Create a worksheet for each company\n",
" for company_name, company_data in report_data.items():\n",
" workbook = xlsxwriter.Workbook(f\"/Users/nico/Documents/test/{report_name}-{company_name}-{dt.today().strftime('%Y-%m-%d')}.xlsx\",{'strings_to_numbers': True})\n",
" bold = workbook.add_format({'bold': True})\n",
" # Create a sheet for each device type\n",
" for device_type, device_data in company_data.items():\n",
" worksheet = workbook.add_worksheet(device_type)\n",
" \n",
" # Set the header column with device types\n",
" device_names = list(device_data.keys())\n",
" worksheet.write_column(1, 0, device_names,bold)\n",
" #TODO Fix header row and ensure data is put in correct column\n",
" # Write the data to the sheet\n",
" for i, (telemetry_name, telemetry_data) in enumerate(device_data.items()):\n",
" # Set the header row with telemetry names\n",
" telemetry_names = list(telemetry_data.keys())\n",
" worksheet.write_row(0, 1, telemetry_names, bold)\n",
" for j, (data_name, data) in enumerate(telemetry_data.items()):\n",
" values = [d[\"value\"] for d in data]\n",
" worksheet.write_row(i + 1, j+ 1, values)\n",
" worksheet.autofit()\n",
" workbook.close()\n",
" spreadsheets.append(workbook)\n",
" \n",
" \"\"\"# Store the generated report in S3.\n",
" s3.Object(BUCKET_NAME, f'{report_name}-{company_name}-{dt.today().strftime('%Y-%m-%d')}.xlsx').put(Body=open(f\"/Users/nico/Documents/test/{report_name}-{company_name}-{dt.today().strftime('%Y-%m-%d')}.xlsx\", 'rb'))\n",
" # Create an email message\n",
" msg = MIMEMultipart()\n",
" msg['Subject'] = report_name\n",
" msg['From'] = 'alerts@henry-pump.com'\n",
" msg['To'] = \", \".join(reportToList[report_name])\n",
"\n",
" # Add a text body to the message (optional)\n",
" body_text = 'Please find the attached spreadsheets.'\n",
" msg.attach(MIMEText(body_text, 'plain'))\n",
"\n",
" # Attach each workbook in the spreadsheets array\n",
" for spreadsheet in spreadsheets:\n",
" # Attach the file to the email message\n",
" attachment = MIMEBase('application', 'octet-stream')\n",
" attachment.set_payload(open(spreadsheet.filename, \"rb\").read())\n",
" encoders.encode_base64(attachment)\n",
" attachment.add_header('Content-Disposition', 'attachment', filename=spreadsheet.filename[5:])\n",
"\n",
" msg.attach(attachment)\n",
" # Send the email using AWS SES\n",
" response = ses_client.send_raw_email(\n",
" \n",
" RawMessage={'Data': msg.as_string()}\n",
" )\n",
"\n",
" print(response)\n",
" print(spreadsheets)\n",
" \"\"\""
]
}
],
"metadata": {
"kernelspec": {
"display_name": "tbreport",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.1"
}
},
"nbformat": 4,
"nbformat_minor": 2
}