new file: Code Snippets/input-test.py

new file:   Code Snippets/ivs-mqtt-client.ipynb
new file:   Code Snippets/ivs-player.html
new file:   Code Snippets/last-will-test.py
This commit is contained in:
Nico Melone
2025-07-29 16:49:01 -05:00
parent a18b5c33f8
commit f9df74cd97
4 changed files with 429 additions and 0 deletions

View File

@@ -0,0 +1,87 @@
from paho.mqtt import client as mqtt_client
import random
import time
import json
from prompt_toolkit import PromptSession, prompt
from prompt_toolkit.patch_stdout import patch_stdout
broker = '162.199.59.89'
port = 1883
topic = "python/mqtt"
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = "nico"
password = "nico"
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id=client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
msg_count = 1
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send {msg} to topic {topic}")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
if msg_count > 5:
break
def subscribe(client: mqtt_client.Client):
def on_message(client, userdata, msg):
try:
payload = json.loads(msg.payload.decode())
sender_id = payload.get("sender_id")
message = payload.get("message")
#print(sender_id, client_id)
if sender_id != username:
print(f"{sender_id}: {message}")
except json.JSONDecodeError:
print(f"Received non-JSON message: {msg.payload.decode()}")
client.subscribe(topic)
client.on_message = on_message
def console(client: mqtt_client.Client):
session = PromptSession()
with patch_stdout():
while True:
try:
msg = session.prompt(f"{username}: ", multiline=True)
if msg.strip().lower() == 'exit':
break
payload = json.dumps({"sender_id": username, "message": msg})
result = client.publish(topic, payload)
if result.rc != 0:
print("Failed to publish message")
except KeyboardInterrupt:
break
def run():
client = connect_mqtt()
subscribe(client)
client.loop_start()
console(client)
#client.loop_stop()
#publish(client)
#client.loop_forever()
run()

View File

