Skip to content

Stream run events with resumable SSE#12

Open
mosquito wants to merge 4 commits into
masterfrom
feature/run-event-streaming
Open

Stream run events with resumable SSE#12
mosquito wants to merge 4 commits into
masterfrom
feature/run-event-streaming

Conversation

@mosquito

@mosquito mosquito commented Jul 2, 2026

Copy link
Copy Markdown
Collaborator

Summary

  • contree run follows /v1/operations/{uuid}/events instead of polling /v1/operations/{uuid}. Output streams live; a TerminalSummary built from the SSE events lets the handler skip the final GET when the stream ends with a completion frame.
  • Adds GzipResponse (incremental Content-Encoding: gzip decoding via read1 + zlib.decompressobj(wbits=31)), plus iter_sse_events / decode_event_chunk parsers in client.py. Streams resume across mid-flight drops using Last-Event-Id.
  • operation wait now writes terminal ops into the session image_cache, and show ignores non-terminal cached entries so it never returns a stale in-progress snapshot.
  • cli_version() returns "editable" for uv sync / pip editable installs by consulting direct_url.json, so version strings don't lie in dev checkouts.

Review fixes (2nd commit)

Two issues raised by @insomnes:

  1. SSE retry no longer gives up. The old streamer capped at len(RETRY_DELAYS) attempts and returned an empty summary; the caller then did a single GET whose result could be non-terminal (EXECUTING), and downstream code treated that as final. Now _stream_events_until_close loops indefinitely — between every failed SSE cycle (connect error, mid-stream drop, or clean close without completion) it does GET /operations/{uuid} and, if terminal, parks the full op on TerminalSummary.fallback_op and returns. Only a completion event, a GET-detected terminal status, BrokenPipeError, or Ctrl+C ends the loop.

  2. BrokenPipeError is no longer misinterpreted as a remote network drop. BrokenPipeError is a subclass of ConnectionError, which was inside RETRYABLE_NETWORK_ERRORS — so contree run ... | head would trigger the retry path as if the remote stream broke. Now the streamer catches BrokenPipeError explicitly and re-raises; cmd_run catches it, sends DELETE /operations/{uuid} to cancel the remote op, silences further stdio writes by redirecting the stdout fd to /dev/null, and exits 141 (128 + SIGPIPE) — the Unix convention for SIGPIPE-terminated pipeline stages.

BrokenPipeError behavior — worked example

What happens on contree run -- tar -xvpjf archive.tbz2 | head:

  1. contree spawns the op remotely and opens the SSE stream.
  2. The server streams tar's verbose output; contree writes each chunk to sys.stdout, and head prints it.
  3. After ~10 lines, head exits, closing the pipe.
  4. Next sys.stdout.buffer.write(chunk) raises BrokenPipeError.
  5. _stream_events_until_close re-raises unchanged (does not treat this as a network drop; no retry, no Last-Event-Id reconnect).
  6. cmd_run catches BrokenPipeError:
    • DELETE /v1/operations/{op_uuid} — the server kills tar.
    • os.dup2(/dev/null, sys.stdout.fileno()) — silences any lingering writes during interpreter shutdown.
    • raise SystemExit(141).

Why cancel the remote op

This matches what a local tar xvpjf archive.tbz2 | head does: tar's next stdout write raises SIGPIPE, and its default SIGPIPE action terminates the process — extraction stops partway.

What differs from local

Local tar ... | head Contree contree run -- tar ... | head
Producer sees pipe close Yes (kernel-delivered SIGPIPE) No — the pipe break happens on the CLI host; the CLI propagates it by cancelling the op
Producer stops Yes Yes (via DELETE /operations/{uuid})
Partial state Whatever tar fully wrote before dying stays on the local filesystem Op ends in CANCELLED status → no result_image_uuid → session image does not advance; the writable layer is discarded

So a local tar | head leaves you a partial (recoverable) extraction; the contree equivalent leaves nothing durable. That's a consequence of contree's all-or-nothing image commit model, not of the CLI's pipe handling.

