from model import * from fastapi import FastAPI, HTTPException, Body from fastapi.middleware.cors import CORSMiddleware from typing import Literal import requests, json from datetime import datetime as dt from datetime import timedelta as td from datetime import timezone as tz app = FastAPI( title="Thingsboard API", version="1.0.0", description="Provides secure access to Thingsboard API", ) origins = ["*"] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def getJWT(): response = requests.post(url_base + 'auth/login', headers=headers,data=credentials) token = response.json() headers["X-Authorization"] = "Bearer " + token['token'] @app.get("/customers", response_model=List[Customer]) def getCustomers(page: int = 0, pageSize: int = 10): """ Returns a list of customers from Thingsboard API. """ response = requests.get( url_base + f"customers?page={page}&pageSize={pageSize}", headers=headers) #print(json.dumps(response.json(), indent=4)) customers = [] customers = response.json()['data'] while(response.json()["hasNext"]): page += 1 response = requests.get(url_base + f"customers?page={page}&pageSize={pageSize}", headers=headers) customers += response.json()['data'] #print(customers) return customers @app.get("/customerNames", response_model=List[str]) def getCustomerNames(page=0, pageSize=10): """ Returns a list of customer names from Thingsboard API. """ customers = getCustomers(page=0, pageSize=10) names = [] for c in customers: names.append(c['name']) #print(names) return names @app.get("/telemetry", response_model=dict) def getTelemetry(startTs, endTs, keys, eType, deviceId): """ Returns a list of telemetry data from Thingsboard API. parameters: startTs: Timestamp of the start time for the data in milliseconds. endTs: Timestamp of the end time for the data in milliseconds. keys: List of attribute keys to retrieve (e.g., temperature,humidity,etc.). eType: Type of entity (e.g., 'ASSET', 'DEVICE'). deviceId: ID of the device as a UUID """ if not keys: keys = ["temperature", "humidity"] if not startTs: #set startTs to time from 1 hour ago and put it in timestamp format with milliseconds startTs = dt.timestamp(dt.now() - td(hours=1)) * 1000 if not endTs: #set endTs to time from now and put it in timestamp format with milliseconds endTs = dt.timestamp(dt.now()) * 1000 telemetry = requests.get( url_base + f"plugins/telemetry/{eType}/{deviceId}/values/timeseries?startTs={startTs}&endTs={endTs}&keys={keys}", headers=headers) #print(telemetry.json()) return telemetry.json() @app.get("/attributes", response_model=dict) def getAttributes(device_id: str, scope: str = "SERVER_SCOPE"): """ Retrieves attributes for a given device from ThingsBoard. Parameters: device_id (str): UUID of the device. scope (str): Attribute scope to retrieve. Options: - CLIENT_SCOPE - SERVER_SCOPE (default) - SHARED_SCOPE Returns: dict: Attributes for the device. """ url = url_base + \ f"plugins/telemetry/DEVICE/{device_id}/values/attributes?scope={scope}" response = requests.get(url, headers=headers) if response.status_code != 200: raise HTTPException( status_code=response.status_code, detail=response.text) return response.json() @app.get("/deviceIdByName", response_model=dict) def getDeviceByName(textSearch: str, deviceType: str = None, sortProperty: str = None, sortOrder: str = "ASC",page: int = 0, pageSize: int = 10): """ Returns a dictionary with the device ID (as a UUID) of a device with a given name in the textSearch param parameters: deviceType: optional parameter to specify the device type for search textSearch: the string or substring of the name of the device being searched for sortProperty: optional parameter to specify the property to sort by can be one of the following [createdTime, name, deviceProfileName, label, customerTitle] sortOrder: optional parameter to specify is ascending or descending order [ASC, DESC] page: the page of results to acquire default is set to 0 to get the first page pageSize: the number of results per page """ response = requests.get(url_base + f"user/devices?page={page}&pageSize={pageSize}&textSearch={textSearch}", headers=headers) #print(response.url) #print(response.request) #print(json.dumps(response.json(),indent=4)) devices = response.json()["data"] device = {textSearch: devices[0]["id"]["id"]} return device def getDevicesByCustomerId(customer_id): response = requests.get( url_base + f"customer/{customer_id}/devices?pageSize=100&page=0", headers=headers ) #print(response.json()) return response.json().get('data', []) def getLastTelemetryTimestamp(device_id, keys=None): end_ts = int(dt.now(tz.utc).timestamp() * 1000) start_ts = int((dt.now(tz.utc) - td(hours=1)).timestamp() * 1000) data = getTelemetry(startTs=start_ts, endTs=end_ts, keys=keys, eType="DEVICE", deviceId=device_id) timestamps = [] for key_data in data.values(): for entry in key_data: timestamps.append(entry.get('ts', 0)) return max(timestamps, default=None) def summarizeCustomerDevices(): summary = {} customers = getCustomers() #print(customers) now_ms = int(dt.now(tz.utc).timestamp() * 1000) stale_threshold = now_ms - (30 * 60 * 1000) # 30 minutes ago for customer in customers: customer_id = customer["id"]["id"] devices = getDevicesByCustomerId(customer_id) active = 0 inactive = 0 stale_devices = [] for device in devices: #print(device) isActive = False last_ts = 0 for key in getAttributes(device_id=device["id"]["id"]): if key.get("key") == "active": isActive = key.get("value") if key.get("key") == "latestReportTime": last_ts = key.get("value") if isActive: active += 1 if last_ts is None or last_ts < stale_threshold: stale_devices.append(device["name"]) else: inactive += 1 summary[customer["name"]] = { "active_devices": active, "inactive_devices": inactive, "stale_active_devices": stale_devices } return summary @app.get("/summary") def getDeviceSummary(): return summarizeCustomerDevices() headers = {"Content-Type": "application/json", "Accept": "application/json"} username = "nmelone@henry-pump.com" password = "gzU6$26v42mU%3jDzTJf" domain = "thingsboard.cloud" credentials = json.dumps( {"username": f"{username}", "password": f"{password}"}) url_base = f"https://{domain}/api/" getJWT() #print(headers)