diff --git a/collector.py b/collector.py index 9a0ef53..61798c7 100644 --- a/collector.py +++ b/collector.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass from datetime import datetime import json import logging @@ -7,6 +8,7 @@ from multiprocessing import Process from typing import Any, Callable, Dict, Iterator log = logging.getLogger(__name__) +# Python 3.7+ # Usage guidelines: # you can import this package using # import simplemqtt.collector @@ -23,19 +25,45 @@ log = logging.getLogger(__name__) # collector.connect() +@dataclass +class Sensor: + topic: str + key: str + + +@dataclass +class StaleState: + dumb: bool + stale: bool + same_day: bool + max_hours: int + + @classmethod + def default(cls): + return StaleState(False, False, False, 0) + + class Collector: def __init__(self, host: str, port: str, tls: bool): + """ Create a new collector and initialise all defaults values. + :param host: the MQTT host address + :param port: the MQTT port number + :param tls: whether or not to use TLS to connect to the MQTT host """ + self._client = mqtt.Client() + # save the information passed along on the MQTT server self._host = host self._port = port self._tls = tls - self._sensors = [] - self._stale_allowed = {} - self._online = {} - self._collected = {} - self._client = mqtt.Client() + # store defaults for the other internal information + self._sensors = [] # keep track of each sensor's topic and user identified key as (topic, key) + self._stale_allowed = {} # mark whether a given key is allowed to go stale and its parameters + self._online = {} # mark whether a given key is registered as online + self._collected = {} # keep track of collected information on a topic + # bookkeeping to deal with callbacks on value changes self._previous_process = None self._current_process = None self.callback = self._callback + # log the status of the collector log.info("New %s object created to connect to %s:%s and TLS support set to %s.", Collector.__name__, self._host, self._port, self._tls) @@ -45,17 +73,17 @@ class Collector: """ Add the topic of a sensor, and the key to access its value. :param sensor: the topic of the sensor to listen to. :param key: the key to access the value of the sensor. - :param stale: flag indicating whether stale results are acceptable. + :param stale: flag – indicates whether stale (old) results are acceptable. :param max_hours: (when stale is allowed) the maximum hours staleness to tolerate. - :param same_day: (when stale is allowed) whether stale info is only allowed on the same day. - :param dumb: whether this is a dumb source which does not have any availability information: enforces unlimited stale""" + :param same_day: flag – (when stale is allowed) whether stale info is only allowed on the same day. + :param dumb: flag - a dumb source does not have any availability information – enforces unlimited stale""" log.info("Adding sensor %s accessible through the key %s", sensor, key) - self._sensors.append((sensor, key)) - self._stale_allowed[key] = (stale, max_hours, same_day, dumb) + self._sensors.append(Sensor(topic=sensor, key=key)) + self._stale_allowed[key] = StaleState(dumb, stale, same_day, max_hours) def connect(self) -> None: - """ Start a non-blocking connection to the MQTT broker. This will also - ensure that callbacks for all the defined sensors are properly setup. """ + """ Start a non-blocking connection to the MQTT broker. + Also ensure that callbacks for all the defined sensors are properly set up. """ if self._tls: # Enable TLS if supported by the broker log.info("Enabling TLS for the MQTT broker.") self._client.tls_set() @@ -64,18 +92,18 @@ class Collector: self._client.on_connect = self._on_connect log.info("Creating the callbacks for each specified sensor.") - for sensor, key in self._sensors: - self._client.message_callback_add(sensor, self._create_handler(key)) - self._online[key] = True + for sensor in self._sensors: + self._client.message_callback_add(sensor.topic, self._create_handler(sensor.key)) + self._online[sensor.key] = True log.info("Asynchronously starting the connection to the MQTT broker.") self._client.loop_start() def wait_all_online(self, *, timeout: Iterator[int] = None, indefinitely: bool = False, interval: int = 5): """ Wait for all sensors to come online. To avoid waiting indefinitely, a list of timeouts seconds - can be provided which will consecutively be used to wait (longer and longer) for the sensors - to come online. Once the sensors are online, or once the timeouts have expired, this method - will return with a Boolean flag to indicate whether the sensors are now indeed online. + can be provided which will consecutively be used to wait for the sensors to come online. + Once the sensors are online, or once the timeouts have expired, this method returns + a Boolean flag to indicate whether the sensors are now indeed online. :param timeout: an iterator of (increasing) waiting in seconds. Defaults to [1, 5, 10, 15] :param indefinitely: a Boolean flag indicating whether we should wait indefinitely. :param interval: the polling interval when waiting indefinitely. @@ -97,13 +125,13 @@ class Collector: def all_online(self) -> bool: """ Verify whether all the subscribed sensors are available and providing values. :return: True if every sensor reports as online and has a value loaded. False otherwise. """ - online = all([self._online.get(key, False) for _, key in self._sensors]) - return all([online, *[key in self._collected for _, key in self._sensors]]) + online = all([self._online.get(sensor.key, False) for sensor in self._sensors]) + return all([online, *[sensor.key in self._collected for sensor in self._sensors]]) def some_online(self) -> bool: """ Verify whether some of the subscribed sensors are available and providing values. :return: True if at least one sensor reports as online and has a value loaded. False otherwise. """ - return any([self._online.get(key, False) and key in self._collected for _, key in self._sensors]) + return any([self._online.get(sensor.key, False) and sensor.key in self._collected for sensor in self._sensors]) def is_online(self, key: str) -> bool: """ Verify if the sensor with the given key is available and providing values. @@ -114,7 +142,7 @@ class Collector: def all_reporting(self) -> bool: """ Verify whether all the subscribed sensors are providing values. :return: True if every sensor has a (potentially stale) value loaded. False otherwise. """ - return all([key in self._collected for _, key in self._sensors]) + return all([sensor.key in self._collected for sensor in self._sensors]) def is_reporting(self, key: str) -> bool: """ Verify if the sensor with the given key is providing values. @@ -128,20 +156,22 @@ class Collector: :return: The item associated with the provided key, or None when (too) stale or not found. """ result = None if key in self._online and self._online[key] is True: + # return the collected value if the sensor is online result = self._collected.get(key, None) elif key in self._collected: - stale, max_hours, same_day, dumb = self._stale_allowed.get(key, (False, 0, False, False)) - if dumb: + # otherwise ... + stale_state = self._stale_allowed.get(key, StaleState.default()) + if stale_state.dumb: # no need to check the liveliness of the sensor, just give the result result = self._collected[key] - elif stale: + elif stale_state.stale: last_updated = self._collected[key]["lastupdated"] if last_updated: # verify that we definitely have the information on the last update time - datetime_key = datetime.strptime(last_updated, '%Y-%m-%dT%H-%M-%S') + datetime_key = datetime.fromisoformat(last_updated) datetime_now = datetime.utcnow() difference = datetime_now - datetime_key # determine difference between now and last update - if difference.days == 0 and (difference.seconds / 3600) < max_hours: # check the conditions - if same_day is False or datetime_key.date() == datetime_now.date(): + if difference.days == 0 and (difference.seconds / 3600) < stale_state.max_hours: + if stale_state.same_day is False or datetime_key.date() == datetime_now.date(): result = self._collected[key] return result @@ -152,12 +182,11 @@ class Collector: def _on_connect(self, mqtt_client: mqtt.Client, _userdata: Any, _flags: Dict, _rc: int) -> None: log.info("(Re)connecting with the MQTT server, subscribing to all topics with QoS=0.") - mqtt_client.subscribe([(sensor, 0) for sensor, _ in self._sensors]) + mqtt_client.subscribe([(sensor.topic, 0) for sensor in self._sensors]) def _process(self, message: mqtt.MQTTMessage, key: str) -> None: topic = message.topic - # hack the JSON: convert single quotes into double quotes as needed - payload = message.payload.decode("utf-8").replace("'", '"') + payload = message.payload.decode("utf-8") if topic.endswith("/status"): self._online[key] = payload == "on" diff --git a/publisher.py b/publisher.py index 1c2229c..121d868 100644 --- a/publisher.py +++ b/publisher.py @@ -2,7 +2,9 @@ import json from time import gmtime, strftime, sleep import paho.mqtt.client as mqtt import logging +from enum import Flag, auto from typing import Iterable, Iterator, Any, Tuple, Dict, Callable + log = logging.getLogger(__name__) # Usage guidelines: @@ -24,36 +26,34 @@ log = logging.getLogger(__name__) # Once done, this information can be pushed to the publisher: # publisher.publish(payload) -# Constants used to create a family of weather sensors. -# The exact version of the sensor is the sum of senses. -# This can be switched to enum.Flag as of Python 3.6. -TEMPERATURE_SENSE = 1 -LIGHTLEVEL_SENSE = 2 -HUMIDITY_SENSE = 4 -AIRQUALITY_SENSE = 8 -PRESSURE_SENSE = 16 -WINDSPEED_SENSE = 32 -WINDDIRECTION_SENSE = 64 -UVINDEX_SENSE = 128 -ICON_SENSE = 256 -ICON_3H_SENSE = 512 -ICON_6H_SENSE = 1024 -APPARENT_SENSE = 2048 -WINDGUSTS_SENSE = 4096 -CLOUDCOVER_SENSE = 8192 -RAIN_INTENSITY_SENSE = 16384 -RAIN_PROBABILITY_SENSE = 32768 -SOLAR_RADIATION_SENSE = 65536 +# Constants used to create a family of weather sensors. +class WeatherSensor(Flag): + TEMPERATURE = auto() + LIGHTLEVEL = auto() + HUMIDITY = auto() + AIRQUALITY = auto() + PRESSURE = auto() + WINDSPEED = auto() + WINDDIRECTION = auto() + UVINDEX = auto() + ICON = auto() + ICON_3H = auto() + ICON_6H = auto() + APPARENT = auto() + WINDGUSTS = auto() + CLOUDCOVER = auto() + RAIN_INTENSITY = auto() + RAIN_PROBABILITY = auto() + SOLAR_RADIATION = auto() # Constants used to create a family of light sensors. -# This can be switched to enum.Flag as of Python 3.6. - -ONOFF_LIGHT = 1 -BRIGHTNESS_LIGHT = 2 -TEMPERATURE_LIGHT = 4 -COLOUR_LIGHT = 8 +class LightSensor(Flag): + ONOFF = auto() + BRIGHTNESS = auto() + TEMPERATURE = auto() + COLOUR = auto() class Payload: @@ -62,7 +62,7 @@ class Payload: __slots__ = ("_publisher", "type", "payload", "_client", "_topic", "_changed") def __init__(self, publisher: "Publisher", payload_type: str, *, - keys: Iterable=None, presets: Iterable[Tuple[str, Any]]=None): + keys: Iterable = None, presets: Iterable[Tuple[str, Any]] = None): """ Provide the type of the payload, as well as other settable keys. :param publisher: the publisher to which to send the payload to. :param payload_type: a string with the type of the payload. @@ -88,8 +88,9 @@ class Payload: :raise KeyError: thrown when key was not defined on initialisation. """ if key not in self.payload: raise KeyError("Key '" + key + "' not available in this payload. " - "Available keys are: '" + ", ".join(self.payload.keys()) + "'.") + "Available keys are: '" + ", ".join(self.payload.keys()) + "'.") + # try and convert the value to JSON. # we don't want the full traceback, so we catch the error, and then rethrow # the error afterwards to hopefully get a cleaner, easier to understand TypeError value_problem = False @@ -104,7 +105,7 @@ class Payload: if value != self.payload[key]: self._changed = True self.payload[key] = value - self.payload["lastupdated"] = strftime("%Y-%m-%dT%H:%M:%S", gmtime()) + self.payload["lastupdated"] = gmtime().isoformat() @property def changed(self) -> bool: @@ -150,7 +151,7 @@ class Publisher: of the sensor which should be easy to read by humans. :param topic: the main topic to which the client will publish. :param description: human-readable description of the sensor. - :param retain: Boolean flag to indicate whether the values posted should be retained. """ + :param retain: Boolean flag to indicate whether the values posted should be retained, defaults to true. """ self._client = mqtt.Client() self._topic = topic self._description = description @@ -271,6 +272,8 @@ class Publisher: """ :param value: the callback function to trigger when connecting to the MQTT broker. """ self._on_connect = value + # MARK: - functions to quickly create the desired payload + def create_generic_sensor(self, keys: Iterable) -> Payload: """ Create the payload to be used with a generic sensor. :param keys: the keys to be associated with this generic sensor. @@ -278,72 +281,77 @@ class Publisher: for this type of generic sensor with the given keys. """ return Payload(self, "generic", keys=keys) - def create_weather_sensor(self, version: int) -> Payload: + def create_weather_sensor(self, version: WeatherSensor) -> Payload: """ Create the payload to be used with a weather sensor. - :param version: the version of the weather sensor. + :param version: the type of weather sensor to use. :return: a Payload object, which can be used to set the Payload for this type of weather sensor with the given version. """ - """The version of a sensor is given as a bit mask.""" - if version < 1 or version > 131071: - raise ValueError("Version type too low or too high.") - - senses = [(TEMPERATURE_SENSE, "temperature"), - (LIGHTLEVEL_SENSE, "lightlevel"), - (HUMIDITY_SENSE, "humidity"), - (AIRQUALITY_SENSE, "airquality"), - (PRESSURE_SENSE, "pressure"), - (WINDSPEED_SENSE, "windspeed"), - (WINDDIRECTION_SENSE, "winddirection"), - (UVINDEX_SENSE, "uvindex"), - (ICON_SENSE, "icon"), - (ICON_3H_SENSE, "icon_3h"), - (ICON_6H_SENSE, "icon_6h"), - (APPARENT_SENSE, "apparent_temperature"), - (WINDGUSTS_SENSE, "windgusts"), - (CLOUDCOVER_SENSE, "cloud_cover"), - (RAIN_INTENSITY_SENSE, "rain_intensity"), - (RAIN_PROBABILITY_SENSE, "rain_probability"), - (SOLAR_RADIATION_SENSE, "solar_radiation") - ] + log.info("Creating a weather sensor payload.") keys = [] - - for sense, key in senses: - if version & sense: - keys.append(key) + if WeatherSensor.TEMPERATURE in version: + keys.append("temperature") + if WeatherSensor.LIGHTLEVEL in version: + keys.append("lightlevel") + if WeatherSensor.HUMIDITY in version: + keys.append("humidity") + if WeatherSensor.AIRQUALITY in version: + keys.append("airquality") + if WeatherSensor.PRESSURE in version: + keys.append("pressure") + if WeatherSensor.WINDSPEED in version: + keys.append("windspeed") + if WeatherSensor.WINDDIRECTION in version: + keys.append("winddirection") + if WeatherSensor.UVINDEX in version: + keys.append("uvindex") + if WeatherSensor.ICON in version: + keys.append("icon") + if WeatherSensor.ICON_3H in version: + keys.append("icon_3h") + if WeatherSensor.ICON_6H in version: + keys.append("icon_6h") + if WeatherSensor.APPARENT in version: + keys.append("apparent_temperature") + if WeatherSensor.WINDGUSTS in version: + keys.append("windgusts") + if WeatherSensor.CLOUDCOVER in version: + keys.append("cloud_cover") + if WeatherSensor.RAIN_INTENSITY in version: + keys.append("rain_intensity") + if WeatherSensor.RAIN_PROBABILITY in version: + keys.append("rain_probability") + if WeatherSensor.SOLAR_RADIATION_SENSE in version: + keys.append("solar_radiation") return Payload(self, "weather", keys=keys, - presets=[("version", version)]) + presets=[("version", version.value)]) - def create_light_sensor(self, version: int) -> Payload: + def create_light_sensor(self, version: LightSensor) -> Payload: """ Create the payload to be used with a light sensor. :param version: the version of the light sensor. :return: a Payload object, which can be used to set the Payload for this type of light sensor with the given version. """ log.info("Creating a light sensor payload.") - """The version of a sensor is given as a bit mask.""" - if version < 1 or version > 8: - raise ValueError("Version type too low or too high.") - - lights = [(ONOFF_LIGHT, ["on", "reachable"]), - (BRIGHTNESS_LIGHT, ["brightness"]), - (TEMPERATURE_LIGHT, ["ct"]), - (COLOUR_LIGHT, ["hue", "saturation", "xy"])] keys = [] - - for light, key in lights: - if version >= light: - keys.extend(key) + if LightSensor.ONOFF in version: + keys.extend(["on", "reachable"]) + if LightSensor.BRIGHTNESS_LIGHT in version: + keys.extend(["brightness"]) + if LightSensor.TEMPERATURE_LIGHT in version: + keys.extend(["ct"]) + if LightSensor.COLOUR_LIGHT in version: + keys.extend(["hue", "saturation", "xy"]) return Payload(self, "light", keys=keys, - presets=[("version", version)]) + presets=[("version", version.value)]) def create_onoff_sensor(self) -> Payload: """ Create the Payload object for a basic on/off sensor. :return: a Payload object with a single "on" key. """ return Payload(self, "onoff", keys=["on"]) - + def create_trigger_sensor(self) -> Payload: """ Create the Payload object for a basic trigger sensor. :return: a Payload object with a single "triggered" key. """