Files
ThingsBoard/LLM Integration/mcp_functions.py
2025-06-04 18:01:43 -05:00

115 lines
4.3 KiB
Python

from model import *
from fastapi import FastAPI, HTTPException, Body
from fastapi.middleware.cors import CORSMiddleware
from typing import Literal
import requests, json
from datetime import datetime as dt
from datetime import timedelta as td
app = FastAPI(
title="Thingsboard API",
version="1.0.0",
description="Provides secure access to Thingsboard API",
)
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def getJWT():
response = requests.post(url_base + 'auth/login', headers=headers,data=credentials)
token = response.json()
headers["X-Authorization"] = "Bearer " + token['token']
@app.get("/customers", response_model=List[Customer])
def getCustomers(page: int = 0, pageSize: int = 10):
"""
Returns a list of customers from Thingsboard API.
"""
response = requests.get(
url_base + f"customers?page={page}&pageSize={pageSize}", headers=headers)
#print(json.dumps(response.json(), indent=4))
customers = []
customers = response.json()['data']
while(response.json()["hasNext"]):
page += 1
response = requests.get(url_base + f"customers?page={page}&pageSize={pageSize}", headers=headers)
customers += response.json()['data']
#print(customers)
return customers
@app.get("/customerNames", response_model=List[str])
def getCustomerNames(page=0, pageSize=10):
"""
Returns a list of customer names from Thingsboard API.
"""
customers = getCustomers(page=0, pageSize=10)
names = []
for c in customers:
names.append(c['name'])
#print(names)
return names
@app.get("/telemetry", response_model=dict)
def getTelemetry(startTs, endTs, keys, eType, deviceId):
"""
Returns a list of telemetry data from Thingsboard API.
parameters:
startTs: Timestamp of the start time for the data in milliseconds.
endTs: Timestamp of the end time for the data in milliseconds.
keys: List of attribute keys to retrieve (e.g., temperature,humidity,etc.).
eType: Type of entity (e.g., 'ASSET', 'DEVICE').
deviceId: ID of the device as a UUID
"""
if(not keys):
keys = ["temperature", "humidity"]
if(not startTs):
#set startTs to time from 1 hour ago and put it in timestamp format with milliseconds
startTs = dt.timestamp(dt.now() - td(hours=1)) * 1000
if(not endTs):
#set endTs to time from now and put it in timestamp format with milliseconds
endTs = dt.timestamp(dt.now()) * 1000
telemetry = requests.get(
url_base + f"plugins/telemetry/{eType}/{deviceId}/values/timeseries?startTs={startTs}&endTs={endTs}&keys={keys}", headers=headers)
print(telemetry.json())
return telemetry.json()
@app.get("/deviceIdByName", response_model=dict)
def getDeviceByName(textSearch: str, deviceType: str = None, sortProperty: str = None, sortOrder: str = "ASC",page: int = 0, pageSize: int = 10):
"""
Returns a dictionary with the device ID (as a UUID) of a device with a given name in the textSearch param
parameters:
deviceType: optional parameter to specify the device type for search
textSearch: the string or substring of the name of the device being searched for
sortProperty: optional parameter to specify the property to sort by can be one of the following [createdTime, name, deviceProfileName, label, customerTitle]
sortOrder: optional parameter to specify is ascending or descending order [ASC, DESC]
page: the page of results to acquire default is set to 0 to get the first page
pageSize: the number of results per page
"""
response = requests.get(url_base + f"user/devices?page={page}&pageSize={pageSize}&textSearch={textSearch}", headers=headers)
#print(response.url)
#print(response.request)
#print(json.dumps(response.json(),indent=4))
devices = response.json()["data"]
device = {textSearch: devices[0]["id"]["id"]}
return device
headers = {"Content-Type": "application/json",
"Accept": "application/json"}
username = "example@example.com"
password = "super-secure-password"
domain = "thingsboard.cloud"
credentials = json.dumps(
{"username": f"{username}", "password": f"{password}"})
url_base = f"https://{domain}/api/"
getJWT()
#print(headers)