simplemqtt/publisher.py

359 lines
16 KiB
Python

import json
from time import gmtime, strftime, sleep
import paho.mqtt.client as mqtt
import logging
from enum import Flag, auto
from typing import Iterable, Iterator, Any, Tuple, Dict, Callable
log = logging.getLogger(__name__)
# Usage guidelines:
# you can import this package using
# import simplemqtt.publisher
# or, alternatively, you can import the publisher and all desired constants using
# from simplemqtt.publisher import Publisher, TEMPERATURE_SENSE, ONOFF_LIGHT
#
# Once imported, a publisher is created by specifying the topic and description:
# publisher = Publisher("/sensors/1", "My first light sensor.")
# And connected using:
# publisher.connect("mqtt.server")
#
# To publish information, the correct Payload needs to be created. This can be requested from the publisher:
# payload = publisher.create_light_sensor(ONOFF_LIGHT)
# This preloads the payload with the acceptable keys, which can immediately be set:
# payload["on"] = False
# payload["reachable"] = True
# Once done, this information can be pushed to the publisher:
# publisher.publish(payload)
# Constants used to create a family of weather sensors.
class WeatherSensor(Flag):
TEMPERATURE = auto()
LIGHTLEVEL = auto()
HUMIDITY = auto()
AIRQUALITY = auto()
PRESSURE = auto()
WINDSPEED = auto()
WINDDIRECTION = auto()
UVINDEX = auto()
ICON = auto()
ICON_3H = auto()
ICON_6H = auto()
APPARENT = auto()
WINDGUSTS = auto()
CLOUDCOVER = auto()
RAIN_INTENSITY = auto()
RAIN_PROBABILITY = auto()
SOLAR_RADIATION = auto()
# Constants used to create a family of light sensors.
class LightSensor(Flag):
ONOFF = auto()
BRIGHTNESS = auto()
TEMPERATURE = auto()
COLOUR = auto()
class Payload:
""" Create an MQTT Payload by setting the desired fields using indexing
and retrieve the JSON-ified payload by converting it to a string. """
__slots__ = ("_publisher", "type", "payload", "_client", "_topic", "_changed")
def __init__(self, publisher: "Publisher", payload_type: str, *,
keys: Iterable = None, presets: Iterable[Tuple[str, Any]] = None):
""" Provide the type of the payload, as well as other settable keys.
:param publisher: the publisher to which to send the payload to.
:param payload_type: a string with the type of the payload.
:param keys: a list of strings, each a key that can be set.
:param presets: a list of key-value tuples to preset."""
self._publisher = publisher
self._client = mqtt.Client()
self.type = payload_type
self.payload = dict()
self._changed = True
self.payload["type"] = payload_type
if keys:
for key in keys:
self.payload[key] = None
if presets:
for key, value in presets:
self.payload[key] = value
def _set(self, key: str, value: Any = None) -> None:
""" Method to set the value of a given key defined on initialisation.
:param key: the key to set.
:param value: the value to associate with the key.
:raise KeyError: thrown when key was not defined on initialisation. """
if key not in self.payload:
raise KeyError("Key '" + key + "' not available in this payload. "
"Available keys are: '" + ", ".join(self.payload.keys()) + "'.")
# try and convert the value to JSON.
# we don't want the full traceback, so we catch the error, and then rethrow
# the error afterwards to hopefully get a cleaner, easier to understand TypeError
value_problem = False
try:
json.dumps(value)
except TypeError as te:
value_problem = str(te)
if value_problem:
raise TypeError("the value could not be turned into JSON: '" + value_problem + "'.")
if value != self.payload[key]:
self._changed = True
self.payload[key] = value
self.payload["lastupdated"] = gmtime().isoformat()
@property
def changed(self) -> bool:
""" Flag when the contents of the payload have changed since the last time it was converted to a string. """
return self._changed
def update(self, d: dict) -> None:
for key, value in d.items():
self[key] = value
def publish(self, *, forced: bool = False) -> None:
self._publisher.publish(self, forced=forced)
def __setitem__(self, key: str, value: Any):
""" Wrapper method for self._set(...) ."""
self._set(key, value)
def __iter__(self) -> Iterator:
""" Retrieve an iterator over the keys of the payload.
:return: an iterator over the keys of this payload. """
return iter(list(self.payload.keys()))
def __str__(self) -> str:
""" When a string is requested, dump the payload as JSON.
:return: the payload in JSON format. """
self._changed = False
return json.dumps(self.payload)
class Publisher:
""" An object of the class Publisher is able to connect to an MQTT broker, start
the connection, reconnect (automatically) and disconnect, and is able to
create specific types of sensors (objects from the Payload class). """
__slots__ = ("_client", "_topic", "_description", "_retain", "_status", "_on_connect", "_signalled", "_online")
_DESC = "/description"
_VALUE = "/value"
_STATUS = "/status"
def __init__(self, topic: str, description: str, *, retain: bool = True):
""" Initialises a Publisher object by setting up the main topic
to which it will publish, as well as a basic description
of the sensor which should be easy to read by humans.
:param topic: the main topic to which the client will publish.
:param description: human-readable description of the sensor.
:param retain: Boolean flag to indicate whether the values posted should be retained, defaults to true. """
self._client = mqtt.Client()
self._topic = topic
self._description = description
self._retain = retain
self._status = None
self._on_connect = None
self._online = False
log.info("Created a new MQTT client which will subscribe to %s.", self._topic)
def connect(self, host: str, *, port: int = 1883,
tls: bool = False, auto_start: bool = True,
signal: bool = True) -> None:
""" Asynchronously connect to the MQTT broker and do some initial bookkeeping.
:param host: the host where the MQTT broker can be found.
:param port: the port used to access the MQTT broker.
:param tls: a Boolean flag to indicate whether the MQTT broker requires TLS.
:param auto_start: a Boolean flag to indicate whether to automatically start
the connection to the MQTT broker. If set to False, the
programmer should call start(...) to initiate the connection.
Alternatively, the programmer could call blocking_start(...)
to trigger a blocking start, which requires some bookkeeping
to be done by the programmer.
:param signal: a Boolean flag to indicate whether to automatically signal
the status of this sensor as "on" (the default). When omitted,
the programmer should call signal_status(...) to do so. """
self._client.will_set(self._topic + Publisher._STATUS, "off", qos=1, retain=True)
if tls: # Enable TLS if supported by the broker
log.info("Enabling TLS for the MQTT broker.")
self._client.tls_set()
log.info("Asynchronously connecting to the MQTT broker at %s:%s.", host, port)
self._client.connect_async(host, port)
self._client.on_connect = self._reconnect
self._client.on_disconnect = self._disconnect
if auto_start:
log.debug("Triggering auto start.")
self.start(signal)
def start(self, signal: bool = True) -> None:
""" Start the connection to the MQTT broker in a non-blocking way.
:param signal: a Boolean flag to indicate whether to automatically signal
the status of this sensor as "on" (the default). When omitted,
the programmer should call signal_status(...) to do so. """
log.info("Starting the connection to the MQTT broker. in a non-blocking way.")
self._client.loop_start()
if signal:
log.info("Signalling the status of the sensor (defaults to 'on').")
self.signal_status()
log.info("Publishing the description of this sensor ('%s') as a retained message to %s.",
self._description, self._topic + Publisher._DESC)
while not self._online:
sleep(0.01)
self._client.publish(self._topic + Publisher._DESC, self._description, qos=1, retain=True)
def blocking_start(self) -> None:
""" Start the connection to the MQTT broker in a blocking way. """
log.info("Starting the connection to the MQTT broker in a blocking style.")
self._client.loop_forever()
def signal_status(self, status: bool = True, *, forced: bool = False) -> None:
""" Signal a change in the status of this sensor.
:param status: the status to signal to the MQTT broker.
:param forced: whether or not to forcibly signal the status, even if
the (internal) status hasn't changed. This is particularly
useful in the case of a reconnect where a last will may
have been set, or when interfered by another sensor. """
if forced or self._status != status:
status = self._status if forced else status
log.debug("Signalling the sensor status as %s.", status)
self._client.publish(self._topic + Publisher._STATUS, "on" if status else "off", qos=1, retain=True)
self._status = status
def disconnect(self) -> None:
""" Disconnect from the MQTT broker. """
log.info("Disconnecting from the MQTT broker.")
self._client.disconnect()
def publish(self, payload: Payload, *, forced: bool = False) -> None:
""" Publish a payload for this sensor to the MQTT broker.
:param payload: the payload to be published, which should be an object
of the type Payload, a requirement that is enforced.
:param forced: whether the payload should be forcefully published. Otherwise,
the payload is only published when changes are detected. """
if not self._online:
# raise EnvironmentError("Not connected to an MQTT server.")
log.debug("Not connected to an MQTT server. Entering waiting pattern ...")
while not self._online:
sleep(0.01)
if isinstance(payload, Payload):
if forced or payload.changed:
log.debug("Publishing '%s' to '%s'.", str(payload), self._topic + Publisher._VALUE)
self._client.publish(self._topic + Publisher._VALUE, retain=self._retain, payload=str(payload))
else:
raise ValueError("The payload is not an object of the type Payload.")
def _reconnect(self, mqtt_client: mqtt.Client, userdata: Any, flags: Dict, rc: int) -> None:
log.info("(Re)connected to the MQTT server.")
self._online = True
if self._status is not None:
log.debug("Forcing the sensor status to be reset to %s.", self._status)
self.signal_status(forced=True)
if self._on_connect:
log.debug("")
self._on_connect(mqtt_client, userdata, flags, rc)
def _disconnect(self, _mqtt_client: mqtt.Client, _userdata: Any, _rc: int) -> None:
log.info("Disconnected from the MQTT server.")
self._online = False
@property
def on_connect(self):
""" :return: returns the callback function that has been set
to execute on connecting to the MQTT broker. """
return self._on_connect
@on_connect.setter
def on_connect(self, value: Callable[[mqtt.Client, Any, mqtt.MQTTMessage, int], None]) -> None:
""" :param value: the callback function to trigger when connecting to the MQTT broker. """
self._on_connect = value
# MARK: - functions to quickly create the desired payload
def create_generic_sensor(self, keys: Iterable) -> Payload:
""" Create the payload to be used with a generic sensor.
:param keys: the keys to be associated with this generic sensor.
:return: a Payload object, which can be used to set the Payload
for this type of generic sensor with the given keys. """
return Payload(self, "generic", keys=keys)
def create_weather_sensor(self, version: WeatherSensor) -> Payload:
""" Create the payload to be used with a weather sensor.
:param version: the type of weather sensor to use.
:return: a Payload object, which can be used to set the Payload
for this type of weather sensor with the given version. """
log.info("Creating a weather sensor payload.")
keys = []
if WeatherSensor.TEMPERATURE in version:
keys.append("temperature")
if WeatherSensor.LIGHTLEVEL in version:
keys.append("lightlevel")
if WeatherSensor.HUMIDITY in version:
keys.append("humidity")
if WeatherSensor.AIRQUALITY in version:
keys.append("airquality")
if WeatherSensor.PRESSURE in version:
keys.append("pressure")
if WeatherSensor.WINDSPEED in version:
keys.append("windspeed")
if WeatherSensor.WINDDIRECTION in version:
keys.append("winddirection")
if WeatherSensor.UVINDEX in version:
keys.append("uvindex")
if WeatherSensor.ICON in version:
keys.append("icon")
if WeatherSensor.ICON_3H in version:
keys.append("icon_3h")
if WeatherSensor.ICON_6H in version:
keys.append("icon_6h")
if WeatherSensor.APPARENT in version:
keys.append("apparent_temperature")
if WeatherSensor.WINDGUSTS in version:
keys.append("windgusts")
if WeatherSensor.CLOUDCOVER in version:
keys.append("cloud_cover")
if WeatherSensor.RAIN_INTENSITY in version:
keys.append("rain_intensity")
if WeatherSensor.RAIN_PROBABILITY in version:
keys.append("rain_probability")
if WeatherSensor.SOLAR_RADIATION_SENSE in version:
keys.append("solar_radiation")
return Payload(self, "weather", keys=keys,
presets=[("version", version.value)])
def create_light_sensor(self, version: LightSensor) -> Payload:
""" Create the payload to be used with a light sensor.
:param version: the version of the light sensor.
:return: a Payload object, which can be used to set the Payload
for this type of light sensor with the given version. """
log.info("Creating a light sensor payload.")
keys = []
if LightSensor.ONOFF in version:
keys.extend(["on", "reachable"])
if LightSensor.BRIGHTNESS_LIGHT in version:
keys.extend(["brightness"])
if LightSensor.TEMPERATURE_LIGHT in version:
keys.extend(["ct"])
if LightSensor.COLOUR_LIGHT in version:
keys.extend(["hue", "saturation", "xy"])
return Payload(self, "light", keys=keys,
presets=[("version", version.value)])
def create_onoff_sensor(self) -> Payload:
""" Create the Payload object for a basic on/off sensor.
:return: a Payload object with a single "on" key. """
return Payload(self, "onoff", keys=["on"])
def create_trigger_sensor(self) -> Payload:
""" Create the Payload object for a basic trigger sensor.
:return: a Payload object with a single "triggered" key. """
return Payload(self, "trigger", keys=["triggered"])