257 lines
12 KiB
Python
257 lines
12 KiB
Python
import requests, json, time, traceback, boto3, xlsxwriter, os
|
|
from threading import Lock
|
|
from datetime import datetime as dt
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email import encoders
|
|
from email.mime.base import MIMEBase
|
|
|
|
THINGSBOARD_URL = "https://hp.henrypump.cloud"
|
|
USERNAME = os.environ["username"]
|
|
PASSWORD = os.environ["password"]
|
|
CONFIG_PATH = './config.json'
|
|
|
|
# Define a rate limiter class
|
|
class RateLimiter:
|
|
def __init__(self, max_calls, period):
|
|
self.max_calls = max_calls
|
|
self.period = period
|
|
self.call_times = []
|
|
self.lock = Lock()
|
|
|
|
def acquire(self):
|
|
with self.lock:
|
|
current_time = time.time()
|
|
# Remove expired calls
|
|
self.call_times = [t for t in self.call_times if t > current_time - self.period]
|
|
if len(self.call_times) >= self.max_calls:
|
|
# Wait for the oldest call to expire
|
|
time_to_wait = self.period - (current_time - self.call_times[0])
|
|
time.sleep(time_to_wait)
|
|
# Register the current call
|
|
self.call_times.append(time.time())
|
|
|
|
# Initialize a rate limiter
|
|
RATE_LIMITER = RateLimiter(max_calls=20, period=1) # Adjust `max_calls` and `period` as needed
|
|
|
|
def sort_dict_keys(d):
|
|
"""Sorts the keys of all nested dictionaries in a given dictionary.
|
|
|
|
Args:
|
|
d: The input dictionary.
|
|
|
|
Returns:
|
|
A new dictionary with sorted keys at each level.
|
|
"""
|
|
sorted_d = {}
|
|
for k, v in d.items():
|
|
if isinstance(v, dict):
|
|
sorted_d[k] = sort_dict_keys(v)
|
|
else:
|
|
sorted_d[k] = v
|
|
return dict(sorted(sorted_d.items()))
|
|
|
|
# Authenticate to get the JWT token
|
|
def get_jwt_token():
|
|
url = f"{THINGSBOARD_URL}/api/auth/login"
|
|
payload = {"username": USERNAME, "password": PASSWORD}
|
|
response = requests.post(url, json=payload)
|
|
|
|
if response.status_code == 200:
|
|
return response.json().get("token")
|
|
else:
|
|
raise Exception(f"Authentication failed: {response.text}")
|
|
|
|
def get_all_customer_devices(jwt_token, customer_id):
|
|
"""Retrieve all devices for a customer, handling pagination."""
|
|
devices = []
|
|
page = 0
|
|
page_size = 100 # Adjust if needed
|
|
|
|
while True:
|
|
url = f"{THINGSBOARD_URL}/api/customer/{customer_id}/devices?pageSize={page_size}&page={page}"
|
|
headers = {"X-Authorization": f"Bearer {jwt_token}"}
|
|
RATE_LIMITER.acquire()
|
|
response = requests.get(url, headers=headers)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
devices.extend(data.get("data", [])) # Add devices from current page
|
|
|
|
if page >= data.get("totalPages", 1) - 1:
|
|
break # Exit loop if this is the last page
|
|
|
|
page += 1 # Move to next page
|
|
else:
|
|
raise Exception(f"Failed to get customer devices: {response.text}")
|
|
|
|
return devices
|
|
|
|
def get_timeseries_keys(jwt_token, device_id):
|
|
"""Retrieve available time-series keys (telemetry keys) for a given device ID."""
|
|
url = f"{THINGSBOARD_URL}/api/plugins/telemetry/DEVICE/{device_id}/keys/timeseries"
|
|
headers = {"X-Authorization": f"Bearer {jwt_token}"}
|
|
RATE_LIMITER.acquire()
|
|
response = requests.get(url, headers=headers)
|
|
|
|
if response.status_code == 200:
|
|
return response.json() # Returns a list of telemetry keys
|
|
else:
|
|
raise Exception(f"Failed to get timeseries keys: {response.text}")
|
|
|
|
def get_latest_telemetry(jwt_token, device_id, keys=None):
|
|
"""Retrieve the latest telemetry data for a given device ID.
|
|
|
|
Args:
|
|
jwt_token (str): The authentication token.
|
|
device_id (str): The ID of the device.
|
|
keys (list, optional): A list of telemetry keys to fetch. Defaults to None (fetch all).
|
|
|
|
Returns:
|
|
dict: The latest telemetry data for the device.
|
|
"""
|
|
if keys:
|
|
key_str = ",".join(keys) # Convert list to comma-separated string
|
|
url = f"{THINGSBOARD_URL}/api/plugins/telemetry/DEVICE/{device_id}/values/timeseries?keys={key_str}"
|
|
else:
|
|
url = f"{THINGSBOARD_URL}/api/plugins/telemetry/DEVICE/{device_id}/values/timeseries"
|
|
|
|
headers = {"X-Authorization": f"Bearer {jwt_token}"}
|
|
RATE_LIMITER.acquire()
|
|
response = requests.get(url, headers=headers)
|
|
|
|
if response.status_code == 200:
|
|
return response.json() # Returns a dictionary with keys and their latest values
|
|
else:
|
|
raise Exception(f"Failed to get latest telemetry: {response.text}")
|
|
|
|
def lambda_handler(event, context):
|
|
try:
|
|
token = get_jwt_token()
|
|
with open(CONFIG_PATH) as f:
|
|
config = json.load(f)
|
|
reportData = {}
|
|
reportToList = {}
|
|
# Loop through each item in config, each item represents a report
|
|
for report in config:
|
|
runThisReport = True
|
|
if report["period"] == "Monthly" and dt.now().day != 1:
|
|
runThisReport = False
|
|
if not report["emails"]:
|
|
runThisReport = False
|
|
if runThisReport:
|
|
reportToList[report["name"]] = report["emails"]
|
|
for customer in report["customers"].keys():
|
|
reportDeviceTypes = report["customers"][customer]["deviceTypes"]
|
|
devices = get_all_customer_devices(token, customer)
|
|
if report["filterDevicesIn"]:
|
|
devices = [device for device in devices if device["id"]["id"] in report["filterDevicesIn"]]
|
|
if report["filterDevicesOut"]:
|
|
devices = [device for device in devices if device["id"]["id"] not in report["filterDevicesOut"]]
|
|
if not reportData.get(report["name"], None):
|
|
reportData[report["name"]] = {}
|
|
for device in devices:
|
|
deviceId = device["id"]["id"]
|
|
deviceType = device["type"]
|
|
deviceName = device["name"]
|
|
sheetName = deviceType
|
|
for x in reportDeviceTypes:
|
|
if x["deviceType"] == deviceType:
|
|
sheetName = x["sheetName"]
|
|
for reportDeviceType in reportDeviceTypes:
|
|
if reportDeviceType["deviceType"] == deviceType:
|
|
keys = get_timeseries_keys(token, deviceId)
|
|
keys = list(filter(lambda x: x in reportDeviceType["dataPoints"], keys))
|
|
#Check for report customer
|
|
if not reportData[report["name"]].get(report["customers"][customer]["name"], None):
|
|
reportData[report["name"]][report["customers"][customer]["name"]] = {}
|
|
#Check for device type in config
|
|
if deviceType in list(map(lambda x: x["deviceType"], reportDeviceTypes)):
|
|
#Check if deviceType in report
|
|
if not reportData[report["name"]][report["customers"][customer]["name"]].get(sheetName, None):
|
|
reportData[report["name"]][report["customers"][customer]["name"]][sheetName] = {}
|
|
if keys:
|
|
deviceData = get_latest_telemetry(token, deviceId, keys)
|
|
for x in reportDeviceTypes:
|
|
if x["deviceType"] == deviceType:
|
|
labels = x["labels"]
|
|
labelled_data = {}
|
|
for k,v in labels.items():
|
|
labelled_data[v] = {}
|
|
for k,v in deviceData.items():
|
|
labelled_data[labels[k]] = v
|
|
reportData[report["name"]][report["customers"][customer]["name"]][sheetName][deviceName] = labelled_data
|
|
else:
|
|
reportData[report["name"]][report["customers"][customer]["name"]][sheetName][deviceName] = {}
|
|
#Sort Data
|
|
reportDataSorted = sort_dict_keys(reportData)
|
|
#print(json.dumps(reportDataSorted,indent=4))
|
|
except KeyError as ke:
|
|
print("KeyError:", ke)
|
|
traceback.print_exc()
|
|
except Exception as e:
|
|
print("Error:", e)
|
|
traceback.print_exc()
|
|
# Create an AWS SES client
|
|
ses_client = boto3.client('ses', region_name='us-east-1')
|
|
s3 = boto3.resource('s3')
|
|
BUCKET_NAME = "thingsboard-email-reports"
|
|
try:
|
|
# Create a workbook for each report
|
|
for report_name, report_data in reportDataSorted.items():
|
|
#will generate an email lower down
|
|
spreadsheets = []
|
|
# Create a worksheet for each company
|
|
for company_name, company_data in report_data.items():
|
|
workbook = xlsxwriter.Workbook(f"/tmp/{report_name}-{company_name}-{dt.today().strftime('%Y-%m-%d')}.xlsx",{'strings_to_numbers': True})
|
|
bold = workbook.add_format({'bold': True})
|
|
# Create a sheet for each device type
|
|
for device_type, device_data in company_data.items():
|
|
worksheet = workbook.add_worksheet(device_type)
|
|
|
|
# Set the header column with device types
|
|
device_names = list(device_data.keys())
|
|
worksheet.write_column(1, 0, device_names,bold)
|
|
|
|
# Write the data to the sheet
|
|
for i, (telemetry_name, telemetry_data) in enumerate(device_data.items()):
|
|
# Set the header row with telemetry names
|
|
telemetry_names = list(telemetry_data.keys())
|
|
worksheet.write_row(0, 1, telemetry_names, bold)
|
|
for j, (data_name, data) in enumerate(telemetry_data.items()):
|
|
values = [d["value"] for d in data]
|
|
worksheet.write_row(i + 1, j+ 1, values)
|
|
worksheet.autofit()
|
|
workbook.close()
|
|
spreadsheets.append(workbook)
|
|
|
|
# Store the generated report in S3.
|
|
s3.Object(BUCKET_NAME, f'{report_name}-{company_name}-{dt.today().strftime('%Y-%m-%d')}.xlsx').put(Body=open(f"/tmp/{report_name}-{company_name}-{dt.today().strftime('%Y-%m-%d')}.xlsx", 'rb'))
|
|
if reportToList[report_name]:
|
|
# Create an email message
|
|
msg = MIMEMultipart()
|
|
msg['Subject'] = report_name
|
|
msg['From'] = 'alerts@henry-pump.com'
|
|
msg['To'] = ", ".join(reportToList[report_name])
|
|
|
|
# Add a text body to the message (optional)
|
|
body_text = 'Please find the attached spreadsheets.'
|
|
msg.attach(MIMEText(body_text, 'plain'))
|
|
|
|
# Attach each workbook in the spreadsheets array
|
|
for spreadsheet in spreadsheets:
|
|
# Attach the file to the email message
|
|
attachment = MIMEBase('application', 'octet-stream')
|
|
attachment.set_payload(open(spreadsheet.filename, "rb").read())
|
|
encoders.encode_base64(attachment)
|
|
attachment.add_header('Content-Disposition', 'attachment', filename=spreadsheet.filename[5:])
|
|
|
|
msg.attach(attachment)
|
|
# Send the email using AWS SES
|
|
response = ses_client.send_raw_email(
|
|
|
|
RawMessage={'Data': msg.as_string()}
|
|
)
|
|
except Exception as e:
|
|
print(f"Something went wrong: {e}")
|
|
traceback.print_exc() |