diff --git a/collector.py b/collector.py index 20b57a3..9a0ef53 100644 --- a/collector.py +++ b/collector.py @@ -3,6 +3,7 @@ import json import logging import paho.mqtt.client as mqtt import time +from multiprocessing import Process from typing import Any, Callable, Dict, Iterator log = logging.getLogger(__name__) @@ -32,20 +33,25 @@ class Collector: self._online = {} self._collected = {} self._client = mqtt.Client() + self._previous_process = None + self._current_process = None + self.callback = self._callback log.info("New %s object created to connect to %s:%s and TLS support set to %s.", Collector.__name__, self._host, self._port, self._tls) def add_sensor(self, sensor: str, key: str, *, - stale: bool = False, max_hours: int = 0, same_day: bool = False) -> None: + stale: bool = False, max_hours: int = 0, same_day: bool = False, + dumb: bool = False) -> None: """ Add the topic of a sensor, and the key to access its value. :param sensor: the topic of the sensor to listen to. :param key: the key to access the value of the sensor. :param stale: flag indicating whether stale results are acceptable. :param max_hours: (when stale is allowed) the maximum hours staleness to tolerate. - :param same_day: (when stale is allowed) whether stale info is only allowed on the same day. """ + :param same_day: (when stale is allowed) whether stale info is only allowed on the same day. + :param dumb: whether this is a dumb source which does not have any availability information: enforces unlimited stale""" log.info("Adding sensor %s accessible through the key %s", sensor, key) self._sensors.append((sensor, key)) - self._stale_allowed[key] = (stale, max_hours, same_day) + self._stale_allowed[key] = (stale, max_hours, same_day, dumb) def connect(self) -> None: """ Start a non-blocking connection to the MQTT broker. This will also @@ -60,6 +66,7 @@ class Collector: log.info("Creating the callbacks for each specified sensor.") for sensor, key in self._sensors: self._client.message_callback_add(sensor, self._create_handler(key)) + self._online[key] = True log.info("Asynchronously starting the connection to the MQTT broker.") self._client.loop_start() @@ -123,8 +130,11 @@ class Collector: if key in self._online and self._online[key] is True: result = self._collected.get(key, None) elif key in self._collected: - stale, max_hours, same_day = self._stale_allowed.get(key, (False, 0, False)) - if stale: + stale, max_hours, same_day, dumb = self._stale_allowed.get(key, (False, 0, False, False)) + if dumb: + # no need to check the liveliness of the sensor, just give the result + result = self._collected[key] + elif stale: last_updated = self._collected[key]["lastupdated"] if last_updated: # verify that we definitely have the information on the last update time datetime_key = datetime.strptime(last_updated, '%Y-%m-%dT%H-%M-%S') @@ -146,7 +156,8 @@ class Collector: def _process(self, message: mqtt.MQTTMessage, key: str) -> None: topic = message.topic - payload = message.payload.decode("utf-8") + # hack the JSON: convert single quotes into double quotes as needed + payload = message.payload.decode("utf-8").replace("'", '"') if topic.endswith("/status"): self._online[key] = payload == "on" @@ -156,9 +167,17 @@ class Collector: elif topic.endswith("/value"): self._collected[key] = json.loads(payload) log.debug("Changed the value of %s to %s.", topic, json.loads(payload)) + if self._previous_process: + self._previous_process.terminate() + self._previous_process = self._current_process + self._current_process = Process(target=self.callback) + self._current_process.start() def _create_handler(self, key: str) -> Callable[[mqtt.Client, Any, mqtt.MQTTMessage], None]: def handler(_client: mqtt.Client, _userdata: Any, message: mqtt.MQTTMessage): self._process(message, key) return handler + + def _callback(self): + pass diff --git a/publisher.py b/publisher.py index e207863..1c2229c 100644 --- a/publisher.py +++ b/publisher.py @@ -39,6 +39,13 @@ UVINDEX_SENSE = 128 ICON_SENSE = 256 ICON_3H_SENSE = 512 ICON_6H_SENSE = 1024 +APPARENT_SENSE = 2048 +WINDGUSTS_SENSE = 4096 +CLOUDCOVER_SENSE = 8192 +RAIN_INTENSITY_SENSE = 16384 +RAIN_PROBABILITY_SENSE = 32768 +SOLAR_RADIATION_SENSE = 65536 + # Constants used to create a family of light sensors. # This can be switched to enum.Flag as of Python 3.6. @@ -277,7 +284,7 @@ class Publisher: :return: a Payload object, which can be used to set the Payload for this type of weather sensor with the given version. """ """The version of a sensor is given as a bit mask.""" - if version < 1 or version > 2047: + if version < 1 or version > 131071: raise ValueError("Version type too low or too high.") senses = [(TEMPERATURE_SENSE, "temperature"), @@ -290,7 +297,14 @@ class Publisher: (UVINDEX_SENSE, "uvindex"), (ICON_SENSE, "icon"), (ICON_3H_SENSE, "icon_3h"), - (ICON_6H_SENSE, "icon_6h")] + (ICON_6H_SENSE, "icon_6h"), + (APPARENT_SENSE, "apparent_temperature"), + (WINDGUSTS_SENSE, "windgusts"), + (CLOUDCOVER_SENSE, "cloud_cover"), + (RAIN_INTENSITY_SENSE, "rain_intensity"), + (RAIN_PROBABILITY_SENSE, "rain_probability"), + (SOLAR_RADIATION_SENSE, "solar_radiation") + ] keys = []