added more functions

This commit is contained in:
Nico Melone
2025-06-04 18:01:43 -05:00
parent 3f73cc27f9
commit ee8d386cd8
3 changed files with 211 additions and 4 deletions

View File

@@ -0,0 +1,114 @@
from model import *
from fastapi import FastAPI, HTTPException, Body
from fastapi.middleware.cors import CORSMiddleware
from typing import Literal
import requests, json
from datetime import datetime as dt
from datetime import timedelta as td
app = FastAPI(
title="Thingsboard API",
version="1.0.0",
description="Provides secure access to Thingsboard API",
)
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def getJWT():
response = requests.post(url_base + 'auth/login', headers=headers,data=credentials)
token = response.json()
headers["X-Authorization"] = "Bearer " + token['token']
@app.get("/customers", response_model=List[Customer])
def getCustomers(page: int = 0, pageSize: int = 10):
"""
Returns a list of customers from Thingsboard API.
"""
response = requests.get(
url_base + f"customers?page={page}&pageSize={pageSize}", headers=headers)
#print(json.dumps(response.json(), indent=4))
customers = []
customers = response.json()['data']
while(response.json()["hasNext"]):
page += 1
response = requests.get(url_base + f"customers?page={page}&pageSize={pageSize}", headers=headers)
customers += response.json()['data']
#print(customers)
return customers
@app.get("/customerNames", response_model=List[str])
def getCustomerNames(page=0, pageSize=10):
"""
Returns a list of customer names from Thingsboard API.
"""
customers = getCustomers(page=0, pageSize=10)
names = []
for c in customers:
names.append(c['name'])
#print(names)
return names
@app.get("/telemetry", response_model=dict)
def getTelemetry(startTs, endTs, keys, eType, deviceId):
"""
Returns a list of telemetry data from Thingsboard API.
parameters:
startTs: Timestamp of the start time for the data in milliseconds.
endTs: Timestamp of the end time for the data in milliseconds.
keys: List of attribute keys to retrieve (e.g., temperature,humidity,etc.).
eType: Type of entity (e.g., 'ASSET', 'DEVICE').
deviceId: ID of the device as a UUID
"""
if(not keys):
keys = ["temperature", "humidity"]
if(not startTs):
#set startTs to time from 1 hour ago and put it in timestamp format with milliseconds
startTs = dt.timestamp(dt.now() - td(hours=1)) * 1000
if(not endTs):
#set endTs to time from now and put it in timestamp format with milliseconds
endTs = dt.timestamp(dt.now()) * 1000
telemetry = requests.get(
url_base + f"plugins/telemetry/{eType}/{deviceId}/values/timeseries?startTs={startTs}&endTs={endTs}&keys={keys}", headers=headers)
print(telemetry.json())
return telemetry.json()
@app.get("/deviceIdByName", response_model=dict)
def getDeviceByName(textSearch: str, deviceType: str = None, sortProperty: str = None, sortOrder: str = "ASC",page: int = 0, pageSize: int = 10):
"""
Returns a dictionary with the device ID (as a UUID) of a device with a given name in the textSearch param
parameters:
deviceType: optional parameter to specify the device type for search
textSearch: the string or substring of the name of the device being searched for
sortProperty: optional parameter to specify the property to sort by can be one of the following [createdTime, name, deviceProfileName, label, customerTitle]
sortOrder: optional parameter to specify is ascending or descending order [ASC, DESC]
page: the page of results to acquire default is set to 0 to get the first page
pageSize: the number of results per page
"""
response = requests.get(url_base + f"user/devices?page={page}&pageSize={pageSize}&textSearch={textSearch}", headers=headers)
#print(response.url)
#print(response.request)
#print(json.dumps(response.json(),indent=4))
devices = response.json()["data"]
device = {textSearch: devices[0]["id"]["id"]}
return device
headers = {"Content-Type": "application/json",
"Accept": "application/json"}
username = "example@example.com"
password = "super-secure-password"
domain = "thingsboard.cloud"
credentials = json.dumps(
{"username": f"{username}", "password": f"{password}"})
url_base = f"https://{domain}/api/"
getJWT()
#print(headers)

View File

