Skip to content

Better event handling#256

Open
rwb27 wants to merge 3 commits into
mainfrom
publish-event
Open

Better event handling#256
rwb27 wants to merge 3 commits into
mainfrom
publish-event

Conversation

@rwb27
Copy link
Copy Markdown
Collaborator

@rwb27 rwb27 commented Feb 2, 2026

This PR makes a start on improving event handling in LabThings. It centralises event handling into a MessageBroker class, and adds a publish method to the ThingServerInterface. This removes ugly boilerplate from property and action descriptors, and creates a separation between pub/sub message handling, and the details of the websocket protocol.

Note: this PR originally contained a bunch more stuff, and was too big. I've split it up, so this is now the first in a few planned PRs.
After this is done, I plan to update the websocket protocol to support more message types and realign with standards.

This PR does not change how websockets work, just tidies up the implementation and moves code from descriptors into a coherent module.

It does introduce a few changes, which are mostly unlikely to be noticed:

  • Messages published before the event loop starts are silently ignored. This feels reasonable, as there's (currently) no way to subscribe to them before the event loop starts.
  • BaseProperty.__set__ no longer accepts an argument to suppress emitting events. That was only used internally, and was only needed because of the previous behaviour where it errored if events fired before the event loop started.
  • Corresponding set_without_emit methods are deleted, and load_settings uses set instead.
  • I've added an observable property to properties. This is true for data properties and false for functional properties. It does not yet propagate to the Thing Description.

In principle, if the observable property propagates to the TD, then a Thing implementation could manually call publish to notify observers when a functional property changes. However, if it's always know when the property changes, it's likely that a data property (possibly with an @lt.on_set function from #331) may be a better choice.

@rwb27 rwb27 added this to the v0.3.0 milestone May 7, 2026
This refactors handling of property and action observations. In particular, it:

* Introduces a MessageBroker class to handle pub/sub messaging centrally. This eliminates duplicated code from descriptors and should be much clearer.
* Adds a `publish` method to the thing server interface for easy publication of events.
* No longer errors if events are published before the event loop is active: they are silently ignored.
* Removes the option to set properties without emitting an event: this is no longer needed - it was only ever done to suppress errors.
* Separates handling of pub/sub messages from websocket protocol considerations.

This does not change the websocket protocol.

I've updated tests, but have not yet added tests for `MessageBroker`.
This adds tests for `Message` and `MessageBroker`. As a result of testing, I've tightened up a few things:
* `MessageBroker` now has a method to close all its send streams.
  This should help achieve a clean shutdown, and stops tests hanging.
* `Message` is now a `pydantic` dataclass so it validates types. We may drop this back to a regular
  dataclass in the future for performance reasons.
* mock_thing_instance now mocks a sensible `name` property so properties
  can work properly.
@barecheck
Copy link
Copy Markdown

barecheck Bot commented May 28, 2026

The sets of observers for properties and actions used to be stored in a
`_labthings_data` attribute of the `Thing`.
They now live in the `MessageBroker` so I have eliminated `LabThingsData`.
Good riddance :)
@rwb27 rwb27 marked this pull request as ready for review May 28, 2026 23:04
"""The property is not observable.
This exception is raised when `~lt.Thing.observe_property` is called with a
This exception is raised when trying to observe
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This exception is raised when trying to observe
This exception is raised when trying to observe a

Comment on lines +97 to +109
async def publish(self, message: Message) -> None:
"""Publish a message.

This async method will relay the message to any subscriber streams.

:param message: the message to send.
"""
try:
subscriptions = self._subscriptions[message.thing][message.affordance]
except KeyError:
return # No subscribers for this thing.
for stream in subscriptions:
await stream.send(message)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to be worried about any garbage collection processes in LabThings? I ask because here we iterate over subscriptions (a WeakSet) while executing await stream.send(message). Yielding to the event loop means garbage collection can occur in the background. If a stream loses its strong references and is garbage-collected while the loop is suspended, Python will throw a RuntimeError: Set changed size during iteration. If we do garbage collection in other processes, we should iterate over a shallow copy instead.

Also, if stream.send(message) encounters a closed or broken socket (e.g., anyio.BrokenResourceError or anyio.ClosedResourceError), it will raise an exception and halt the entire publish loop. This prevents the remaining streams in the queue from receiving the message.

Suggested change
async def publish(self, message: Message) -> None:
"""Publish a message.
This async method will relay the message to any subscriber streams.
:param message: the message to send.
"""
try:
subscriptions = self._subscriptions[message.thing][message.affordance]
except KeyError:
return # No subscribers for this thing.
for stream in subscriptions:
await stream.send(message)
async def publish(self, message: Message) -> None:
"""Publish a message.
This async method will relay the message to any subscriber streams.
:param message: the message to send.
"""
try:
subscriptions = self._subscriptions[message.thing][message.affordance]
except KeyError:
return # No subscribers for this thing.
# Iterate over a list copy to safely survive async context switches
# Using `list` to create strong references
for stream in list(subscriptions):
try:
await stream.send(message)
except (anyio.BrokenResourceError, anyio.ClosedResourceError):
# Optionally log the error; WeakSet will eventually clean up the dead stream
pass

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is added, a test will need to be added to cover this

Comment on lines +25 to +33
:param thing: The name of the Thing generating the event.
:param affordance: The name of the affordance generating the event.
:param message: The message to send.
"""

