From 3c22582a760ab819b1f769a3d26df7a8d1a00af8 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 9 Jun 2026 12:54:48 +0800 Subject: [PATCH] {"schema":"decodex/commit/1","summary":"Add resumable ELF backfill benchmark","authority":"XY-817"} --- Makefile.toml | 9 + README.md | 4 +- apps/elf-eval/src/bin/live_baseline_elf.rs | 601 ++++++++++++++++-- docker-compose.baseline.yml | 7 + .../benchmarking/live_baseline_benchmark.md | 28 +- scripts/live-baseline-benchmark.sh | 53 +- scripts/live-baseline-report-to-md.sh | 28 + 7 files changed, 667 insertions(+), 63 deletions(-) diff --git a/Makefile.toml b/Makefile.toml index ab3c4762..e6987085 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -297,6 +297,7 @@ args = [ # | task | type | cwd | # | -------------------------- | ------- | --- | # | baseline-live-docker | command | | +# | baseline-backfill-docker | command | | # | baseline-live-report | command | | # | baseline-live-docker-clean | command | | # | baseline-production-synthetic | command | | @@ -310,6 +311,14 @@ args = [ "set -euo pipefail; head=\"$(git rev-parse HEAD)\"; if [ -n \"$(git status --porcelain)\" ]; then head=\"$head+dirty\"; fi; export ELF_BASELINE_ELF_HEAD=\"$head\"; docker compose -f docker-compose.baseline.yml run --build --rm baseline-runner", ] +[tasks.baseline-backfill-docker] +workspace = false +command = "bash" +args = [ + "-lc", + "set -euo pipefail; head=\"$(git rev-parse HEAD)\"; if [ -n \"$(git status --porcelain)\" ]; then head=\"$head+dirty\"; fi; export ELF_BASELINE_ELF_HEAD=\"$head\"; export ELF_BASELINE_PROJECTS=\"${ELF_BASELINE_PROJECTS:-ELF}\"; export ELF_BASELINE_PROFILE=\"${ELF_BASELINE_PROFILE:-backfill}\"; export ELF_BASELINE_BACKFILL_DOCS=\"${ELF_BASELINE_BACKFILL_DOCS:-2000}\"; export ELF_BASELINE_ELF_TIMEOUT_SECONDS=\"${ELF_BASELINE_ELF_TIMEOUT_SECONDS:-3600}\"; export ELF_BASELINE_MAX_ELF_SECONDS=\"${ELF_BASELINE_MAX_ELF_SECONDS:-3600}\"; docker compose -f docker-compose.baseline.yml run --build --rm baseline-runner", +] + [tasks.baseline-live-report] workspace = false command = "bash" diff --git a/README.md b/README.md index 5f4b7af4..e77f3344 100644 --- a/README.md +++ b/README.md @@ -128,8 +128,8 @@ embeddings. smoke. OpenViking was `incomplete` because its local embedding dependency could not complete in the Docker runner. - The benchmark runner and report publisher are checked in and Docker-isolated: - `cargo make baseline-live-docker`, `cargo make baseline-live-report`, and - `cargo make baseline-live-docker-clean`. + `cargo make baseline-live-docker`, `cargo make baseline-backfill-docker`, + `cargo make baseline-live-report`, and `cargo make baseline-live-docker-clean`. Detailed evidence and interpretation: diff --git a/apps/elf-eval/src/bin/live_baseline_elf.rs b/apps/elf-eval/src/bin/live_baseline_elf.rs index b0857036..09bbe255 100644 --- a/apps/elf-eval/src/bin/live_baseline_elf.rs +++ b/apps/elf-eval/src/bin/live_baseline_elf.rs @@ -11,6 +11,7 @@ use std::{ time::{Duration, Instant}, }; +use blake3::Hasher; use clap::Parser; use color_eyre::{Report, eyre}; use serde::{Deserialize, Serialize}; @@ -22,7 +23,8 @@ use elf_chunking::ChunkingConfig; use elf_config::{Config, EmbeddingProviderConfig, LlmProviderConfig, ProviderConfig}; use elf_service::{ AddNoteInput, AddNoteRequest, BoxFuture, DeleteRequest, ElfService, EmbeddingProvider, - ExtractorProvider, PayloadLevel, Providers, RerankProvider, SearchRequest, UpdateRequest, + ExtractorProvider, NoteOp, PayloadLevel, Providers, RerankProvider, SearchRequest, + UpdateRequest, }; use elf_storage::{db::Db, qdrant::QdrantStore}; use elf_testkit::TestDatabase; @@ -32,6 +34,7 @@ const TENANT_ID: &str = "elf-live-baseline"; const PROJECT_ID: &str = "shared-corpus"; const AGENT_ID: &str = "elf-bench-agent"; const SCOPE: &str = "agent_private"; +const BACKFILL_CHECKPOINT_SCHEMA: &str = "elf.live_baseline.backfill_checkpoint/v1"; #[derive(Debug, Parser)] #[command(version = elf_cli::VERSION, rename_all = "kebab", styles = elf_cli::styles())] @@ -100,6 +103,78 @@ struct CorpusNote { source_doc: String, } +#[derive(Debug)] +struct BackfillOutcome { + report: BackfillReport, + note_ids: Vec, +} + +#[derive(Debug)] +struct ExistingBackfillNote { + note_id: Uuid, + source_hash: Option, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +struct BackfillCheckpoint { + schema: String, + corpus_hash: String, + completed: BTreeMap, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +struct BackfillCheckpointEntry { + note_id: Uuid, + key: String, + source_hash: String, + op: String, +} + +#[derive(Debug, Serialize)] +struct BackfillReport { + checkpoint_path: String, + corpus_hash: String, + source_count: usize, + completed_count: usize, + batch_size: usize, + worker_concurrency: usize, + elapsed_seconds: f64, + attempted_writes: usize, + skipped_completed: usize, + duplicate_source_notes: Vec, + resume: BackfillResumeReport, + attempts: Vec, +} + +#[derive(Debug, Serialize)] +struct BackfillResumeReport { + enabled: bool, + interrupted: bool, + interrupt_after: Option, + resume_attempts: usize, + completed_before_resume: usize, + completed_after_resume: usize, +} + +#[derive(Debug, Serialize)] +struct BackfillAttemptEvidence { + attempt: usize, + resumed: bool, + interrupt_after: Option, + skipped_completed: usize, + attempted_writes: usize, + completed_writes: usize, + checkpoint_completed: usize, + interrupted: bool, +} + +#[derive(Debug, Serialize)] +struct DuplicateSourceNote { + source_doc: String, + count: i64, + note_ids: Vec, +} + #[derive(Debug)] struct BaselineRuntime { config_path: PathBuf, @@ -113,6 +188,7 @@ struct BaselineRuntime { struct WorkerRunEvidence { label: String, expected_note_count: usize, + concurrency: usize, iterations: usize, before: BTreeMap, after: BTreeMap, @@ -164,6 +240,7 @@ struct ElfBaselineReport { reason: String, head: String, embedding: EmbeddingRuntimeReport, + backfill: BackfillReport, indexing: IndexingReport, summary: QuerySummary, check_summary: CheckSummary, @@ -583,6 +660,182 @@ fn worker_indexing_check(evidence: WorkerRunEvidence) -> CheckResult { } } +fn resumable_backfill_check(report: &BackfillReport) -> CheckResult { + let resume_pass = !report.resume.enabled + || (report.resume.interrupted + && report.resume.resume_attempts >= 2 + && report.skipped_completed > 0); + let pass = report.completed_count == report.source_count + && report.duplicate_source_notes.is_empty() + && resume_pass; + + CheckResult { + name: "resumable_backfill_no_duplicates", + status: if pass { "pass" } else { "fail" }, + reason: if pass { + "Checkpointed backfill resumed from durable progress and did not duplicate source documents." + .to_string() + } else { + "Checkpointed backfill did not complete cleanly, did not prove resume, or duplicated source documents." + .to_string() + }, + evidence: serde_json::json!(report), + } +} + +fn backfill_batch_size() -> usize { + parse_env_usize("ELF_BASELINE_BACKFILL_BATCH_SIZE").unwrap_or(32).max(1) +} + +fn worker_concurrency() -> usize { + let default = match env::var("ELF_BASELINE_PROFILE").as_deref() { + Ok("backfill" | "large") => 4, + Ok("stress") => 4, + Ok("scale" | "full") => 2, + _ => 1, + }; + + parse_env_usize("ELF_BASELINE_WORKER_CONCURRENCY").unwrap_or(default).clamp(1, 32) +} + +fn backfill_resume_probe_enabled() -> bool { + env::var("ELF_BASELINE_BACKFILL_RESUME_PROBE") + .map(|value| value != "0" && !value.eq_ignore_ascii_case("false")) + .unwrap_or(true) +} + +fn backfill_interrupt_after(source_count: usize) -> Option { + if !backfill_resume_probe_enabled() || source_count <= 1 { + return None; + } + + let configured = parse_env_usize("ELF_BASELINE_BACKFILL_INTERRUPT_AFTER"); + let default = (source_count / 2).max(1); + + Some(configured.unwrap_or(default).clamp(1, source_count.saturating_sub(1))) +} + +fn backfill_checkpoint_path(out: &Path) -> PathBuf { + env_string(&["ELF_BASELINE_BACKFILL_CHECKPOINT"]) + .map(PathBuf::from) + .unwrap_or_else(|| out.with_file_name("elf-backfill-checkpoint.json")) +} + +fn empty_backfill_checkpoint(corpus_hash: &str) -> BackfillCheckpoint { + BackfillCheckpoint { + schema: BACKFILL_CHECKPOINT_SCHEMA.to_string(), + corpus_hash: corpus_hash.to_string(), + completed: BTreeMap::new(), + } +} + +fn load_backfill_checkpoint( + path: &Path, + corpus_hash: &str, +) -> color_eyre::Result { + if !path.exists() { + return Ok(empty_backfill_checkpoint(corpus_hash)); + } + + let raw = fs::read_to_string(path)?; + let checkpoint = serde_json::from_str::(&raw)?; + + if checkpoint.schema == BACKFILL_CHECKPOINT_SCHEMA && checkpoint.corpus_hash == corpus_hash { + Ok(checkpoint) + } else { + Ok(empty_backfill_checkpoint(corpus_hash)) + } +} + +fn write_backfill_checkpoint( + path: &Path, + checkpoint: &BackfillCheckpoint, +) -> color_eyre::Result<()> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + + let raw = serde_json::to_string_pretty(checkpoint)?; + let tmp_path = path.with_extension("json.tmp"); + + fs::write(&tmp_path, raw)?; + fs::rename(tmp_path, path)?; + + Ok(()) +} + +fn source_hash(note: &CorpusNote) -> String { + let mut hasher = Hasher::new(); + + hasher.update(note.source_doc.as_bytes()); + hasher.update(b"\0"); + hasher.update(note.key.as_bytes()); + hasher.update(b"\0"); + hasher.update(note.text.as_bytes()); + + hasher.finalize().to_hex().to_string() +} + +fn corpus_hash(notes: &[CorpusNote]) -> String { + let mut hasher = Hasher::new(); + + for note in notes { + hasher.update(note.source_doc.as_bytes()); + hasher.update(b"\0"); + hasher.update(source_hash(note).as_bytes()); + hasher.update(b"\0"); + } + + hasher.finalize().to_hex().to_string() +} + +fn checkpoint_entry_valid( + note: &CorpusNote, + entry: &BackfillCheckpointEntry, + existing: &BTreeMap, +) -> bool { + let expected_hash = source_hash(note); + + if entry.source_hash != expected_hash { + return false; + } + + existing.get(¬e.source_doc).is_some_and(|stored| { + stored.note_id == entry.note_id + && stored.source_hash.as_deref() == Some(expected_hash.as_str()) + }) +} + +fn note_input(note: &CorpusNote) -> AddNoteInput { + let hash = source_hash(note); + + AddNoteInput { + r#type: "fact".to_string(), + key: Some(note.key.clone()), + text: note.text.clone(), + structured: None, + importance: 0.9, + confidence: 0.95, + ttl_days: None, + source_ref: serde_json::json!({ + "source": "ELF live baseline corpus", + "title": note.title, + "document": note.source_doc, + "source_hash": hash, + }), + write_policy: None, + } +} + +fn note_op_string(op: NoteOp) -> color_eyre::Result { + let value = serde_json::to_value(op)?; + + value + .as_str() + .map(ToString::to_string) + .ok_or_else(|| eyre::eyre!("Serialized note op was not a string.")) +} + fn concurrent_note_count() -> usize { if let Ok(value) = env::var("ELF_BASELINE_CONCURRENT_NOTES") && let Ok(parsed) = value.parse::() @@ -591,6 +844,7 @@ fn concurrent_note_count() -> usize { } match env::var("ELF_BASELINE_PROFILE").as_deref() { + Ok("backfill" | "large") => 32, Ok("stress") => 32, Ok("scale" | "full") => 16, _ => 4, @@ -642,6 +896,7 @@ fn concurrent_marker(index: usize) -> String { fn soak_config() -> SoakConfig { let profile = env::var("ELF_BASELINE_PROFILE").ok(); let (default_seconds, default_rounds) = match profile.as_deref() { + Some("backfill" | "large") => (60, 6), Some("stress") => (60, 6), Some("scale" | "full") => (15, 3), _ => (0, 0), @@ -986,6 +1241,273 @@ fn git_head() -> color_eyre::Result { Ok(String::from_utf8(output.stdout)?.trim().to_string()) } +async fn load_existing_backfill_notes( + service: &ElfService, +) -> color_eyre::Result> { + let rows = sqlx::query_as::<_, (Uuid, String, Option)>( + "\ +SELECT note_id, source_ref->>'document' AS source_doc, source_ref->>'source_hash' AS source_hash +FROM memory_notes +WHERE tenant_id = $1 + AND project_id = $2 + AND agent_id = $3 + AND scope = $4 + AND status = 'active' + AND source_ref->>'source' = 'ELF live baseline corpus' + AND source_ref->>'document' IS NOT NULL +ORDER BY updated_at DESC", + ) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(SCOPE) + .fetch_all(&service.db.pool) + .await?; + let mut out = BTreeMap::new(); + + for (note_id, source_doc, hash) in rows { + out.entry(source_doc).or_insert(ExistingBackfillNote { note_id, source_hash: hash }); + } + + Ok(out) +} + +async fn duplicate_source_notes( + service: &ElfService, +) -> color_eyre::Result> { + let rows = sqlx::query_as::<_, (String, i64, Vec)>( + "\ +SELECT + source_ref->>'document' AS source_doc, + COUNT(*)::bigint AS count, + array_agg(note_id ORDER BY note_id)::uuid[] AS note_ids +FROM memory_notes +WHERE tenant_id = $1 + AND project_id = $2 + AND agent_id = $3 + AND scope = $4 + AND status = 'active' + AND source_ref->>'source' = 'ELF live baseline corpus' + AND source_ref->>'document' IS NOT NULL +GROUP BY source_ref->>'document' +HAVING COUNT(*) > 1 +ORDER BY source_doc", + ) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(SCOPE) + .fetch_all(&service.db.pool) + .await?; + + Ok(rows + .into_iter() + .map(|(source_doc, count, note_ids)| DuplicateSourceNote { source_doc, count, note_ids }) + .collect()) +} + +async fn run_resumable_backfill( + service: &ElfService, + notes: &[CorpusNote], + checkpoint_path: &Path, +) -> color_eyre::Result { + let started_at = Instant::now(); + let corpus_hash = corpus_hash(notes); + let batch_size = backfill_batch_size(); + let interrupt_after = backfill_interrupt_after(notes.len()); + let first_attempt = run_backfill_attempt( + service, + notes, + checkpoint_path, + &corpus_hash, + batch_size, + 1, + interrupt_after, + ) + .await?; + let interrupted = first_attempt.interrupted; + let completed_before_resume = first_attempt.checkpoint_completed; + let mut attempts = Vec::new(); + + attempts.push(first_attempt); + + if interrupted { + attempts.push( + run_backfill_attempt( + service, + notes, + checkpoint_path, + &corpus_hash, + batch_size, + 2, + None, + ) + .await?, + ); + } + + let checkpoint = load_backfill_checkpoint(checkpoint_path, &corpus_hash)?; + let existing = load_existing_backfill_notes(service).await?; + let mut note_ids = Vec::with_capacity(notes.len()); + + for note in notes { + let Some(entry) = checkpoint.completed.get(¬e.source_doc) else { + return Err(eyre::eyre!( + "Backfill checkpoint missing completed source {}.", + note.source_doc + )); + }; + + if !checkpoint_entry_valid(note, entry, &existing) { + return Err(eyre::eyre!( + "Backfill checkpoint entry for {} does not match Postgres state.", + note.source_doc + )); + } + + note_ids.push(entry.note_id); + } + + let duplicate_source_notes = duplicate_source_notes(service).await?; + let attempted_writes = attempts.iter().map(|attempt| attempt.attempted_writes).sum(); + let skipped_completed = attempts.iter().map(|attempt| attempt.skipped_completed).sum(); + let completed_after_resume = checkpoint.completed.len(); + let report = BackfillReport { + checkpoint_path: checkpoint_path.display().to_string(), + corpus_hash, + source_count: notes.len(), + completed_count: note_ids.len(), + batch_size, + worker_concurrency: worker_concurrency(), + elapsed_seconds: started_at.elapsed().as_secs_f64(), + attempted_writes, + skipped_completed, + duplicate_source_notes, + resume: BackfillResumeReport { + enabled: interrupt_after.is_some(), + interrupted, + interrupt_after, + resume_attempts: attempts.len(), + completed_before_resume, + completed_after_resume, + }, + attempts, + }; + + Ok(BackfillOutcome { report, note_ids }) +} + +async fn run_backfill_attempt( + service: &ElfService, + notes: &[CorpusNote], + checkpoint_path: &Path, + corpus_hash: &str, + batch_size: usize, + attempt: usize, + interrupt_after: Option, +) -> color_eyre::Result { + let mut checkpoint = load_backfill_checkpoint(checkpoint_path, corpus_hash)?; + let existing = load_existing_backfill_notes(service).await?; + let notes_by_source = + notes.iter().map(|note| (note.source_doc.as_str(), note)).collect::>(); + let checkpoint_len_before_prune = checkpoint.completed.len(); + + checkpoint.completed.retain(|source_doc, entry| { + notes_by_source + .get(source_doc.as_str()) + .is_some_and(|note| checkpoint_entry_valid(note, entry, &existing)) + }); + + if checkpoint.completed.len() != checkpoint_len_before_prune { + write_backfill_checkpoint(checkpoint_path, &checkpoint)?; + } + + let mut pending = Vec::new(); + let mut skipped_completed = 0_usize; + + for note in notes { + if checkpoint.completed.contains_key(¬e.source_doc) { + skipped_completed += 1; + } else { + pending.push(note); + } + } + + let max_writes = interrupt_after.unwrap_or(usize::MAX); + let mut attempted_writes = 0_usize; + let mut completed_writes = 0_usize; + let mut cursor = 0_usize; + + while cursor < pending.len() && attempted_writes < max_writes { + let remaining_budget = max_writes.saturating_sub(attempted_writes); + let take = batch_size.min(remaining_budget).min(pending.len() - cursor); + let batch = &pending[cursor..cursor + take]; + let response = service + .add_note(AddNoteRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: AGENT_ID.to_string(), + scope: SCOPE.to_string(), + notes: batch.iter().map(|note| note_input(note)).collect(), + }) + .await?; + + if response.results.len() != batch.len() { + return Err(eyre::eyre!( + "Backfill add_note returned {} results for {} inputs.", + response.results.len(), + batch.len() + )); + } + + for (note, result) in batch.iter().zip(response.results) { + let op = note_op_string(result.op)?; + + if op == "REJECTED" { + return Err(eyre::eyre!( + "Backfill note {} was rejected: {:?}.", + note.source_doc, + result.reason_code + )); + } + + let note_id = result.note_id.ok_or_else(|| { + eyre::eyre!("Backfill note {} did not return a note_id.", note.source_doc) + })?; + + checkpoint.completed.insert( + note.source_doc.clone(), + BackfillCheckpointEntry { + note_id, + key: note.key.clone(), + source_hash: source_hash(note), + op, + }, + ); + + completed_writes += 1; + } + + attempted_writes += batch.len(); + cursor += batch.len(); + + write_backfill_checkpoint(checkpoint_path, &checkpoint)?; + } + + let interrupted = cursor < pending.len(); + + Ok(BackfillAttemptEvidence { + attempt, + resumed: skipped_completed > 0, + interrupt_after, + skipped_completed, + attempted_writes, + completed_writes, + checkpoint_completed: checkpoint.completed.len(), + interrupted, + }) +} + #[tokio::main] async fn main() -> color_eyre::Result<()> { color_eyre::install()?; @@ -1019,7 +1541,9 @@ async fn run(args: Args) -> color_eyre::Result { }; let service = Arc::new(build_service(&runtime).await?); let notes = load_corpus_notes(&args.corpus)?; - let note_ids = add_notes(&service, ¬es).await?; + let backfill_checkpoint_path = backfill_checkpoint_path(&args.out); + let backfill = run_resumable_backfill(&service, ¬es, &backfill_checkpoint_path).await?; + let note_ids = backfill.note_ids; let initial_worker = run_worker_until_indexed(&runtime, &service, ¬e_ids, "corpus_upsert").await?; let rebuild = service.rebuild_qdrant().await?; @@ -1031,7 +1555,11 @@ async fn run(args: Args) -> color_eyre::Result { let latency_ms_mean = latency_ms_total / query_results.len().max(1) as f64; let retrieval_status = if fail_count == 0 { "retrieval_pass" } else { "retrieval_wrong_result" }; - let mut checks = vec![retrieval_check(&query_results), worker_indexing_check(initial_worker)]; + let mut checks = vec![ + resumable_backfill_check(&backfill.report), + retrieval_check(&query_results), + worker_indexing_check(initial_worker), + ]; checks.extend(run_lifecycle_checks(&runtime, &service, ¬es, ¬e_ids).await?); checks.push(run_concurrent_write_check(&runtime, Arc::clone(&service)).await?); @@ -1061,6 +1589,7 @@ async fn run(args: Args) -> color_eyre::Result { reason, head: git_head().unwrap_or_else(|_| "unknown".to_string()), embedding: embedding_runtime_report(&service.cfg), + backfill: backfill.report, indexing: IndexingReport { note_count: notes.len(), rebuild_rebuilt_count: rebuild.rebuilt_count, @@ -1138,51 +1667,19 @@ async fn build_worker_state(runtime: &BaselineRuntime) -> color_eyre::Result color_eyre::Result> { - let request = AddNoteRequest { - tenant_id: TENANT_ID.to_string(), - project_id: PROJECT_ID.to_string(), - agent_id: AGENT_ID.to_string(), - scope: SCOPE.to_string(), - notes: notes - .iter() - .map(|note| AddNoteInput { - r#type: "fact".to_string(), - key: Some(note.key.clone()), - text: note.text.clone(), - structured: None, - importance: 0.9, - confidence: 0.95, - ttl_days: None, - source_ref: serde_json::json!({ - "source": "ELF live baseline corpus", - "title": note.title, - "document": note.source_doc, - }), - write_policy: None, - }) - .collect(), - }; - let response = service.add_note(request).await?; - let mut ids = Vec::with_capacity(response.results.len()); - - for result in response.results { - let note_id = - result.note_id.ok_or_else(|| eyre::eyre!("ELF add_note did not return a note_id."))?; - - ids.push(note_id); - } - - Ok(ids) -} - async fn run_worker_until_indexed( runtime: &BaselineRuntime, service: &ElfService, note_ids: &[Uuid], label: &str, ) -> color_eyre::Result { - let state = build_worker_state(runtime).await?; + let concurrency = worker_concurrency(); + let mut states = Vec::with_capacity(concurrency); + + for _ in 0..concurrency { + states.push(Arc::new(build_worker_state(runtime).await?)); + } + let before = outbox_status_counts(service, note_ids).await?; let max_iterations = worker_max_iterations(note_ids.len()); let mut iterations = 0_usize; @@ -1197,6 +1694,7 @@ async fn run_worker_until_indexed( return Ok(WorkerRunEvidence { label: label.to_string(), expected_note_count: note_ids.len(), + concurrency, iterations, before, after, @@ -1206,9 +1704,23 @@ async fn run_worker_until_indexed( }); } - worker::process_once(&state).await?; + let mut set = JoinSet::new(); + + for state in &states { + let state = Arc::clone(state); + + set.spawn(async move { + worker::process_once(&state) + .await + .map_err(|err| eyre::eyre!("Worker process_once failed: {err}")) + }); + } + + while let Some(joined) = set.join_next().await { + joined??; + } - iterations += 1; + iterations = iterations.saturating_add(concurrency); } let after = outbox_status_counts(service, note_ids).await?; @@ -1218,6 +1730,7 @@ async fn run_worker_until_indexed( Ok(WorkerRunEvidence { label: label.to_string(), expected_note_count: note_ids.len(), + concurrency, iterations, before, after, diff --git a/docker-compose.baseline.yml b/docker-compose.baseline.yml index ac7e9762..efdf1fd5 100644 --- a/docker-compose.baseline.yml +++ b/docker-compose.baseline.yml @@ -53,6 +53,12 @@ services: ELF_BASELINE_ELF_EMBEDDING_PATH: ${ELF_BASELINE_ELF_EMBEDDING_PATH:-} ELF_BASELINE_ELF_EMBEDDING_PROVIDER_ID: ${ELF_BASELINE_ELF_EMBEDDING_PROVIDER_ID:-} ELF_BASELINE_ELF_EMBEDDING_TIMEOUT_MS: ${ELF_BASELINE_ELF_EMBEDDING_TIMEOUT_MS:-} + ELF_BASELINE_ELF_TIMEOUT_SECONDS: ${ELF_BASELINE_ELF_TIMEOUT_SECONDS:-} + ELF_BASELINE_BACKFILL_BATCH_SIZE: ${ELF_BASELINE_BACKFILL_BATCH_SIZE:-} + ELF_BASELINE_BACKFILL_CHECKPOINT: ${ELF_BASELINE_BACKFILL_CHECKPOINT:-} + ELF_BASELINE_BACKFILL_DOCS: ${ELF_BASELINE_BACKFILL_DOCS:-2000} + ELF_BASELINE_BACKFILL_INTERRUPT_AFTER: ${ELF_BASELINE_BACKFILL_INTERRUPT_AFTER:-} + ELF_BASELINE_BACKFILL_RESUME_PROBE: ${ELF_BASELINE_BACKFILL_RESUME_PROBE:-} ELF_BASELINE_MAX_ELF_RSS_KB: ${ELF_BASELINE_MAX_ELF_RSS_KB:-1500000} ELF_BASELINE_MAX_ELF_SECONDS: ${ELF_BASELINE_MAX_ELF_SECONDS:-600} ELF_BASELINE_PROFILE: ${ELF_BASELINE_PROFILE:-smoke} @@ -64,6 +70,7 @@ services: ELF_BASELINE_SOAK_SECONDS: ${ELF_BASELINE_SOAK_SECONDS:-} ELF_BASELINE_STRESS_DOCS: ${ELF_BASELINE_STRESS_DOCS:-480} ELF_BASELINE_TOP_K: ${ELF_BASELINE_TOP_K:-10} + ELF_BASELINE_WORKER_CONCURRENCY: ${ELF_BASELINE_WORKER_CONCURRENCY:-} QWEN_API_KEY: ${QWEN_API_KEY:-} QWEN_EMBEDDING_API_BASE: ${QWEN_EMBEDDING_API_BASE:-} QWEN_EMBEDDING_DIMENSIONS: ${QWEN_EMBEDDING_DIMENSIONS:-} diff --git a/docs/guide/benchmarking/live_baseline_benchmark.md b/docs/guide/benchmarking/live_baseline_benchmark.md index c229eff6..7e891f44 100644 --- a/docs/guide/benchmarking/live_baseline_benchmark.md +++ b/docs/guide/benchmarking/live_baseline_benchmark.md @@ -48,6 +48,8 @@ Corpus profiles: `apps/elf-eval/fixtures/production_corpus/synthetic_coding_agent_manifest.json`. - `production-private`: local private/sanitized production corpus manifest supplied by `ELF_BASELINE_PRODUCTION_CORPUS_MANIFEST`. +- `backfill`: 2000 documents by default, 16 query cases, alternate phrasings for + every needle query, and ELF-only resumable backfill evidence. Use `ELF_BASELINE_SCALE_DOCS` and `ELF_BASELINE_STRESS_DOCS` to raise or lower the generated corpus sizes. @@ -56,6 +58,8 @@ Use `ELF_BASELINE_PRODUCTION_CORPUS_MANIFEST` to supply a local manifest that fo manifest path is absent, the file is missing, a referenced `local_path` is missing, or a query references an unknown evidence ID. It does not fall back to the checked-in synthetic fixture. +Use `ELF_BASELINE_BACKFILL_DOCS` to set the generated corpus size for the backfill +profile; values such as `10000` are supported for operator-controlled stress runs. Use `ELF_BASELINE_CONCURRENT_NOTES`, `ELF_BASELINE_MAX_ELF_SECONDS`, and `ELF_BASELINE_MAX_ELF_RSS_KB` to tune ELF's concurrent-write and resource-envelope checks. @@ -78,6 +82,12 @@ explicit timeout env var is set. For Qwen3 production embedding runs, use `Qwen3-Embedding-8B` with `EMBEDDING_DIMENSIONS=4096`. The aggregate report records ELF's embedding mode, provider id, model, dimensions, timeout, API base, and path; it never records the API key. +For ELF backfill runs, the runner writes a durable checkpoint file under the report +directory by default, intentionally interrupts the first pass unless +`ELF_BASELINE_BACKFILL_RESUME_PROBE=0`, then resumes from the checkpoint. Tune +`ELF_BASELINE_BACKFILL_BATCH_SIZE`, `ELF_BASELINE_BACKFILL_INTERRUPT_AFTER`, +`ELF_BASELINE_BACKFILL_CHECKPOINT`, and `ELF_BASELINE_WORKER_CONCURRENCY` when +measuring import and indexing throughput. Current external same-corpus adapters: @@ -101,11 +111,11 @@ Current external same-corpus adapters: Current deeper checks: - ELF: same-corpus retrieval through worker-produced chunks, async worker indexing - completion, service update replacement through the worker, service delete - suppression through the worker, cold-start search recovery after constructing a - fresh service over the same Postgres and Qdrant stores, concurrent write/search E2E, - configurable repeated write/search soak stability, and a configurable local resource - envelope. + completion, resumable checkpointed backfill without duplicate source notes, service + update replacement through the worker, service delete suppression through the worker, + cold-start search recovery after constructing a fresh service over the same Postgres + and Qdrant stores, concurrent write/search E2E, configurable repeated write/search + soak stability, and a configurable local resource envelope. - qmd, memsearch, and mem0: same-corpus retrieval, update replacement, delete suppression, and cold-start search recovery through their local public API or CLI surfaces. @@ -142,6 +152,8 @@ To run the scale profile: ELF_BASELINE_PROFILE=scale cargo make baseline-live-docker ELF_BASELINE_PROFILE=scale ELF_BASELINE_SCALE_DOCS=240 cargo make baseline-live-docker ELF_BASELINE_PROFILE=stress cargo make baseline-live-docker +ELF_BASELINE_PROJECTS=ELF ELF_BASELINE_PROFILE=backfill cargo make baseline-live-docker +cargo make baseline-backfill-docker ``` To iterate on one or more project adapters without rerunning the full matrix: @@ -183,8 +195,10 @@ synthetic or private production-corpus results. Each project record includes an `embedding` summary so deterministic local and production-provider runs are not confused. ELF query records include task, expected evidence IDs, allowed alternate evidence IDs, top evidence ID, wrong-result count, and per-query latency. Each project -record also includes `checks` and `check_summary`; the aggregate `full_check_summary` -is the adoption-relevant multi-check count. +record also includes `backfill` evidence with source count, completed count, batch +size, worker concurrency, resume state, duplicate-source count, and backfill elapsed +seconds. Each project record also includes `checks` and `check_summary`; the aggregate +`full_check_summary` is the adoption-relevant multi-check count. Production-ready claims must cite a concrete report path. A claim based only on generated public `smoke`, `scale`, or `stress` profiles is not enough for personal diff --git a/scripts/live-baseline-benchmark.sh b/scripts/live-baseline-benchmark.sh index 1b5a6e0a..fa71bfb9 100755 --- a/scripts/live-baseline-benchmark.sh +++ b/scripts/live-baseline-benchmark.sh @@ -14,6 +14,7 @@ PROJECT_FILTER="${ELF_BASELINE_PROJECTS:-all}" CORPUS_PROFILE="${ELF_BASELINE_PROFILE:-smoke}" SCALE_DOC_COUNT="${ELF_BASELINE_SCALE_DOCS:-120}" STRESS_DOC_COUNT="${ELF_BASELINE_STRESS_DOCS:-480}" +BACKFILL_DOC_COUNT="${ELF_BASELINE_BACKFILL_DOCS:-2000}" QUERY_TOP_K="${ELF_BASELINE_TOP_K:-10}" CURRENT_PROJECT_STARTED_AT="" PRODUCTION_SYNTHETIC_MANIFEST="${ROOT_DIR}/apps/elf-eval/fixtures/production_corpus/synthetic_coding_agent_manifest.json" @@ -21,6 +22,25 @@ CORPUS_TRACK="generated_public" CORPUS_PATH_DESCRIPTION="generated in Docker under /bench/corpus" CORPUS_MANIFEST_ID="" +elf_timeout_seconds() { + if [[ -n "${ELF_BASELINE_ELF_TIMEOUT_SECONDS:-}" ]]; then + echo "${ELF_BASELINE_ELF_TIMEOUT_SECONDS}" + return + fi + + case "${CORPUS_PROFILE}" in + backfill | large) + echo 3600 + ;; + stress) + echo 1800 + ;; + *) + echo 1200 + ;; + esac +} + if [[ ! -f "/.dockerenv" && "${ELF_BASELINE_ALLOW_HOST:-0}" != "1" ]]; then echo "Refusing to run live baseline benchmark outside Docker. Use cargo make baseline-live-docker." >&2 exit 1 @@ -34,16 +54,17 @@ for cmd in bash cargo git jq node npm python3 rg timeout; do done generate_corpus() { - python3 - "${CORPUS_PROFILE}" "${SCALE_DOC_COUNT}" "${STRESS_DOC_COUNT}" "${CORPUS_DIR}" "${REPORT_DIR}/queries.json" <<'PY' + python3 - "${CORPUS_PROFILE}" "${SCALE_DOC_COUNT}" "${STRESS_DOC_COUNT}" "${BACKFILL_DOC_COUNT}" "${CORPUS_DIR}" "${REPORT_DIR}/queries.json" <<'PY' import json import sys from pathlib import Path -profile, scale_doc_count_raw, stress_doc_count_raw, corpus_dir_raw, queries_path_raw = sys.argv[1:] +profile, scale_doc_count_raw, stress_doc_count_raw, backfill_doc_count_raw, corpus_dir_raw, queries_path_raw = sys.argv[1:] corpus_dir = Path(corpus_dir_raw) queries_path = Path(queries_path_raw) scale_doc_count = int(scale_doc_count_raw) stress_doc_count = int(stress_doc_count_raw) +backfill_doc_count = int(backfill_doc_count_raw) anchors = [ { @@ -120,10 +141,13 @@ elif profile in {"scale", "full"}: elif profile == "stress": docs = list(anchors) target_count = max(stress_doc_count, len(anchors)) +elif profile in {"backfill", "large"}: + docs = list(anchors) + target_count = max(backfill_doc_count, len(anchors)) else: raise SystemExit(f"unsupported ELF_BASELINE_PROFILE={profile!r}") -if profile in {"scale", "full", "stress"}: +if profile in {"scale", "full", "stress", "backfill", "large"}: topics = [ "scheduler dry run budget window", "operator dashboard cache refresh", @@ -173,7 +197,7 @@ for doc in query_docs: "allowed_alternate_evidence_ids": [], } ) - if profile == "stress": + if profile in {"stress", "backfill", "large"}: queries.append( { "id": f"q-{base_id}-alt", @@ -507,6 +531,7 @@ json_record() { embedding: ($checks[0].embedding // null), query_summary: ($checks[0].query_summary // null), queries: ($checks[0].queries // null), + backfill: ($checks[0].backfill // null), check_summary: $checks[0].check_summary, checks: $checks[0].checks }' >>"${RECORDS}" @@ -533,6 +558,7 @@ json_record() { elapsed_seconds: $elapsed_seconds, query_summary: null, queries: null, + backfill: null, check_summary: { total: 1, pass: (if $retrieval_status == "retrieval_pass" then 1 else 0 end), @@ -695,10 +721,10 @@ project_elf() { head="$(git -C "${ROOT_DIR}" rev-parse HEAD 2>>"${log_path}" || echo "unknown")" fi - if run_cmd "${project}: same-corpus retrieval" 1200 "${log_path}" \ + if run_cmd "${project}: same-corpus retrieval" "$(elf_timeout_seconds)" "${log_path}" \ "cd '${ROOT_DIR}' && cargo run -p elf-eval --bin live_baseline_elf -- --config config/local/elf.docker.toml --corpus '${CORPUS_DIR}' --queries '${REPORT_DIR}/queries.json' --out '${result_path}'"; then if [[ -s "${result_path}" ]] && jq -e '.checks and .check_summary' "${result_path}" >/dev/null 2>&1; then - jq '{embedding, query_summary: .summary, queries, check_summary, checks}' "${result_path}" >"${REPORT_DIR}/${project}-checks.json" + jq '{embedding, query_summary: .summary, queries, backfill, check_summary, checks}' "${result_path}" >"${REPORT_DIR}/${project}-checks.json" fi if [[ -s "${result_path}" ]] && jq -e --argjson document_count "${DOCUMENT_COUNT}" --argjson query_count "${QUERY_COUNT}" ' .schema == "elf.live_baseline.elf_result/v1" and @@ -707,13 +733,20 @@ project_elf() { .summary.fail == 0 and .check_summary.fail == 0 and .check_summary.incomplete == 0 and + .backfill.source_count == $document_count and + .backfill.completed_count == $document_count and + (.backfill.duplicate_source_notes | length) == 0 and + ( + .backfill.resume.enabled == false or + (.backfill.resume.interrupted == true and .backfill.resume.resume_attempts >= 2) + ) and .indexing.note_count == $document_count and .indexing.rebuild_rebuilt_count >= $document_count and .indexing.rebuild_error_count == 0 ' "${result_path}" >/dev/null; then json_record "${project}" "${repo}" "${head}" "pass" "retrieval_pass" \ "$(jq -r '.reason' "${result_path}")" \ - "${project}.log" "add_note; worker outbox indexing; rebuild_qdrant; search_raw; concurrent writes; soak stability" + "${project}.log" "checkpointed add_note backfill; bounded worker outbox indexing; rebuild_qdrant; search_raw; concurrent writes; soak stability" return fi @@ -721,19 +754,19 @@ project_elf() { json_record "${project}" "${repo}" "${head}" "$(jq -r '.status // "fail"' "${result_path}")" \ "$(jq -r '.retrieval_status // "retrieval_failed"' "${result_path}")" \ "$(jq -r '.reason // "ELF result did not satisfy live baseline pass criteria"' "${result_path}")" \ - "${project}.log" "add_note; worker outbox indexing; rebuild_qdrant; search_raw; concurrent writes; soak stability" + "${project}.log" "checkpointed add_note backfill; bounded worker outbox indexing; rebuild_qdrant; search_raw; concurrent writes; soak stability" return fi json_record "${project}" "${repo}" "${head}" "fail" "runtime_failed" \ "ELF command completed but did not write a valid live-baseline result; inspect ELF.log for the runtime error" \ - "${project}.log" "add_note; worker outbox indexing; rebuild_qdrant; search_raw; concurrent writes; soak stability" + "${project}.log" "checkpointed add_note backfill; bounded worker outbox indexing; rebuild_qdrant; search_raw; concurrent writes; soak stability" return fi json_record "${project}" "${repo}" "${head}" "fail" "runtime_failed" \ "ELF same-corpus retrieval command failed in Docker" \ - "${project}.log" "add_note; worker outbox indexing; rebuild_qdrant; search_raw; concurrent writes; soak stability" + "${project}.log" "checkpointed add_note backfill; bounded worker outbox indexing; rebuild_qdrant; search_raw; concurrent writes; soak stability" } project_agentmemory() { diff --git a/scripts/live-baseline-report-to-md.sh b/scripts/live-baseline-report-to-md.sh index bdb54ed8..9242e8ca 100755 --- a/scripts/live-baseline-report-to-md.sh +++ b/scripts/live-baseline-report-to-md.sh @@ -115,6 +115,34 @@ render_report() { "" else empty end ), + ( + [.projects[] | select(.backfill != null)] as $backfilled + | if ($backfilled | length) > 0 then + "## Backfill", + "", + "| Project | Sources | Completed | Batch | Workers | Resume | Duplicates | Backfill Elapsed |", + "| --- | --- | --- | --- | --- | --- | --- | --- |", + ( + $backfilled[] + | "| " + (.project | md) + + " | `" + (.backfill.source_count | tostring) + "`" + + " | `" + (.backfill.completed_count | tostring) + "`" + + " | `" + (.backfill.batch_size | tostring) + "`" + + " | `" + (.backfill.worker_concurrency | tostring) + "`" + + " | `" + ( + if .backfill.resume.enabled then + "resumed after " + (.backfill.resume.completed_before_resume | tostring) + + "/" + (.backfill.resume.completed_after_resume | tostring) + else + "disabled" + end + ) + "`" + + " | `" + ((.backfill.duplicate_source_notes | length) | tostring) + "`" + + " | `" + (.backfill.elapsed_seconds | tostring) + "s` |" + ), + "" + else empty end + ), "## Result Semantics", "", "- `pass`: every encoded check for the selected project and profile passed.",