import json, traceback, requests, time from collections import defaultdict from threading import Lock from datetime import datetime as dt THINGSBOARD_URL = "https://hp.henrypump.cloud" USERNAME = "nmelone@henry-pump.com" PASSWORD = "gzU6$26v42mU%3jDzTJf" CONFIG_PATH = '/Users/nico/Documents/GitHub/ThingsBoard/Report Generator/lambda-python3.12/tbreport/config.json' # Define a rate limiter class class RateLimiter: def __init__(self, max_calls, period): self.max_calls = max_calls self.period = period self.call_times = [] self.lock = Lock() def acquire(self): with self.lock: current_time = time.time() # Remove expired calls self.call_times = [t for t in self.call_times if t > current_time - self.period] if len(self.call_times) >= self.max_calls: # Wait for the oldest call to expire time_to_wait = self.period - (current_time - self.call_times[0]) time.sleep(time_to_wait) # Register the current call self.call_times.append(time.time()) # Initialize a rate limiter RATE_LIMITER = RateLimiter(max_calls=20, period=1) # Adjust `max_calls` and `period` as needed def sort_dict_keys(d): """Sorts the keys of all nested dictionaries in a given dictionary. Args: d: The input dictionary. Returns: A new dictionary with sorted keys at each level. """ sorted_d = {} for k, v in d.items(): if isinstance(v, dict): sorted_d[k] = sort_dict_keys(v) else: sorted_d[k] = v return dict(sorted(sorted_d.items())) # Authenticate to get the JWT token def get_jwt_token(): url = f"{THINGSBOARD_URL}/api/auth/login" payload = {"username": USERNAME, "password": PASSWORD} response = requests.post(url, json=payload) if response.status_code == 200: return response.json().get("token") else: raise Exception(f"Authentication failed: {response.text}") def get_all_customer_devices(jwt_token, customer_id): """Retrieve all devices for a customer, handling pagination.""" devices = [] page = 0 page_size = 100 # Adjust if needed while True: url = f"{THINGSBOARD_URL}/api/customer/{customer_id}/devices?pageSize={page_size}&page={page}" headers = {"X-Authorization": f"Bearer {jwt_token}"} RATE_LIMITER.acquire() response = requests.get(url, headers=headers) if response.status_code == 200: data = response.json() devices.extend(data.get("data", [])) # Add devices from current page if page >= data.get("totalPages", 1) - 1: break # Exit loop if this is the last page page += 1 # Move to next page else: raise Exception(f"Failed to get customer devices: {response.text}") return devices def get_timeseries_keys(jwt_token, device_id): """Retrieve available time-series keys (telemetry keys) for a given device ID.""" url = f"{THINGSBOARD_URL}/api/plugins/telemetry/DEVICE/{device_id}/keys/timeseries" headers = {"X-Authorization": f"Bearer {jwt_token}"} RATE_LIMITER.acquire() response = requests.get(url, headers=headers) if response.status_code == 200: return response.json() # Returns a list of telemetry keys else: raise Exception(f"Failed to get timeseries keys: {response.text}") def get_latest_telemetry(jwt_token, device_id, keys=None): """Retrieve the latest telemetry data for a given device ID. Args: jwt_token (str): The authentication token. device_id (str): The ID of the device. keys (list, optional): A list of telemetry keys to fetch. Defaults to None (fetch all). Returns: dict: The latest telemetry data for the device. """ if keys: key_str = ",".join(keys) # Convert list to comma-separated string url = f"{THINGSBOARD_URL}/api/plugins/telemetry/DEVICE/{device_id}/values/timeseries?keys={key_str}" else: url = f"{THINGSBOARD_URL}/api/plugins/telemetry/DEVICE/{device_id}/values/timeseries" headers = {"X-Authorization": f"Bearer {jwt_token}"} RATE_LIMITER.acquire() response = requests.get(url, headers=headers) if response.status_code == 200: return response.json() # Returns a dictionary with keys and their latest values else: raise Exception(f"Failed to get latest telemetry: {response.text}") def lambda_handler(event,context): try: token = get_jwt_token() with open(CONFIG_PATH) as f: config = json.load(f) reportData = defaultdict(lambda: defaultdict(lambda: defaultdict(dict))) reportToList = {} for report in config: reportToList[report["name"]] = report["emails"] for customer, customer_data in report["customers"].items(): reportDeviceTypes = customer_data["deviceTypes"] device_types_set = {x["deviceType"] for x in reportDeviceTypes} devices = get_all_customer_devices(token, customer) filter_in = set(report.get("filterDevicesIn", [])) filter_out = set(report.get("filterDevicesOut", [])) devices = [device for device in devices if (not filter_in or device["id"]["id"] in filter_in)] devices = [device for device in devices if (not filter_out or device["id"]["id"] not in filter_out)] for device in devices: deviceId = device["id"]["id"] deviceType = device["type"] deviceName = device["name"] if deviceType in device_types_set: keys = None labels = {} for reportDeviceType in reportDeviceTypes: if reportDeviceType["deviceType"] == deviceType: keys = get_timeseries_keys(token, deviceId) keys = [k for k in keys if k in reportDeviceType["dataPoints"]] labels = reportDeviceType["labels"] break # No need to check further if keys: deviceData = get_latest_telemetry(token, deviceId, keys) labelled_data = {labels[k]: v for k, v in deviceData.items() if k in labels} reportData[report["name"]][customer_data["name"]][deviceType][deviceName] = labelled_data else: reportData[report["name"]][customer_data["name"]][deviceType][deviceName] = {} reportDataSorted = sort_dict_keys(reportData) print(json.dumps(reportDataSorted, indent=4)) except KeyError as ke: print("KeyError:", ke) traceback.print_exc() except Exception as e: print("Error:", e) traceback.print_exc()