updated pub_sub with chunking code

This commit is contained in:
Nico Melone
2023-11-30 12:53:42 -06:00
parent 71bc9f9b39
commit d12420651a
30 changed files with 1932 additions and 931 deletions

View File

@@ -0,0 +1,33 @@
import json, time
from datetime import datetime as dt
from quickfaas.measure import recall, write
from quickfaas.remotebus import publish
from common.Logger import logger
# Helper function to split the payload into chunks
def chunk_payload(payload, chunk_size=20):
chunked_values = list(payload["values"].items())
for i in range(0, len(chunked_values), chunk_size):
yield {
"ts": payload["ts"],
"values": dict(chunked_values[i:i+chunk_size])
}
def sync():
#get new values and send
payload = {"ts": round(dt.timestamp(dt.now()))*1000, "values": {}}
topic = "v1/devices/me/telemetry"
try:
data = recall()#json.loads(recall().decode("utf-8"))
except Exception as e:
logger.error(e)
logger.debug(data)
for controller in data:
for measure in controller["measures"]:
#publish measure
payload["values"][measure["name"]] = measure["value"]
logger.debug("Sending on topic: {}".format(topic))
logger.debug("Sending value: {}".format(payload))
for chunk in chunk_payload(payload=payload):
publish(topic, json.dumps(chunk), 1)
time.sleep(2)

View File

