351 lines
16 KiB
Python
351 lines
16 KiB
Python
import json
|
|
from time import gmtime, strftime, sleep
|
|
import paho.mqtt.client as mqtt
|
|
import logging
|
|
from typing import Iterable, Iterator, Any, Tuple, Dict, Callable
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Usage guidelines:
|
|
# you can import this package using
|
|
# import simplemqtt.publisher
|
|
# or, alternatively, you can import the publisher and all desired constants using
|
|
# from simplemqtt.publisher import Publisher, TEMPERATURE_SENSE, ONOFF_LIGHT
|
|
#
|
|
# Once imported, a publisher is created by specifying the topic and description:
|
|
# publisher = Publisher("/sensors/1", "My first light sensor.")
|
|
# And connected using:
|
|
# publisher.connect("mqtt.server")
|
|
#
|
|
# To publish information, the correct Payload needs to be created. This can be requested from the publisher:
|
|
# payload = publisher.create_light_sensor(ONOFF_LIGHT)
|
|
# This preloads the payload with the acceptable keys, which can immediately be set:
|
|
# payload["on"] = False
|
|
# payload["reachable"] = True
|
|
# Once done, this information can be pushed to the publisher:
|
|
# publisher.publish(payload)
|
|
|
|
# Constants used to create a family of weather sensors.
|
|
# The exact version of the sensor is the sum of senses.
|
|
# This can be switched to enum.Flag as of Python 3.6.
|
|
|
|
TEMPERATURE_SENSE = 1
|
|
LIGHTLEVEL_SENSE = 2
|
|
HUMIDITY_SENSE = 4
|
|
AIRQUALITY_SENSE = 8
|
|
PRESSURE_SENSE = 16
|
|
WINDSPEED_SENSE = 32
|
|
WINDDIRECTION_SENSE = 64
|
|
UVINDEX_SENSE = 128
|
|
ICON_SENSE = 256
|
|
ICON_3H_SENSE = 512
|
|
ICON_6H_SENSE = 1024
|
|
APPARENT_SENSE = 2048
|
|
WINDGUSTS_SENSE = 4096
|
|
CLOUDCOVER_SENSE = 8192
|
|
RAIN_INTENSITY_SENSE = 16384
|
|
RAIN_PROBABILITY_SENSE = 32768
|
|
SOLAR_RADIATION_SENSE = 65536
|
|
|
|
|
|
# Constants used to create a family of light sensors.
|
|
# This can be switched to enum.Flag as of Python 3.6.
|
|
|
|
ONOFF_LIGHT = 1
|
|
BRIGHTNESS_LIGHT = 2
|
|
TEMPERATURE_LIGHT = 4
|
|
COLOUR_LIGHT = 8
|
|
|
|
|
|
class Payload:
|
|
""" Create an MQTT Payload by setting the desired fields using indexing
|
|
and retrieve the JSON-ified payload by converting it to a string. """
|
|
__slots__ = ("_publisher", "type", "payload", "_client", "_topic", "_changed")
|
|
|
|
def __init__(self, publisher: "Publisher", payload_type: str, *,
|
|
keys: Iterable=None, presets: Iterable[Tuple[str, Any]]=None):
|
|
""" Provide the type of the payload, as well as other settable keys.
|
|
:param publisher: the publisher to which to send the payload to.
|
|
:param payload_type: a string with the type of the payload.
|
|
:param keys: a list of strings, each a key that can be set.
|
|
:param presets: a list of key-value tuples to preset."""
|
|
self._publisher = publisher
|
|
self._client = mqtt.Client()
|
|
self.type = payload_type
|
|
self.payload = dict()
|
|
self._changed = True
|
|
self.payload["type"] = payload_type
|
|
if keys:
|
|
for key in keys:
|
|
self.payload[key] = None
|
|
if presets:
|
|
for key, value in presets:
|
|
self.payload[key] = value
|
|
|
|
def _set(self, key: str, value: Any = None) -> None:
|
|
""" Method to set the value of a given key defined on initialisation.
|
|
:param key: the key to set.
|
|
:param value: the value to associate with the key.
|
|
:raise KeyError: thrown when key was not defined on initialisation. """
|
|
if key not in self.payload:
|
|
raise KeyError("Key '" + key + "' not available in this payload. "
|
|
"Available keys are: '" + ", ".join(self.payload.keys()) + "'.")
|
|
|
|
# we don't want the full traceback, so we catch the error, and then rethrow
|
|
# the error afterwards to hopefully get a cleaner, easier to understand TypeError
|
|
value_problem = False
|
|
try:
|
|
json.dumps(value)
|
|
except TypeError as te:
|
|
value_problem = str(te)
|
|
|
|
if value_problem:
|
|
raise TypeError("the value could not be turned into JSON: '" + value_problem + "'.")
|
|
|
|
if value != self.payload[key]:
|
|
self._changed = True
|
|
self.payload[key] = value
|
|
self.payload["lastupdated"] = strftime("%Y-%m-%dT%H:%M:%S", gmtime())
|
|
|
|
@property
|
|
def changed(self) -> bool:
|
|
""" Flag when the contents of the payload have changed since the last time it was converted to a string. """
|
|
return self._changed
|
|
|
|
def update(self, d: dict) -> None:
|
|
for key, value in d.items():
|
|
self[key] = value
|
|
|
|
def publish(self, *, forced: bool = False) -> None:
|
|
self._publisher.publish(self, forced=forced)
|
|
|
|
def __setitem__(self, key: str, value: Any):
|
|
""" Wrapper method for self._set(...) ."""
|
|
self._set(key, value)
|
|
|
|
def __iter__(self) -> Iterator:
|
|
""" Retrieve an iterator over the keys of the payload.
|
|
:return: an iterator over the keys of this payload. """
|
|
return iter(list(self.payload.keys()))
|
|
|
|
def __str__(self) -> str:
|
|
""" When a string is requested, dump the payload as JSON.
|
|
:return: the payload in JSON format. """
|
|
self._changed = False
|
|
return json.dumps(self.payload)
|
|
|
|
|
|
class Publisher:
|
|
""" An object of the class Publisher is able to connect to an MQTT broker, start
|
|
the connection, reconnect (automatically) and disconnect, and is able to
|
|
create specific types of sensors (objects from the Payload class). """
|
|
__slots__ = ("_client", "_topic", "_description", "_retain", "_status", "_on_connect", "_signalled", "_online")
|
|
|
|
_DESC = "/description"
|
|
_VALUE = "/value"
|
|
_STATUS = "/status"
|
|
|
|
def __init__(self, topic: str, description: str, *, retain: bool = True):
|
|
""" Initialises a Publisher object by setting up the main topic
|
|
to which it will publish, as well as a basic description
|
|
of the sensor which should be easy to read by humans.
|
|
:param topic: the main topic to which the client will publish.
|
|
:param description: human-readable description of the sensor.
|
|
:param retain: Boolean flag to indicate whether the values posted should be retained. """
|
|
self._client = mqtt.Client()
|
|
self._topic = topic
|
|
self._description = description
|
|
self._retain = retain
|
|
self._status = None
|
|
self._on_connect = None
|
|
self._online = False
|
|
log.info("Created a new MQTT client which will subscribe to %s.", self._topic)
|
|
|
|
def connect(self, host: str, *, port: int = 1883,
|
|
tls: bool = False, auto_start: bool = True,
|
|
signal: bool = True) -> None:
|
|
""" Asynchronously connect to the MQTT broker and do some initial bookkeeping.
|
|
:param host: the host where the MQTT broker can be found.
|
|
:param port: the port used to access the MQTT broker.
|
|
:param tls: a Boolean flag to indicate whether the MQTT broker requires TLS.
|
|
:param auto_start: a Boolean flag to indicate whether to automatically start
|
|
the connection to the MQTT broker. If set to False, the
|
|
programmer should call start(...) to initiate the connection.
|
|
Alternatively, the programmer could call blocking_start(...)
|
|
to trigger a blocking start, which requires some bookkeeping
|
|
to be done by the programmer.
|
|
:param signal: a Boolean flag to indicate whether to automatically signal
|
|
the status of this sensor as "on" (the default). When omitted,
|
|
the programmer should call signal_status(...) to do so. """
|
|
self._client.will_set(self._topic + Publisher._STATUS, "off", qos=1, retain=True)
|
|
if tls: # Enable TLS if supported by the broker
|
|
log.info("Enabling TLS for the MQTT broker.")
|
|
self._client.tls_set()
|
|
log.info("Asynchronously connecting to the MQTT broker at %s:%s.", host, port)
|
|
self._client.connect_async(host, port)
|
|
self._client.on_connect = self._reconnect
|
|
self._client.on_disconnect = self._disconnect
|
|
if auto_start:
|
|
log.debug("Triggering auto start.")
|
|
self.start(signal)
|
|
|
|
def start(self, signal: bool = True) -> None:
|
|
""" Start the connection to the MQTT broker in a non-blocking way.
|
|
:param signal: a Boolean flag to indicate whether to automatically signal
|
|
the status of this sensor as "on" (the default). When omitted,
|
|
the programmer should call signal_status(...) to do so. """
|
|
log.info("Starting the connection to the MQTT broker. in a non-blocking way.")
|
|
self._client.loop_start()
|
|
if signal:
|
|
log.info("Signalling the status of the sensor (defaults to 'on').")
|
|
self.signal_status()
|
|
log.info("Publishing the description of this sensor ('%s') as a retained message to %s.",
|
|
self._description, self._topic + Publisher._DESC)
|
|
while not self._online:
|
|
sleep(0.01)
|
|
self._client.publish(self._topic + Publisher._DESC, self._description, qos=1, retain=True)
|
|
|
|
def blocking_start(self) -> None:
|
|
""" Start the connection to the MQTT broker in a blocking way. """
|
|
log.info("Starting the connection to the MQTT broker in a blocking style.")
|
|
self._client.loop_forever()
|
|
|
|
def signal_status(self, status: bool = True, *, forced: bool = False) -> None:
|
|
""" Signal a change in the status of this sensor.
|
|
:param status: the status to signal to the MQTT broker.
|
|
:param forced: whether or not to forcibly signal the status, even if
|
|
the (internal) status hasn't changed. This is particularly
|
|
useful in the case of a reconnect where a last will may
|
|
have been set, or when interfered by another sensor. """
|
|
if forced or self._status != status:
|
|
status = self._status if forced else status
|
|
log.debug("Signalling the sensor status as %s.", status)
|
|
self._client.publish(self._topic + Publisher._STATUS, "on" if status else "off", qos=1, retain=True)
|
|
self._status = status
|
|
|
|
def disconnect(self) -> None:
|
|
""" Disconnect from the MQTT broker. """
|
|
log.info("Disconnecting from the MQTT broker.")
|
|
self._client.disconnect()
|
|
|
|
def publish(self, payload: Payload, *, forced: bool = False) -> None:
|
|
""" Publish a payload for this sensor to the MQTT broker.
|
|
:param payload: the payload to be published, which should be an object
|
|
of the type Payload, a requirement that is enforced.
|
|
:param forced: whether the payload should be forcefully published. Otherwise,
|
|
the payload is only published when changes are detected. """
|
|
if not self._online:
|
|
# raise EnvironmentError("Not connected to an MQTT server.")
|
|
log.debug("Not connected to an MQTT server. Entering waiting pattern ...")
|
|
while not self._online:
|
|
sleep(0.01)
|
|
if isinstance(payload, Payload):
|
|
if forced or payload.changed:
|
|
log.debug("Publishing '%s' to '%s'.", str(payload), self._topic + Publisher._VALUE)
|
|
self._client.publish(self._topic + Publisher._VALUE, retain=self._retain, payload=str(payload))
|
|
else:
|
|
raise ValueError("The payload is not an object of the type Payload.")
|
|
|
|
def _reconnect(self, mqtt_client: mqtt.Client, userdata: Any, flags: Dict, rc: int) -> None:
|
|
log.info("(Re)connected to the MQTT server.")
|
|
self._online = True
|
|
if self._status is not None:
|
|
log.debug("Forcing the sensor status to be reset to %s.", self._status)
|
|
self.signal_status(forced=True)
|
|
|
|
if self._on_connect:
|
|
log.debug("")
|
|
self._on_connect(mqtt_client, userdata, flags, rc)
|
|
|
|
def _disconnect(self, _mqtt_client: mqtt.Client, _userdata: Any, _rc: int) -> None:
|
|
log.info("Disconnected from the MQTT server.")
|
|
self._online = False
|
|
|
|
@property
|
|
def on_connect(self):
|
|
""" :return: returns the callback function that has been set
|
|
to execute on connecting to the MQTT broker. """
|
|
return self._on_connect
|
|
|
|
@on_connect.setter
|
|
def on_connect(self, value: Callable[[mqtt.Client, Any, mqtt.MQTTMessage, int], None]) -> None:
|
|
""" :param value: the callback function to trigger when connecting to the MQTT broker. """
|
|
self._on_connect = value
|
|
|
|
def create_generic_sensor(self, keys: Iterable) -> Payload:
|
|
""" Create the payload to be used with a generic sensor.
|
|
:param keys: the keys to be associated with this generic sensor.
|
|
:return: a Payload object, which can be used to set the Payload
|
|
for this type of generic sensor with the given keys. """
|
|
return Payload(self, "generic", keys=keys)
|
|
|
|
def create_weather_sensor(self, version: int) -> Payload:
|
|
""" Create the payload to be used with a weather sensor.
|
|
:param version: the version of the weather sensor.
|
|
:return: a Payload object, which can be used to set the Payload
|
|
for this type of weather sensor with the given version. """
|
|
"""The version of a sensor is given as a bit mask."""
|
|
if version < 1 or version > 131071:
|
|
raise ValueError("Version type too low or too high.")
|
|
|
|
senses = [(TEMPERATURE_SENSE, "temperature"),
|
|
(LIGHTLEVEL_SENSE, "lightlevel"),
|
|
(HUMIDITY_SENSE, "humidity"),
|
|
(AIRQUALITY_SENSE, "airquality"),
|
|
(PRESSURE_SENSE, "pressure"),
|
|
(WINDSPEED_SENSE, "windspeed"),
|
|
(WINDDIRECTION_SENSE, "winddirection"),
|
|
(UVINDEX_SENSE, "uvindex"),
|
|
(ICON_SENSE, "icon"),
|
|
(ICON_3H_SENSE, "icon_3h"),
|
|
(ICON_6H_SENSE, "icon_6h"),
|
|
(APPARENT_SENSE, "apparent_temperature"),
|
|
(WINDGUSTS_SENSE, "windgusts"),
|
|
(CLOUDCOVER_SENSE, "cloud_cover"),
|
|
(RAIN_INTENSITY_SENSE, "rain_intensity"),
|
|
(RAIN_PROBABILITY_SENSE, "rain_probability"),
|
|
(SOLAR_RADIATION_SENSE, "solar_radiation")
|
|
]
|
|
|
|
keys = []
|
|
|
|
for sense, key in senses:
|
|
if version & sense:
|
|
keys.append(key)
|
|
|
|
return Payload(self, "weather", keys=keys,
|
|
presets=[("version", version)])
|
|
|
|
def create_light_sensor(self, version: int) -> Payload:
|
|
""" Create the payload to be used with a light sensor.
|
|
:param version: the version of the light sensor.
|
|
:return: a Payload object, which can be used to set the Payload
|
|
for this type of light sensor with the given version. """
|
|
log.info("Creating a light sensor payload.")
|
|
"""The version of a sensor is given as a bit mask."""
|
|
if version < 1 or version > 8:
|
|
raise ValueError("Version type too low or too high.")
|
|
|
|
lights = [(ONOFF_LIGHT, ["on", "reachable"]),
|
|
(BRIGHTNESS_LIGHT, ["brightness"]),
|
|
(TEMPERATURE_LIGHT, ["ct"]),
|
|
(COLOUR_LIGHT, ["hue", "saturation", "xy"])]
|
|
|
|
keys = []
|
|
|
|
for light, key in lights:
|
|
if version >= light:
|
|
keys.extend(key)
|
|
|
|
return Payload(self, "light", keys=keys,
|
|
presets=[("version", version)])
|
|
|
|
def create_onoff_sensor(self) -> Payload:
|
|
""" Create the Payload object for a basic on/off sensor.
|
|
:return: a Payload object with a single "on" key. """
|
|
return Payload(self, "onoff", keys=["on"])
|
|
|
|
def create_trigger_sensor(self) -> Payload:
|
|
""" Create the Payload object for a basic trigger sensor.
|
|
:return: a Payload object with a single "triggered" key. """
|
|
return Payload(self, "trigger", keys=["triggered"])
|