Files
ThingsBoard/Report Generator/lambda-python3.12/tbreport/tbreport.py
2025-01-26 10:35:47 -06:00

176 lines
8.4 KiB
Python

from tb_rest_client.rest_client_pe import *
from tb_rest_client.rest import ApiException
import json, xlsxwriter, boto3, os, time
from threading import Lock
from datetime import datetime as dt
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email import encoders
from email.mime.base import MIMEBase
# Define a rate limiter class
class RateLimiter:
def __init__(self, max_calls, period):
self.max_calls = max_calls
self.period = period
self.call_times = []
self.lock = Lock()
def acquire(self):
with self.lock:
current_time = time.time()
# Remove expired calls
self.call_times = [t for t in self.call_times if t > current_time - self.period]
if len(self.call_times) >= self.max_calls:
# Wait for the oldest call to expire
time_to_wait = self.period - (current_time - self.call_times[0])
time.sleep(time_to_wait)
# Register the current call
self.call_times.append(time.time())
# Initialize a rate limiter
rate_limiter = RateLimiter(max_calls=10, period=1) # Adjust `max_calls` and `period` as needed
def sort_dict_keys(d):
"""Sorts the keys of all nested dictionaries in a given dictionary.
Args:
d: The input dictionary.
Returns:
A new dictionary with sorted keys at each level.
"""
sorted_d = {}
for k, v in d.items():
if isinstance(v, dict):
sorted_d[k] = sort_dict_keys(v)
else:
sorted_d[k] = v
return dict(sorted(sorted_d.items()))
def lambda_handler(event, context):
# Creating Rest Client for ThingsBoard
with RestClientPE(base_url="https://hp.henrypump.cloud") as rest_client:
try:
rest_client.login(username=os.environ["username"], password=os.environ["password"])
# Loading Config from file
with open("./config.json") as f:
config = json.load(f)
reportData = {}
reportToList = {}
# Loop through each item in config, each item represents a report
for report in config:
reportToList[report["name"]] = report["emails"]
for customer in report["customers"].keys():
# Apply rate limiting for API calls
rate_limiter.acquire()
devices = rest_client.get_customer_devices(customer_id=customer, page=0, page_size=1000)
if report["filterDevicesIn"]:
devices.data = [device for device in devices.data if device.id.id in report["filterDevicesIn"]]
if report["filterDevicesOut"]:
devices.data = [device for device in devices.data if device.id.id not in report["filterDevicesOut"]]
if not reportData.get(report["name"], None):
reportData[report["name"]] = {}
for device in devices.data:
for deviceType in report["customers"][customer]["deviceTypes"]:
if device.type == deviceType["deviceType"]:
rate_limiter.acquire()
keys = rest_client.get_timeseries_keys_v1(device.id)
keys = list(filter(lambda x: x in deviceType["dataPoints"], keys))
#Check for report customer
if not reportData[report["name"]].get(report["customers"][customer]["name"], None):
reportData[report["name"]][report["customers"][customer]["name"]] = {}
#Check for device type in config
if device.type in list(map(lambda x: x["deviceType"], report["customers"][customer]["deviceTypes"])):
#Check if deviceType in report
if not reportData[report["name"]][report["customers"][customer]["name"]].get(device.type, None):
reportData[report["name"]][report["customers"][customer]["name"]][device.type] = {}
if keys:
rate_limiter.acquire()
deviceData = rest_client.get_latest_timeseries(entity_id=device.id, keys=",".join(keys))
for x in report["customers"][customer]["deviceTypes"]:
if x["deviceType"] == device.type:
labels = x["labels"]
labelled_data = {}
for k,v in labels.items():
labelled_data[v] = {}
for k,v in deviceData.items():
labelled_data[labels[k]] = v
reportData[report["name"]][report["customers"][customer]["name"]][device.type][device.name] = labelled_data
else:
reportData[report["name"]][report["customers"][customer]["name"]][device.type][device.name] = {}
#Sort Data
reportDataSorted = sort_dict_keys(reportData)
#print(json.dumps(reportDataSorted,indent=4))
except ApiException as e:
print(f"API Exception: {e}")
except Exception as e:
print(f"Other Exception in getting data:\n{e}")
# Create an AWS SES client
ses_client = boto3.client('ses', region_name='us-east-1')
s3 = boto3.resource('s3')
BUCKET_NAME = "thingsboard-email-reports"
# Create a workbook for each report
for report_name, report_data in reportDataSorted.items():
#will generate an email lower down
spreadsheets = []
# Create a worksheet for each company
for company_name, company_data in report_data.items():
workbook = xlsxwriter.Workbook(f"/tmp/{report_name}-{company_name}-{dt.today().strftime('%Y-%m-%d')}.xlsx",{'strings_to_numbers': True})
bold = workbook.add_format({'bold': True})
# Create a sheet for each device type
for device_type, device_data in company_data.items():
worksheet = workbook.add_worksheet(device_type)
# Set the header column with device types
device_names = list(device_data.keys())
worksheet.write_column(1, 0, device_names,bold)
# Write the data to the sheet
for i, (telemetry_name, telemetry_data) in enumerate(device_data.items()):
# Set the header row with telemetry names
telemetry_names = list(telemetry_data.keys())
worksheet.write_row(0, 1, telemetry_names, bold)
for j, (data_name, data) in enumerate(telemetry_data.items()):
values = [d["value"] for d in data]
worksheet.write_row(i + 1, j+ 1, values)
worksheet.autofit()
workbook.close()
spreadsheets.append(workbook)
# Store the generated report in S3.
s3.Object(BUCKET_NAME, f'{report_name}-{company_name}-{dt.today().strftime('%Y-%m-%d')}.xlsx').put(Body=open(f"/tmp/{report_name}-{company_name}-{dt.today().strftime('%Y-%m-%d')}.xlsx", 'rb'))
if reportToList[report_name]:
# Create an email message
msg = MIMEMultipart()
msg['Subject'] = report_name
msg['From'] = 'alerts@henry-pump.com'
msg['To'] = ", ".join(reportToList[report_name])
# Add a text body to the message (optional)
body_text = 'Please find the attached spreadsheets.'
msg.attach(MIMEText(body_text, 'plain'))
# Attach each workbook in the spreadsheets array
for spreadsheet in spreadsheets:
# Attach the file to the email message
attachment = MIMEBase('application', 'octet-stream')
attachment.set_payload(open(spreadsheet.filename, "rb").read())
encoders.encode_base64(attachment)
attachment.add_header('Content-Disposition', 'attachment', filename=spreadsheet.filename[5:])
msg.attach(attachment)
# Send the email using AWS SES
response = ses_client.send_raw_email(
RawMessage={'Data': msg.as_string()}
)