@@ -0,0 +1,179 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import json, time"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"payload = {\n",
" \"Controller A\": [\n",
" {\n",
" \"ts\": 1700149800000,\n",
" \"values\": {\n",
" \"Temperature\": 22,\n",
" \"Pressure\": 101.3,\n",
" \"one\": 12,\n",
" \"two\": 2,\n",
" \"three\": 3,\n",
" \"4\": 4,\n",
" \"5\": 5,\n",
" \"6\": 6,\n",
" \"7\": 7,\n",
" \"Temperature1\": 22,\n",
" \"Pressure1\": 101.3,\n",
" \"one1\": 12,\n",
" \"two1\": 2,\n",
" \"three1\": 3,\n",
" \"41\": 4,\n",
" \"51\": 5,\n",
" \"61\": 6,\n",
" \"71\": 7,\n",
" \"Temperature2\": 22,\n",
" \"Pressure2\": 101.3,\n",
" \"one2\": 12,\n",
" \"two2\": 2,\n",
" \"three2\": 3,\n",
" \"42\": 4,\n",
" \"52\": 5,\n",
" \"62\": 6,\n",
" \"72\": 7,\n",
" \"Temperature3\": 22,\n",
" \"Pressure3\": 101.3,\n",
" \"one3\": 12,\n",
" \"two3\": 2,\n",
" \"three3\": 3,\n",
" \"43\": 4,\n",
" \"53\": 5,\n",
" \"63\": 6,\n",
" \"73\": 7\n",
" }\n",
" }\n",
" ],\n",
" \"Controller B\": [\n",
" {\n",
" \"ts\": 1700149800000,\n",
" \"values\": {\n",
" \"Humidity\": 45\n",
" }\n",
" }\n",
" ]\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"attributes_payload = {\n",
" \"Controller A\": {\n",
" \"latestReportTime\": 1700149800000\n",
" },\n",
" \"Controller B\": {\n",
" \"latestReportTime\": 1700149800000\n",
" }\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"def chunk_payload(payload, chunk_size=20, is_attributes_payload=False):\n",
" if is_attributes_payload:\n",
" # For attributes payload, chunk the controllers\n",
" controllers = list(payload.items())\n",
" for i in range(0, len(controllers), chunk_size):\n",
" yield dict(controllers[i:i + chunk_size])\n",
" else:\n",
" # For data payload, chunk the values within each controller\n",
" for controller, data in payload.items():\n",
" for entry in data:\n",
" ts = entry['ts']\n",
" values = entry['values']\n",
" chunked_values = list(values.items())\n",
" for i in range(0, len(chunked_values), chunk_size):\n",
" yield {\n",
" \"controller\": controller,\n",
" \"ts\": ts,\n",
" \"values\": dict(chunked_values[i:i + chunk_size])\n",
" }\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"__topic__ {\"controller\": \"Controller A\", \"ts\": 1700149800000, \"values\": {\"Temperature\": 22, \"Pressure\": 101.3, \"one\": 12, \"two\": 2, \"three\": 3, \"4\": 4, \"5\": 5, \"6\": 6, \"7\": 7, \"Temperature1\": 22, \"Pressure1\": 101.3, \"one1\": 12, \"two1\": 2, \"three1\": 3, \"41\": 4, \"51\": 5, \"61\": 6, \"71\": 7, \"Temperature2\": 22, \"Pressure2\": 101.3}} __qos__\n",
"__topic__ {\"controller\": \"Controller A\", \"ts\": 1700149800000, \"values\": {\"one2\": 12, \"two2\": 2, \"three2\": 3, \"42\": 4, \"52\": 5, \"62\": 6, \"72\": 7, \"Temperature3\": 22, \"Pressure3\": 101.3, \"one3\": 12, \"two3\": 2, \"three3\": 3, \"43\": 4, \"53\": 5, \"63\": 6, \"73\": 7}} __qos__\n",
"__topic__ {\"controller\": \"Controller B\", \"ts\": 1700149800000, \"values\": {\"Humidity\": 45}} __qos__\n"
]
}
],
"source": [
"for chunk in chunk_payload(payload=payload):\n",
" print(\"__topic__\", json.dumps(chunk), \"__qos__\")\n",
" time.sleep(2)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"__topic__ {\"Controller A\": {\"latestReportTime\": 1700149800000}, \"Controller B\": {\"latestReportTime\": 1700149800000}} __qos__\n"
]
}
],
"source": [
"for chunk in chunk_payload(payload=attributes_payload, is_attributes_payload=True):\n",
" print(\"__topic__\", json.dumps(chunk), \"__qos__\")\n",
" time.sleep(2)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "tbDataCollector",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -0,0 +1,105 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"import phonenumbers\n",
"from phonenumbers import carrier"
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['+14322228152',\n",
" '+13254509104',\n",
" '+14323012230',\n",
" '+14326314847',\n",
" '+14326311606',\n",
" '+14326311503',\n",
" '+14326409671']"
]
},
"execution_count": 40,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"numbers = \"+14322228152,+13254509104,+14323012230,+14326314847,+14326311606,+14326311503,+14326409671\"\n",
"numbers = numbers.split(\",\")\n",
"numbers"
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Country Code: 1 National Number: 4322228152\n",
"Is Possible: True\n",
"Is Valid: True\n",
"Country Code: 1 National Number: 3254509104\n",
"Is Possible: True\n",
"Is Valid: True\n",
"Country Code: 1 National Number: 4323012230\n",
"Is Possible: True\n",
"Is Valid: True\n",
"Country Code: 1 National Number: 4326314847\n",
"Is Possible: True\n",
"Is Valid: True\n",
"Country Code: 1 National Number: 4326311606\n",
"Is Possible: True\n",
"Is Valid: True\n",
"Country Code: 1 National Number: 4326311503\n",
"Is Possible: True\n",
"Is Valid: True\n",
"Country Code: 1 National Number: 4326409671\n",
"Is Possible: True\n",
"Is Valid: True\n"
]
}
],
"source": [
"for number in numbers:\n",
" n = phonenumbers.parse(number, None)\n",
" print(n)\n",
" print(f\"Is Possible: {phonenumbers.is_possible_number(n)}\")\n",
" print(f\"Is Valid: {phonenumbers.is_valid_number(n)}\")\n",
" #print(f\"Carrier: {carrier.name_for_number(n,\"en\")}\")\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "phoneValidator",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.0"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long