import requests import json from datetime import datetime as dt class ThingsBoardAPI(): def __init__(self, username, password,domain): self.headers = {"Content-Type": "application/json", "Accept": "application/json"} self.credentials = json.dumps({"username":f"{username}", "password":f"{password}"}) self.url_base = f"https://{domain}/api/" self.getJWT() def getJWT(self): response = requests.post(self.url_base + 'auth/login', headers=self.headers,data=self.credentials) self.token = response.json() self.headers["X-Authorization"] = "Bearer " + self.token['token'] def getCustomers(self, page=0, pageSize=10): customers = requests.get(self.url_base + f"customers?page={page}&pageSize={pageSize}", headers=self.headers) return customers.json() def getAssets(self, customers, target_customer, page=0, pageSize=10): for c in customers['data']: if c["name"] == target_customer: cid = c["id"]["id"] assets = requests.get(self.url_base + f"customers/{cid}/assets?page={page}&pageSize={pageSize}") return assets.json() def getDevices(self,customers,target_customer, page=0, pageSize=10): for c in customers['data']: if c["name"] == target_customer: cid = c["id"]["id"] devices = requests.get(self.url_base + f"customer/{cid}/devices?page={page}&pageSize={pageSize}", headers=self.headers) return devices.json() def getDeviceKeys(self, devices,target_device): for d in devices['data']: if d["name"] == target_device: did = d['id']['id'] eType = d['id']['entityType'] keys = requests.get(self.url_base + f"plugins/telemetry/{eType}/{did}/keys/timeseries", headers=self.headers) return eType, did, keys.json() def getTelemetry(self, startTs, endTs, keys, eType, did): telemetry = requests.get(self.url_base + f"plugins/telemetry/{eType}/{did}/values/timeseries?startTs={startTs}&endTs={endTs}&keys={keys}", headers=self.headers) return telemetry.json() def deleteTimeSeriesAll(self, keys, eType, did): resp = requests.delete(self.url_base + f"plugins/telemetry/{eType}/{did}/timeseries/delete?deleteAllDataForKeys=true&keys={keys}", headers=self.headers) return resp.json() def deleteTimeSeriesScoped(self, startTs, endTs, keys, eType, did): resp = requests.delete(self.url_base + f"plugins/telemetry/{eType}/{did}/timeseries/delete?startTs={startTs}&endTs={endTs}&keys={keys}", headers=self.headers) return resp.json() def convertDateTimeToMS(self,datestring): date = dt.strptime(datestring,"%d %b %Y, %H:%M:%S") return int(date.timestamp() * 1000) def __repr__(self): print(f"Base URL: {self.url_base}") print(f"Token: {self.token['token']}") return ""