@@ -5,6 +5,7 @@ from typing import Literal
import requests, json
from datetime import datetime as dt
from datetime import timedelta as td
from datetime import timezone as tz
app = FastAPI(
title="Thingsboard API",
version="1.0.0",
@@ -69,19 +70,46 @@ def getTelemetry(startTs, endTs, keys, eType, deviceId):
eType: Type of entity (e.g., 'ASSET', 'DEVICE').
deviceId: ID of the device as a UUID
"""
if(not keys):
if not keys:
keys = ["temperature", "humidity"]
if(not startTs):
if not startTs:
#set startTs to time from 1 hour ago and put it in timestamp format with milliseconds
startTs = dt.timestamp(dt.now() - td(hours=1)) * 1000
if(not endTs):
if not endTs:
#set endTs to time from now and put it in timestamp format with milliseconds
endTs = dt.timestamp(dt.now()) * 1000
telemetry = requests.get(
url_base + f"plugins/telemetry/{eType}/{deviceId}/values/timeseries?startTs={startTs}&endTs={endTs}&keys={keys}", headers=headers)
print(telemetry.json())
#print(telemetry.json())
return telemetry.json()
@app.get("/attributes", response_model=dict)
def getAttributes(device_id: str, scope: str = "SERVER_SCOPE"):
"""
Retrieves attributes for a given device from ThingsBoard.
Parameters:
device_id (str): UUID of the device.
scope (str): Attribute scope to retrieve. Options:
- CLIENT_SCOPE
- SERVER_SCOPE (default)
- SHARED_SCOPE
Returns:
dict: Attributes for the device.
"""
url = url_base + \
f"plugins/telemetry/DEVICE/{device_id}/values/attributes?scope={scope}"
response = requests.get(url, headers=headers)
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code, detail=response.text)
return response.json()
@app.get("/deviceIdByName", response_model=dict)
def getDeviceByName(textSearch: str, deviceType: str = None, sortProperty: str = None, sortOrder: str = "ASC",page: int = 0, pageSize: int = 10):
"""
@@ -101,8 +129,73 @@ def getDeviceByName(textSearch: str, deviceType: str = None, sortProperty: str =
devices = response.json()["data"]
device = {textSearch: devices[0]["id"]["id"]}
return device
def getDevicesByCustomerId(customer_id):
response = requests.get(
url_base + f"customer/{customer_id}/devices?pageSize=4&page=0", headers=headers
)
#print(response.json())
return response.json().get('data', [])
def getLastTelemetryTimestamp(device_id, keys=None):
end_ts = int(dt.now(tz.utc).timestamp() * 1000)
start_ts = int((dt.now(tz.utc) - td(hours=1)).timestamp() * 1000)
data = getTelemetry(startTs=start_ts, endTs=end_ts,
keys=keys, eType="DEVICE", deviceId=device_id)
timestamps = []
for key_data in data.values():
for entry in key_data:
timestamps.append(entry.get('ts', 0))
return max(timestamps, default=None)
def summarizeCustomerDevices():
summary = {}
customers = getCustomers()
#print(customers)
now_ms = int(dt.now(tz.utc).timestamp() * 1000)
stale_threshold = now_ms - (30 * 60 * 1000) # 30 minutes ago
for customer in customers:
customer_id = customer["id"]["id"]
devices = getDevicesByCustomerId(customer_id)
active = 0
inactive = 0
stale_devices = []
for device in devices:
#print(device)
isActive = False
last_ts = 0
for key in getAttributes(device_id=device["id"]["id"]):
if key.get("key") == "active":
isActive = key.get("value")
if key.get("key") == "latestReportTime":
last_ts = key.get("value")
if isActive:
active += 1
if last_ts is None or last_ts < stale_threshold:
stale_devices.append(device["name"])
else:
inactive += 1
summary[customer["name"]] = {
"active_devices": active,
"inactive_devices": inactive,
"stale_active_devices": stale_devices
}
return summary
@app.get("/summary")
def getDeviceSummary():
return summarizeCustomerDevices()
headers = {"Content-Type": "application/json",
"Accept": "application/json"}
username = "nmelone@henry-pump.com"
password = "gzU6$26v42mU%3jDzTJf"
domain = "thingsboard.cloud"