connected manual samples to graphs

This commit is contained in:
Nico Melone
2025-02-28 18:43:39 -06:00
parent ce584f87aa
commit 0825f71023
7 changed files with 378 additions and 71 deletions

File diff suppressed because one or more lines are too long

View File

@@ -5,7 +5,7 @@ codeuri = "/Users/nico/Documents/GitHub/ThingsBoard/EKKO Reports/thunderbirdfs-d
runtime = "python3.9"
architecture = "x86_64"
handler = "thunderbirdfsreport.lambda_handler"
source_hash = "96f09690c748fbb53cc41f4396ed0d83dfb501a3cb2710f29c4149645fc9c9fb"
source_hash = "34ee8699205f989b1c9503c33660f39caa456af608fb961c68f13ce0fdacf18c"
manifest_hash = ""
packagetype = "Zip"
functions = ["ThunderbirdFSReport"]

View File

@@ -10,7 +10,7 @@ from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email import encoders
from email.mime.base import MIMEBase
from itertools import product
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(module)s - %(lineno)d - %(message)s',
@@ -50,7 +50,14 @@ def getTelemetry(rest_client, device, keys, start_ts, end_ts,limit):
logging.error("Something went wrong in getTelemetry")
logging.error(e)
return False
def getLatestTelemetry(rest_client, device, keys):
try:
return rest_client.get_latest_timeseries(entity_id=device.id, keys=keys) #entity_type=entity_type,
except Exception as e:
logging.error("Something went wrong in getTelemetry")
logging.error(e)
return False
def getTime(timeRequest):
@@ -91,12 +98,33 @@ def getThingsBoardData(url, username, password, targetCustomer, timeRequest):
start_ts, end_ts = getTime(timeRequest)
#print(keys)
telemetry[d.name] = getTelemetry(rest_client=rest_client, device=device, keys=','.join(keys), start_ts=start_ts, end_ts=end_ts, limit=25000)
latest = getLatestTelemetry(rest_client=rest_client, device=device, keys="manual_well,manual_tfs_lead,manual_copa_lead,manual_job_name")
telemetry[d.name].update(latest)
return telemetry
except ApiException as e:
logging.error(e)
return False
def getThingsBoardData(url, username, password, targetCustomer, timeRequest):
# Creating the REST client object with context manager to get auto token refresh
with RestClientCE(base_url=url) as rest_client:
try:
# Auth with credentials
rest_client.login(username=username, password=password)
# Get customers > get devices under a target customer > get keys for devices > get data for devices
customers = rest_client.get_customers(page_size="100", page="0")
devices = getDevices(rest_client=rest_client, customers=customers, target_customer=targetCustomer)
telemetry = {}
for d in devices.data:
#print(d.name)
device, keys, err = getDeviceKeys(rest_client=rest_client, devices=devices, target_device=d.name)
start_ts, end_ts = getTime(timeRequest)
#print(keys)
telemetry[d.name] = getTelemetry(rest_client=rest_client, device=device, keys=','.join(keys), start_ts=start_ts, end_ts=end_ts, limit=25000)
return telemetry
except ApiException as e:
logging.error(e)
return False
def getMaxWidth():
label_mapping = {
@@ -136,27 +164,34 @@ def formatColumnName(telemetryName):
"Inlet Ph Temp": "INLET PH TEMP",
"Ait 102b H2s": "INLET H₂S",
"At 109b H2s": "OUTLET H₂S",
"At 109c Oil In Water": "OUTLET DENSITY",
"At 109c Oil In Water": "OUTLET OIW",
"Ait 102a Turbitity": "INLET TURBIDITY",
"At 109a Turbidity": "OUTLET TURBIDITY",
"At 109e Orp": "OUTLET ORP",
"Ait 102d Oil In Water": "INLET DENSITY"
"Ait 102d Oil In Water": "INLET DENSITY",
"Fit 106b Flow Rate": "SKIM FLOW RATE",
"Manual Sample Time": "manual_sample_time",
"Outlet Ph": "Outlet Ph"
}
return label_mapping.get(name, name)
return label_mapping.get(name, telemetryName)
def formatChartName(telemetryName):
return " ".join([x.upper() for x in telemetryName.split("_")])
def process_dataframe(telemetry, keys, time, special_handling=None, latest_only=False):
def process_dataframe(telemetry, keys, time, special_handling=None, latest_only=False, all_keys=False):
df = pd.DataFrame()
# If latest_only is True, ensure missing keys are initialized
if latest_only:
if all_keys:
now = dt.timestamp(dt.now())
for key in keys:
if key not in telemetry:
telemetry[key] = [{'ts': dt.timestamp(dt.now()), 'value': '0'}]
if key in special_handling["string"]:
telemetry[key] = [{'ts': now, 'value': ''}]
else:
telemetry[key] = [{'ts': now, 'value': ''}]
for datapoint in telemetry.keys():
if datapoint in keys:
@@ -192,9 +227,10 @@ def getManualDataFrame(telemetry, keys, time):
telemetry, keys, time,
special_handling={
"datetime": ["manual_next_pigging_scheduled"],
"string": ["manual_equipment_description", "manual_issues_concerns"]
"string": ["manual_equipment_description", "manual_issues_concerns", "manual_well", "manual_tfs_lead", "manual_copa_lead", "manual_job_name", "manual_summary", "manual_hse_spills", "manual_quality_issues"]
},
latest_only=True
latest_only=True,
all_keys = True
)
def getSampleDataFrame(telemetry, keys, time):
@@ -203,11 +239,10 @@ def getSampleDataFrame(telemetry, keys, time):
special_handling={
"datetime": ["manual_sample_time"],
"string": ["manual_sample_datapoint", "manual_sample_lab", "manual_sample_location"]
}
},
all_keys = True
)
def get_last_data_row(ws):
# Start from the bottom row and work up to find the last row with data
for row in range(ws.max_row, 0, -1):
@@ -215,6 +250,60 @@ def get_last_data_row(ws):
return row
return 0 # If no data is found, return 0
def transform_sample_telemetry(telemetry):
transformed_telemetry = {}
# Define mapping for Lab and Location
lab_map = {"Thunderbird Lab": "T", "ConocoPhillips Lab": "C"}
location_map = {"Inlet": "I", "Outlet": "O"}
# Extract relevant sample keys
times = {entry["ts"]: entry["value"] for entry in telemetry.get("manual_sample_time", [])}
labs = {entry["ts"]: entry["value"] for entry in telemetry.get("manual_sample_lab", [])}
locations = {entry["ts"]: entry["value"] for entry in telemetry.get("manual_sample_location", [])}
datapoints = {entry["ts"]: entry["value"] for entry in telemetry.get("manual_sample_datapoint", [])}
values = {entry["ts"]: entry["value"] for entry in telemetry.get("manual_sample_value", [])}
# Iterate through available timestamps
for ts in values.keys():
lab = lab_map.get(labs.get(ts, ""), "Unknown")
location = location_map.get(locations.get(ts, ""), "Unknown")
datapoint = datapoints.get(ts, "Unknown")
new_name = f"{lab}-{location}-{datapoint}"
# Initialize if not exists
if new_name not in transformed_telemetry:
transformed_telemetry[new_name] = []
transformed_telemetry[new_name].append({"ts": ts, "value": values[ts]})
# Include manual_sample_time separately for indexing
transformed_telemetry["manual_sample_time"] = [{"ts": ts, "value": times[ts]} for ts in times]
if transformed_telemetry["manual_sample_time"]:
return transformed_telemetry
else:
return {}
def getSampleKeys():
# Ensure all possible column combinations exist
unique_labs = ['C', 'T']
unique_locations = ['I', 'O']
unique_datapoints = ['H2S', 'Density', 'ORP', 'O2', 'H2O2', 'pH', 'Turbidity', 'OIW']
all_possible_columns = [
f"{lab}-{loc}-{dp}" for lab, loc, dp in product(unique_labs, unique_locations, unique_datapoints)
]
all_possible_columns.append("manual_sample_time")
return all_possible_columns
def getSampleData(telemetry, device, keys):
sample_data = {}
for key in keys:
temp = telemetry[device].get(key,None)
if temp:
sample_data[key] = temp
return sample_data
def lambda_handler(event, context):
url = "https://www.enxlekkocloud.com"
@@ -239,14 +328,16 @@ def lambda_handler(event, context):
)
reportsheet = writer.book.worksheets[0]
keys = ['ait_102a_turbitity','ait_102b_h2s', 'at_109a_turbidity', 'at_109b_h2s', 'at_109c_oil_in_water', 'at_109e_orp', 'fit_100_flow_rate', 'fit_109b_flow_rate', 'lit_116b_level', 'lit_116a_level', 'outlet_turbidity_temp', 'outlet_orp_temp', 'inlet_turbidity_temp', 'inlet_ph_temp', 'ait_102d_oil_in_water','outlet_ph']
manual_keys = ['manual_bag_filter_changes', 'manual_cartridge_filter_changes', 'manual_clean_water_sold_per_job', 'manual_coagulant_on_hand', 'manual_diverted_water_time', 'manual_equipment_description', 'manual_equipment_time', 'manual_h202_on_hand', 'manual_issues_concerns', 'manual_next_pigging_scheduled', 'manual_skim_oil_discharged_per_job', 'manual_standby_time', 'manual_unit_uptime', 'manual_upright_tank_issues', 'manual_vac_truck_batches', 'manual_water_events', 'manual_water_events_time', 'manual_water_to_tanks_time']
keys = ['ait_102a_turbitity','ait_102b_h2s', 'at_109a_turbidity', 'at_109b_h2s', 'at_109c_oil_in_water', 'at_109e_orp', 'fit_100_flow_rate', 'fit_109b_flow_rate', 'lit_116b_level', 'lit_116a_level', 'outlet_turbidity_temp', 'outlet_orp_temp', 'inlet_turbidity_temp', 'inlet_ph_temp', 'ait_102d_oil_in_water','outlet_ph','fit_106b_flow_rate']
manual_keys = ['manual_bag_filter_changes', 'manual_cartridge_filter_changes', 'manual_clean_water_sold_per_job', 'manual_coagulant_on_hand', 'manual_diverted_water_time', 'manual_equipment_description', 'manual_equipment_time', 'manual_h202_on_hand', 'manual_issues_concerns', 'manual_next_pigging_scheduled', 'manual_skim_oil_discharged_per_job', 'manual_standby_time', 'manual_unit_uptime', 'manual_upright_tank_issues', 'manual_vac_truck_batches', 'manual_water_events', 'manual_water_events_time', 'manual_water_to_tanks_time',"manual_well", "manual_tfs_lead", "manual_copa_lead", "manual_job_name", "manual_summary", "manual_hse_spills", "manual_quality_issues"]
sample_keys = ['manual_sample_datapoint', 'manual_sample_lab', 'manual_sample_location', 'manual_sample_time', 'manual_sample_value']
#Create a Sheet for each Device
for device in telemetry.keys():
df = getDataFrame(telemetry[device], keys, time)
dfm = getManualDataFrame(telemetry[device], manual_keys, time)
dfs = getSampleDataFrame(telemetry[device], sample_keys, time)
processed_sample_keys = getSampleKeys()
sample_data = transform_sample_telemetry(getSampleData(telemetry=telemetry,device=device,keys=sample_keys))
dfs = getSampleDataFrame(sample_data, processed_sample_keys, time)
# Write the dataframe data to XlsxWriter. Turn off the default header and
# index and skip one row to allow us to insert a user defined header.
df.to_excel(writer, sheet_name=device, startrow=0, header=True, index=True, float_format="%.2f")

View File

@@ -10,7 +10,7 @@ from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email import encoders
from email.mime.base import MIMEBase
from itertools import product
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(module)s - %(lineno)d - %(message)s',
@@ -50,7 +50,14 @@ def getTelemetry(rest_client, device, keys, start_ts, end_ts,limit):
logging.error("Something went wrong in getTelemetry")
logging.error(e)
return False
def getLatestTelemetry(rest_client, device, keys):
try:
return rest_client.get_latest_timeseries(entity_id=device.id, keys=keys) #entity_type=entity_type,
except Exception as e:
logging.error("Something went wrong in getTelemetry")
logging.error(e)
return False
def getTime(timeRequest):
@@ -91,12 +98,33 @@ def getThingsBoardData(url, username, password, targetCustomer, timeRequest):
start_ts, end_ts = getTime(timeRequest)
#print(keys)
telemetry[d.name] = getTelemetry(rest_client=rest_client, device=device, keys=','.join(keys), start_ts=start_ts, end_ts=end_ts, limit=25000)
latest = getLatestTelemetry(rest_client=rest_client, device=device, keys="manual_well,manual_tfs_lead,manual_copa_lead,manual_job_name")
telemetry[d.name].update(latest)
return telemetry
except ApiException as e:
logging.error(e)
return False
def getThingsBoardData(url, username, password, targetCustomer, timeRequest):
# Creating the REST client object with context manager to get auto token refresh
with RestClientCE(base_url=url) as rest_client:
try:
# Auth with credentials
rest_client.login(username=username, password=password)
# Get customers > get devices under a target customer > get keys for devices > get data for devices
customers = rest_client.get_customers(page_size="100", page="0")
devices = getDevices(rest_client=rest_client, customers=customers, target_customer=targetCustomer)
telemetry = {}
for d in devices.data:
#print(d.name)
device, keys, err = getDeviceKeys(rest_client=rest_client, devices=devices, target_device=d.name)
start_ts, end_ts = getTime(timeRequest)
#print(keys)
telemetry[d.name] = getTelemetry(rest_client=rest_client, device=device, keys=','.join(keys), start_ts=start_ts, end_ts=end_ts, limit=25000)
return telemetry
except ApiException as e:
logging.error(e)
return False
def getMaxWidth():
label_mapping = {
@@ -136,27 +164,34 @@ def formatColumnName(telemetryName):
"Inlet Ph Temp": "INLET PH TEMP",
"Ait 102b H2s": "INLET H₂S",
"At 109b H2s": "OUTLET H₂S",
"At 109c Oil In Water": "OUTLET DENSITY",
"At 109c Oil In Water": "OUTLET OIW",
"Ait 102a Turbitity": "INLET TURBIDITY",
"At 109a Turbidity": "OUTLET TURBIDITY",
"At 109e Orp": "OUTLET ORP",
"Ait 102d Oil In Water": "INLET DENSITY"
"Ait 102d Oil In Water": "INLET DENSITY",
"Fit 106b Flow Rate": "SKIM FLOW RATE",
"Manual Sample Time": "manual_sample_time",
"Outlet Ph": "Outlet Ph"
}
return label_mapping.get(name, name)
return label_mapping.get(name, telemetryName)
def formatChartName(telemetryName):
return " ".join([x.upper() for x in telemetryName.split("_")])
def process_dataframe(telemetry, keys, time, special_handling=None, latest_only=False):
def process_dataframe(telemetry, keys, time, special_handling=None, latest_only=False, all_keys=False):
df = pd.DataFrame()
# If latest_only is True, ensure missing keys are initialized
if latest_only:
if all_keys:
now = dt.timestamp(dt.now())
for key in keys:
if key not in telemetry:
telemetry[key] = [{'ts': dt.timestamp(dt.now()), 'value': '0'}]
if key in special_handling["string"]:
telemetry[key] = [{'ts': now, 'value': ''}]
else:
telemetry[key] = [{'ts': now, 'value': ''}]
for datapoint in telemetry.keys():
if datapoint in keys:
@@ -192,9 +227,10 @@ def getManualDataFrame(telemetry, keys, time):
telemetry, keys, time,
special_handling={
"datetime": ["manual_next_pigging_scheduled"],
"string": ["manual_equipment_description", "manual_issues_concerns"]
"string": ["manual_equipment_description", "manual_issues_concerns", "manual_well", "manual_tfs_lead", "manual_copa_lead", "manual_job_name", "manual_summary", "manual_hse_spills", "manual_quality_issues"]
},
latest_only=True
latest_only=True,
all_keys = True
)
def getSampleDataFrame(telemetry, keys, time):
@@ -203,11 +239,10 @@ def getSampleDataFrame(telemetry, keys, time):
special_handling={
"datetime": ["manual_sample_time"],
"string": ["manual_sample_datapoint", "manual_sample_lab", "manual_sample_location"]
}
},
all_keys = True
)
def get_last_data_row(ws):
# Start from the bottom row and work up to find the last row with data
for row in range(ws.max_row, 0, -1):
@@ -215,6 +250,60 @@ def get_last_data_row(ws):
return row
return 0 # If no data is found, return 0
def transform_sample_telemetry(telemetry):
transformed_telemetry = {}
# Define mapping for Lab and Location
lab_map = {"Thunderbird Lab": "T", "ConocoPhillips Lab": "C"}
location_map = {"Inlet": "I", "Outlet": "O"}
# Extract relevant sample keys
times = {entry["ts"]: entry["value"] for entry in telemetry.get("manual_sample_time", [])}
labs = {entry["ts"]: entry["value"] for entry in telemetry.get("manual_sample_lab", [])}
locations = {entry["ts"]: entry["value"] for entry in telemetry.get("manual_sample_location", [])}
datapoints = {entry["ts"]: entry["value"] for entry in telemetry.get("manual_sample_datapoint", [])}
values = {entry["ts"]: entry["value"] for entry in telemetry.get("manual_sample_value", [])}
# Iterate through available timestamps
for ts in values.keys():
lab = lab_map.get(labs.get(ts, ""), "Unknown")
location = location_map.get(locations.get(ts, ""), "Unknown")
datapoint = datapoints.get(ts, "Unknown")
new_name = f"{lab}-{location}-{datapoint}"
# Initialize if not exists
if new_name not in transformed_telemetry:
transformed_telemetry[new_name] = []
transformed_telemetry[new_name].append({"ts": ts, "value": values[ts]})
# Include manual_sample_time separately for indexing
transformed_telemetry["manual_sample_time"] = [{"ts": ts, "value": times[ts]} for ts in times]
if transformed_telemetry["manual_sample_time"]:
return transformed_telemetry
else:
return {}
def getSampleKeys():
# Ensure all possible column combinations exist
unique_labs = ['C', 'T']
unique_locations = ['I', 'O']
unique_datapoints = ['H2S', 'Density', 'ORP', 'O2', 'H2O2', 'pH', 'Turbidity', 'OIW']
all_possible_columns = [
f"{lab}-{loc}-{dp}" for lab, loc, dp in product(unique_labs, unique_locations, unique_datapoints)
]
all_possible_columns.append("manual_sample_time")
return all_possible_columns
def getSampleData(telemetry, device, keys):
sample_data = {}
for key in keys:
temp = telemetry[device].get(key,None)
if temp:
sample_data[key] = temp
return sample_data
def lambda_handler(event, context):
url = "https://www.enxlekkocloud.com"
@@ -239,14 +328,16 @@ def lambda_handler(event, context):
)
reportsheet = writer.book.worksheets[0]
keys = ['ait_102a_turbitity','ait_102b_h2s', 'at_109a_turbidity', 'at_109b_h2s', 'at_109c_oil_in_water', 'at_109e_orp', 'fit_100_flow_rate', 'fit_109b_flow_rate', 'lit_116b_level', 'lit_116a_level', 'outlet_turbidity_temp', 'outlet_orp_temp', 'inlet_turbidity_temp', 'inlet_ph_temp', 'ait_102d_oil_in_water','outlet_ph']
manual_keys = ['manual_bag_filter_changes', 'manual_cartridge_filter_changes', 'manual_clean_water_sold_per_job', 'manual_coagulant_on_hand', 'manual_diverted_water_time', 'manual_equipment_description', 'manual_equipment_time', 'manual_h202_on_hand', 'manual_issues_concerns', 'manual_next_pigging_scheduled', 'manual_skim_oil_discharged_per_job', 'manual_standby_time', 'manual_unit_uptime', 'manual_upright_tank_issues', 'manual_vac_truck_batches', 'manual_water_events', 'manual_water_events_time', 'manual_water_to_tanks_time']
keys = ['ait_102a_turbitity','ait_102b_h2s', 'at_109a_turbidity', 'at_109b_h2s', 'at_109c_oil_in_water', 'at_109e_orp', 'fit_100_flow_rate', 'fit_109b_flow_rate', 'lit_116b_level', 'lit_116a_level', 'outlet_turbidity_temp', 'outlet_orp_temp', 'inlet_turbidity_temp', 'inlet_ph_temp', 'ait_102d_oil_in_water','outlet_ph','fit_106b_flow_rate']
manual_keys = ['manual_bag_filter_changes', 'manual_cartridge_filter_changes', 'manual_clean_water_sold_per_job', 'manual_coagulant_on_hand', 'manual_diverted_water_time', 'manual_equipment_description', 'manual_equipment_time', 'manual_h202_on_hand', 'manual_issues_concerns', 'manual_next_pigging_scheduled', 'manual_skim_oil_discharged_per_job', 'manual_standby_time', 'manual_unit_uptime', 'manual_upright_tank_issues', 'manual_vac_truck_batches', 'manual_water_events', 'manual_water_events_time', 'manual_water_to_tanks_time',"manual_well", "manual_tfs_lead", "manual_copa_lead", "manual_job_name", "manual_summary", "manual_hse_spills", "manual_quality_issues"]
sample_keys = ['manual_sample_datapoint', 'manual_sample_lab', 'manual_sample_location', 'manual_sample_time', 'manual_sample_value']
#Create a Sheet for each Device
for device in telemetry.keys():
df = getDataFrame(telemetry[device], keys, time)
dfm = getManualDataFrame(telemetry[device], manual_keys, time)
dfs = getSampleDataFrame(telemetry[device], sample_keys, time)
processed_sample_keys = getSampleKeys()
sample_data = transform_sample_telemetry(getSampleData(telemetry=telemetry,device=device,keys=sample_keys))
dfs = getSampleDataFrame(sample_data, processed_sample_keys, time)
# Write the dataframe data to XlsxWriter. Turn off the default header and
# index and skip one row to allow us to insert a user defined header.
df.to_excel(writer, sheet_name=device, startrow=0, header=True, index=True, float_format="%.2f")