From 5bd92e6a14f1501fc73d0105aec08577cb075072 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Sat, 6 Jun 2026 13:52:23 -0600 Subject: [PATCH 1/2] feat(loops): opt-in sandbox poll-mode (drop-resilient batch streaming) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Long, quiet in-box turns (clone/build/test) idle-drop their live SSE on the sandbox (replay-404 / exhausted reconnects) on BOTH prod and staging, which excludes cells and blocks batch eval runs. The reconnect path (lastEventId replay) 404s because the per-session event buffer is reaped between the drop and the reconnect. Adds an opt-in LoopLineageOptions.streaming: 'poll' (default 'sse', unchanged). Poll-mode fire-and-detaches via dispatchPrompt, awaits the terminal result by status-poll (session(id).result() — no held stream), and yields the answer as one synthesized event. With no live SSE held across the quiet execution, the idle-drop is impossible by construction. Threaded through the default fresh-box path (what batch runs actually use — the lineage only activates on sessionContinuity/forkFanout) and the lineage start/continue/fork. Lower trace fidelity (one terminal event, not per-token), so opt-in for batch; interactive chat keeps live SSE. Bench opts in via SANDBOX_STREAMING=poll. - typecheck clean; 37 kernel tests pass incl. a new poll-mode wiring test (dispatchPrompt + result() used, streamPrompt never held) - default 'sse' byte-identical (13 existing lineage tests pass unchanged) e2e validation against the live drop is pending sandbox stability — prod is currently degraded at the box-ACQUISITION layer too (separate from the SSE drop), which blocked the end-to-end run. Correct by construction + unit-tested; the lift number lands once a box can be acquired. --- bench/src/experiment.ts | 6 +++ src/runtime/run-loop.ts | 23 ++++++++++- src/runtime/sandbox-lineage.ts | 61 ++++++++++++++++++++++++++--- src/runtime/types.ts | 12 ++++++ tests/loops/sandbox-lineage.test.ts | 52 ++++++++++++++++++++++++ 5 files changed, 147 insertions(+), 7 deletions(-) diff --git a/bench/src/experiment.ts b/bench/src/experiment.ts index 670866d..bc0db33 100644 --- a/bench/src/experiment.ts +++ b/bench/src/experiment.ts @@ -302,6 +302,12 @@ export async function runExperiment(cfg: ExperimentConfig): Promise( if (!Number.isFinite(maxConcurrency) || maxConcurrency <= 0) { throw new ValidationError('runLoop: maxConcurrency must be > 0') } + // Default fresh-box path streaming mode (read regardless of lineage activation, + // which gates on sessionContinuity/forkFanout — the bench uses neither). + const sandboxStreaming = options.lineage?.streaming ?? 'sse' if (!options.ctx?.sandboxClient || typeof options.ctx.sandboxClient.create !== 'function') { throw new ValidationError('runLoop: ctx.sandboxClient.create is required') } @@ -290,6 +294,7 @@ export async function runLoop( output: options.output, validator: options.validator, maxConcurrency, + streaming: sandboxStreaming, signal: controller.signal, ctx: options.ctx, runId, @@ -397,7 +402,10 @@ async function setUpLineage( } const capabilities = await probeSandboxCapabilities(options.ctx.sandboxClient) return { - lineage: createSandboxLineage(options.ctx.sandboxClient, capabilities, { maxConcurrency }), + lineage: createSandboxLineage(options.ctx.sandboxClient, capabilities, { + maxConcurrency, + streaming: lineageOpts.streaming, + }), options: lineageOpts, handles: new Map(), canPrune: typeof options.driver.describePlan !== 'function', @@ -560,6 +568,10 @@ interface RunBatchArgs { /** The loop's lineage state; iterations record their handle here for the next * round to descend from. Set iff `lineagePlan` is. */ lineageState?: LineageState + /** Sandbox streaming mode for the default fresh-box path. 'poll' fire-and- + * detaches + status-polls the terminal result (drop-resilient for long batch + * turns); 'sse' streams live (default). */ + streaming: 'sse' | 'poll' } async function runBatch(args: RunBatchArgs) { @@ -644,7 +656,14 @@ async function executeIteration(args: ExecuteIterationArgs { + if (signal.aborted) throwAbort() + await box.dispatchPrompt(prompt, { sessionId, signal }) + const result = await box.session(sessionId).result() + if (signal.aborted) throwAbort() + yield { + type: 'result', + id: sessionId, + data: { + finalText: result.response ?? '', + success: result.success, + ...(result.error ? { error: result.error } : {}), + ...(result.usage ? { usage: result.usage } : {}), + }, + } +} + +export function promptEvents( + streaming: 'sse' | 'poll', + box: SandboxInstance, + prompt: string, + sessionId: string, + signal: AbortSignal, +): AsyncIterable { + return streaming === 'poll' + ? pollPromptEvents(box, prompt, sessionId, signal) + : box.streamPrompt(prompt, { sessionId, signal }) +} + /** * A live box plus the session that threads its iterations together. Handed back * by `start`/`fork`, passed into `continue`/`fork` to descend from. Opaque to @@ -136,11 +184,14 @@ export interface SandboxLineage { export function createSandboxLineage( client: LoopSandboxClient, capabilities: SandboxCapabilities, - options: { maxConcurrency?: number } = {}, + options: { maxConcurrency?: number; streaming?: 'sse' | 'poll' } = {}, ): SandboxLineage { if (!client || typeof client.create !== 'function') { throw new ValidationError('createSandboxLineage: client.create is required') } + // 'sse' (default) preserves the byte-identical live-stream behavior; 'poll' is + // the drop-resilient fire-and-detach path for long, quiet batch turns. + const streaming = options.streaming ?? 'sse' // Bounds the burst of box creation inside `fork` so an N-way fanout doesn't // provision N boxes simultaneously regardless of the loop's concurrency cap. const forkConcurrency = Math.max( @@ -164,7 +215,7 @@ export function createSandboxLineage( async start(spec, prompt, signal) { const box = await acquireFresh(spec, signal) const sessionId = mintSessionId() - const events = box.streamPrompt(prompt, { sessionId, signal }) + const events = promptEvents(streaming, box, prompt, sessionId, signal) return { handle: { box, sessionId }, events } }, @@ -175,7 +226,7 @@ export function createSandboxLineage( await assertSessionLive(handle.box, handle.sessionId) // Same box, same session id — the server continues the conversation; we do // NOT re-acquire and do NOT re-inject prior context as prompt text. - return handle.box.streamPrompt(prompt, { sessionId: handle.sessionId, signal }) + return promptEvents(streaming, handle.box, prompt, handle.sessionId, signal) }, async fork(parent, prompts, specs, signal) { @@ -204,14 +255,14 @@ export function createSandboxLineage( const sessionId = mintSessionId() return { handle: { box, sessionId }, - events: box.streamPrompt(prompt, { sessionId, signal }), + events: promptEvents(streaming, box, prompt, sessionId, signal), } } const box = await acquireFresh(spec, signal) const sessionId = mintSessionId() return { handle: { box, sessionId }, - events: box.streamPrompt(prompt, { sessionId, signal }), + events: promptEvents(streaming, box, prompt, sessionId, signal), } }) }, diff --git a/src/runtime/types.ts b/src/runtime/types.ts index 797cfb7..019d1eb 100644 --- a/src/runtime/types.ts +++ b/src/runtime/types.ts @@ -280,6 +280,18 @@ export interface LoopLineageOptions { * different-per-branch profiles use the unforked fanout path. */ forkFanout?: boolean + /** + * Per-turn sandbox streaming mode. Default `'sse'` (live `streamPrompt` — + * low-latency, full per-token trace; best for interactive chat). `'poll'` + * fire-and-detaches via `dispatchPrompt` and awaits the terminal result by + * status-polling, so a long, quiet in-box turn (clone + build + test) never + * holds a live stream a proxy idle-timeout can drop mid-execution. Lower trace + * fidelity (one terminal event), so it is opt-in — intended for BATCH eval + * runs, which don't need live streaming and were losing long turns to the + * idle-drop. Applies to the default fresh-box path too, not only when + * `sessionContinuity`/`forkFanout` are on. + */ + streaming?: 'sse' | 'poll' } /** @experimental */ diff --git a/tests/loops/sandbox-lineage.test.ts b/tests/loops/sandbox-lineage.test.ts index 0e8ff97..05799f9 100644 --- a/tests/loops/sandbox-lineage.test.ts +++ b/tests/loops/sandbox-lineage.test.ts @@ -163,6 +163,58 @@ describe('runLoop lineage — sessionContinuity OFF (the independence invariant) }) }) +describe('runLoop — streaming: poll (drop-resilient batch path)', () => { + it('fire-and-detaches via dispatchPrompt + drains the terminal result, never holding a live stream', async () => { + const calls = { stream: 0, dispatch: 0, result: 0 } + const client = { + async create(): Promise { + return { + id: 'poll-box', + async *streamPrompt(): AsyncGenerator { + calls.stream += 1 // MUST NOT run in poll mode — that is the whole point + yield { type: 'result', data: { finalText: 'SSE' } } + }, + async dispatchPrompt(_m: string, o?: { sessionId?: string }) { + calls.dispatch += 1 + return { sessionId: o?.sessionId ?? 'minted', status: 'running' as const, alreadyExisted: false } + }, + session(id: string) { + return { + async status() { + return { id, status: 'completed' as const } + }, + async result() { + calls.result += 1 + return { success: true, response: 'POLLED', durationMs: 1 } + }, + } + }, + async delete() {}, + } as unknown as SandboxInstance + }, + } + const pollOutput: OutputAdapter = { + parse: (events) => String((events.at(-1)?.data as { finalText?: string } | undefined)?.finalText ?? ''), + } + const moves: TopologyMove[] = [{ kind: 'refine', task: { goal: 'g' } }, { kind: 'stop' }] + let i = 0 + const planner: TopologyPlanner = () => moves[i++]! + + await runLoop({ + driver: createDynamicDriver({ planner }), + agentRun: spec('w'), + output: pollOutput, + task: { goal: 'g' }, + ctx: { sandboxClient: client as never }, + lineage: { streaming: 'poll' }, + }) + + expect(calls.dispatch).toBe(1) // fire-and-detach used + expect(calls.result).toBe(1) // terminal result drained by status-poll + expect(calls.stream).toBe(0) // a live SSE was NEVER held — the drop is impossible + }) +}) + describe('runLoop lineage — sessionContinuity ON', () => { it('a refine continues the parent on the SAME box with the SAME session id', async () => { const { client, streamCalls, created } = createFakeClient({ criuAvailable: false }) From b83281029ef0ec139c1e2f3c8c13055eddfda0b5 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Sat, 6 Jun 2026 14:01:10 -0600 Subject: [PATCH 2/2] =?UTF-8?q?style(tests):=20biome=20=E2=80=94=20templat?= =?UTF-8?q?e-literals=20in=20coder.test.ts=20(pre-existing,=20main=20was?= =?UTF-8?q?=20lint-red)=20+=20format=20the=20new=20poll-mode=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/loops/sandbox-lineage.test.ts | 9 +++++++-- tests/profiles/coder.test.ts | 6 +++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/tests/loops/sandbox-lineage.test.ts b/tests/loops/sandbox-lineage.test.ts index 05799f9..69355aa 100644 --- a/tests/loops/sandbox-lineage.test.ts +++ b/tests/loops/sandbox-lineage.test.ts @@ -176,7 +176,11 @@ describe('runLoop — streaming: poll (drop-resilient batch path)', () => { }, async dispatchPrompt(_m: string, o?: { sessionId?: string }) { calls.dispatch += 1 - return { sessionId: o?.sessionId ?? 'minted', status: 'running' as const, alreadyExisted: false } + return { + sessionId: o?.sessionId ?? 'minted', + status: 'running' as const, + alreadyExisted: false, + } }, session(id: string) { return { @@ -194,7 +198,8 @@ describe('runLoop — streaming: poll (drop-resilient batch path)', () => { }, } const pollOutput: OutputAdapter = { - parse: (events) => String((events.at(-1)?.data as { finalText?: string } | undefined)?.finalText ?? ''), + parse: (events) => + String((events.at(-1)?.data as { finalText?: string } | undefined)?.finalText ?? ''), } const moves: TopologyMove[] = [{ kind: 'refine', task: { goal: 'g' } }, { kind: 'stop' }] let i = 0 diff --git a/tests/profiles/coder.test.ts b/tests/profiles/coder.test.ts index 402c3d6..6e928c8 100644 --- a/tests/profiles/coder.test.ts +++ b/tests/profiles/coder.test.ts @@ -212,15 +212,15 @@ describe('coderProfile output adapter', () => { const events: SandboxEvent[] = [ { type: 'message.part.updated', - data: { part: { type: 'reasoning' }, delta: '```json\n' + decoy + '\n``` thinking...' }, + data: { part: { type: 'reasoning' }, delta: `\`\`\`json\n${decoy}\n\`\`\` thinking...` }, }, { type: 'message.part.updated', - data: { part: { type: 'text' }, delta: 'earlier ```json\n' + decoy + '\n``` then ' }, + data: { part: { type: 'text' }, delta: `earlier \`\`\`json\n${decoy}\n\`\`\` then ` }, }, { type: 'message.part.updated', - data: { part: { type: 'text' }, delta: 'final ```json\n' + real + '\n```' }, + data: { part: { type: 'text' }, delta: `final \`\`\`json\n${real}\n\`\`\`` }, }, ] as SandboxEvent[] const out = preset.output.parse(events)