Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/tests/observable_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,14 @@ def test_time_buffer_read_until_closed(self):

def test_time_buffer_aggregated_read_until_closed(self):
observable = self.create_observable()
buffered_observable = observable.time_buffered(100, lambda chunks: ['|'.join(chunks)])
# Use 30ms period (not 100ms) to avoid a timing race: the helper
# _test_read_until_closed sends late messages after 100ms sleep.
# With period=100ms both the flush thread and the async thread fire at
# ~t=100ms; the flush thread can close the buffered observable before
# read_until_closed subscribes (at t≈140ms), resulting in empty data.
# With period=30ms flush cycles land at t=30/60/90/120ms; the subscriber
# is set up at t≈49ms and the late-message flush happens at t≈120ms.
buffered_observable = observable.time_buffered(30, lambda chunks: ['|'.join(chunks)])

data, _, late_messages = self._test_read_until_closed(
observable,
Expand Down