from model import * from fastapi import FastAPI, HTTPException, Body from fastapi.middleware.cors import CORSMiddleware from typing import Literal import requests, json from datetime import datetime as dt from datetime import timedelta as td app = FastAPI( title="Thingsboard API", version="1.0.0", description="Provides secure access to Thingsboard API", ) origins = ["*"] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def getJWT(): response = requests.post(url_base + 'auth/login', headers=headers,data=credentials) token = response.json() headers["X-Authorization"] = "Bearer " + token['token'] @app.get("/customers", response_model=List[Customer]) def getCustomers(page: int = 0, pageSize: int = 10): """ Returns a list of customers from Thingsboard API. """ response = requests.get( url_base + f"customers?page={page}&pageSize={pageSize}", headers=headers) #print(json.dumps(response.json(), indent=4)) customers = [] customers = response.json()['data'] while(response.json()["hasNext"]): page += 1 response = requests.get(url_base + f"customers?page={page}&pageSize={pageSize}", headers=headers) customers += response.json()['data'] #print(customers) return customers @app.get("/customerNames", response_model=List[str]) def getCustomerNames(page=0, pageSize=10): """ Returns a list of customer names from Thingsboard API. """ customers = getCustomers(page=0, pageSize=10) names = [] for c in customers: names.append(c['name']) #print(names) return names @app.get("/telemetry", response_model=dict) def getTelemetry(startTs, endTs, keys, eType, deviceId): """ Returns a list of telemetry data from Thingsboard API. parameters: startTs: Timestamp of the start time for the data in milliseconds. endTs: Timestamp of the end time for the data in milliseconds. keys: List of attribute keys to retrieve (e.g., temperature,humidity,etc.). eType: Type of entity (e.g., 'ASSET', 'DEVICE'). deviceId: ID of the device as a UUID """ if(not keys): keys = ["temperature", "humidity"] if(not startTs): #set startTs to time from 1 hour ago and put it in timestamp format with milliseconds startTs = dt.timestamp(dt.now() - td(hours=1)) * 1000 if(not endTs): #set endTs to time from now and put it in timestamp format with milliseconds endTs = dt.timestamp(dt.now()) * 1000 telemetry = requests.get( url_base + f"plugins/telemetry/{eType}/{deviceId}/values/timeseries?startTs={startTs}&endTs={endTs}&keys={keys}", headers=headers) print(telemetry.json()) return telemetry.json() @app.get("/deviceIdByName", response_model=dict) def getDeviceByName(textSearch: str, deviceType: str = None, sortProperty: str = None, sortOrder: str = "ASC",page: int = 0, pageSize: int = 10): """ Returns a dictionary with the device ID (as a UUID) of a device with a given name in the textSearch param parameters: deviceType: optional parameter to specify the device type for search textSearch: the string or substring of the name of the device being searched for sortProperty: optional parameter to specify the property to sort by can be one of the following [createdTime, name, deviceProfileName, label, customerTitle] sortOrder: optional parameter to specify is ascending or descending order [ASC, DESC] page: the page of results to acquire default is set to 0 to get the first page pageSize: the number of results per page """ response = requests.get(url_base + f"user/devices?page={page}&pageSize={pageSize}&textSearch={textSearch}", headers=headers) #print(response.url) #print(response.request) #print(json.dumps(response.json(),indent=4)) devices = response.json()["data"] device = {textSearch: devices[0]["id"]["id"]} return device headers = {"Content-Type": "application/json", "Accept": "application/json"} username = "example@example.com" password = "super-secure-password" domain = "thingsboard.cloud" credentials = json.dumps( {"username": f"{username}", "password": f"{password}"}) url_base = f"https://{domain}/api/" getJWT() #print(headers)