96 lines
3.2 KiB
Python
96 lines
3.2 KiB
Python
import json
|
|
import httpx
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.responses import JSONResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
from typing import Dict, Any
|
|
import uvicorn
|
|
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
|
|
app = FastAPI()
|
|
|
|
origins = ["*"]
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=origins,
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
# Load OpenAPI spec from IBM COS API
|
|
OPENAPI_URL = "https://thingsboard.cloud/v3/api-docs/thingsboard"
|
|
|
|
class ToolCallInput(BaseModel):
|
|
tool_name: str
|
|
input: Dict[str, Any]
|
|
|
|
# In-memory registry of tools
|
|
tool_registry = {}
|
|
|
|
async def fetch_openapi_spec():
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(OPENAPI_URL)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def generate_tools_from_openapi(openapi: Dict[str, Any]):
|
|
paths = openapi.get("paths", {})
|
|
if not paths:
|
|
raise ValueError(f"Path is empty or invalid for {service_name}")
|
|
|
|
for path, methods in paths.items():
|
|
for method, details in methods.items():
|
|
operation_id = details.get("operationId") or f"{method}_{path.replace('/', '_')}"
|
|
summary = details.get("summary", "")
|
|
|
|
# Create a basic tool function with a name and HTTP method
|
|
def make_tool(p, m):
|
|
async def tool_func(input_data):
|
|
region = input_data.get("region", "us-south")
|
|
headers = input_data.get("headers", {})
|
|
body = input_data.get("body", None)
|
|
params = input_data.get("params", None)
|
|
params = input_data.get("params", {})
|
|
formatted_path = p
|
|
for key, value in params.items():
|
|
formatted_path = formatted_path.replace(f"{{{key}}}", value)
|
|
url = f"https://thingsboard.cloud/v3{formatted_path}"
|
|
async with httpx.AsyncClient() as client:
|
|
req = client.build_request(m.upper(), url, headers=headers, json=body, params=params)
|
|
res = await client.send(req)
|
|
return {"status_code": res.status_code, "body": res.text}
|
|
return tool_func
|
|
|
|
tool_registry[operation_id] = make_tool(path, method)
|
|
|
|
@app.post("/invoke")
|
|
async def invoke_tool(call: ToolCallInput):
|
|
tool_name = call.tool_name
|
|
input_data = call.input
|
|
print(input_data)
|
|
if tool_name not in tool_registry:
|
|
return JSONResponse(status_code=404, content={"error": "Tool not found"})
|
|
|
|
tool_func = tool_registry[tool_name]
|
|
try:
|
|
result = await tool_func(input_data)
|
|
return JSONResponse(content={"output": result})
|
|
except Exception as e:
|
|
return JSONResponse(status_code=500, content={"error": str(e)})
|
|
|
|
@app.get("/tools")
|
|
async def list_tools():
|
|
return JSONResponse(content={"tools": list(tool_registry.keys())})
|
|
|
|
@app.on_event("startup")
|
|
async def startup():
|
|
openapi = await fetch_openapi_spec()
|
|
generate_tools_from_openapi(openapi)
|
|
print(f"Registered tools: {list(tool_registry.keys())}")
|