Files
ThingsBoard/thingsboardAPI.py
2022-07-21 13:58:09 -05:00

62 lines
2.9 KiB
Python

import requests
import json
from datetime import datetime as dt
class ThingsBoardAPI():
def __init__(self, username, password,domain):
self.headers = {"Content-Type": "application/json", "Accept": "application/json"}
self.credentials = json.dumps({"username":f"{username}", "password":f"{password}"})
self.url_base = f"https://{domain}/api/"
self.getJWT()
def getJWT(self):
response = requests.post(self.url_base + 'auth/login', headers=self.headers,data=self.credentials)
self.token = response.json()
self.headers["X-Authorization"] = "Bearer " + self.token['token']
def getCustomers(self, page=0, pageSize=10):
customers = requests.get(self.url_base + f"customers?page={page}&pageSize={pageSize}", headers=self.headers)
return customers.json()
def getAssets(self, customers, target_customer, page=0, pageSize=10):
for c in customers['data']:
if c["name"] == target_customer:
cid = c["id"]["id"]
assets = requests.get(self.url_base + f"customers/{cid}/assets?page={page}&pageSize={pageSize}")
return assets.json()
def getDevices(self,customers,target_customer, page=0, pageSize=10):
for c in customers['data']:
if c["name"] == target_customer:
cid = c["id"]["id"]
devices = requests.get(self.url_base + f"customer/{cid}/devices?page={page}&pageSize={pageSize}", headers=self.headers)
return devices.json()
def getDeviceKeys(self, devices,target_device):
for d in devices['data']:
if d["name"] == target_device:
did = d['id']['id']
eType = d['id']['entityType']
keys = requests.get(self.url_base + f"plugins/telemetry/{eType}/{did}/keys/timeseries", headers=self.headers)
return eType, did, keys.json()
def getTelemetry(self, startTs, endTs, keys, eType, did):
telemetry = requests.get(self.url_base + f"plugins/telemetry/{eType}/{did}/values/timeseries?startTs={startTs}&endTs={endTs}&keys={keys}", headers=self.headers)
return telemetry.json()
def deleteTimeSeriesAll(self, keys, eType, did):
resp = requests.delete(self.url_base + f"plugins/telemetry/{eType}/{did}/timeseries/delete?deleteAllDataForKeys=true&keys={keys}", headers=self.headers)
return resp.json()
def deleteTimeSeriesScoped(self, startTs, endTs, keys, eType, did):
resp = requests.delete(self.url_base + f"plugins/telemetry/{eType}/{did}/timeseries/delete?startTs={startTs}&endTs={endTs}&keys={keys}", headers=self.headers)
return resp.json()
def convertDateTimeToMS(self,datestring):
date = dt.strptime(datestring,"%d %b %Y, %H:%M:%S")
return int(date.timestamp() * 1000)
def __repr__(self):
print(f"Base URL: {self.url_base}")
print(f"Token: {self.token['token']}")
return ""