{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import pytz\n", "from datetime import datetime\n", "import json, csv, time\n", "import paho.mqtt.client as mqtt\n", "from datetime import datetime, timedelta" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "0" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Set the MQTT broker connection details\n", "MQTT_BROKER = \"hp.henrypump.cloud\"\n", "MQTT_PORT = 1883\n", "\n", "# Create an MQTT client instance\n", "client = mqtt.Client(client_id=\"faskens-bp-compressor\")\n", "client.username_pw_set(\"faskensmqtt\", \"faskensmqtt@1903\" )\n", "client.connect(MQTT_BROKER, MQTT_PORT)" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "\n", "# Define a function to convert datetime to timestamp and round to nearest 10 minutes\n", "def convert_datetime_to_timestamp(dt_str):\n", " cst_tz = pytz.timezone('America/Chicago')\n", " dt_cst = cst_tz.localize(datetime.strptime(dt_str, '%Y-%m-%d/%H:%M:%S'))\n", " \n", " # Calculate the remainder of minutes in the hour\n", " minute_remainder = dt_cst.minute % 10\n", " \n", " if minute_remainder >= 5:\n", " # Round up to the next 10-minute interval\n", " dt_cst += timedelta(minutes=10 - minute_remainder)\n", " else:\n", " # Round down to the previous 10-minute interval\n", " dt_cst -= timedelta(minutes=minute_remainder)\n", " \n", " # Ensure the resulting datetime is valid (no 23:60:00 or similar issues)\n", " while dt_cst.minute % 10 != 0:\n", " if dt_cst.minute > 50:\n", " dt_cst += timedelta(hours=1)\n", " dt_cst -= timedelta(minutes=dt_cst.minute % 60)\n", " else:\n", " dt_cst -= timedelta(minutes=dt_cst.minute % 10)\n", " \n", " return int(dt_cst.timestamp()*1000)\n", "\n", "# Transform the data and send it to the MQTT broker in chunks of 20\n", "transformed_data = []\n", "with open('/Users/nico/Downloads/history_data_default.csv', 'r') as csvfile:\n", " reader = csv.DictReader(csvfile)\n", " for row in reader:\n", " dt = row['Time']\n", " value = json.loads(row[' Value'])[\"plcpond\"][\"air_comp_val\"]\n", " if value[0] == 1:\n", " transformed_data.append({\"ts\": convert_datetime_to_timestamp(dt), \"values\": {\"air_comp_val\": value[1]}})\n", "\n", "\n", "\n", "chunk_size = 20\n", "for i in range(0, len(transformed_data), chunk_size):\n", " chunk = transformed_data[i:i + chunk_size]\n", " for x in chunk:\n", " client.publish(\"v1/devices/me/telemetry\", json.dumps(x))\n", " time.sleep(1)\n", "\n" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "0" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "\n", "client.disconnect()" ] } ], "metadata": { "kernelspec": { "display_name": "aws", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.5" } }, "nbformat": 4, "nbformat_minor": 2 }