133 lines
4.9 KiB
Python
133 lines
4.9 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
mqtt_counter_publish.py
|
||
-----------------------
|
||
|
||
* Uses `paho.mqtt.client` to connect to a broker.
|
||
* Publishes a list of *static* payloads to a topic.
|
||
* Measures bytes sent/received by the **process** with `psutil`.
|
||
* Prints a neat summary at the end.
|
||
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import time
|
||
import random
|
||
import psutil
|
||
import paho.mqtt.client as mqtt
|
||
|
||
# ------------------------------------------------------------------
|
||
# Helper: return write_bytes / read_bytes of the current process
|
||
# ------------------------------------------------------------------
|
||
def get_io_counters():
|
||
"""Return (write_bytes, read_bytes) for the current process."""
|
||
proc = psutil.Process(os.getpid())
|
||
io = proc.io_counters() # .write_bytes, .read_bytes
|
||
return io.write_bytes, io.read_bytes #io.write_chars, io.read_chars #io.write_bytes, io.read_bytes
|
||
|
||
def print_io_delta(start, end):
|
||
"""Print the difference between two (write, read) tuples."""
|
||
sent = end[0] - start[0]
|
||
recv = end[1] - start[1]
|
||
print(f"\nΔ sent: {sent:,} bytes")
|
||
print(f"Δ received: {recv:,} bytes\n")
|
||
|
||
# ------------------------------------------------------------------
|
||
# MQTT callbacks (minimal – we don't care about incoming messages)
|
||
# ------------------------------------------------------------------
|
||
def on_connect(client, userdata, flags, rc):
|
||
if rc == 0:
|
||
print("✅ Connected to broker")
|
||
else:
|
||
print(f"❌ Connect failed (rc={rc})")
|
||
|
||
# ------------------------------------------------------------------
|
||
# Publish static payloads
|
||
# ------------------------------------------------------------------
|
||
def publish_static_payloads(client, topic, payloads, delay=0.1):
|
||
"""
|
||
Publish each payload in `payloads` to `topic`.
|
||
`delay` (seconds) can be used to throttle the rate.
|
||
"""
|
||
for i, payload in enumerate(payloads, 1):
|
||
result = client.publish(topic, json.dumps(payload))
|
||
# Wait for the network loop to actually send it
|
||
client.loop(timeout=0.1)
|
||
if result.rc != mqtt.MQTT_ERR_SUCCESS:
|
||
print(f"⚠️ Publish {i} failed with rc={result.rc}")
|
||
else:
|
||
print(f"→ Sent msg #{i} ({len(json.dumps(payload))} bytes)")
|
||
time.sleep(delay)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Main routine
|
||
# ------------------------------------------------------------------
|
||
def main():
|
||
broker = "hp.henrypump.cloud" # change if needed
|
||
port = 1883 # default MQTT
|
||
topic = "v1/devices/me/telemetry"
|
||
|
||
# Static payloads – feel free to replace with your own data
|
||
payloads = []
|
||
now = time.time() * 1000
|
||
for x in range(25):
|
||
values = {}
|
||
for y in range(20):
|
||
values[f"my_telemetry_{x+y}"] = random.random()
|
||
payloads.append({"ts": now, "values": values})
|
||
|
||
payloads = [{"test1k": "A" * 1024}]
|
||
# ------------------------------------------------------------------
|
||
# Create MQTT client, connect
|
||
# ------------------------------------------------------------------
|
||
client = mqtt.Client()
|
||
client.username_pw_set("e5xv3wfi1oa44ty2ydv2")
|
||
client.on_connect = on_connect
|
||
client.connect(broker, port, keepalive=60)
|
||
|
||
# -------------------------------------------------------------
|
||
# 1️⃣ Capture baseline I/O counters **before** any traffic
|
||
# -------------------------------------------------------------
|
||
baseline = get_io_counters()
|
||
print("\n📊 Baseline I/O counters:")
|
||
print(f" Written: {baseline[0]:,} bytes")
|
||
print(f" Read: {baseline[1]:,} bytes")
|
||
|
||
# -------------------------------------------------------------
|
||
# 2️⃣ Start network loop in a background thread
|
||
# -------------------------------------------------------------
|
||
client.loop_start()
|
||
|
||
# -------------------------------------------------------------
|
||
# 3️⃣ Publish the static payloads
|
||
# -------------------------------------------------------------
|
||
print("\n📤 Publishing static payloads …")
|
||
for _ in range(1):
|
||
publish_static_payloads(client, topic, payloads, delay=0.2)
|
||
|
||
# Give the broker a moment to ack / reply (if any)
|
||
time.sleep(10)
|
||
|
||
# -------------------------------------------------------------
|
||
# 4️⃣ Capture I/O counters **after** traffic
|
||
# -------------------------------------------------------------
|
||
final = get_io_counters()
|
||
print("\n📊 Final I/O counters:")
|
||
print(f" Written: {final[0]:,} bytes")
|
||
print(f" Read: {final[1]:,} bytes")
|
||
|
||
# -------------------------------------------------------------
|
||
# 5️⃣ Report the delta
|
||
# -------------------------------------------------------------
|
||
print_io_delta(baseline, final)
|
||
|
||
# -------------------------------------------------------------
|
||
# Clean‑up
|
||
# -------------------------------------------------------------
|
||
client.loop_stop()
|
||
client.disconnect()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |