Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bench/src/experiment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,12 @@ export async function runExperiment(cfg: ExperimentConfig): Promise<ExperimentRe
task: task.prompt,
ctx: { sandboxClient: cfg.sandboxClient, hooks: runtime.hooks },
maxIterations: rounds,
// Batch eval turns are long + quiet (clone/build/test) → a live SSE
// idle-drops on prod AND staging. SANDBOX_STREAMING=poll fire-and-detaches
// + status-polls the terminal result so the cell completes. Default 'sse'.
...(process.env.SANDBOX_STREAMING === 'poll'
? { lineage: { streaming: 'poll' as const } }
: {}),
})
const iter0 = result.iterations[0]
const infraError = iter0?.error !== undefined && iter0.output === undefined
Expand Down
23 changes: 21 additions & 2 deletions src/runtime/run-loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { probeSandboxCapabilities } from './sandbox-capabilities'
import { extractLlmCallEvent } from './sandbox-events'
import {
createSandboxLineage,
promptEvents,
type SandboxLineage,
type SandboxLineageHandle,
} from './sandbox-lineage'
Expand Down Expand Up @@ -143,6 +144,9 @@ export async function runLoop<Task, Output, Decision>(
if (!Number.isFinite(maxConcurrency) || maxConcurrency <= 0) {
throw new ValidationError('runLoop: maxConcurrency must be > 0')
}
// Default fresh-box path streaming mode (read regardless of lineage activation,
// which gates on sessionContinuity/forkFanout — the bench uses neither).
const sandboxStreaming = options.lineage?.streaming ?? 'sse'
if (!options.ctx?.sandboxClient || typeof options.ctx.sandboxClient.create !== 'function') {
throw new ValidationError('runLoop: ctx.sandboxClient.create is required')
}
Expand Down Expand Up @@ -290,6 +294,7 @@ export async function runLoop<Task, Output, Decision>(
output: options.output,
validator: options.validator,
maxConcurrency,
streaming: sandboxStreaming,
signal: controller.signal,
ctx: options.ctx,
runId,
Expand Down Expand Up @@ -397,7 +402,10 @@ async function setUpLineage<Task, Output, Decision>(
}
const capabilities = await probeSandboxCapabilities(options.ctx.sandboxClient)
return {
lineage: createSandboxLineage(options.ctx.sandboxClient, capabilities, { maxConcurrency }),
lineage: createSandboxLineage(options.ctx.sandboxClient, capabilities, {
maxConcurrency,
streaming: lineageOpts.streaming,
}),
options: lineageOpts,
handles: new Map(),
canPrune: typeof options.driver.describePlan !== 'function',
Expand Down Expand Up @@ -560,6 +568,10 @@ interface RunBatchArgs<Task, Output> {
/** The loop's lineage state; iterations record their handle here for the next
* round to descend from. Set iff `lineagePlan` is. */
lineageState?: LineageState
/** Sandbox streaming mode for the default fresh-box path. 'poll' fire-and-
* detaches + status-polls the terminal result (drop-resilient for long batch
* turns); 'sse' streams live (default). */
streaming: 'sse' | 'poll'
}

async function runBatch<Task, Output>(args: RunBatchArgs<Task, Output>) {
Expand Down Expand Up @@ -644,7 +656,14 @@ async function executeIteration<Task, Output>(args: ExecuteIterationArgs<Task, O
stream = acquired.events
} else {
box = await createSandboxForSpec(args.ctx.sandboxClient, spec, args.signal)
stream = box.streamPrompt(spec.taskToPrompt(args.item.task), { signal: args.signal })
const prompt = spec.taskToPrompt(args.item.task)
// 'poll' (opt-in) fire-and-detaches + status-polls the terminal result so a
// long, quiet turn never holds a drop-prone live SSE; 'sse' (default)
// streams live — byte-identical to the prior path.
stream =
args.streaming === 'poll'
? promptEvents('poll', box, prompt, `${args.runId}-i${args.item.index}`, args.signal)
: box.streamPrompt(prompt, { signal: args.signal })
}
const placement = describeSandboxPlacement(args.ctx.sandboxClient, box)
await emitTrace(args.ctx.traceEmitter, {
Expand Down
61 changes: 56 additions & 5 deletions src/runtime/sandbox-lineage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,54 @@ import {
const TEARDOWN_TIMEOUT_MS = 15_000
const DEFAULT_FORK_CONCURRENCY = 4

/**
* One turn's event stream, in the lineage's chosen streaming mode.
*
* - `'sse'` (default): live `streamPrompt` — low latency, full per-token trace.
* Best for interactive chat.
* - `'poll'`: fire-and-detach via `dispatchPrompt`, await the terminal result by
* status-polling (NOT a held SSE), then yield the answer as one synthesized
* event. A long, quiet in-box turn (clone + build + test) never holds a live
* stream a proxy idle-timeout can drop mid-execution — the failure mode that
* made batch eval runs lose their stream on both prod and staging. Lower trace
* fidelity (one terminal event, not per-token), so it is opt-in for batch.
*
* Both yield the same `SandboxEvent` vocabulary, so callers are agnostic.
*/
async function* pollPromptEvents(
box: SandboxInstance,
prompt: string,
sessionId: string,
signal: AbortSignal,
): AsyncIterable<SandboxEvent> {
if (signal.aborted) throwAbort()
await box.dispatchPrompt(prompt, { sessionId, signal })
const result = await box.session(sessionId).result()
if (signal.aborted) throwAbort()
yield {
type: 'result',
id: sessionId,
data: {
finalText: result.response ?? '',
success: result.success,
...(result.error ? { error: result.error } : {}),
...(result.usage ? { usage: result.usage } : {}),
},
}
}

export function promptEvents(
streaming: 'sse' | 'poll',
box: SandboxInstance,
prompt: string,
sessionId: string,
signal: AbortSignal,
): AsyncIterable<SandboxEvent> {
return streaming === 'poll'
? pollPromptEvents(box, prompt, sessionId, signal)
: box.streamPrompt(prompt, { sessionId, signal })
}

/**
* A live box plus the session that threads its iterations together. Handed back
* by `start`/`fork`, passed into `continue`/`fork` to descend from. Opaque to
Expand Down Expand Up @@ -136,11 +184,14 @@ export interface SandboxLineage {
export function createSandboxLineage(
client: LoopSandboxClient,
capabilities: SandboxCapabilities,
options: { maxConcurrency?: number } = {},
options: { maxConcurrency?: number; streaming?: 'sse' | 'poll' } = {},
): SandboxLineage {
if (!client || typeof client.create !== 'function') {
throw new ValidationError('createSandboxLineage: client.create is required')
}
// 'sse' (default) preserves the byte-identical live-stream behavior; 'poll' is
// the drop-resilient fire-and-detach path for long, quiet batch turns.
const streaming = options.streaming ?? 'sse'
// Bounds the burst of box creation inside `fork` so an N-way fanout doesn't
// provision N boxes simultaneously regardless of the loop's concurrency cap.
const forkConcurrency = Math.max(
Expand All @@ -164,7 +215,7 @@ export function createSandboxLineage(
async start(spec, prompt, signal) {
const box = await acquireFresh(spec, signal)
const sessionId = mintSessionId()
const events = box.streamPrompt(prompt, { sessionId, signal })
const events = promptEvents(streaming, box, prompt, sessionId, signal)
return { handle: { box, sessionId }, events }
},

Expand All @@ -175,7 +226,7 @@ export function createSandboxLineage(
await assertSessionLive(handle.box, handle.sessionId)
// Same box, same session id — the server continues the conversation; we do
// NOT re-acquire and do NOT re-inject prior context as prompt text.
return handle.box.streamPrompt(prompt, { sessionId: handle.sessionId, signal })
return promptEvents(streaming, handle.box, prompt, handle.sessionId, signal)
},

async fork(parent, prompts, specs, signal) {
Expand Down Expand Up @@ -204,14 +255,14 @@ export function createSandboxLineage(
const sessionId = mintSessionId()
return {
handle: { box, sessionId },
events: box.streamPrompt(prompt, { sessionId, signal }),
events: promptEvents(streaming, box, prompt, sessionId, signal),
}
}
const box = await acquireFresh(spec, signal)
const sessionId = mintSessionId()
return {
handle: { box, sessionId },
events: box.streamPrompt(prompt, { sessionId, signal }),
events: promptEvents(streaming, box, prompt, sessionId, signal),
}
})
},
Expand Down
12 changes: 12 additions & 0 deletions src/runtime/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,18 @@ export interface LoopLineageOptions {
* different-per-branch profiles use the unforked fanout path.
*/
forkFanout?: boolean
/**
* Per-turn sandbox streaming mode. Default `'sse'` (live `streamPrompt` —
* low-latency, full per-token trace; best for interactive chat). `'poll'`
* fire-and-detaches via `dispatchPrompt` and awaits the terminal result by
* status-polling, so a long, quiet in-box turn (clone + build + test) never
* holds a live stream a proxy idle-timeout can drop mid-execution. Lower trace
* fidelity (one terminal event), so it is opt-in — intended for BATCH eval
* runs, which don't need live streaming and were losing long turns to the
* idle-drop. Applies to the default fresh-box path too, not only when
* `sessionContinuity`/`forkFanout` are on.
*/
streaming?: 'sse' | 'poll'
}

/** @experimental */
Expand Down
57 changes: 57 additions & 0 deletions tests/loops/sandbox-lineage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,63 @@ describe('runLoop lineage — sessionContinuity OFF (the independence invariant)
})
})

describe('runLoop — streaming: poll (drop-resilient batch path)', () => {
it('fire-and-detaches via dispatchPrompt + drains the terminal result, never holding a live stream', async () => {
const calls = { stream: 0, dispatch: 0, result: 0 }
const client = {
async create(): Promise<SandboxInstance> {
return {
id: 'poll-box',
async *streamPrompt(): AsyncGenerator<SandboxEvent> {
calls.stream += 1 // MUST NOT run in poll mode — that is the whole point
yield { type: 'result', data: { finalText: 'SSE' } }
},
async dispatchPrompt(_m: string, o?: { sessionId?: string }) {
calls.dispatch += 1
return {
sessionId: o?.sessionId ?? 'minted',
status: 'running' as const,
alreadyExisted: false,
}
},
session(id: string) {
return {
async status() {
return { id, status: 'completed' as const }
},
async result() {
calls.result += 1
return { success: true, response: 'POLLED', durationMs: 1 }
},
}
},
async delete() {},
} as unknown as SandboxInstance
},
}
const pollOutput: OutputAdapter<string> = {
parse: (events) =>
String((events.at(-1)?.data as { finalText?: string } | undefined)?.finalText ?? ''),
}
const moves: TopologyMove<Task>[] = [{ kind: 'refine', task: { goal: 'g' } }, { kind: 'stop' }]
let i = 0
const planner: TopologyPlanner<Task, string> = () => moves[i++]!

await runLoop<Task, string, 'continue' | 'done'>({
driver: createDynamicDriver<Task, string>({ planner }),
agentRun: spec('w'),
output: pollOutput,
task: { goal: 'g' },
ctx: { sandboxClient: client as never },
lineage: { streaming: 'poll' },
})

expect(calls.dispatch).toBe(1) // fire-and-detach used
expect(calls.result).toBe(1) // terminal result drained by status-poll
expect(calls.stream).toBe(0) // a live SSE was NEVER held — the drop is impossible
})
})

describe('runLoop lineage — sessionContinuity ON', () => {
it('a refine continues the parent on the SAME box with the SAME session id', async () => {
const { client, streamCalls, created } = createFakeClient({ criuAvailable: false })
Expand Down
6 changes: 3 additions & 3 deletions tests/profiles/coder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,15 @@ describe('coderProfile output adapter', () => {
const events: SandboxEvent[] = [
{
type: 'message.part.updated',
data: { part: { type: 'reasoning' }, delta: '```json\n' + decoy + '\n``` thinking...' },
data: { part: { type: 'reasoning' }, delta: `\`\`\`json\n${decoy}\n\`\`\` thinking...` },
},
{
type: 'message.part.updated',
data: { part: { type: 'text' }, delta: 'earlier ```json\n' + decoy + '\n``` then ' },
data: { part: { type: 'text' }, delta: `earlier \`\`\`json\n${decoy}\n\`\`\` then ` },
},
{
type: 'message.part.updated',
data: { part: { type: 'text' }, delta: 'final ```json\n' + real + '\n```' },
data: { part: { type: 'text' }, delta: `final \`\`\`json\n${real}\n\`\`\`` },
},
] as SandboxEvent[]
const out = preset.output.parse(events)
Expand Down
Loading