Stream run events with resumable SSE#12
Open
mosquito wants to merge 4 commits into
Open
Conversation
`contree run` now follows /v1/operations/{uuid}/events instead of
polling. Adds a gzip-aware incremental HTTP body wrapper, SSE frame
parser with Last-Event-Id resume across drops, and a TerminalSummary
that lets the run handler skip the final GET when the stream ends
with a completion event.
`operation wait` now caches terminal ops into the session image cache
so subsequent `show` invocations reuse the result. `show` treats
non-terminal cache hits as misses to avoid stale reads.
Client:
- GzipResponse: incremental Content-Encoding: gzip via read1+decompressobj
- iter_sse_events / decode_event_chunk parsers
- cli_version() now returns "editable" for direct-url editable installs
insomnes
reviewed
Jul 2, 2026
insomnes
reviewed
Jul 2, 2026
| ) | ||
|
|
||
| for attempt in range(attempts): | ||
| delay = RETRY_DELAYS[attempt - 1] |
Collaborator
There was a problem hiding this comment.
10 on attempt 0 (0-1=-1) is unexpected, I suppose
- Drop the retry-budget bail-out. Between every failed SSE cycle
(connect error, mid-stream drop, clean close without completion),
poll `GET /operations/{uuid}` and, if terminal, park the full op
on `TerminalSummary.fallback_op` so cmd_run can use it directly.
Only a `completion` event, a GET-detected terminal status, or
Ctrl+C ends the loop.
- Catch `BrokenPipeError` explicitly before RETRYABLE_NETWORK_ERRORS
in the stream consumer and re-raise. BrokenPipeError is a
ConnectionError subclass, so the old broad handler would treat a
closed local pipe (e.g. `contree run ... | head`) as a remote drop
and retry uselessly. cmd_run now catches it, cancels the op,
silences further stdio writes, and exits 141 (128 + SIGPIPE).
Tests cover ApiError-until-terminal, empty stream + terminal GET,
BrokenPipe propagation out of the streamer, and cmd_run's
BrokenPipe cancel-and-exit path.
RETRY_DELAYS[attempt - 1] on attempt=0 wrapped to the tail (10s) before ever being intended as a retry index. The retry-block guard now scopes delay to attempts > 0, and the 410/425 branch uses RETRY_DELAYS[min(attempt, len - 1)] so first-attempt Too Early / Gone retries sleep 1s instead of 10s.
`contree build` now consumes the same SSE event stream as `contree run`
instead of polling `/operations/{uuid}` and dumping stdout/stderr at the
end. RUN output reaches the user's terminal as the server produces it —
matching `docker build`'s live-log behavior.
The Dockerfile RUN keyword calls `_stream_events_until_close` with a
`DefaultFormatter`, always writing chunks to `sys.stdout`/`sys.stderr`
regardless of the top-level formatter used for the final build record.
When the streamer falls back to a GET (SSE never delivered a completion
event), the captured stdout/stderr from the op's metadata is replayed
via `log_streams` so the user still sees what happened. The `poll()`
helper and local `TERMINAL_STATUSES` constant are gone.
Also drops the streamer-level graduated backoff. `client.request`
already retries every SSE reconnect through its own `RETRY_DELAYS`
ladder, so the extra `_stream_backoff_sleep` was pure duplication.
Only kept is a 0.5s `TIGHT_LOOP_FLOOR` when a cycle produced no
forward progress — guards against a server that returns immediate
empty streams for an executing op.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
contree runfollows/v1/operations/{uuid}/eventsinstead of polling/v1/operations/{uuid}. Output streams live; aTerminalSummarybuilt from the SSE events lets the handler skip the final GET when the stream ends with acompletionframe.GzipResponse(incrementalContent-Encoding: gzipdecoding viaread1+zlib.decompressobj(wbits=31)), plusiter_sse_events/decode_event_chunkparsers inclient.py. Streams resume across mid-flight drops usingLast-Event-Id.operation waitnow writes terminal ops into the sessionimage_cache, andshowignores non-terminal cached entries so it never returns a stale in-progress snapshot.cli_version()returns"editable"foruv sync/ pip editable installs by consultingdirect_url.json, so version strings don't lie in dev checkouts.Review fixes (2nd commit)
Two issues raised by @insomnes:
SSE retry no longer gives up. The old streamer capped at
len(RETRY_DELAYS)attempts and returned an empty summary; the caller then did a single GET whose result could be non-terminal (EXECUTING), and downstream code treated that as final. Now_stream_events_until_closeloops indefinitely — between every failed SSE cycle (connect error, mid-stream drop, or clean close withoutcompletion) it doesGET /operations/{uuid}and, if terminal, parks the full op onTerminalSummary.fallback_opand returns. Only acompletionevent, a GET-detected terminal status,BrokenPipeError, or Ctrl+C ends the loop.BrokenPipeErroris no longer misinterpreted as a remote network drop.BrokenPipeErroris a subclass ofConnectionError, which was insideRETRYABLE_NETWORK_ERRORS— socontree run ... | headwould trigger the retry path as if the remote stream broke. Now the streamer catchesBrokenPipeErrorexplicitly and re-raises;cmd_runcatches it, sendsDELETE /operations/{uuid}to cancel the remote op, silences further stdio writes by redirecting the stdout fd to/dev/null, and exits141(128 + SIGPIPE) — the Unix convention for SIGPIPE-terminated pipeline stages.BrokenPipeError behavior — worked example
What happens on
contree run -- tar -xvpjf archive.tbz2 | head:contreespawns the op remotely and opens the SSE stream.contreewrites each chunk tosys.stdout, andheadprints it.headexits, closing the pipe.sys.stdout.buffer.write(chunk)raisesBrokenPipeError._stream_events_until_closere-raises unchanged (does not treat this as a network drop; no retry, noLast-Event-Idreconnect).cmd_runcatchesBrokenPipeError:DELETE /v1/operations/{op_uuid}— the server kills tar.os.dup2(/dev/null, sys.stdout.fileno())— silences any lingering writes during interpreter shutdown.raise SystemExit(141).Why cancel the remote op
This matches what a local
tar xvpjf archive.tbz2 | headdoes: tar's next stdout write raises SIGPIPE, and its default SIGPIPE action terminates the process — extraction stops partway.What differs from local
tar ... | headcontree run -- tar ... | headDELETE /operations/{uuid})CANCELLEDstatus → noresult_image_uuid→ session image does not advance; the writable layer is discardedSo a local
tar | headleaves you a partial (recoverable) extraction; the contree equivalent leaves nothing durable. That's a consequence of contree's all-or-nothing image commit model, not of the CLI's pipe handling.When this matters
yes | head,find / | head,dmesg | head— cancelling is fine; no meaningful side effect lost.contree run -- tar ... | head,... | make build | head,... | cargo build 2>&1 | head— the op is aborted before completing; if you want the work to finish, either drop the pipe or usecontree run -d(detach mode) and read the output later viacontree show.The CLI can't tell these two cases apart, so it errs on the side of matching shell/SIGPIPE semantics. Reversing this would require the server to commit partial images for cancelled ops, which is out of scope here.
Test plan
make lintmake typesuv run pytest(1593 passed, 4 skipped)contree run -- sleep 5; echo doneshows live output and exits cleanlycontree run --format json -- echo hireturns clean JSON with full stdout capturedcontree run -- yes | headcancels the op on the server and exits 141