from dataclasses import dataclass from datetime import datetime import json import logging import paho.mqtt.client as mqtt import time from multiprocessing import Process from typing import Any, Callable, Dict, Iterator log = logging.getLogger(__name__) # Python 3.7+ # Usage guidelines: # you can import this package using # import simplemqtt.collector # or, alternatively, you can import the collector directly using # from simplemqtt.collector import Collector # # Once imported, a collector is created simply by specifying the server information: # collector = Collector(plex.local) # Subsequently, sensors are added using the add_sensor(...) method: # collector.add_sensor("sensors/1/+", "living room") # Now the collector will collect all information retrieved from the sensor and store it for easy access: # data = collector["living room"] # Once all the sensors you want to collect are provided, connect to the MQTT server using: # collector.connect() @dataclass class Sensor: topic: str key: str @dataclass class StaleState: dumb: bool stale: bool same_day: bool max_hours: int @classmethod def default(cls): return StaleState(False, False, False, 0) class Collector: def __init__(self, host: str, port: str, tls: bool): """ Create a new collector and initialise all defaults values. :param host: the MQTT host address :param port: the MQTT port number :param tls: whether or not to use TLS to connect to the MQTT host """ self._client = mqtt.Client() # save the information passed along on the MQTT server self._host = host self._port = port self._tls = tls # store defaults for the other internal information self._sensors = [] # keep track of each sensor's topic and user identified key as (topic, key) self._stale_allowed = {} # mark whether a given key is allowed to go stale and its parameters self._online = {} # mark whether a given key is registered as online self._collected = {} # keep track of collected information on a topic # bookkeeping to deal with callbacks on value changes self._previous_process = None self._current_process = None self.callback = self._callback # log the status of the collector log.info("New %s object created to connect to %s:%s and TLS support set to %s.", Collector.__name__, self._host, self._port, self._tls) def add_sensor(self, sensor: str, key: str, *, stale: bool = False, max_hours: int = 0, same_day: bool = False, dumb: bool = False) -> None: """ Add the topic of a sensor, and the key to access its value. :param sensor: the topic of the sensor to listen to. :param key: the key to access the value of the sensor. :param stale: flag – indicates whether stale (old) results are acceptable. :param max_hours: (when stale is allowed) the maximum hours staleness to tolerate. :param same_day: flag – (when stale is allowed) whether stale info is only allowed on the same day. :param dumb: flag - a dumb source does not have any availability information – enforces unlimited stale""" log.info("Adding sensor %s accessible through the key %s", sensor, key) self._sensors.append(Sensor(topic=sensor, key=key)) self._stale_allowed[key] = StaleState(dumb, stale, same_day, max_hours) def connect(self) -> None: """ Start a non-blocking connection to the MQTT broker. Also ensure that callbacks for all the defined sensors are properly set up. """ if self._tls: # Enable TLS if supported by the broker log.info("Enabling TLS for the MQTT broker.") self._client.tls_set() log.info("Asynchronously connecting to the MQTT broker at %s:%s.", self._host, self._port) self._client.connect_async(self._host, self._port) self._client.on_connect = self._on_connect log.info("Creating the callbacks for each specified sensor.") for sensor in self._sensors: self._client.message_callback_add(sensor.topic, self._create_handler(sensor.key)) self._online[sensor.key] = True log.info("Asynchronously starting the connection to the MQTT broker.") self._client.loop_start() def wait_all_online(self, *, timeout: Iterator[int] = None, indefinitely: bool = False, interval: int = 5): """ Wait for all sensors to come online. To avoid waiting indefinitely, a list of timeouts seconds can be provided which will consecutively be used to wait for the sensors to come online. Once the sensors are online, or once the timeouts have expired, this method returns a Boolean flag to indicate whether the sensors are now indeed online. :param timeout: an iterator of (increasing) waiting in seconds. Defaults to [1, 5, 10, 15] :param indefinitely: a Boolean flag indicating whether we should wait indefinitely. :param interval: the polling interval when waiting indefinitely. :return: a Boolean flag to indicate whether the sensors are now online. """ timeout = timeout if timeout else [1, 5, 10, 15] timeout_step = 0 while not self.all_online(): log.debug("Not all sensors are online, or some sensors do not have a value loaded. Waiting ...") if not indefinitely: if timeout_step < len(timeout): time.sleep(timeout[timeout_step]) timeout_step += 1 else: time.sleep(interval) return self.all_online() def all_online(self) -> bool: """ Verify whether all the subscribed sensors are available and providing values. :return: True if every sensor reports as online and has a value loaded. False otherwise. """ online = all([self._online.get(sensor.key, False) for sensor in self._sensors]) return all([online, *[sensor.key in self._collected for sensor in self._sensors]]) def some_online(self) -> bool: """ Verify whether some of the subscribed sensors are available and providing values. :return: True if at least one sensor reports as online and has a value loaded. False otherwise. """ return any([self._online.get(sensor.key, False) and sensor.key in self._collected for sensor in self._sensors]) def is_online(self, key: str) -> bool: """ Verify if the sensor with the given key is available and providing values. :param key: the key of the sensor to check. :return: True if the sensor reports as online and has a value loaded. False otherwise. """ return self._online.get(key, False) and key in self._collected def all_reporting(self) -> bool: """ Verify whether all the subscribed sensors are providing values. :return: True if every sensor has a (potentially stale) value loaded. False otherwise. """ return all([sensor.key in self._collected for sensor in self._sensors]) def is_reporting(self, key: str) -> bool: """ Verify if the sensor with the given key is providing values. :param key: the key of the sensor to check. :return: True if the sensor has a (potentially stale) value loaded. False otherwise. """ return key in self._collected def __getitem__(self, key: str) -> Any: """ Return the item associated with the key. As desired, also stale values can be retrieved. :param key: the key of the item to retrieve. :return: The item associated with the provided key, or None when (too) stale or not found. """ result = None if key in self._online and self._online[key] is True: # return the collected value if the sensor is online result = self._collected.get(key, None) elif key in self._collected: # otherwise ... stale_state = self._stale_allowed.get(key, StaleState.default()) if stale_state.dumb: # no need to check the liveliness of the sensor, just give the result result = self._collected[key] elif stale_state.stale: last_updated = self._collected[key]["lastupdated"] if last_updated: # verify that we definitely have the information on the last update time datetime_key = datetime.fromisoformat(last_updated) datetime_now = datetime.utcnow() difference = datetime_now - datetime_key # determine difference between now and last update if difference.days == 0 and (difference.seconds / 3600) < stale_state.max_hours: if stale_state.same_day is False or datetime_key.date() == datetime_now.date(): result = self._collected[key] return result def __iter__(self) -> Iterator: """ Retrieve an iterator over the keys that are being collected. :return: an iterator over the keys that are being collected. """ return iter(list(self._online.keys())) def _on_connect(self, mqtt_client: mqtt.Client, _userdata: Any, _flags: Dict, _rc: int) -> None: log.info("(Re)connecting with the MQTT server, subscribing to all topics with QoS=0.") mqtt_client.subscribe([(sensor.topic, 0) for sensor in self._sensors]) def _process(self, message: mqtt.MQTTMessage, key: str) -> None: topic = message.topic payload = message.payload.decode("utf-8") if topic.endswith("/status"): self._online[key] = payload == "on" log.debug("The status of %s is %s.", topic, payload == "on") if payload == "off" and key in self._collected: del self._collected[key] elif topic.endswith("/value"): self._collected[key] = json.loads(payload) log.debug("Changed the value of %s to %s.", topic, json.loads(payload)) if self._previous_process: self._previous_process.terminate() self._previous_process = self._current_process self._current_process = Process(target=self.callback) self._current_process.start() def _create_handler(self, key: str) -> Callable[[mqtt.Client, Any, mqtt.MQTTMessage], None]: def handler(_client: mqtt.Client, _userdata: Any, message: mqtt.MQTTMessage): self._process(message, key) return handler def _callback(self): pass