@@ -0,0 +1,249 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 4,
"id": "c384425c",
"metadata": {},
"outputs": [],
"source": [
"import paho.mqtt.client as mqtt\n",
"import subprocess\n",
"import json\n",
"import requests\n",
"from requests.auth import HTTPBasicAuth"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "69ef65e8",
"metadata": {},
"outputs": [],
"source": [
"# Define the MQTT callback functions\n",
"def on_connect(client, userdata, flags, rc):\n",
" print(\"Connected with result code\", rc)\n",
" client.subscribe(\"v1/devices/me/rpc/request/+\")\n",
"\n",
"def on_message(client, userdata, msg):\n",
" #print(f\"Message received on topic {msg.topic}: {msg.payload.decode()}\")\n",
" #print(type(msg.payload.decode()))\n",
" payload = json.loads(msg.payload.decode())\n",
" \"\"\"\n",
" ffmpeg_cmd = [\n",
" 'ffmpeg',\n",
" '-f', 'video4linux2',\n",
" '-video_size', '640x480',\n",
" '-framerate', '30',\n",
" '-i', '/dev/video0',\n",
" '-c:v', 'libx264',\n",
" '-b:v', '6000K',\n",
" '-maxrate', '6000K',\n",
" '-bufsize', '6000K',\n",
" '-pix_fmt', 'yuv420p',\n",
" '-r', '30',\n",
" '-s', '640x480',\n",
" '-g', '120',\n",
" '-profile:v', 'main',\n",
" '-preset', 'veryfast',\n",
" '-acodec', 'aac',\n",
" '-ab', '160k',\n",
" '-ar', '44100',\n",
" '-flvflags', 'no_duration_filesize',\n",
" '-t', '600', # <-- 10 minutes in seconds\n",
" '-f', 'flv',\n",
" 'rtmps://82b189904d9f.global-contribute.live-video.net/app/sk_us-east-1_rn5oFWv918EZ_jh1Mu2QafsUusPkYLNsh6U0YY4Gu1L'\n",
" ]\n",
"\n",
" ffmpeg_cmd = [\n",
" 'ffmpeg',\n",
" '-rtsp_transport', 'tcp',\n",
" '-i', 'rtsp://192.168.1.17/stream1',\n",
" '-c:v', 'libx264',\n",
" '-b:v', '6000K',\n",
" '-maxrate', '6000K',\n",
" '-bufsize', '6000K',\n",
" '-pix_fmt', 'yuv420p',\n",
" '-r', '30',\n",
" '-s', '640x480',\n",
" '-g', '120',\n",
" '-profile:v', 'main',\n",
" '-preset', 'veryfast',\n",
" '-acodec', 'aac',\n",
" '-ab', '160k',\n",
" '-ar', '44100',\n",
" '-flvflags', 'no_duration_filesize',\n",
" '-t', '60',\n",
" '-f', 'flv',\n",
" 'rtmps://82b189904d9f.global-contribute.live-video.net/app/sk_us-east-1_rn5oFWv918EZ_jh1Mu2QafsUusPkYLNsh6U0YY4Gu1L'\n",
" ]\n",
" \"\"\"\n",
" if payload[\"method\"] == \"zoom-in\":\n",
" \"\"\"Zoom in\"\"\"\n",
" try:\n",
" response = requests.get(\"http://192.168.1.17:3917/cgi-bin/com/ptz.cgi?rzoom=25\", auth=HTTPBasicAuth(\"ASS\", \"amerus@1903\"))\n",
" except Exception as e:\n",
" print(f\"Error in zoom-in: {e}\")\n",
" elif payload[\"method\"] == \"zoom-out\":\n",
" \"\"\"Zoom out\"\"\"\n",
" try:\n",
" response = requests.get(\"http://192.168.1.17:3917/cgi-bin/com/ptz.cgi?rzoom=-25\", auth=HTTPBasicAuth(\"ASS\", \"amerus@1903\"))\n",
" except Exception as e:\n",
" print(f\"Error in zoom-out: {e}\")\n",
" elif payload[\"method\"] == \"zoom-min\":\n",
" \"\"\"Zoom Min\"\"\"\n",
" try:\n",
" response = requests.get(\"http://192.168.1.17:3917/cgi-bin/com/ptz.cgi?zoom=0\", auth=HTTPBasicAuth(\"ASS\", \"amerus@1903\"))\n",
" except Exception as e:\n",
" print(f\"Error in zoom-min: {e}\")\n",
" elif payload[\"method\"] == \"zoom-max\":\n",
" \"\"\"Zoom Max\"\"\"\n",
" try:\n",
" response = requests.get(\"http://192.168.1.17:3917/cgi-bin/com/ptz.cgi?zoom=2844\", auth=HTTPBasicAuth(\"ASS\", \"amerus@1903\"))\n",
" except Exception as e:\n",
" print(f\"Error in zoom-max: {e}\")\n",
" elif payload[\"method\"] == \"move-home\":\n",
" \"\"\"Move Home\"\"\"\n",
" try:\n",
" response = requests.get(\"http://192.168.1.17:3917/cgi-bin/com/ptz.cgi?move=home\", auth=HTTPBasicAuth(\"ASS\", \"amerus@1903\"))\n",
" except Exception as e:\n",
" print(f\"Error in move-home: {e}\")\n",
" elif payload[\"method\"] == \"move-left\":\n",
" \"\"\"Move Left\"\"\"\n",
" try:\n",
" response = requests.get(\"http://192.168.1.17:3917/cgi-bin/com/ptz.cgi?move=left\", auth=HTTPBasicAuth(\"ASS\", \"amerus@1903\"))\n",
" except Exception as e:\n",
" print(f\"Error in move-left: {e}\")\n",
" elif payload[\"method\"] == \"move-upleft\":\n",
" \"\"\"Move Up Left\"\"\"\n",
" try:\n",
" response = requests.get(\"http://192.168.1.17:3917/cgi-bin/com/ptz.cgi?move=upleft\", auth=HTTPBasicAuth(\"ASS\", \"amerus@1903\"))\n",
" except Exception as e:\n",
" print(f\"Error in move-upleft: {e}\")\n",
" elif payload[\"method\"] == \"move-up\":\n",
" \"\"\"Move Up\"\"\"\n",
" try:\n",
" response = requests.get(\"http://192.168.1.17:3917/cgi-bin/com/ptz.cgi?move=up\", auth=HTTPBasicAuth(\"ASS\", \"amerus@1903\"))\n",
" except Exception as e:\n",
" print(f\"Error in move-up: {e}\")\n",
" elif payload[\"method\"] == \"move-upright\":\n",
" \"\"\"Move Up Right\"\"\"\n",
" try:\n",
" response = requests.get(\"http://192.168.1.17:3917/cgi-bin/com/ptz.cgi?move=upright\", auth=HTTPBasicAuth(\"ASS\", \"amerus@1903\"))\n",
" except Exception as e:\n",
" print(f\"Error in move-upright: {e}\")\n",
" elif payload[\"method\"] == \"move-right\":\n",
" \"\"\"Move Right\"\"\"\n",
" try:\n",
" response = requests.get(\"http://192.168.1.17:3917/cgi-bin/com/ptz.cgi?move=right\", auth=HTTPBasicAuth(\"ASS\", \"amerus@1903\"))\n",
" except Exception as e:\n",
" print(f\"Error in move-right: {e}\")\n",
" elif payload[\"method\"] == \"move-downright\":\n",
" \"\"\"Move Down Right\"\"\"\n",
" try:\n",
" response = requests.get(\"http://192.168.1.17:3917/cgi-bin/com/ptz.cgi?move=downright\", auth=HTTPBasicAuth(\"ASS\", \"amerus@1903\"))\n",
" except Exception as e:\n",
" print(f\"Error in move-downright: {e}\")\n",
" elif payload[\"method\"] == \"move-down\":\n",
" \"\"\"Move Down\"\"\"\n",
" try:\n",
" response = requests.get(\"http://192.168.1.17:3917/cgi-bin/com/ptz.cgi?move=down\", auth=HTTPBasicAuth(\"ASS\", \"amerus@1903\"))\n",
" except Exception as e:\n",
" print(f\"Error in move-down: {e}\")\n",
" elif payload[\"method\"] == \"move-downleft\":\n",
" \"\"\"Move Down Left\"\"\"\n",
" try:\n",
" response = requests.get(\"http://192.168.1.17:3917/cgi-bin/com/ptz.cgi?move=downleft\", auth=HTTPBasicAuth(\"ASS\", \"amerus@1903\"))\n",
" except Exception as e:\n",
" print(f\"Error in move-downleft: {e}\")\n",
" elif payload[\"method\"] == \"startStream\":\n",
" ffmpeg_cmd = [\n",
" 'ffmpeg',\n",
" '-i', 'rtsp://192.168.1.17/stream4',\n",
" '-tune', 'zerolatency',\n",
" '-c:v', 'copy',\n",
" '-t', '120',\n",
" '-f', 'flv',\n",
" 'rtmps://82b189904d9f.global-contribute.live-video.net/app/sk_us-east-1_rn5oFWv918EZ_jh1Mu2QafsUusPkYLNsh6U0YY4Gu1L'\n",
" ]\n",
" subprocess.Popen(ffmpeg_cmd)\n"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "54f5d18a",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/var/folders/dd/glmkqm595_n53prmxzd7qh980000gn/T/ipykernel_60479/1857724342.py:2: DeprecationWarning: Callback API version 1 is deprecated, update to latest version\n",
" client = mqtt.Client()\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Connected with result code 0\n"
]
},
{
"ename": "KeyboardInterrupt",
"evalue": "",
"output_type": "error",
"traceback": [
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
"\u001b[31mKeyboardInterrupt\u001b[39m Traceback (most recent call last)",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[6]\u001b[39m\u001b[32m, line 16\u001b[39m\n\u001b[32m 13\u001b[39m client.connect(broker_address, broker_port, \u001b[32m60\u001b[39m)\n\u001b[32m 15\u001b[39m \u001b[38;5;66;03m# Start the loop to process callbacks and reconnect if needed\u001b[39;00m\n\u001b[32m---> \u001b[39m\u001b[32m16\u001b[39m \u001b[43mclient\u001b[49m\u001b[43m.\u001b[49m\u001b[43mloop_forever\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/paho-mqtt/lib/python3.13/site-packages/paho/mqtt/client.py:2297\u001b[39m, in \u001b[36mClient.loop_forever\u001b[39m\u001b[34m(self, timeout, retry_first_connection)\u001b[39m\n\u001b[32m 2295\u001b[39m rc = MQTTErrorCode.MQTT_ERR_SUCCESS\n\u001b[32m 2296\u001b[39m \u001b[38;5;28;01mwhile\u001b[39;00m rc == MQTTErrorCode.MQTT_ERR_SUCCESS:\n\u001b[32m-> \u001b[39m\u001b[32m2297\u001b[39m rc = \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_loop\u001b[49m\u001b[43m(\u001b[49m\u001b[43mtimeout\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 2298\u001b[39m \u001b[38;5;66;03m# We don't need to worry about locking here, because we've\u001b[39;00m\n\u001b[32m 2299\u001b[39m \u001b[38;5;66;03m# either called loop_forever() when in single threaded mode, or\u001b[39;00m\n\u001b[32m 2300\u001b[39m \u001b[38;5;66;03m# in multi threaded mode when loop_stop() has been called and\u001b[39;00m\n\u001b[32m 2301\u001b[39m \u001b[38;5;66;03m# so no other threads can access _out_packet or _messages.\u001b[39;00m\n\u001b[32m 2302\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m (\u001b[38;5;28mself\u001b[39m._thread_terminate \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mTrue\u001b[39;00m\n\u001b[32m 2303\u001b[39m \u001b[38;5;129;01mand\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(\u001b[38;5;28mself\u001b[39m._out_packet) == \u001b[32m0\u001b[39m\n\u001b[32m 2304\u001b[39m \u001b[38;5;129;01mand\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(\u001b[38;5;28mself\u001b[39m._out_messages) == \u001b[32m0\u001b[39m):\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/paho-mqtt/lib/python3.13/site-packages/paho/mqtt/client.py:1663\u001b[39m, in \u001b[36mClient._loop\u001b[39m\u001b[34m(self, timeout)\u001b[39m\n\u001b[32m 1660\u001b[39m rlist = [\u001b[38;5;28mself\u001b[39m._sock, \u001b[38;5;28mself\u001b[39m._sockpairR]\n\u001b[32m 1662\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m-> \u001b[39m\u001b[32m1663\u001b[39m socklist = \u001b[43mselect\u001b[49m\u001b[43m.\u001b[49m\u001b[43mselect\u001b[49m\u001b[43m(\u001b[49m\u001b[43mrlist\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mwlist\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m[\u001b[49m\u001b[43m]\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtimeout\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 1664\u001b[39m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mTypeError\u001b[39;00m:\n\u001b[32m 1665\u001b[39m \u001b[38;5;66;03m# Socket isn't correct type, in likelihood connection is lost\u001b[39;00m\n\u001b[32m 1666\u001b[39m \u001b[38;5;66;03m# ... or we called disconnect(). In that case the socket will\u001b[39;00m\n\u001b[32m (...)\u001b[39m\u001b[32m 1669\u001b[39m \u001b[38;5;66;03m# rc != MQTT_ERR_SUCCESS and we don't want state to change from\u001b[39;00m\n\u001b[32m 1670\u001b[39m \u001b[38;5;66;03m# mqtt_cs_disconnecting.\u001b[39;00m\n\u001b[32m 1671\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m._state \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;129;01min\u001b[39;00m (_ConnectionState.MQTT_CS_DISCONNECTING, _ConnectionState.MQTT_CS_DISCONNECTED):\n",
"\u001b[31mKeyboardInterrupt\u001b[39m: "
]
}
],
"source": [
"# Create an MQTT client instance\n",
"client = mqtt.Client()\n",
"# Create Auth\n",
"client.username_pw_set(\"e5xv3wfi1oa44ty2ydv2\", \"\")\n",
"# Assign callbacks\n",
"client.on_connect = on_connect\n",
"client.on_message = on_message\n",
"\n",
"# Connect to a broker\n",
"broker_address = \"hp.henrypump.cloud\" # Public broker for testing\n",
"broker_port = 1883\n",
"\n",
"client.connect(broker_address, broker_port, 60)\n",
"\n",
"# Start the loop to process callbacks and reconnect if needed\n",
"client.loop_forever()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "paho-mqtt",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.5"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@@ -0,0 +1,58 @@
<!doctype html>
<html lang="en">
<head>
<style>
body {
max-width: 100%;
}
.demo {
display: flex;
justify-content: center;
}
.video-container {
min-width: 75%;
max-width: 75%;
}
#video-player {
min-width: 100%;
max-width: 100%;
}
.src-container-direct {
margin-bottom: 15px;
text-align: center;
}
.src-input {
padding: 10px 5px;
width: 50%;
}
.src-submit {
height: 37px;
border-color: #ddd;
margin: 1px 5px
}
</style>
</head>
<body>
<div class="demo">
<div class="video-container">
<form class="src-container-direct">
<input class="src-input" placeholder="Enter IVS .m3u8" />
<button class="src-submit" type="submit">Load</button>
</form>
<video id="video-player" playsinline controls></video>
<div class='version'></div>
</div>
</div>
<script src="https://player.live-video.net/1.42.0/amazon-ivs-player.min.js"></script>
<script src="./cloud-player.js"></script>
</body>
</html>

View File

@@ -0,0 +1,35 @@
from paho.mqtt import client as mqtt
import time
import random
import os
import signal
import json
BROKER = 'hp.henrypump.cloud' # or your broker IP/domain
PORT = 1883
TOPIC = "v1/devices/me/attributes"
WILL_MSG = json.dumps({"isActive": False})
CLIENT_A_ID = f'client-a-{random.randint(0, 1000)}'
def start_client_a_with_will():
client = mqtt.Client(client_id=CLIENT_A_ID, clean_session=True)
client.will_set(TOPIC, WILL_MSG, qos=1, retain=False)
client.username_pw_set("e5xv3wfi1oa44ty2ydv2")
client.connect(BROKER, PORT)
print("[Client A] Connected with last will set.")
client.publish("v1/devices/me/attributes", json.dumps({"isActive": True}))
client.loop_start()
return client
def simulate_crash(client):
print("[Client A] Simulating crash (kill connection)...")
client._sock.close() # simulate ungraceful disconnect
client.loop_stop()
if __name__ == "__main__":
client_a = start_client_a_with_will()
time.sleep(2) # wait for connections to establish
raise Exception