Better event handling#256
Conversation
This refactors handling of property and action observations. In particular, it: * Introduces a MessageBroker class to handle pub/sub messaging centrally. This eliminates duplicated code from descriptors and should be much clearer. * Adds a `publish` method to the thing server interface for easy publication of events. * No longer errors if events are published before the event loop is active: they are silently ignored. * Removes the option to set properties without emitting an event: this is no longer needed - it was only ever done to suppress errors. * Separates handling of pub/sub messages from websocket protocol considerations. This does not change the websocket protocol. I've updated tests, but have not yet added tests for `MessageBroker`.
This adds tests for `Message` and `MessageBroker`. As a result of testing, I've tightened up a few things: * `MessageBroker` now has a method to close all its send streams. This should help achieve a clean shutdown, and stops tests hanging. * `Message` is now a `pydantic` dataclass so it validates types. We may drop this back to a regular dataclass in the future for performance reasons. * mock_thing_instance now mocks a sensible `name` property so properties can work properly.
Barecheck - Code coverage reportTotal: 97.05%Your code coverage diff: 0.05% ▴ Uncovered files and lines |
The sets of observers for properties and actions used to be stored in a `_labthings_data` attribute of the `Thing`. They now live in the `MessageBroker` so I have eliminated `LabThingsData`. Good riddance :)
| """The property is not observable. | ||
| This exception is raised when `~lt.Thing.observe_property` is called with a | ||
| This exception is raised when trying to observe |
There was a problem hiding this comment.
| This exception is raised when trying to observe | |
| This exception is raised when trying to observe a |
| async def publish(self, message: Message) -> None: | ||
| """Publish a message. | ||
|
|
||
| This async method will relay the message to any subscriber streams. | ||
|
|
||
| :param message: the message to send. | ||
| """ | ||
| try: | ||
| subscriptions = self._subscriptions[message.thing][message.affordance] | ||
| except KeyError: | ||
| return # No subscribers for this thing. | ||
| for stream in subscriptions: | ||
| await stream.send(message) |
There was a problem hiding this comment.
Do we need to be worried about any garbage collection processes in LabThings? I ask because here we iterate over subscriptions (a WeakSet) while executing await stream.send(message). Yielding to the event loop means garbage collection can occur in the background. If a stream loses its strong references and is garbage-collected while the loop is suspended, Python will throw a RuntimeError: Set changed size during iteration. If we do garbage collection in other processes, we should iterate over a shallow copy instead.
Also, if stream.send(message) encounters a closed or broken socket (e.g., anyio.BrokenResourceError or anyio.ClosedResourceError), it will raise an exception and halt the entire publish loop. This prevents the remaining streams in the queue from receiving the message.
| async def publish(self, message: Message) -> None: | |
| """Publish a message. | |
| This async method will relay the message to any subscriber streams. | |
| :param message: the message to send. | |
| """ | |
| try: | |
| subscriptions = self._subscriptions[message.thing][message.affordance] | |
| except KeyError: | |
| return # No subscribers for this thing. | |
| for stream in subscriptions: | |
| await stream.send(message) | |
| async def publish(self, message: Message) -> None: | |
| """Publish a message. | |
| This async method will relay the message to any subscriber streams. | |
| :param message: the message to send. | |
| """ | |
| try: | |
| subscriptions = self._subscriptions[message.thing][message.affordance] | |
| except KeyError: | |
| return # No subscribers for this thing. | |
| # Iterate over a list copy to safely survive async context switches | |
| # Using `list` to create strong references | |
| for stream in list(subscriptions): | |
| try: | |
| await stream.send(message) | |
| except (anyio.BrokenResourceError, anyio.ClosedResourceError): | |
| # Optionally log the error; WeakSet will eventually clean up the dead stream | |
| pass |
There was a problem hiding this comment.
If this is added, a test will need to be added to cover this
| :param thing: The name of the Thing generating the event. | ||
| :param affordance: The name of the affordance generating the event. | ||
| :param message: The message to send. | ||
| """ | ||
|
|
||
| thing: str | ||
| affordance: str | ||
| message_type: Literal["property", "action", "event"] | ||
| payload: Any |
There was a problem hiding this comment.
Params don't match the description
| :param stream: A stream to send the messages to. | ||
| :raises TypeError: if the `thing` argument is not a string. | ||
| """ | ||
| if not isinstance(thing, str): |
There was a problem hiding this comment.
Why don't we check the type of affordance too?
There was a problem hiding this comment.
If this is added, a test will need to be added to cover this
There was a problem hiding this comment.
As a rule, MessageBroker isn't exposed to the user, so it should only be called internally and problems ought to be picked up by mypy. I've tried to avoid adding explicit validation/type checks unless there's a need, and I think affordance is unlikely to be a problem.
I noticed the argument name thing might lead to people passing in a thing instance, and there wouldn't be an error if they did so (because MessageBroker only uses it as a dict key). I thought to do this because of the likelihood of someone, most probably me, passing an instance instead of a name. I didn't think there was a massive risk of someone doing the same thing for affordance.
I could just remove the check - it should also be flagged by mypy. On the other hand, I think it could well catch a bug in the future so is worth leaving in.
Maybe I should also change the argument names to be explicitly thing_name and affordance_name - but that felt overly verbose.
I'm happy with the way it is, but equally happy adding another isinstance check and test.
| :raises KeyError: if there is no such subscription. | ||
| :raises TypeError: if the `thing` argument is not a string. | ||
| """ | ||
| if not isinstance(thing, str): |
There was a problem hiding this comment.
Why don't we check the type of affordance too?
There was a problem hiding this comment.
If this is added, a test will need to be added to cover this
| if thing not in self._subscriptions: | ||
| self._subscriptions[thing] = {} | ||
| if affordance not in self._subscriptions[thing]: | ||
| self._subscriptions[thing][affordance] = WeakSet() | ||
| self._subscriptions[thing][affordance].add(stream) |
There was a problem hiding this comment.
Cleaner to use python's setdefault method for dictionaries: https://www.w3schools.com/PYTHON/ref_dictionary_setdefault.asp
| if thing not in self._subscriptions: | |
| self._subscriptions[thing] = {} | |
| if affordance not in self._subscriptions[thing]: | |
| self._subscriptions[thing][affordance] = WeakSet() | |
| self._subscriptions[thing][affordance].add(stream) | |
| affordances = self._subscriptions.setdefault(thing, {}) | |
| streams = affordances.setdefault(affordance, WeakSet()) | |
| streams.add(stream) |
There was a problem hiding this comment.
Thanks, that is cleaner :)
| try: | ||
| self._subscriptions[thing][affordance].discard(stream) | ||
| except KeyError as e: | ||
| raise e |
There was a problem hiding this comment.
The discard method doesn't raise KeyErrors if an item isn't there: https://www.w3schools.com/python/ref_set_discard.asp
The remove method does raise an error
| if item.message_type == "action": | ||
| msg = { | ||
| "messageType": "actionStatus", | ||
| "data": {"action name": item.affordance, "status": item.payload}, | ||
| } | ||
| elif item.message_type == "property": |
There was a problem hiding this comment.
What if message_type is "event"? (defined in message_broker.py)
| await broker.publish(message_a2) | ||
| await broker.publish(message_a2) | ||
| # It's important to close streams or the test hangs. | ||
| await broker.close_streams() |
There was a problem hiding this comment.
Can we add a test that checks this is working on its own? Something like:
async def test_close_streams():
"""Verify that close_streams actually closes the subscribed streams."""
broker = MessageBroker()
send_stream, recv_stream = anyio.create_memory_object_stream[Message]()
broker.subscribe("thing_a", "prop", send_stream)
await broker.close_streams()
# Attempting to send on a closed stream should raise an error
with pytest.raises(anyio.ClosedResourceError):
await send_stream.send(Message("thing_a", "prop", "property", "test"))
This PR makes a start on improving event handling in LabThings. It centralises event handling into a
MessageBrokerclass, and adds apublishmethod to theThingServerInterface. This removes ugly boilerplate from property and action descriptors, and creates a separation between pub/sub message handling, and the details of the websocket protocol.Note: this PR originally contained a bunch more stuff, and was too big. I've split it up, so this is now the first in a few planned PRs.
After this is done, I plan to update the websocket protocol to support more message types and realign with standards.
This PR does not change how websockets work, just tidies up the implementation and moves code from descriptors into a coherent module.
It does introduce a few changes, which are mostly unlikely to be noticed:
BaseProperty.__set__no longer accepts an argument to suppress emitting events. That was only used internally, and was only needed because of the previous behaviour where it errored if events fired before the event loop started.set_without_emitmethods are deleted, andload_settingsusessetinstead.observableproperty to properties. This is true for data properties and false for functional properties. It does not yet propagate to the Thing Description.In principle, if the
observableproperty propagates to the TD, then aThingimplementation could manually callpublishto notify observers when a functional property changes. However, if it's always know when the property changes, it's likely that a data property (possibly with an@lt.on_setfunction from #331) may be a better choice.