411 lines
19 KiB
Python
411 lines
19 KiB
Python
import logging, boto3, pytz, os, shutil
|
|
from openpyxl.utils.datetime import to_excel
|
|
import pandas as pd
|
|
from datetime import datetime as dt
|
|
from datetime import timedelta as td
|
|
import datetime as dtf
|
|
from tb_rest_client.rest_client_ce import *
|
|
from tb_rest_client.rest import ApiException
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email import encoders
|
|
from email.mime.base import MIMEBase
|
|
from itertools import product
|
|
|
|
logging.basicConfig(level=logging.DEBUG,
|
|
format='%(asctime)s - %(levelname)s - %(module)s - %(lineno)d - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S')
|
|
|
|
|
|
# ThingsBoard REST API URL
|
|
url = "https://www.enxlekkocloud.com" #"https://hp.henrypump.cloud"
|
|
|
|
|
|
def getDevices(rest_client, customers,target_customer, page=0, pageSize=500):
|
|
try:
|
|
cid = ""
|
|
for c in customers.data:
|
|
if c.name == target_customer:
|
|
cid = c.id.id
|
|
devices = rest_client.get_customer_devices(customer_id=cid, page_size=pageSize, page=page)
|
|
return devices #.to_dict()
|
|
return PageDataDevice()
|
|
except Exception as e:
|
|
logging.error(f"Error occured in getDevices: {e}")
|
|
return PageDataDevice()
|
|
|
|
|
|
|
|
def getDeviceKeys(rest_client, devices,target_device):
|
|
try:
|
|
for d in devices.data:
|
|
if d.name == target_device:
|
|
device = d
|
|
keys = rest_client.get_timeseries_keys_v1(d.id)
|
|
return device, keys, None
|
|
return None, None,"Device Not Found"
|
|
except Exception as e:
|
|
logging.error("Something went wrong in getDeviceKeys")
|
|
logging.error(e)
|
|
return (None, None, e)
|
|
|
|
|
|
def getTelemetry(rest_client, device, keys, start_ts, end_ts,limit):
|
|
try:
|
|
return rest_client.get_timeseries(entity_id=device.id, keys=keys, start_ts=start_ts, end_ts=end_ts, limit=limit) #entity_type=entity_type,
|
|
except Exception as e:
|
|
logging.error("Something went wrong in getTelemetry")
|
|
logging.error(e)
|
|
return False
|
|
|
|
def getLatestTelemetry(rest_client, device, keys):
|
|
try:
|
|
latest = rest_client.get_latest_timeseries(entity_id=device.id, keys=keys)
|
|
return latest
|
|
except Exception as e:
|
|
logging.error("Something went wrong in getTelemetry")
|
|
logging.error(e)
|
|
return False
|
|
|
|
|
|
def getTime(timeRequest):
|
|
start_ts, end_ts = 0,0
|
|
if timeRequest["type"] == "last":
|
|
now = dt.now()
|
|
delta = td(days=timeRequest["days"], seconds=timeRequest["seconds"], microseconds=timeRequest["microseconds"], milliseconds=timeRequest["milliseconds"], minutes=timeRequest["minutes"], hours=timeRequest["hours"], weeks=timeRequest["weeks"])
|
|
start_ts = str(int(dt.timestamp(now - delta) * 1000))
|
|
end_ts = str(int(dt.timestamp(now) * 1000))
|
|
elif timeRequest["type"] == "midnight-midnight":
|
|
timezone = pytz.timezone(timeRequest["timezone"])
|
|
today = dtf.date.today()
|
|
yesterday_midnight = dtf.datetime.combine(today - dtf.timedelta(days=1), dtf.time())
|
|
today_midnight = dtf.datetime.combine(today, dtf.time())
|
|
yesterday_midnight = timezone.localize(yesterday_midnight)
|
|
today_midnight = timezone.localize(today_midnight)
|
|
start_ts = int(yesterday_midnight.timestamp()) * 1000
|
|
end_ts = int(today_midnight.timestamp()) * 1000
|
|
elif timeRequest["type"] == "range":
|
|
start_ts = timeRequest["ts_start"]
|
|
end_ts = timeRequest["ts_end"]
|
|
return (start_ts, end_ts)
|
|
|
|
|
|
def getThingsBoardData(url, username, password, targetCustomer, timeRequest):
|
|
# Creating the REST client object with context manager to get auto token refresh
|
|
with RestClientCE(base_url=url) as rest_client:
|
|
try:
|
|
# Auth with credentials
|
|
rest_client.login(username=username, password=password)
|
|
# Get customers > get devices under a target customer > get keys for devices > get data for devices
|
|
customers = rest_client.get_customers(page_size="500", page="0")
|
|
devices = getDevices(rest_client=rest_client, customers=customers, target_customer=targetCustomer)
|
|
telemetry = {}
|
|
if devices.data:
|
|
for d in devices.data:
|
|
#print(d.name)
|
|
device, keys, err = getDeviceKeys(rest_client=rest_client, devices=devices, target_device=d.name)
|
|
start_ts, end_ts = getTime(timeRequest)
|
|
#print(keys)
|
|
telemetry[d.name] = getTelemetry(rest_client=rest_client, device=device, keys=','.join(keys), start_ts=start_ts, end_ts=end_ts, limit=25000)
|
|
manual_samples = getTelemetry(rest_client=rest_client, device=device, keys=','.join(["manual_sample_value", "manual_sample_time", "manual_sample_lab", "manual_sample_datapoint", "manual_sample_location", 'manual_bag_filter_changes', 'manual_cartridge_filter_changes', 'manual_clean_water_sold_per_job', 'manual_coagulant_on_hand', 'manual_diverted_water_time', 'manual_equipment_description', 'manual_equipment_time', 'manual_h202_on_hand', 'manual_issues_concerns',
|
|
'manual_next_pigging_scheduled', 'manual_skim_oil_discharged_per_job', 'manual_standby_time', 'manual_unit_uptime', 'manual_upright_tank_issues', 'manual_vac_truck_batches', 'manual_water_events', 'manual_water_events_time', 'manual_water_to_tanks_time', "manual_well", "manual_tfs_lead", "manual_copa_lead", "manual_job_name", "manual_summary", "manual_hse_spills", "manual_quality_issues"]), start_ts=start_ts, end_ts=end_ts + 2*60*60*1000, limit=25000)
|
|
latest = getLatestTelemetry(rest_client=rest_client, device=device, keys="manual_well,manual_tfs_lead,manual_copa_lead,manual_job_name")
|
|
telemetry[d.name].update(manual_samples)
|
|
telemetry[d.name].update(latest)
|
|
return telemetry
|
|
return {}
|
|
except ApiException as e:
|
|
logging.error(e)
|
|
return False
|
|
|
|
def getMaxWidth():
|
|
label_mapping = {
|
|
"Lit 116b Level": "WASTE TANK 1",
|
|
"Lit 116a Level": "WASTE TANK 2",
|
|
"Fit 100 Flow Rate": "INLET FLOW RATE",
|
|
"Fit 109b Flow Rate": "SALES FLOW RATE",
|
|
"Outlet Turbidity Temp": "OUTLET TURBIDITY TEMP",
|
|
"Outlet Orp Temp": "OUTLET ORP TEMP",
|
|
"Inlet Turbidity Temp": "INLET TURBIDITY TEMP",
|
|
"Inlet Ph Temp": "INLET PH TEMP",
|
|
"Ait 102b H2s": "INLET H₂S",
|
|
"At 109b H2s": "OUTLET H₂S",
|
|
"Coriolis Density": "OUTLET OIL IN WATER",
|
|
"Ait 102a Turbitity": "INLET TURBIDITY",
|
|
"At 109a Turbidity": "OUTLET TURBIDITY",
|
|
"At 109e Orp": "OUTLET ORP"
|
|
}
|
|
width = 0
|
|
for key,value in label_mapping.items():
|
|
if(len(value) > width):
|
|
width = len(value)
|
|
|
|
return width
|
|
|
|
|
|
def formatColumnName(telemetryName):
|
|
name = " ".join([x.capitalize() for x in telemetryName.split("_")])
|
|
label_mapping = {
|
|
"Lit 116b Level": "WASTE TANK 1",
|
|
"Lit 116a Level": "WASTE TANK 2",
|
|
"Fit 100 Flow Rate": "INLET FLOW RATE",
|
|
"Fit 109b Flow Rate": "SALES FLOW RATE",
|
|
"Outlet Turbidity Temp": "OUTLET TURBIDITY TEMP",
|
|
"Outlet Orp Temp": "OUTLET ORP TEMP",
|
|
"Inlet Turbidity Temp": "INLET TURBIDITY TEMP",
|
|
"Inlet Ph Temp": "INLET PH TEMP",
|
|
"Ait 102b H2s": "INLET H₂S",
|
|
"At 109b H2s": "OUTLET H₂S",
|
|
"At 109c Oil In Water": "OUTLET OIW",
|
|
"Ait 102a Turbitity": "INLET TURBIDITY",
|
|
"At 109a Turbidity": "OUTLET TURBIDITY",
|
|
"At 109e Orp": "OUTLET ORP",
|
|
"Coriolis Density": "INLET DENSITY",
|
|
"Fit 106b Flow Rate": "SKIM FLOW RATE",
|
|
"Manual Sample Time": "manual_sample_time",
|
|
"Outlet Ph": "Outlet Ph"
|
|
}
|
|
return label_mapping.get(name, telemetryName)
|
|
|
|
|
|
def formatChartName(telemetryName):
|
|
return " ".join([x.upper() for x in telemetryName.split("_")])
|
|
|
|
|
|
def process_dataframe(telemetry, keys, time, special_handling=None, latest_only=False, all_keys=False):
|
|
df = pd.DataFrame()
|
|
|
|
# If latest_only is True, ensure missing keys are initialized
|
|
if all_keys:
|
|
now = dt.timestamp(dt.now()) * 1000
|
|
for key in keys:
|
|
if key not in telemetry:
|
|
if key in special_handling["string"]:
|
|
telemetry[key] = [{'ts': now, 'value': ''}]
|
|
else:
|
|
telemetry[key] = [{'ts': now, 'value': ''}]
|
|
|
|
for datapoint in telemetry.keys():
|
|
if datapoint in keys:
|
|
temp_df = pd.DataFrame(telemetry[datapoint])
|
|
temp_df['ts'] = pd.to_datetime(temp_df['ts'], unit='ms').dt.tz_localize('UTC').dt.tz_convert(time["timezone"]).dt.tz_localize(None)
|
|
temp_df.set_index('ts', inplace=True)
|
|
|
|
if special_handling and datapoint in special_handling.get("datetime", []):
|
|
temp_df["value"] = pd.to_datetime(temp_df['value'], unit='ms').dt.tz_localize('UTC').dt.tz_convert(time["timezone"]).dt.tz_localize(None)
|
|
elif special_handling and datapoint in special_handling.get("string", []):
|
|
temp_df["value"] = temp_df["value"].astype(str)
|
|
else:
|
|
temp_df["value"] = pd.to_numeric(temp_df["value"], errors="coerce")
|
|
|
|
temp_df.rename(columns={'value': formatColumnName(datapoint)}, inplace=True)
|
|
df = df.join(temp_df, how='outer')
|
|
|
|
if latest_only:
|
|
latest_values = df.apply(lambda x: x.dropna().iloc[-1] if not x.dropna().empty else None)
|
|
df = pd.DataFrame([latest_values])
|
|
|
|
df = df.reindex(sorted(df.columns), axis=1)
|
|
df.rename_axis('Date', inplace=True)
|
|
|
|
return df
|
|
|
|
# Usage
|
|
def getDataFrame(telemetry, keys, time):
|
|
return process_dataframe(telemetry, keys, time)
|
|
|
|
def getManualDataFrame(telemetry, keys, time):
|
|
return process_dataframe(
|
|
telemetry, keys, time,
|
|
special_handling={
|
|
"datetime": ["manual_next_pigging_scheduled"],
|
|
"string": ["manual_equipment_description", "manual_issues_concerns", "manual_well", "manual_tfs_lead", "manual_copa_lead", "manual_job_name", "manual_summary", "manual_hse_spills", "manual_quality_issues"]
|
|
},
|
|
latest_only=True,
|
|
all_keys = True
|
|
)
|
|
|
|
def getSampleDataFrame(telemetry, keys, time):
|
|
return process_dataframe(
|
|
telemetry, keys, time,
|
|
special_handling={
|
|
"datetime": ["manual_sample_time"],
|
|
"string": ["manual_sample_datapoint", "manual_sample_lab", "manual_sample_location"]
|
|
},
|
|
all_keys = True
|
|
)
|
|
|
|
def get_last_data_row(ws):
|
|
# Start from the bottom row and work up to find the last row with data
|
|
for row in range(ws.max_row, 0, -1):
|
|
if any(cell.value is not None for cell in ws[row]):
|
|
return row
|
|
return 0 # If no data is found, return 0
|
|
|
|
def transform_sample_telemetry(telemetry):
|
|
transformed_telemetry = {}
|
|
|
|
# Define mapping for Lab and Location
|
|
lab_map = {"Thunderbird Lab": "T", "ConocoPhillips Lab": "C"}
|
|
location_map = {"Inlet": "I", "Outlet": "O"}
|
|
|
|
# Extract relevant sample keys
|
|
times = {entry["ts"]: entry["value"] for entry in telemetry.get("manual_sample_time", [])}
|
|
labs = {entry["ts"]: entry["value"] for entry in telemetry.get("manual_sample_lab", [])}
|
|
locations = {entry["ts"]: entry["value"] for entry in telemetry.get("manual_sample_location", [])}
|
|
datapoints = {entry["ts"]: entry["value"] for entry in telemetry.get("manual_sample_datapoint", [])}
|
|
values = {entry["ts"]: entry["value"] for entry in telemetry.get("manual_sample_value", [])}
|
|
|
|
# Iterate through available timestamps
|
|
for ts in values.keys():
|
|
lab = lab_map.get(labs.get(ts, ""), "Unknown")
|
|
location = location_map.get(locations.get(ts, ""), "Unknown")
|
|
datapoint = datapoints.get(ts, "Unknown")
|
|
|
|
new_name = f"{lab}-{location}-{datapoint}"
|
|
|
|
# Initialize if not exists
|
|
if new_name not in transformed_telemetry:
|
|
transformed_telemetry[new_name] = []
|
|
|
|
transformed_telemetry[new_name].append({"ts": ts, "value": values[ts]})
|
|
|
|
# Include manual_sample_time separately for indexing
|
|
transformed_telemetry["manual_sample_time"] = [{"ts": ts, "value": times[ts]} for ts in times]
|
|
if transformed_telemetry["manual_sample_time"]:
|
|
return transformed_telemetry
|
|
else:
|
|
return {}
|
|
|
|
def getSampleKeys():
|
|
# Ensure all possible column combinations exist
|
|
unique_labs = ['C', 'T']
|
|
unique_locations = ['I', 'O']
|
|
unique_datapoints = ['H2S', 'Density', 'ORP', 'O2', 'H2O2', 'pH', 'Turbidity', 'OIW']
|
|
all_possible_columns = [
|
|
f"{lab}-{loc}-{dp}" for lab, loc, dp in product(unique_labs, unique_locations, unique_datapoints)
|
|
]
|
|
all_possible_columns.append("manual_sample_time")
|
|
return all_possible_columns
|
|
|
|
def getSampleData(telemetry, device, keys):
|
|
sample_data = {}
|
|
for key in keys:
|
|
temp = telemetry[device].get(key,None)
|
|
if temp:
|
|
sample_data[key] = temp
|
|
return sample_data
|
|
|
|
def lambda_handler(event, context):
|
|
url = "https://www.enxlekkocloud.com"
|
|
|
|
time = {
|
|
"type": "midnight-midnight",
|
|
"timezone": "US/Alaska"
|
|
}
|
|
|
|
telemetry = getThingsBoardData(url, os.environ["username"], os.environ["password"], "Thunderbird Field Services", time)
|
|
|
|
if telemetry.get("ACW #1", None):
|
|
shutil.copyfile('./ACW Daily Report Template.xlsx', f"/tmp/Thunderbird_{dt.today().strftime('%Y-%m-%d')}.xlsx")
|
|
# Create a Pandas Excel writer using XlsxWriter as the engine.
|
|
writer = pd.ExcelWriter(
|
|
f"/tmp/Thunderbird_{dt.today().strftime('%Y-%m-%d')}.xlsx",
|
|
engine="openpyxl",
|
|
datetime_format="yyyy-mm-dd hh:mm:ss",
|
|
date_format="yyyy-mm-dd",
|
|
#engine_kwargs={'options': {'strings_to_numbers': True}}),
|
|
mode="a",
|
|
if_sheet_exists="overlay"
|
|
)
|
|
|
|
reportsheet = writer.book.worksheets[0]
|
|
keys = ['ait_102a_turbitity','ait_102b_h2s', 'at_109a_turbidity', 'at_109b_h2s', 'at_109c_oil_in_water', 'at_109e_orp', 'fit_100_flow_rate', 'fit_109b_flow_rate', 'lit_116b_level', 'lit_116a_level', 'outlet_turbidity_temp', 'outlet_orp_temp', 'inlet_turbidity_temp', 'inlet_ph_temp', 'coriolis_density','outlet_ph','fit_106b_flow_rate','system_run']
|
|
manual_keys = ['manual_bag_filter_changes', 'manual_cartridge_filter_changes', 'manual_clean_water_sold_per_job', 'manual_coagulant_on_hand', 'manual_diverted_water_time', 'manual_equipment_description', 'manual_equipment_time', 'manual_h202_on_hand', 'manual_issues_concerns', 'manual_next_pigging_scheduled', 'manual_skim_oil_discharged_per_job', 'manual_standby_time', 'manual_unit_uptime', 'manual_upright_tank_issues', 'manual_vac_truck_batches', 'manual_water_events', 'manual_water_events_time', 'manual_water_to_tanks_time',"manual_well", "manual_tfs_lead", "manual_copa_lead", "manual_job_name", "manual_summary", "manual_hse_spills", "manual_quality_issues"]
|
|
sample_keys = ['manual_sample_datapoint', 'manual_sample_lab', 'manual_sample_location', 'manual_sample_time', 'manual_sample_value']
|
|
#Create a Sheet for each Device
|
|
for device in telemetry.keys():
|
|
df = getDataFrame(telemetry[device], keys, time)
|
|
dfm = getManualDataFrame(telemetry[device], manual_keys, time)
|
|
processed_sample_keys = getSampleKeys()
|
|
sample_data = transform_sample_telemetry(getSampleData(telemetry=telemetry,device=device,keys=sample_keys))
|
|
dfs = getSampleDataFrame(sample_data, processed_sample_keys, time)
|
|
# Write the dataframe data to XlsxWriter. Turn off the default header and
|
|
# index and skip one row to allow us to insert a user defined header.
|
|
df.to_excel(writer, sheet_name=device, startrow=0, header=True, index=True, float_format="%.2f")
|
|
dfm.to_excel(writer, sheet_name=device+" Manual Entry", startrow=0, header=True, index=True, float_format="%.2f")
|
|
dfs.to_excel(writer, sheet_name=device+" Manual Samples", startrow=0, header=True, index=True, float_format="%.2f")
|
|
# Get the xlsxwriter workbook and worksheet objects.
|
|
workbook = writer.book
|
|
worksheet = writer.sheets[device]
|
|
for row in worksheet.iter_rows(min_row=2, max_col=1):
|
|
for cell in row:
|
|
cell.number_format = 'yyyy-mm-dd hh:mm:ss'
|
|
|
|
#Getting the data sheet for ACW #1 to access date range actually available
|
|
datasheet = writer.book.worksheets[1]
|
|
datetime_min = datasheet["A2"].value
|
|
last_data_row = get_last_data_row(datasheet)
|
|
datetime_max = datasheet[f"A{last_data_row}"].value
|
|
#Convert to excel number
|
|
datetime_min = to_excel(datetime_min)
|
|
datetime_max = round(to_excel(datetime_max))
|
|
for chart in reportsheet._charts:
|
|
#Change the range of the chart
|
|
#chart = reportsheet._charts[0]
|
|
chart.x_axis.scaling.min = datetime_min
|
|
chart.x_axis.scaling.max = datetime_max
|
|
chart.x_axis.number_format = 'hh:mm'
|
|
reportsheet["B4"].value = dt.fromtimestamp(getTime(time)[0]/1000).strftime('%m/%d/%Y')
|
|
"""
|
|
Just a reminder of how to manipulate a single cell
|
|
reportsheet["B5"] = "Test Well Name"
|
|
"""
|
|
# Close the Pandas Excel writer and output the Excel file.
|
|
writer.close()
|
|
|
|
# Create an AWS SES client
|
|
ses_client = boto3.client('ses', region_name='us-east-1')
|
|
s3 = boto3.resource('s3')
|
|
BUCKET_NAME = "thingsboard-email-reports"
|
|
s3.Object(BUCKET_NAME, f"Thunderbird_{dt.today().strftime('%Y-%m-%d')}.xlsx").put(Body=open(f"/tmp/Thunderbird_{dt.today().strftime('%Y-%m-%d')}.xlsx", 'rb'))
|
|
# Create a new SES resource
|
|
# Create an email message
|
|
emails = [
|
|
"dvaught@thunderbirdfs.com",
|
|
"rkamper@thunderbirdfs.com",
|
|
"john.griffin@acaciaes.com",
|
|
"Bruce@enxl.us",
|
|
"Joshua.Fine@fineelectricalservices2018.com",
|
|
"choice.luster@thunderbirdfs.com",
|
|
"rvaught@thunderbirdfs.com",
|
|
"sterling.smith@enxl.us",
|
|
"hudson.fields@thunderbirdfs.com"
|
|
]
|
|
msg = MIMEMultipart()
|
|
msg['Subject'] = "Thunderbird Field Services"
|
|
msg['From'] = 'alerts@henry-pump.com'
|
|
msg['To'] = ", ".join(emails)
|
|
|
|
# Add a text body to the message (optional)
|
|
body_text = 'Please find the attached spreadsheet(s).'
|
|
msg.attach(MIMEText(body_text, 'plain'))
|
|
|
|
|
|
# Attach the file to the email message
|
|
attachment = MIMEBase('application', 'octet-stream')
|
|
attachment.set_payload(open(f"/tmp/Thunderbird_{dt.today().strftime('%Y-%m-%d')}.xlsx", "rb").read())
|
|
encoders.encode_base64(attachment)
|
|
attachment.add_header('Content-Disposition', 'attachment', filename=f"Thunderbird_{dt.today().strftime('%Y-%m-%d')}.xlsx")
|
|
msg.attach(attachment)
|
|
|
|
# Send the email using AWS SES
|
|
response = ses_client.send_raw_email(
|
|
|
|
RawMessage={'Data': msg.as_string()}
|
|
)
|
|
logging.info(response)
|
|
else:
|
|
logging.info("There was no data to collect not sending report")
|
|
|