165 lines
8.5 KiB
Python
165 lines
8.5 KiB
Python
from datetime import datetime
|
|
import json
|
|
import logging
|
|
import paho.mqtt.client as mqtt
|
|
import time
|
|
from typing import Any, Callable, Dict, Iterator
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Usage guidelines:
|
|
# you can import this package using
|
|
# import simplemqtt.collector
|
|
# or, alternatively, you can import the collector directly using
|
|
# from simplemqtt.collector import Collector
|
|
#
|
|
# Once imported, a collector is created simply by specifying the server information:
|
|
# collector = Collector(plex.local)
|
|
# Subsequently, sensors are added using the add_sensor(...) method:
|
|
# collector.add_sensor("sensors/1/+", "living room")
|
|
# Now the collector will collect all information retrieved from the sensor and store it for easy access:
|
|
# data = collector["living room"]
|
|
# Once all the sensors you want to collect are provided, connect to the MQTT server using:
|
|
# collector.connect()
|
|
|
|
|
|
class Collector:
|
|
def __init__(self, host: str, port: str, tls: bool):
|
|
self._host = host
|
|
self._port = port
|
|
self._tls = tls
|
|
self._sensors = []
|
|
self._stale_allowed = {}
|
|
self._online = {}
|
|
self._collected = {}
|
|
self._client = mqtt.Client()
|
|
log.info("New %s object created to connect to %s:%s and TLS support set to %s.",
|
|
Collector.__name__, self._host, self._port, self._tls)
|
|
|
|
def add_sensor(self, sensor: str, key: str, *,
|
|
stale: bool = False, max_hours: int = 0, same_day: bool = False) -> None:
|
|
""" Add the topic of a sensor, and the key to access its value.
|
|
:param sensor: the topic of the sensor to listen to.
|
|
:param key: the key to access the value of the sensor.
|
|
:param stale: flag indicating whether stale results are acceptable.
|
|
:param max_hours: (when stale is allowed) the maximum hours staleness to tolerate.
|
|
:param same_day: (when stale is allowed) whether stale info is only allowed on the same day. """
|
|
log.info("Adding sensor %s accessible through the key %s", sensor, key)
|
|
self._sensors.append((sensor, key))
|
|
self._stale_allowed[key] = (stale, max_hours, same_day)
|
|
|
|
def connect(self) -> None:
|
|
""" Start a non-blocking connection to the MQTT broker. This will also
|
|
ensure that callbacks for all the defined sensors are properly setup. """
|
|
if self._tls: # Enable TLS if supported by the broker
|
|
log.info("Enabling TLS for the MQTT broker.")
|
|
self._client.tls_set()
|
|
log.info("Asynchronously connecting to the MQTT broker at %s:%s.", self._host, self._port)
|
|
self._client.connect_async(self._host, self._port)
|
|
self._client.on_connect = self._on_connect
|
|
|
|
log.info("Creating the callbacks for each specified sensor.")
|
|
for sensor, key in self._sensors:
|
|
self._client.message_callback_add(sensor, self._create_handler(key))
|
|
|
|
log.info("Asynchronously starting the connection to the MQTT broker.")
|
|
self._client.loop_start()
|
|
|
|
def wait_all_online(self, *, timeout: Iterator[int] = None, indefinitely: bool = False, interval: int = 5):
|
|
""" Wait for all sensors to come online. To avoid waiting indefinitely, a list of timeouts seconds
|
|
can be provided which will consecutively be used to wait (longer and longer) for the sensors
|
|
to come online. Once the sensors are online, or once the timeouts have expired, this method
|
|
will return with a Boolean flag to indicate whether the sensors are now indeed online.
|
|
:param timeout: an iterator of (increasing) waiting in seconds. Defaults to [1, 5, 10, 15]
|
|
:param indefinitely: a Boolean flag indicating whether we should wait indefinitely.
|
|
:param interval: the polling interval when waiting indefinitely.
|
|
:return: a Boolean flag to indicate whether the sensors are now online. """
|
|
timeout = timeout if timeout else [1, 5, 10, 15]
|
|
|
|
timeout_step = 0
|
|
while not self.all_online():
|
|
log.debug("Not all sensors are online, or some sensors do not have a value loaded. Waiting ...")
|
|
if not indefinitely:
|
|
if timeout_step < len(timeout):
|
|
time.sleep(timeout[timeout_step])
|
|
timeout_step += 1
|
|
else:
|
|
time.sleep(interval)
|
|
|
|
return self.all_online()
|
|
|
|
def all_online(self) -> bool:
|
|
""" Verify whether all the subscribed sensors are available and providing values.
|
|
:return: True if every sensor reports as online and has a value loaded. False otherwise. """
|
|
online = all([self._online.get(key, False) for _, key in self._sensors])
|
|
return all([online, *[key in self._collected for _, key in self._sensors]])
|
|
|
|
def some_online(self) -> bool:
|
|
""" Verify whether some of the subscribed sensors are available and providing values.
|
|
:return: True if at least one sensor reports as online and has a value loaded. False otherwise. """
|
|
return any([self._online.get(key, False) and key in self._collected for _, key in self._sensors])
|
|
|
|
def is_online(self, key: str) -> bool:
|
|
""" Verify if the sensor with the given key is available and providing values.
|
|
:param key: the key of the sensor to check.
|
|
:return: True if the sensor reports as online and has a value loaded. False otherwise. """
|
|
return self._online.get(key, False) and key in self._collected
|
|
|
|
def all_reporting(self) -> bool:
|
|
""" Verify whether all the subscribed sensors are providing values.
|
|
:return: True if every sensor has a (potentially stale) value loaded. False otherwise. """
|
|
return all([key in self._collected for _, key in self._sensors])
|
|
|
|
def is_reporting(self, key: str) -> bool:
|
|
""" Verify if the sensor with the given key is providing values.
|
|
:param key: the key of the sensor to check.
|
|
:return: True if the sensor has a (potentially stale) value loaded. False otherwise. """
|
|
return key in self._collected
|
|
|
|
def __getitem__(self, key: str) -> Any:
|
|
""" Return the item associated with the key. As desired, also stale values can be retrieved.
|
|
:param key: the key of the item to retrieve.
|
|
:return: The item associated with the provided key, or None when (too) stale or not found. """
|
|
result = None
|
|
if key in self._online and self._online[key] is True:
|
|
result = self._collected.get(key, None)
|
|
elif key in self._collected:
|
|
stale, max_hours, same_day = self._stale_allowed.get(key, (False, 0, False))
|
|
if stale:
|
|
last_updated = self._collected[key]["lastupdated"]
|
|
if last_updated: # verify that we definitely have the information on the last update time
|
|
datetime_key = datetime.strptime(last_updated, '%Y-%m-%dT%H-%M-%S')
|
|
datetime_now = datetime.utcnow()
|
|
difference = datetime_now - datetime_key # determine difference between now and last update
|
|
if difference.days == 0 and (difference.seconds / 3600) < max_hours: # check the conditions
|
|
if same_day is False or datetime_key.date() == datetime_now.date():
|
|
result = self._collected[key]
|
|
return result
|
|
|
|
def __iter__(self) -> Iterator:
|
|
""" Retrieve an iterator over the keys that are being collected.
|
|
:return: an iterator over the keys that are being collected. """
|
|
return iter(list(self._online.keys()))
|
|
|
|
def _on_connect(self, mqtt_client: mqtt.Client, _userdata: Any, _flags: Dict, _rc: int) -> None:
|
|
log.info("(Re)connecting with the MQTT server, subscribing to all topics with QoS=0.")
|
|
mqtt_client.subscribe([(sensor, 0) for sensor, _ in self._sensors])
|
|
|
|
def _process(self, message: mqtt.MQTTMessage, key: str) -> None:
|
|
topic = message.topic
|
|
payload = message.payload.decode("utf-8")
|
|
|
|
if topic.endswith("/status"):
|
|
self._online[key] = payload == "on"
|
|
log.debug("The status of %s is %s.", topic, payload == "on")
|
|
if payload == "off" and key in self._collected:
|
|
del self._collected[key]
|
|
elif topic.endswith("/value"):
|
|
self._collected[key] = json.loads(payload)
|
|
log.debug("Changed the value of %s to %s.", topic, json.loads(payload))
|
|
|
|
def _create_handler(self, key: str) -> Callable[[mqtt.Client, Any, mqtt.MQTTMessage], None]:
|
|
def handler(_client: mqtt.Client, _userdata: Any, message: mqtt.MQTTMessage):
|
|
self._process(message, key)
|
|
|
|
return handler
|