thing: str
affordance: str
message_type: Literal["property", "action", "event"]
payload: Any
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Params don't match the description

:param stream: A stream to send the messages to.
:raises TypeError: if the `thing` argument is not a string.
"""
if not isinstance(thing, str):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we check the type of affordance too?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is added, a test will need to be added to cover this

Copy link
Copy Markdown
Collaborator Author

@rwb27 rwb27 Jun 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a rule, MessageBroker isn't exposed to the user, so it should only be called internally and problems ought to be picked up by mypy. I've tried to avoid adding explicit validation/type checks unless there's a need, and I think affordance is unlikely to be a problem.

I noticed the argument name thing might lead to people passing in a thing instance, and there wouldn't be an error if they did so (because MessageBroker only uses it as a dict key). I thought to do this because of the likelihood of someone, most probably me, passing an instance instead of a name. I didn't think there was a massive risk of someone doing the same thing for affordance.

I could just remove the check - it should also be flagged by mypy. On the other hand, I think it could well catch a bug in the future so is worth leaving in.

Maybe I should also change the argument names to be explicitly thing_name and affordance_name - but that felt overly verbose.

I'm happy with the way it is, but equally happy adding another isinstance check and test.

:raises KeyError: if there is no such subscription.
:raises TypeError: if the `thing` argument is not a string.
"""
if not isinstance(thing, str):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we check the type of affordance too?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is added, a test will need to be added to cover this

Comment on lines +73 to +77
if thing not in self._subscriptions:
self._subscriptions[thing] = {}
if affordance not in self._subscriptions[thing]:
self._subscriptions[thing][affordance] = WeakSet()
self._subscriptions[thing][affordance].add(stream)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaner to use python's setdefault method for dictionaries: https://www.w3schools.com/PYTHON/ref_dictionary_setdefault.asp

Suggested change
if thing not in self._subscriptions:
self._subscriptions[thing] = {}
if affordance not in self._subscriptions[thing]:
self._subscriptions[thing][affordance] = WeakSet()
self._subscriptions[thing][affordance].add(stream)
affordances = self._subscriptions.setdefault(thing, {})
streams = affordances.setdefault(affordance, WeakSet())
streams.add(stream)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that is cleaner :)

Comment on lines +92 to +95
try:
self._subscriptions[thing][affordance].discard(stream)
except KeyError as e:
raise e
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The discard method doesn't raise KeyErrors if an item isn't there: https://www.w3schools.com/python/ref_set_discard.asp

The remove method does raise an error

Comment on lines +96 to +101
if item.message_type == "action":
msg = {
"messageType": "actionStatus",
"data": {"action name": item.affordance, "status": item.payload},
}
elif item.message_type == "property":
Copy link
Copy Markdown
Contributor

@bprobert97 bprobert97 Jun 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if message_type is "event"? (defined in message_broker.py)

await broker.publish(message_a2)
await broker.publish(message_a2)
# It's important to close streams or the test hangs.
await broker.close_streams()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test that checks this is working on its own? Something like:

async def test_close_streams():
    """Verify that close_streams actually closes the subscribed streams."""
    broker = MessageBroker()
    send_stream, recv_stream = anyio.create_memory_object_stream[Message]()
    
    broker.subscribe("thing_a", "prop", send_stream)
    await broker.close_streams()
    
    # Attempting to send on a closed stream should raise an error
    with pytest.raises(anyio.ClosedResourceError):
        await send_stream.send(Message("thing_a", "prop", "property", "test"))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants