simplemqtt/collector.py
2018-04-12 13:51:07 +01:00

165 lines
8.5 KiB
Python

from datetime import datetime
import json
import logging
import paho.mqtt.client as mqtt
import time
from typing import Any, Callable, Dict, Iterator
log = logging.getLogger(__name__)
# Usage guidelines:
# you can import this package using
# import simplemqtt.collector
# or, alternatively, you can import the collector directly using
# from simplemqtt.collector import Collector
#
# Once imported, a collector is created simply by specifying the server information:
# collector = Collector(plex.local)
# Subsequently, sensors are added using the add_sensor(...) method:
# collector.add_sensor("sensors/1/+", "living room")
# Now the collector will collect all information retrieved from the sensor and store it for easy access:
# data = collector["living room"]
# Once all the sensors you want to collect are provided, connect to the MQTT server using:
# collector.connect()
class Collector:
def __init__(self, host: str, port: str, tls: bool):
self._host = host
self._port = port
self._tls = tls
self._sensors = []
self._stale_allowed = {}
self._online = {}
self._collected = {}
self._client = mqtt.Client()
log.info("New %s object created to connect to %s:%s and TLS support set to %s.",
Collector.__name__, self._host, self._port, self._tls)
def add_sensor(self, sensor: str, key: str, *,
stale: bool = False, max_hours: int = 0, same_day: bool = False) -> None:
""" Add the topic of a sensor, and the key to access its value.
:param sensor: the topic of the sensor to listen to.
:param key: the key to access the value of the sensor.
:param stale: flag indicating whether stale results are acceptable.
:param max_hours: (when stale is allowed) the maximum hours staleness to tolerate.
:param same_day: (when stale is allowed) whether stale info is only allowed on the same day. """
log.info("Adding sensor %s accessible through the key %s", sensor, key)
self._sensors.append((sensor, key))
self._stale_allowed[key] = (stale, max_hours, same_day)
def connect(self) -> None:
""" Start a non-blocking connection to the MQTT broker. This will also
ensure that callbacks for all the defined sensors are properly setup. """
if self._tls: # Enable TLS if supported by the broker
log.info("Enabling TLS for the MQTT broker.")
self._client.tls_set()
log.info("Asynchronously connecting to the MQTT broker at %s:%s.", self._host, self._port)
self._client.connect_async(self._host, self._port)
self._client.on_connect = self._on_connect
log.info("Creating the callbacks for each specified sensor.")
for sensor, key in self._sensors:
self._client.message_callback_add(sensor, self._create_handler(key))
log.info("Asynchronously starting the connection to the MQTT broker.")
self._client.loop_start()
def wait_all_online(self, *, timeout: Iterator[int] = None, indefinitely: bool = False, interval: int = 5):
""" Wait for all sensors to come online. To avoid waiting indefinitely, a list of timeouts seconds
can be provided which will consecutively be used to wait (longer and longer) for the sensors
to come online. Once the sensors are online, or once the timeouts have expired, this method
will return with a Boolean flag to indicate whether the sensors are now indeed online.
:param timeout: an iterator of (increasing) waiting in seconds. Defaults to [1, 5, 10, 15]
:param indefinitely: a Boolean flag indicating whether we should wait indefinitely.
:param interval: the polling interval when waiting indefinitely.
:return: a Boolean flag to indicate whether the sensors are now online. """
timeout = timeout if timeout else [1, 5, 10, 15]
timeout_step = 0
while not self.all_online():
log.debug("Not all sensors are online, or some sensors do not have a value loaded. Waiting ...")
if not indefinitely:
if timeout_step < len(timeout):
time.sleep(timeout[timeout_step])
timeout_step += 1
else:
time.sleep(interval)
return self.all_online()
def all_online(self) -> bool:
""" Verify whether all the subscribed sensors are available and providing values.
:return: True if every sensor reports as online and has a value loaded. False otherwise. """
online = all([self._online.get(key, False) for _, key in self._sensors])
return all([online, *[key in self._collected for _, key in self._sensors]])
def some_online(self) -> bool:
""" Verify whether some of the subscribed sensors are available and providing values.
:return: True if at least one sensor reports as online and has a value loaded. False otherwise. """
return any([self._online.get(key, False) and key in self._collected for _, key in self._sensors])
def is_online(self, key: str) -> bool:
""" Verify if the sensor with the given key is available and providing values.
:param key: the key of the sensor to check.
:return: True if the sensor reports as online and has a value loaded. False otherwise. """
return self._online.get(key, False) and key in self._collected
def all_reporting(self) -> bool:
""" Verify whether all the subscribed sensors are providing values.
:return: True if every sensor has a (potentially stale) value loaded. False otherwise. """
return all([key in self._collected for _, key in self._sensors])
def is_reporting(self, key: str) -> bool:
""" Verify if the sensor with the given key is providing values.
:param key: the key of the sensor to check.
:return: True if the sensor has a (potentially stale) value loaded. False otherwise. """
return key in self._collected
def __getitem__(self, key: str) -> Any:
""" Return the item associated with the key. As desired, also stale values can be retrieved.
:param key: the key of the item to retrieve.
:return: The item associated with the provided key, or None when (too) stale or not found. """
result = None
if key in self._online and self._online[key] is True:
result = self._collected.get(key, None)
elif key in self._collected:
stale, max_hours, same_day = self._stale_allowed.get(key, (False, 0, False))
if stale:
last_updated = self._collected[key]["lastupdated"]
if last_updated: # verify that we definitely have the information on the last update time
datetime_key = datetime.strptime(last_updated, '%Y-%m-%dT%H-%M-%S')
datetime_now = datetime.utcnow()
difference = datetime_now - datetime_key # determine difference between now and last update
if difference.days == 0 and (difference.seconds / 3600) < max_hours: # check the conditions
if same_day is False or datetime_key.date() == datetime_now.date():
result = self._collected[key]
return result
def __iter__(self) -> Iterator:
""" Retrieve an iterator over the keys that are being collected.
:return: an iterator over the keys that are being collected. """
return iter(list(self._online.keys()))
def _on_connect(self, mqtt_client: mqtt.Client, _userdata: Any, _flags: Dict, _rc: int) -> None:
log.info("(Re)connecting with the MQTT server, subscribing to all topics with QoS=0.")
mqtt_client.subscribe([(sensor, 0) for sensor, _ in self._sensors])
def _process(self, message: mqtt.MQTTMessage, key: str) -> None:
topic = message.topic
payload = message.payload.decode("utf-8")
if topic.endswith("/status"):
self._online[key] = payload == "on"
log.debug("The status of %s is %s.", topic, payload == "on")
if payload == "off" and key in self._collected:
del self._collected[key]
elif topic.endswith("/value"):
self._collected[key] = json.loads(payload)
log.debug("Changed the value of %s to %s.", topic, json.loads(payload))
def _create_handler(self, key: str) -> Callable[[mqtt.Client, Any, mqtt.MQTTMessage], None]:
def handler(_client: mqtt.Client, _userdata: Any, message: mqtt.MQTTMessage):
self._process(message, key)
return handler