When this matters

  • yes | head, find / | head, dmesg | head — cancelling is fine; no meaningful side effect lost.
  • contree run -- tar ... | head, ... | make build | head, ... | cargo build 2>&1 | head — the op is aborted before completing; if you want the work to finish, either drop the pipe or use contree run -d (detach mode) and read the output later via contree show.

The CLI can't tell these two cases apart, so it errs on the side of matching shell/SIGPIPE semantics. Reversing this would require the server to commit partial images for cancelled ops, which is out of scope here.

Test plan

  • make lint
  • make types
  • uv run pytest (1593 passed, 4 skipped)
  • Manual: contree run -- sleep 5; echo done shows live output and exits cleanly
  • Manual: kill the stream mid-run (drop network), confirm it resumes without duplicating output
  • Manual: contree run --format json -- echo hi returns clean JSON with full stdout captured
  • Manual: contree run -- yes | head cancels the op on the server and exits 141

`contree run` now follows /v1/operations/{uuid}/events instead of
polling. Adds a gzip-aware incremental HTTP body wrapper, SSE frame
parser with Last-Event-Id resume across drops, and a TerminalSummary
that lets the run handler skip the final GET when the stream ends
with a completion event.

`operation wait` now caches terminal ops into the session image cache
so subsequent `show` invocations reuse the result. `show` treats
non-terminal cache hits as misses to avoid stale reads.

Client:
- GzipResponse: incremental Content-Encoding: gzip via read1+decompressobj
- iter_sse_events / decode_event_chunk parsers
- cli_version() now returns "editable" for direct-url editable installs
Comment thread contree_cli/cli/run.py Outdated
Comment thread contree_cli/cli/run.py
Comment thread contree_cli/client.py Outdated
)

for attempt in range(attempts):
delay = RETRY_DELAYS[attempt - 1]

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 on attempt 0 (0-1=-1) is unexpected, I suppose

mosquito added 3 commits July 2, 2026 17:35
- Drop the retry-budget bail-out.  Between every failed SSE cycle
  (connect error, mid-stream drop, clean close without completion),
  poll `GET /operations/{uuid}` and, if terminal, park the full op
  on `TerminalSummary.fallback_op` so cmd_run can use it directly.
  Only a `completion` event, a GET-detected terminal status, or
  Ctrl+C ends the loop.

- Catch `BrokenPipeError` explicitly before RETRYABLE_NETWORK_ERRORS
  in the stream consumer and re-raise.  BrokenPipeError is a
  ConnectionError subclass, so the old broad handler would treat a
  closed local pipe (e.g. `contree run ... | head`) as a remote drop
  and retry uselessly.  cmd_run now catches it, cancels the op,
  silences further stdio writes, and exits 141 (128 + SIGPIPE).

Tests cover ApiError-until-terminal, empty stream + terminal GET,
BrokenPipe propagation out of the streamer, and cmd_run's
BrokenPipe cancel-and-exit path.
RETRY_DELAYS[attempt - 1] on attempt=0 wrapped to the tail (10s)
before ever being intended as a retry index. The retry-block guard
now scopes delay to attempts > 0, and the 410/425 branch uses
RETRY_DELAYS[min(attempt, len - 1)] so first-attempt Too Early / Gone
retries sleep 1s instead of 10s.
`contree build` now consumes the same SSE event stream as `contree run`
instead of polling `/operations/{uuid}` and dumping stdout/stderr at the
end. RUN output reaches the user's terminal as the server produces it —
matching `docker build`'s live-log behavior.

The Dockerfile RUN keyword calls `_stream_events_until_close` with a
`DefaultFormatter`, always writing chunks to `sys.stdout`/`sys.stderr`
regardless of the top-level formatter used for the final build record.
When the streamer falls back to a GET (SSE never delivered a completion
event), the captured stdout/stderr from the op's metadata is replayed
via `log_streams` so the user still sees what happened. The `poll()`
helper and local `TERMINAL_STATUSES` constant are gone.

Also drops the streamer-level graduated backoff. `client.request`
already retries every SSE reconnect through its own `RETRY_DELAYS`
ladder, so the extra `_stream_backoff_sleep` was pure duplication.
Only kept is a 0.5s `TIGHT_LOOP_FLOOR` when a cycle produced no
forward progress — guards against a server that returns immediate
empty streams for an executing op.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants