Files
HP_InHand_IG502/Pub_Sub/cameratrailer_mb/thingsboard/pub/sendSnapshot.py
2025-09-18 16:59:08 -05:00

114 lines
3.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import json
import logging
import time
from datetime import datetime as dt
import requests
from requests.adapters import HTTPAdapter, Retry
from requests.auth import HTTPDigestAuth
from requests.exceptions import ConnectionError
from common.Logger import logger # your custom logger
from quickfaas.global_dict import get as get_params
from quickfaas.remotebus import publish
def convertDStoJSON(ds):
"""Convert the list of key/value dicts returned by get_params()
into a single dictionary."""
return {x["key"]: x["value"] for x in ds}
def _payload_too_big(payload, max_bytes=65535):
"""Return True if the UTF8 encoded payload is larger than max_bytes."""
return len(payload.encode("utf-8")) > max_bytes
def getImage():
# Grab configuration
params = convertDStoJSON(get_params())
camera_ip = params["camera_ip"].replace("_", ".")
port = params["port"]
# Build the URL and auth
url = f"http://{camera_ip}:{port}/cgi-bin/jpg/image.cgi?stream=stream3"
auth = HTTPDigestAuth("ASS", "amerus@1903")
# Retry policy (10 attempts, backoff 0.1s)
retry_strategy = Retry(
total=10,
backoff_factor=0.1,
status_forcelist=[404, 408, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
with requests.Session() as session:
session.mount("http://", adapter)
# Grab the JPEG as a streaming response
resp = session.get(url, auth=auth, stream=True)
resp.raise_for_status()
# Write the stream to disk (optional keeps your code close to the original)
with open("./snapshot.jpg", "wb") as fh:
for block in resp.iter_content(1024):
if not block:
break
fh.write(block)
# Read the file we just wrote and base64encode it
with open("./snapshot.jpg", "rb") as fh:
encoded_string = base64.b64encode(fh.read()).decode("utf-8")
# Build the payload
timestamp_ms = int(round(dt.timestamp(dt.now()) / 600) * 600 * 1000)
snapshot_dt = dt.fromtimestamp(timestamp_ms/1000).strftime("%Y %m %d %H:%M")
payload = json.dumps(
{
"ts": timestamp_ms,
"values": {
"snapshot": encoded_string,
"snapshotDT": snapshot_dt,
"camera_error": "OK",
},
},
separators=(",", ":"), # tighter JSON saves a few bytes
)
# Guard against oversized payloads
if _payload_too_big(payload):
logger.warning(
"Payload size (%d bytes) exceeds %d bytes message not sent.",
len(payload.encode("utf-8")),
65535,
)
return # do not publish
publish(__topic__, payload, __qos__)
logger.info("Published snapshot of size %d bytes", len(payload.encode("utf-8")))
def sendSnapshot(message, wizard_api):
logger.debug(message)
try:
getImage()
except ConnectionError as ce:
logger.error("Could not connect to camera")
logger.exception(ce)
error_msg = f"Could not connect to camera (ConnectionError), check camera connection and power\n\n{ce}"
_publish_error(error_msg)
except Exception as e:
logger.error("Could not get image")
logger.exception(e)
error_msg = f"Could not connect to camera, check camera connection, power, IP Address\n\n{e}"
_publish_error(error_msg)
def _publish_error(error_msg):
"""Utility that logs the error and publishes it to the broker."""
payload = json.dumps(
{
"ts": int(round(dt.timestamp(dt.now()) / 600) * 600 * 1000),
"values": {"camera_error": error_msg},
},
separators=(",", ":"),
)
publish(__topic__, payload, __qos__)