perf(streaming): skip build_events and use O(1) buffers on get_final_message drain path#1663
Open
buddywhitman wants to merge 1 commit into
Open
Conversation
…message drain path When a caller only wants the final Message (via get_final_message() or until_done()) every event that flows through __stream__ pays the full cost of build_events() — constructing TextEvent / InputJsonEvent / CitationEvent objects that are immediately discarded. On a real batch run with a p99 of ~4 k deltas per response this is ~100–300 ms of pure event-building overhead per call. Additionally, the on-each-delta patterns in accumulate_event have two O(n²) shapes that show up once responses grow large: - `content.text += delta` is an attribute-target assignment, so CPython cannot apply its in-place string optimisation; every delta reallocates the whole string. - `from_json(json_buf, partial_mode=True)` re-parses the entire growing JSON fragment on every input_json_delta. Fix both for the drain path: 1. MessageStream._emit_events flag (default True). until_done() sets it to False before consuming, which causes __stream__ to skip the build_events() call and its yields entirely — the generator drains the raw stream without pausing, so consume_sync_iterator sees a single StopIteration. get_final_message() picks this up transitively through until_done(). Same change for AsyncMessageStream. 2. accumulate_event(emit_events=False) switches to list-based text accumulation (deferred join at content_block_stop) and skips the per-delta from_json parse for input_json_delta, doing a single final parse at content_block_stop instead. The iteration path (for chunk in stream:) is unaffected — emit_events stays True and the existing per-delta behaviour is preserved so that TextEvent.snapshot and InputJsonEvent.snapshot remain correct on every yield. Fixes anthropics#1649. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #1649.
Summary
When a caller uses
get_final_message()oruntil_done()without iterating events, every SSE event still pays the full cost ofbuild_events()- constructingTextEvent/InputJsonEvent/ etc. objects that are immediately discarded. On responses with p99 ~4 k deltas this is 100–300 ms of pure overhead per call.Additionally,
accumulate_eventhas two O(n²) shapes:content.text += delta— attribute-target assignment; CPython can't apply its in-place string optimization, so each delta reallocates the bufferfrom_json(json_buf, partial_mode=True)— re-parses the growing JSON fragment on everyinput_json_deltaChanges (
src/anthropic/lib/streaming/_messages.py)_emit_eventsflag —MessageStreamandAsyncMessageStreamnow carry a_emit_events: bool = Trueflag.until_done()(and transitivelyget_final_message()) sets it toFalsebefore consuming, causing__stream__to skip thebuild_events()call and its yields entirely.accumulate_event(emit_events=False)— when draining:__text_buflist; joins once atcontent_block_stop→ O(n) instead of O(n²)from_jsonforinput_json_delta; does one final parse atcontent_block_stop→ O(n) instead of O(n²)The iteration path (
for chunk in stream:) is unaffected —emit_eventsstaysTruesoTextEvent.snapshotandInputJsonEvent.snapshotremain correct on every yield.Tests
tests/lib/streaming/— 17 passed (was 13; 4 newTestDrainPathtests added).The new tests assert that
get_final_message()without prior iteration returns correct text content and parsed tool-input for both sync and async streams.