simplemqtt/collector.py

213 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from dataclasses import dataclass
from datetime import datetime
import json
import logging
import paho.mqtt.client as mqtt
import time
from multiprocessing import Process
from typing import Any, Callable, Dict, Iterator
log = logging.getLogger(__name__)
# Python 3.7+
# Usage guidelines:
# you can import this package using
# import simplemqtt.collector
# or, alternatively, you can import the collector directly using
# from simplemqtt.collector import Collector
#
# Once imported, a collector is created simply by specifying the server information:
# collector = Collector(plex.local)
# Subsequently, sensors are added using the add_sensor(...) method:
# collector.add_sensor("sensors/1/+", "living room")
# Now the collector will collect all information retrieved from the sensor and store it for easy access:
# data = collector["living room"]
# Once all the sensors you want to collect are provided, connect to the MQTT server using:
# collector.connect()
@dataclass
class Sensor:
topic: str
key: str
@dataclass
class StaleState:
dumb: bool
stale: bool
same_day: bool
max_hours: int
@classmethod
def default(cls):
return StaleState(False, False, False, 0)
class Collector:
def __init__(self, host: str, port: str, tls: bool):
""" Create a new collector and initialise all defaults values.
:param host: the MQTT host address
:param port: the MQTT port number
:param tls: whether or not to use TLS to connect to the MQTT host """
self._client = mqtt.Client()
# save the information passed along on the MQTT server
self._host = host
self._port = port
self._tls = tls
# store defaults for the other internal information
self._sensors = [] # keep track of each sensor's topic and user identified key as (topic, key)
self._stale_allowed = {} # mark whether a given key is allowed to go stale and its parameters
self._online = {} # mark whether a given key is registered as online
self._collected = {} # keep track of collected information on a topic
# bookkeeping to deal with callbacks on value changes
self._previous_process = None
self._current_process = None
self.callback = self._callback
# log the status of the collector
log.info("New %s object created to connect to %s:%s and TLS support set to %s.",
Collector.__name__, self._host, self._port, self._tls)
def add_sensor(self, sensor: str, key: str, *,
stale: bool = False, max_hours: int = 0, same_day: bool = False,
dumb: bool = False) -> None:
""" Add the topic of a sensor, and the key to access its value.
:param sensor: the topic of the sensor to listen to.
:param key: the key to access the value of the sensor.
:param stale: flag indicates whether stale (old) results are acceptable.
:param max_hours: (when stale is allowed) the maximum hours staleness to tolerate.
:param same_day: flag (when stale is allowed) whether stale info is only allowed on the same day.
:param dumb: flag - a dumb source does not have any availability information enforces unlimited stale"""
log.info("Adding sensor %s accessible through the key %s", sensor, key)
self._sensors.append(Sensor(topic=sensor, key=key))
self._stale_allowed[key] = StaleState(dumb, stale, same_day, max_hours)
def connect(self) -> None:
""" Start a non-blocking connection to the MQTT broker.
Also ensure that callbacks for all the defined sensors are properly set up. """
if self._tls: # Enable TLS if supported by the broker
log.info("Enabling TLS for the MQTT broker.")
self._client.tls_set()
log.info("Asynchronously connecting to the MQTT broker at %s:%s.", self._host, self._port)
self._client.connect_async(self._host, self._port)
self._client.on_connect = self._on_connect
log.info("Creating the callbacks for each specified sensor.")
for sensor in self._sensors:
self._client.message_callback_add(sensor.topic, self._create_handler(sensor.key))
self._online[sensor.key] = True
log.info("Asynchronously starting the connection to the MQTT broker.")
self._client.loop_start()
def wait_all_online(self, *, timeout: Iterator[int] = None, indefinitely: bool = False, interval: int = 5):
""" Wait for all sensors to come online. To avoid waiting indefinitely, a list of timeouts seconds
can be provided which will consecutively be used to wait for the sensors to come online.
Once the sensors are online, or once the timeouts have expired, this method returns
a Boolean flag to indicate whether the sensors are now indeed online.
:param timeout: an iterator of (increasing) waiting in seconds. Defaults to [1, 5, 10, 15]
:param indefinitely: a Boolean flag indicating whether we should wait indefinitely.
:param interval: the polling interval when waiting indefinitely.
:return: a Boolean flag to indicate whether the sensors are now online. """
timeout = timeout if timeout else [1, 5, 10, 15]
timeout_step = 0
while not self.all_online():
log.debug("Not all sensors are online, or some sensors do not have a value loaded. Waiting ...")
if not indefinitely:
if timeout_step < len(timeout):
time.sleep(timeout[timeout_step])
timeout_step += 1
else:
time.sleep(interval)
return self.all_online()
def all_online(self) -> bool:
""" Verify whether all the subscribed sensors are available and providing values.
:return: True if every sensor reports as online and has a value loaded. False otherwise. """
online = all([self._online.get(sensor.key, False) for sensor in self._sensors])
return all([online, *[sensor.key in self._collected for sensor in self._sensors]])
def some_online(self) -> bool:
""" Verify whether some of the subscribed sensors are available and providing values.
:return: True if at least one sensor reports as online and has a value loaded. False otherwise. """
return any([self._online.get(sensor.key, False) and sensor.key in self._collected for sensor in self._sensors])
def is_online(self, key: str) -> bool:
""" Verify if the sensor with the given key is available and providing values.
:param key: the key of the sensor to check.
:return: True if the sensor reports as online and has a value loaded. False otherwise. """
return self._online.get(key, False) and key in self._collected
def all_reporting(self) -> bool:
""" Verify whether all the subscribed sensors are providing values.
:return: True if every sensor has a (potentially stale) value loaded. False otherwise. """
return all([sensor.key in self._collected for sensor in self._sensors])
def is_reporting(self, key: str) -> bool:
""" Verify if the sensor with the given key is providing values.
:param key: the key of the sensor to check.
:return: True if the sensor has a (potentially stale) value loaded. False otherwise. """
return key in self._collected
def __getitem__(self, key: str) -> Any:
""" Return the item associated with the key. As desired, also stale values can be retrieved.
:param key: the key of the item to retrieve.
:return: The item associated with the provided key, or None when (too) stale or not found. """
result = None
if key in self._online and self._online[key] is True:
# return the collected value if the sensor is online
result = self._collected.get(key, None)
elif key in self._collected:
# otherwise ...
stale_state = self._stale_allowed.get(key, StaleState.default())
if stale_state.dumb:
# no need to check the liveliness of the sensor, just give the result
result = self._collected[key]
elif stale_state.stale:
last_updated = self._collected[key]["lastupdated"]
if last_updated: # verify that we definitely have the information on the last update time
datetime_key = datetime.fromisoformat(last_updated)
datetime_now = datetime.utcnow()
difference = datetime_now - datetime_key # determine difference between now and last update
if difference.days == 0 and (difference.seconds / 3600) < stale_state.max_hours:
if stale_state.same_day is False or datetime_key.date() == datetime_now.date():
result = self._collected[key]
return result
def __iter__(self) -> Iterator:
""" Retrieve an iterator over the keys that are being collected.
:return: an iterator over the keys that are being collected. """
return iter(list(self._online.keys()))
def _on_connect(self, mqtt_client: mqtt.Client, _userdata: Any, _flags: Dict, _rc: int) -> None:
log.info("(Re)connecting with the MQTT server, subscribing to all topics with QoS=0.")
mqtt_client.subscribe([(sensor.topic, 0) for sensor in self._sensors])
def _process(self, message: mqtt.MQTTMessage, key: str) -> None:
topic = message.topic
payload = message.payload.decode("utf-8")
if topic.endswith("/status"):
self._online[key] = payload == "on"
log.debug("The status of %s is %s.", topic, payload == "on")
if payload == "off" and key in self._collected:
del self._collected[key]
elif topic.endswith("/value"):
self._collected[key] = json.loads(payload)
log.debug("Changed the value of %s to %s.", topic, json.loads(payload))
if self._previous_process:
self._previous_process.terminate()
self._previous_process = self._current_process
self._current_process = Process(target=self.callback)
self._current_process.start()
def _create_handler(self, key: str) -> Callable[[mqtt.Client, Any, mqtt.MQTTMessage], None]:
def handler(_client: mqtt.Client, _userdata: Any, message: mqtt.MQTTMessage):
self._process(message, key)
return handler
def _callback(self):
pass