Files
HP_InHand_IG502/Pub_Sub/hrvalvecontroller/cameratrailer/thingsboard/pub/sendSnapshot.py
2025-03-13 16:01:03 -05:00

54 lines
2.4 KiB
Python

import json, time, requests, base64
from common.Logger import logger
from quickfaas.remotebus import publish
from quickfaas.global_dict import get as get_params
from datetime import datetime as dt
from requests.adapters import HTTPAdapter, Retry
from requests.auth import HTTPDigestAuth
from requests.exceptions import ConnectionError
def convertDStoJSON(ds):
j = dict()
for x in ds:
j[x["key"]] = x["value"]
return j
def getImage():
params = convertDStoJSON(get_params())
camera_ip = params["camera_ip"].replace("_", ".")
port = params["port"]
with open('./snapshot.jpg', 'wb') as handle:
with requests.Session() as s:
retries = Retry(total = 10, backoff_factor=0.1, status_forcelist=[404,408, 500, 502, 503, 504])
s.mount('http://', HTTPAdapter(max_retries=retries))
try:
resp = s.get("http://" + camera_ip + ":" + port + "/cgi-bin/camctrl?af=on", auth=HTTPDigestAuth("ASS", "amerus@1903"),stream=True)
except:
logger.error("Did not Auto Focus")
time.sleep(2)
resp = s.get("http://" + camera_ip + ":" + port + "/cgi-bin/SnapshotJPEG?Resolution=640x360", auth=HTTPDigestAuth("ASS", "amerus@1903"), stream=True)
for block in resp.iter_content(1024):
if not block:
break
handle.write(block)
with open('./snapshot.jpg', 'rb') as image_file:
encoded_string = base64.b64encode(image_file.read())
publish(__topic__, json.dumps({"ts": (round(dt.timestamp(dt.now())/600)*600)*1000, "values":{"snapshot": encoded_string.decode("UTF-8"), "camera_error": "OK"}}), __qos__)
def sendSnapshot(message,wizard_api):
logger.debug(message)
try:
getImage()
except ConnectionError as ce:
logger.error("Could not connect to Camera")
logger.error(ce)
publish(__topic__, json.dumps({"ts": (round(dt.timestamp(dt.now())/600)*600)*1000, "values":{"camera_error": f"Could not connect to camera (ConnectionError), check camera connection and power\n\n{ce}"}}), __qos__)
except Exception as e:
logger.error("Could not get image")
logger.error(e)
publish(__topic__, json.dumps({"ts": (round(dt.timestamp(dt.now())/600)*600)*1000, "values":{"camera_error": f"Could not connect to camera, check camera connection, power, IP Address\n\n{e}"}}), __qos__)