Skip to content

perf(streaming): skip build_events and use O(1) buffers on get_final_message drain path#1663

Open
buddywhitman wants to merge 1 commit into
anthropics:mainfrom
buddywhitman:fix/streaming-drain-performance
Open

perf(streaming): skip build_events and use O(1) buffers on get_final_message drain path#1663
buddywhitman wants to merge 1 commit into
anthropics:mainfrom
buddywhitman:fix/streaming-drain-performance

Conversation

@buddywhitman

Copy link
Copy Markdown

Fixes #1649.

Summary

When a caller uses get_final_message() or until_done() without iterating events, every SSE event still pays the full cost of build_events() - constructing TextEvent / InputJsonEvent / etc. objects that are immediately discarded. On responses with p99 ~4 k deltas this is 100–300 ms of pure overhead per call.

Additionally, accumulate_event has two O(n²) shapes:

  • content.text += delta — attribute-target assignment; CPython can't apply its in-place string optimization, so each delta reallocates the buffer
  • from_json(json_buf, partial_mode=True) — re-parses the growing JSON fragment on every input_json_delta

Changes (src/anthropic/lib/streaming/_messages.py)

  1. _emit_events flagMessageStream and AsyncMessageStream now carry a _emit_events: bool = True flag. until_done() (and transitively get_final_message()) sets it to False before consuming, causing __stream__ to skip the build_events() call and its yields entirely.

  2. accumulate_event(emit_events=False) — when draining:

    • Accumulates text deltas into a __text_buf list; joins once at content_block_stop → O(n) instead of O(n²)
    • Skips per-delta from_json for input_json_delta; does one final parse at content_block_stop → O(n) instead of O(n²)

The iteration path (for chunk in stream:) is unaffectedemit_events stays True so TextEvent.snapshot and InputJsonEvent.snapshot remain correct on every yield.

Tests

tests/lib/streaming/17 passed (was 13; 4 new TestDrainPath tests added).
The new tests assert that get_final_message() without prior iteration returns correct text content and parsed tool-input for both sync and async streams.

…message drain path

When a caller only wants the final Message (via get_final_message() or
until_done()) every event that flows through __stream__ pays the full
cost of build_events() — constructing TextEvent / InputJsonEvent /
CitationEvent objects that are immediately discarded.  On a real
batch run with a p99 of ~4 k deltas per response this is ~100–300 ms
of pure event-building overhead per call.

Additionally, the on-each-delta patterns in accumulate_event have two
O(n²) shapes that show up once responses grow large:
- `content.text += delta` is an attribute-target assignment, so CPython
  cannot apply its in-place string optimisation; every delta reallocates
  the whole string.
- `from_json(json_buf, partial_mode=True)` re-parses the entire growing
  JSON fragment on every input_json_delta.

Fix both for the drain path:

1. MessageStream._emit_events flag (default True).  until_done() sets it
   to False before consuming, which causes __stream__ to skip the
   build_events() call and its yields entirely — the generator drains the
   raw stream without pausing, so consume_sync_iterator sees a single
   StopIteration.  get_final_message() picks this up transitively through
   until_done().  Same change for AsyncMessageStream.

2. accumulate_event(emit_events=False) switches to list-based text
   accumulation (deferred join at content_block_stop) and skips the
   per-delta from_json parse for input_json_delta, doing a single final
   parse at content_block_stop instead.

The iteration path (for chunk in stream:) is unaffected — emit_events
stays True and the existing per-delta behaviour is preserved so that
TextEvent.snapshot and InputJsonEvent.snapshot remain correct on every
yield.

Fixes anthropics#1649.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@buddywhitman buddywhitman requested a review from a team as a code owner June 8, 2026 10:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Streaming response consumption validates full event union per event on the loop, even for get_final_message()

2 participants