From 14f7e8bb529d23310f42f580d23e3f5e00691f9d Mon Sep 17 00:00:00 2001 From: Kim Bauters Date: Thu, 12 Apr 2018 13:51:07 +0100 Subject: [PATCH] first commit --- LICENSE | 24 ++++ README.md | 4 + __init__.py | 1 + collector.py | 164 +++++++++++++++++++++++++ publisher.py | 331 +++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 524 insertions(+) create mode 100644 LICENSE create mode 100644 README.md create mode 100644 __init__.py create mode 100644 collector.py create mode 100644 publisher.py diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..50c06e2 --- /dev/null +++ b/LICENSE @@ -0,0 +1,24 @@ +Copyright (c) 2017-2018, Kim Bauters +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of the nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..c4241c5 --- /dev/null +++ b/README.md @@ -0,0 +1,4 @@ +# SimpleMQTT + +The SimpleMQTT module is built on top of the [https://github.com/eclipse/paho.mqtt.python](Eclipse Paho MQTT Python client). +This is a Python 3 implementation of a (simple) AgentSpeak interpreter. It extends on the client by providing the *publisher* class, to simplify publishing to an MQTT server, and by providing the *collector* class, to simplify (mass-)collection of data from an MQTT server. \ No newline at end of file diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..8dff5e5 --- /dev/null +++ b/__init__.py @@ -0,0 +1 @@ +__all__ = ["publisher", "collector"] \ No newline at end of file diff --git a/collector.py b/collector.py new file mode 100644 index 0000000..20b57a3 --- /dev/null +++ b/collector.py @@ -0,0 +1,164 @@ +from datetime import datetime +import json +import logging +import paho.mqtt.client as mqtt +import time +from typing import Any, Callable, Dict, Iterator +log = logging.getLogger(__name__) + +# Usage guidelines: +# you can import this package using +# import simplemqtt.collector +# or, alternatively, you can import the collector directly using +# from simplemqtt.collector import Collector +# +# Once imported, a collector is created simply by specifying the server information: +# collector = Collector(plex.local) +# Subsequently, sensors are added using the add_sensor(...) method: +# collector.add_sensor("sensors/1/+", "living room") +# Now the collector will collect all information retrieved from the sensor and store it for easy access: +# data = collector["living room"] +# Once all the sensors you want to collect are provided, connect to the MQTT server using: +# collector.connect() + + +class Collector: + def __init__(self, host: str, port: str, tls: bool): + self._host = host + self._port = port + self._tls = tls + self._sensors = [] + self._stale_allowed = {} + self._online = {} + self._collected = {} + self._client = mqtt.Client() + log.info("New %s object created to connect to %s:%s and TLS support set to %s.", + Collector.__name__, self._host, self._port, self._tls) + + def add_sensor(self, sensor: str, key: str, *, + stale: bool = False, max_hours: int = 0, same_day: bool = False) -> None: + """ Add the topic of a sensor, and the key to access its value. + :param sensor: the topic of the sensor to listen to. + :param key: the key to access the value of the sensor. + :param stale: flag indicating whether stale results are acceptable. + :param max_hours: (when stale is allowed) the maximum hours staleness to tolerate. + :param same_day: (when stale is allowed) whether stale info is only allowed on the same day. """ + log.info("Adding sensor %s accessible through the key %s", sensor, key) + self._sensors.append((sensor, key)) + self._stale_allowed[key] = (stale, max_hours, same_day) + + def connect(self) -> None: + """ Start a non-blocking connection to the MQTT broker. This will also + ensure that callbacks for all the defined sensors are properly setup. """ + if self._tls: # Enable TLS if supported by the broker + log.info("Enabling TLS for the MQTT broker.") + self._client.tls_set() + log.info("Asynchronously connecting to the MQTT broker at %s:%s.", self._host, self._port) + self._client.connect_async(self._host, self._port) + self._client.on_connect = self._on_connect + + log.info("Creating the callbacks for each specified sensor.") + for sensor, key in self._sensors: + self._client.message_callback_add(sensor, self._create_handler(key)) + + log.info("Asynchronously starting the connection to the MQTT broker.") + self._client.loop_start() + + def wait_all_online(self, *, timeout: Iterator[int] = None, indefinitely: bool = False, interval: int = 5): + """ Wait for all sensors to come online. To avoid waiting indefinitely, a list of timeouts seconds + can be provided which will consecutively be used to wait (longer and longer) for the sensors + to come online. Once the sensors are online, or once the timeouts have expired, this method + will return with a Boolean flag to indicate whether the sensors are now indeed online. + :param timeout: an iterator of (increasing) waiting in seconds. Defaults to [1, 5, 10, 15] + :param indefinitely: a Boolean flag indicating whether we should wait indefinitely. + :param interval: the polling interval when waiting indefinitely. + :return: a Boolean flag to indicate whether the sensors are now online. """ + timeout = timeout if timeout else [1, 5, 10, 15] + + timeout_step = 0 + while not self.all_online(): + log.debug("Not all sensors are online, or some sensors do not have a value loaded. Waiting ...") + if not indefinitely: + if timeout_step < len(timeout): + time.sleep(timeout[timeout_step]) + timeout_step += 1 + else: + time.sleep(interval) + + return self.all_online() + + def all_online(self) -> bool: + """ Verify whether all the subscribed sensors are available and providing values. + :return: True if every sensor reports as online and has a value loaded. False otherwise. """ + online = all([self._online.get(key, False) for _, key in self._sensors]) + return all([online, *[key in self._collected for _, key in self._sensors]]) + + def some_online(self) -> bool: + """ Verify whether some of the subscribed sensors are available and providing values. + :return: True if at least one sensor reports as online and has a value loaded. False otherwise. """ + return any([self._online.get(key, False) and key in self._collected for _, key in self._sensors]) + + def is_online(self, key: str) -> bool: + """ Verify if the sensor with the given key is available and providing values. + :param key: the key of the sensor to check. + :return: True if the sensor reports as online and has a value loaded. False otherwise. """ + return self._online.get(key, False) and key in self._collected + + def all_reporting(self) -> bool: + """ Verify whether all the subscribed sensors are providing values. + :return: True if every sensor has a (potentially stale) value loaded. False otherwise. """ + return all([key in self._collected for _, key in self._sensors]) + + def is_reporting(self, key: str) -> bool: + """ Verify if the sensor with the given key is providing values. + :param key: the key of the sensor to check. + :return: True if the sensor has a (potentially stale) value loaded. False otherwise. """ + return key in self._collected + + def __getitem__(self, key: str) -> Any: + """ Return the item associated with the key. As desired, also stale values can be retrieved. + :param key: the key of the item to retrieve. + :return: The item associated with the provided key, or None when (too) stale or not found. """ + result = None + if key in self._online and self._online[key] is True: + result = self._collected.get(key, None) + elif key in self._collected: + stale, max_hours, same_day = self._stale_allowed.get(key, (False, 0, False)) + if stale: + last_updated = self._collected[key]["lastupdated"] + if last_updated: # verify that we definitely have the information on the last update time + datetime_key = datetime.strptime(last_updated, '%Y-%m-%dT%H-%M-%S') + datetime_now = datetime.utcnow() + difference = datetime_now - datetime_key # determine difference between now and last update + if difference.days == 0 and (difference.seconds / 3600) < max_hours: # check the conditions + if same_day is False or datetime_key.date() == datetime_now.date(): + result = self._collected[key] + return result + + def __iter__(self) -> Iterator: + """ Retrieve an iterator over the keys that are being collected. + :return: an iterator over the keys that are being collected. """ + return iter(list(self._online.keys())) + + def _on_connect(self, mqtt_client: mqtt.Client, _userdata: Any, _flags: Dict, _rc: int) -> None: + log.info("(Re)connecting with the MQTT server, subscribing to all topics with QoS=0.") + mqtt_client.subscribe([(sensor, 0) for sensor, _ in self._sensors]) + + def _process(self, message: mqtt.MQTTMessage, key: str) -> None: + topic = message.topic + payload = message.payload.decode("utf-8") + + if topic.endswith("/status"): + self._online[key] = payload == "on" + log.debug("The status of %s is %s.", topic, payload == "on") + if payload == "off" and key in self._collected: + del self._collected[key] + elif topic.endswith("/value"): + self._collected[key] = json.loads(payload) + log.debug("Changed the value of %s to %s.", topic, json.loads(payload)) + + def _create_handler(self, key: str) -> Callable[[mqtt.Client, Any, mqtt.MQTTMessage], None]: + def handler(_client: mqtt.Client, _userdata: Any, message: mqtt.MQTTMessage): + self._process(message, key) + + return handler diff --git a/publisher.py b/publisher.py new file mode 100644 index 0000000..4e7b83a --- /dev/null +++ b/publisher.py @@ -0,0 +1,331 @@ +import json +from time import gmtime, strftime, sleep +import paho.mqtt.client as mqtt +import logging +from typing import Iterable, Iterator, Any, Tuple, Dict, Callable +log = logging.getLogger(__name__) + +# Usage guidelines: +# you can import this package using +# import simplemqtt.publisher +# or, alternatively, you can import the publisher and all desired constants using +# from simplemqtt.publisher import Publisher, TEMPERATURE_SENSE, ONOFF_LIGHT +# +# Once imported, a publisher is created by specifying the topic and description: +# publisher = Publisher("/sensors/1", "My first light sensor.") +# And connected using: +# publisher.connect("mqtt.server") +# +# To publish information, the correct Payload needs to be created. This can be requested from the publisher: +# payload = publisher.create_light_sensor(ONOFF_LIGHT) +# This preloads the payload with the acceptable keys, which can immediately be set: +# payload["on"] = False +# payload["reachable"] = True +# Once done, this information can be pushed to the publisher: +# publisher.publish(payload) + +# Constants used to create a family of weather sensors. +# The exact version of the sensor is the sum of senses. +# This can be switched to enum.Flag as of Python 3.6. + +TEMPERATURE_SENSE = 1 +LIGHTLEVEL_SENSE = 2 +HUMIDITY_SENSE = 4 +AIRQUALITY_SENSE = 8 +PRESSURE_SENSE = 16 +WINDSPEED_SENSE = 32 +WINDDIRECTION_SENSE = 64 +UVINDEX_SENSE = 128 +ICON_SENSE = 256 +ICON_3H_SENSE = 512 +ICON_6H_SENSE = 1024 + +# Constants used to create a family of light sensors. +# This can be switched to enum.Flag as of Python 3.6. + +ONOFF_LIGHT = 1 +BRIGHTNESS_LIGHT = 2 +TEMPERATURE_LIGHT = 4 +COLOUR_LIGHT = 8 + + +class Payload: + """ Create an MQTT Payload by setting the desired fields using indexing + and retrieve the JSON-ified payload by converting it to a string. """ + __slots__ = ("_publisher", "type", "payload", "_client", "_topic", "_changed") + + def __init__(self, publisher: "Publisher", payload_type: str, *, + keys: Iterable=None, presets: Iterable[Tuple[str, Any]]=None): + """ Provide the type of the payload, as well as other settable keys. + :param publisher: the publisher to which to send the payload to. + :param payload_type: a string with the type of the payload. + :param keys: a list of strings, each a key that can be set. + :param presets: a list of key-value tuples to preset.""" + self._publisher = publisher + self._client = mqtt.Client() + self.type = payload_type + self.payload = dict() + self._changed = True + self.payload["type"] = payload_type + if keys: + for key in keys: + self.payload[key] = None + if presets: + for key, value in presets: + self.payload[key] = value + + def _set(self, key: str, value: Any = None) -> None: + """ Method to set the value of a given key defined on initialisation. + :param key: the key to set. + :param value: the value to associate with the key. + :raise KeyError: thrown when key was not defined on initialisation. """ + if key not in self.payload: + raise KeyError("Key '" + key + "' not available in this payload. " + "Available keys are: '" + ", ".join(self.payload.keys()) + "'.") + + # we don't want the full traceback, so we catch the error, and then rethrow + # the error afterwards to hopefully get a cleaner, easier to understand TypeError + value_problem = False + try: + json.dumps(value) + except TypeError as te: + value_problem = str(te) + + if value_problem: + raise TypeError("the value could not be turned into JSON: '" + value_problem + "'.") + + if value != self.payload[key]: + self._changed = True + self.payload[key] = value + self.payload["lastupdated"] = strftime("%Y-%m-%dT%H:%M:%S", gmtime()) + + @property + def changed(self) -> bool: + """ Flag when the contents of the payload have changed since the last time it was converted to a string. """ + return self._changed + + def update(self, d: dict) -> None: + for key, value in d.items(): + self[key] = value + + def publish(self, *, forced: bool = False) -> None: + self._publisher.publish(self, forced=forced) + + def __setitem__(self, key: str, value: Any): + """ Wrapper method for self._set(...) .""" + self._set(key, value) + + def __iter__(self) -> Iterator: + """ Retrieve an iterator over the keys of the payload. + :return: an iterator over the keys of this payload. """ + return iter(list(self.payload.keys())) + + def __str__(self) -> str: + """ When a string is requested, dump the payload as JSON. + :return: the payload in JSON format. """ + self._changed = False + return json.dumps(self.payload) + + +class Publisher: + """ An object of the class Publisher is able to connect to an MQTT broker, start + the connection, reconnect (automatically) and disconnect, and is able to + create specific types of sensors (objects from the Payload class). """ + __slots__ = ("_client", "_topic", "_description", "_retain", "_status", "_on_connect", "_signalled", "_online") + + _DESC = "/description" + _VALUE = "/value" + _STATUS = "/status" + + def __init__(self, topic: str, description: str, *, retain: bool = True): + """ Initialises a Publisher object by setting up the main topic + to which it will publish, as well as a basic description + of the sensor which should be easy to read by humans. + :param topic: the main topic to which the client will publish. + :param description: human-readable description of the sensor. + :param retain: Boolean flag to indicate whether the values posted should be retained. """ + self._client = mqtt.Client() + self._topic = topic + self._description = description + self._retain = retain + self._status = None + self._on_connect = None + self._online = False + log.info("Created a new MQTT client which will subscribe to %s.", self._topic) + + def connect(self, host: str, *, port: int = 1883, + tls: bool = False, auto_start: bool = True, + signal: bool = True) -> None: + """ Asynchronously connect to the MQTT broker and do some initial bookkeeping. + :param host: the host where the MQTT broker can be found. + :param port: the port used to access the MQTT broker. + :param tls: a Boolean flag to indicate whether the MQTT broker requires TLS. + :param auto_start: a Boolean flag to indicate whether to automatically start + the connection to the MQTT broker. If set to False, the + programmer should call start(...) to initiate the connection. + Alternatively, the programmer could call blocking_start(...) + to trigger a blocking start, which requires some bookkeeping + to be done by the programmer. + :param signal: a Boolean flag to indicate whether to automatically signal + the status of this sensor as "on" (the default). When omitted, + the programmer should call signal_status(...) to do so. """ + self._client.will_set(self._topic + Publisher._STATUS, "off", qos=1, retain=True) + if tls: # Enable TLS if supported by the broker + log.info("Enabling TLS for the MQTT broker.") + self._client.tls_set() + log.info("Asynchronously connecting to the MQTT broker at %s:%s.", host, port) + self._client.connect_async(host, port) + self._client.on_connect = self._reconnect + self._client.on_disconnect = self._disconnect + if auto_start: + log.debug("Triggering auto start.") + self.start(signal) + + def start(self, signal: bool = True) -> None: + """ Start the connection to the MQTT broker in a non-blocking way. + :param signal: a Boolean flag to indicate whether to automatically signal + the status of this sensor as "on" (the default). When omitted, + the programmer should call signal_status(...) to do so. """ + log.info("Starting the connection to the MQTT broker. in a non-blocking way.") + self._client.loop_start() + if signal: + log.info("Signalling the status of the sensor (defaults to 'on').") + self.signal_status() + log.info("Publishing the description of this sensor ('%s') as a retained message to %s.", + self._description, self._topic + Publisher._DESC) + while not self._online: + sleep(0.01) + self._client.publish(self._topic + Publisher._DESC, self._description, qos=1, retain=True) + + def blocking_start(self) -> None: + """ Start the connection to the MQTT broker in a blocking way. """ + log.info("Starting the connection to the MQTT broker in a blocking style.") + self._client.loop_forever() + + def signal_status(self, status: bool = True, *, forced: bool = False) -> None: + """ Signal a change in the status of this sensor. + :param status: the status to signal to the MQTT broker. + :param forced: whether or not to forcibly signal the status, even if + the (internal) status hasn't changed. This is particularly + useful in the case of a reconnect where a last will may + have been set, or when interfered by another sensor. """ + if forced or self._status != status: + status = self._status if forced else status + log.debug("Signalling the sensor status as %s.", status) + self._client.publish(self._topic + Publisher._STATUS, "on" if status else "off", qos=1, retain=True) + self._status = status + + def disconnect(self) -> None: + """ Disconnect from the MQTT broker. """ + log.info("Disconnecting from the MQTT broker.") + self._client.disconnect() + + def publish(self, payload: Payload, *, forced: bool = False) -> None: + """ Publish a payload for this sensor to the MQTT broker. + :param payload: the payload to be published, which should be an object + of the type Payload, a requirement that is enforced. + :param forced: whether the payload should be forcefully published. Otherwise, + the payload is only published when changes are detected. """ + if not self._online: + # raise EnvironmentError("Not connected to an MQTT server.") + log.debug("Not connected to an MQTT server. Entering waiting pattern ...") + while not self._online: + sleep(0.01) + if isinstance(payload, Payload): + if forced or payload.changed: + log.debug("Publishing '%s' to '%s'.", str(payload), self._topic + Publisher._VALUE) + self._client.publish(self._topic + Publisher._VALUE, retain=self._retain, payload=str(payload)) + else: + raise ValueError("The payload is not an object of the type Payload.") + + def _reconnect(self, mqtt_client: mqtt.Client, userdata: Any, flags: Dict, rc: int) -> None: + log.info("(Re)connected to the MQTT server.") + self._online = True + if self._status is not None: + log.debug("Forcing the sensor status to be reset to %s.", self._status) + self.signal_status(forced=True) + + if self._on_connect: + log.debug("") + self._on_connect(mqtt_client, userdata, flags, rc) + + def _disconnect(self, _mqtt_client: mqtt.Client, _userdata: Any, _rc: int) -> None: + log.info("Disconnected from the MQTT server.") + self._online = False + + @property + def on_connect(self): + """ :return: returns the callback function that has been set + to execute on connecting to the MQTT broker. """ + return self._on_connect + + @on_connect.setter + def on_connect(self, value: Callable[[mqtt.Client, Any, mqtt.MQTTMessage, int], None]) -> None: + """ :param value: the callback function to trigger when connecting to the MQTT broker. """ + self._on_connect = value + + def create_generic_sensor(self, keys: Iterable) -> Payload: + """ Create the payload to be used with a generic sensor. + :param keys: the keys to be associated with this generic sensor. + :return: a Payload object, which can be used to set the Payload + for this type of generic sensor with the given keys. """ + return Payload(self, "generic", keys=keys) + + def create_weather_sensor(self, version: int) -> Payload: + """ Create the payload to be used with a weather sensor. + :param version: the version of the weather sensor. + :return: a Payload object, which can be used to set the Payload + for this type of weather sensor with the given version. """ + """The version of a sensor is given as a bit mask.""" + if version < 1 or version > 2047: + raise ValueError("Version type too low or too high.") + + senses = [(TEMPERATURE_SENSE, "temperature"), + (LIGHTLEVEL_SENSE, "lightlevel"), + (HUMIDITY_SENSE, "humidity"), + (AIRQUALITY_SENSE, "airquality"), + (PRESSURE_SENSE, "pressure"), + (WINDSPEED_SENSE, "windspeed"), + (WINDDIRECTION_SENSE, "winddirection"), + (UVINDEX_SENSE, "uvindex"), + (ICON_SENSE, "icon"), + (ICON_3H_SENSE, "icon_3h"), + (ICON_6H_SENSE, "icon_6h")] + + keys = [] + + for sense, key in senses: + if version & sense: + keys.append(key) + + return Payload(self, "weather", keys=keys, + presets=[("version", version)]) + + def create_light_sensor(self, version: int) -> Payload: + """ Create the payload to be used with a light sensor. + :param version: the version of the light sensor. + :return: a Payload object, which can be used to set the Payload + for this type of light sensor with the given version. """ + log.info("Creating a light sensor payload.") + """The version of a sensor is given as a bit mask.""" + if version < 1 or version > 8: + raise ValueError("Version type too low or too high.") + + lights = [(ONOFF_LIGHT, ["on", "reachable"]), + (BRIGHTNESS_LIGHT, ["brightness"]), + (TEMPERATURE_LIGHT, ["ct"]), + (COLOUR_LIGHT, ["hue", "saturation", "xy"])] + + keys = [] + + for light, key in lights: + if version >= light: + keys.extend(key) + + return Payload(self, "light", keys=keys, + presets=[("version", version)]) + + def create_onoff_sensor(self) -> Payload: + """ Create the Payload object for a basic on/off sensor. + :return: a Payload object with a single "onoff" key. """ + return Payload(self, "onoff", keys=["on"])