From f89cd46e96765014dcf33d17c31f8f0b32cce26b Mon Sep 17 00:00:00 2001 From: Patrick McDonagh Date: Tue, 22 May 2018 13:11:58 -0500 Subject: [PATCH] Initial Commit --- .gitignore | 2 ++ app.yaml | 10 ++++++++++ firebase_interface.py | 32 ++++++++++++++++++++++++++++++++ mqtt_reader.py | 37 +++++++++++++++++++++++++++++++++++++ requirements.txt | 14 ++++++++++++++ 5 files changed, 95 insertions(+) create mode 100644 .gitignore create mode 100644 app.yaml create mode 100644 firebase_interface.py create mode 100644 mqtt_reader.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6f8b672 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +env/* +*.pyc diff --git a/app.yaml b/app.yaml new file mode 100644 index 0000000..3ff1ced --- /dev/null +++ b/app.yaml @@ -0,0 +1,10 @@ +runtime: python +entrypoint: python mqtt_reader.py +env: flex + +manual_scaling: + instances: 1 +resources: + cpu: 1 + memory_gb: 0.5 + disk_size_gb: 10 diff --git a/firebase_interface.py b/firebase_interface.py new file mode 100644 index 0000000..6e46676 --- /dev/null +++ b/firebase_interface.py @@ -0,0 +1,32 @@ +"""Sample of wrapper for meshify data to firebase.""" + +import time +from firebase import firebase + +BASE_URL = 'https://pocloud-ff2c9.firebaseio.com' +fbdb = firebase.FirebaseApplication(BASE_URL, None) + +def set_firebase_channel(deviceId, device_type, channel, value, timestamp): + """Sets the value of a firebase channel.""" + if timestamp == 0: + timestamp = time.time() + channel_value = { + 'name': channel, + 'value': value, + 'timestamp': timestamp + } + + channels_url = "/devices/{}/{}/channels/{}".format(deviceId, device_type, channel) + chan_result = fbdb.patch(channels_url, channel_value, + connection=None, + params={'print': 'pretty'}, + headers={'X_FANCY_HEADER': 'VERY FANCY'}) + print(chan_result) + + + history_url = "/devices/{}/{}/history/{}".format(deviceId, device_type, channel) + result = fbdb.post(history_url, channel_value, + connection=None, + params={'print': 'pretty'}, + headers={'X_FANCY_HEADER': 'VERY FANCY'}) + print(result) diff --git a/mqtt_reader.py b/mqtt_reader.py new file mode 100644 index 0000000..fa9b0db --- /dev/null +++ b/mqtt_reader.py @@ -0,0 +1,37 @@ +import paho.mqtt.client as paho +from firebase_interface import set_firebase_channel +import json + +# The callback for when the client receives a CONNACK response from the server. +def on_connect(client, userdata, flags, rc): + print("Connected with result code "+str(rc)) + + # Subscribing in on_connect() means that if we lose the connection and + # reconnect then subscriptions will be renewed. + client.subscribe("meshify/db/194/#") + +# The callback for when a PUBLISH message is received from the server. +def on_message(client, userdata, msg): + print(msg.topic+" "+str(msg.payload)) + payload = json.loads(msg.payload) + topic = msg.topic.split("/") + device_id = topic[3] + device_type = topic[4].split("_")[0] + channel_name = topic[5] + value = payload[0]['value'] + timestamp = payload[0]['timestamp'] + set_firebase_channel(device_id, device_type, channel_name, value, timestamp) + +if __name__ == '__main__': + client = paho.Client() + client.username_pw_set("admin", "columbus") + client.on_connect = on_connect + client.on_message = on_message + + client.connect("mqtt.meshify.com", 1884, 120) + + # Blocking call that processes network traffic, dispatches callbacks and + # handles reconnecting. + # Other loop*() functions are available that give a threaded interface and a + # manual interface. + client.loop_forever() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..85eaf8c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,14 @@ +cachetools==2.1.0 +certifi==2018.4.16 +chardet==3.0.4 +idna==2.6 +paho-mqtt==1.3.1 +protobuf==3.5.2.post1 +pyasn1==0.4.2 +pyasn1-modules==0.2.1 +python-firebase==1.2 +pytz==2018.4 +requests==2.18.4 +rsa==3.4.2 +six==1.11.0 +urllib3==1.22