diff --git a/.claude/board/EPIPHANIES.md b/.claude/board/EPIPHANIES.md index 57bc411b..d64c1dab 100644 --- a/.claude/board/EPIPHANIES.md +++ b/.claude/board/EPIPHANIES.md @@ -1,3 +1,23 @@ +## 2026-07-02 — E-1BRC-GRIDLAKE-SWEETSPOT-1: the 64×64 gridlake SoA is the measured sweet spot — the batch pipeline at tile scale equals the best streamed topology while carrying the double-WAL +**Status:** FINDING (measured, onebrc-probe lane J t7; closes the operator's four follow-up questions and the t4→t7 kanban-update arc) + +Lane J parameterized lane I with the operator's questions as knobs (grid × sink-lanes × registry). t7 @4 cores, same-session refs G(1)=46.3 / H=40.5 / F=70.1: **J(gridlake 4096, 1 lane, no registry) = 46.2–46.3 Mrows/s — equal to the best streamed ownership topology, above the lazy-owner orchestrator, with the strongest witness (double-WAL both ends, 312 msgs).** The four answers: (1) t6's ~20 vs H's 39.4 decomposes into registry RESIDENCY (knob-isolated: ON halves steady state net of spawn — t6's CONJECTURE promoted to FINDING) + the L2-busting 64K-cell table/memo working set; (2) the cache WAS wrong — matching the batch SoA to the 64×64 gridlake tile (4096 cells ≈ 80 KB integer; the literal 4096×BF16=16 KB pair is ndarray #227's proven VDPBF16PS tier) recovers it completely; (3) 1 sink lane suffices, 8 free, 64 over-lanes — lane count scales with per-batch APPLY work, never with data or address space; (4) orchestration-with-occupancy is NOT the sweet spot when ownership can live as the index-aligned guarantee table — H's lazy mechanisms stay right only when fine-grained ownership must be actors. Composed recipe: **64×64 gridlake batch SoA + codebook CAM addressing + 1–8 sink lane pairs + whole-table double-cast + flush cache; ownership = index-aligned guarantee table, no standing per-cell registry; BF16 planes per #227 when tile-GEMM lands.** Tables: crates/onebrc-probe/README.md §5.7. +## 2026-07-02 — E-1BRC-BATCH-PIPELINE-1: the operator's batch-pipeline spec measured — 312 messages total, double-WAL on both ends, flush cache interleaves; the remaining cost is residency, not architecture +**Status:** FINDING (measured, onebrc-probe lane I t6; completes the t4→t5→t6 kanban-update arc; operator spec implemented verbatim) + +The spec: all 65536 mailboxes UPFRONT; two fixed aligned indices (mailbox idx == SoA row idx — ownership = the `row_owner[i]==i` binding + write-on-behalf, never a message path); codebook-minted identity → direct CAM addressing (no probe/compare in the hot loop; ~400 global mint locks per run, total); whole-table DOUBLE-CASTS (one Arc per 64K-row batch to BOTH the mailbox-ownership-guarantee sink and the Lance row-address sink); flush cache so flushing and reindex-next interleave. t6 measured: **312 messages total** (156 batches × 2 ends — vs ~63K flat t4a, ~2.6K orchestrated t5: message count tracks BATCHES, independent of occupancy AND address-space size); flush_cache peak 2–3 tables/worker (the worker never waits for a flush); ownership==lance==156 journals + 156 version ticks asserted (the double-WAL: replayable from either end — the full W1b batch-writer shape, stronger than G/H's ownership-only witness); 64K spawn = 1.1–2.7 s one-time standing infrastructure (17–40 µs/actor); steady state ~20–22 Mrows/s ≈ ½ of the 1-owner streamed G(1) 43 — attribution CONJECTURE: resident-64K-actor memory footprint on a 4-core container + two serialized sinks. RULING: **the batch pipeline wins the messaging war outright and carries the strongest witness story; the standing 64K registry is affordable infrastructure; the remaining optimization surface is residency footprint, not architecture.** Composition across the arc: H's lazy activation (registry need not pre-exist) and I's whole-table double-cast + flush cache (when it does) are complementary — the shared invariant is that producers NEVER address fine-grained owners directly. Tables: crates/onebrc-probe/README.md §5.6. +## 2026-07-02 — E-1BRC-ORCHESTRATION-SWEETSPOT-1: the sweet spot is the orchestration tier itself — lazy activation + ahead-firing batching flatten the ownership curve (23× recovery at the 64K end) +**Status:** FINDING (measured, onebrc-probe lane H t5; completes the E-1BRC-KANBAN-UPDATE-1 / E-1BRC-OWNER-GRANULARITY-1 arc; operator: "the 65536 mailboxes had no Orchestration at all — find the sweet spot") + +t4a's 20× cliff was the FLAT topology: 64K eager spawns, ~63K owner-addressed casts, producers addressing owners directly. Lane H interposes the planner/kanban-executor domain's own two mechanisms — **lazy activation** (router tier spawns an owner only on first traffic: live mailboxes track OCCUPANCY ~413, never the 64K address space) and the **ahead-firing batch writer** (routers buffer per-owner entries, fire batched Applys at batch_k) — over lane G's UNCHANGED one-mailbox-per-SoA substrate, witness discipline intact (owner journals == router casts). t5 medians @4 cores: H(16) 42.2 / H(256) 36.8 / H(4096) 40.2 / **H(65536) 39.4 vs same-session flat 1.7 — 23× recovery, within ~9% of the best coarse topology (G(1) 43.2; F reference 81.7)**. The ruling that closes the arc: **orchestration FLATTENS the granularity curve — ownership granularity becomes a semantic choice (per-tile addressability, per-owner WAL), not a performance gamble. Fine-grained mailbox-as-owner is viable IF AND ONLY IF producers never address owners directly: the router/delegation tier is a LOAD-BEARING part of the kanban-update architecture, and flat fan-out to fine owners is the measured 20× anti-pattern.** graph-flow (rs-graph-llm) remains the OUTER loop by design — task-granularity persisted-cursor orchestration; per-morsel it would measure storage latency, and its in-container build is blocked by the pre-existing burn 403 (W3b). Tables: crates/onebrc-probe/README.md §5.5. +## 2026-07-02 — E-1BRC-OWNER-GRANULARITY-1: one mailbox per SoA (operator correction) — the ownership curve is a plateau then a 20× cliff; Morton tile GROUPING is what makes mailbox-as-owner viable +**Status:** FINDING (measured, onebrc-probe lane G t4a; corrects the framing of E-1BRC-KANBAN-UPDATE-1 — the numbers there stand, the topology language was inverted) + +Operator: "I thought we spawn one ractor mailbox per SoA?" — ratified: that IS the canon, and lane G's "sharding the 64K SoA" framing was an ownership inversion (the code's owners were always independent, but each allocated a full 64K-slot table, making the fine-grained end unrunnable). Reworked: each owner's actor State is its OWN `OwnerSoa` sized to its tile span — one mailbox = one SoA, verbatim — unlocking the full granularity sweep including the literal 64K-concurrent-SoAs end. Medians @4 cores: G(1) 43.4 / G(16) 30.3 / G(256) 35.9 / G(4096) 18.3 / **G(65536 = one mailbox per tile) 2.1 Mrows/s — a 20× collapse** vs one owner (64K spawns paid in-run; cast fragmentation ~150→~63K messages as each morsel's ~413 stations scatter to ~413 owners; 64K mailbox tasks on 4 cores). The completed ruling: **the ownership-granularity curve is a plateau (1–256 owners, ~30–43 Mrows/s, topology noise-dominated) then a cliff; one-mailbox-per-semantic-cell is architecturally clean and measurably catastrophic at OLAP arrival rates. Morton tile GROUPING is not an optimization detail — it is the mechanism that makes mailbox-as-owner viable: the mailbox is the OWNER boundary, the tile is the ADDRESS boundary, and they must never be conflated 1:1 under load.** Owners' memory now ∝ span (the collapse is scheduling+messaging, not memory). Tables: crates/onebrc-probe/README.md §5.4a. +## 2026-07-02 — E-1BRC-KANBAN-UPDATE-1: the kanban-update write path measured — 0.54× at morsel granularity, the tax is all boundary, and ownership must not shard below contention +**Status:** FINDING (measured, onebrc-probe lane G t4, same recipe corpus as E-1BRC-ADDRESSING-1; tables `crates/onebrc-probe/README.md` §5.4; operator-requested follow-up "compare morton and the kanban vs without / 64k concurrent SoA vs Morton tile ... when using kanban update") + +Lane G holds lane F's Morton-tile 64K SoA as OWNED state behind shard mailbox actors: workers pre-reduce 64K-row morsels (#227's morsel size; ndarray rebased onto master to sit on its merged Morton/morsel probe), cast dirty entries prefix-routed to owners, every applied batch witnessed with a KanbanMove (journal==casts asserted). t4 medians @4 cores: **F 79.5 (private merge, no witness) / G 43.0 @1 shard / 39.9 @4 / 36.0 @16 (one thrash collapse to 11.7); workers=3 strictly worse.** Three rulings for the architecture: (1) **kanban update costs ~0.54×** at morsel granularity and the tax decomposes entirely into boundary costs (Arc corpus copy, blocking+async oversubscription, per-morsel messaging) — the witness itself is ~free (lane E) — buying live bounded-staleness state, witnessed replayable writes, single-writer safety, bounded worker memory; (2) **do not shard ownership below contention** — at ~400 groups ONE mailbox absorbs all apply work and every extra shard is pure scheduling overhead; shard count scales with owner WORK, never with rows; (3) **the Morton prefix ROUTE is free as a mechanism** (G@4 within ~7% of G@1 before thrash) — tile-sharding stays the right tool, its trigger is owner-side contention (high cardinality / heavy per-entry work). W2d consequence: private-merge when the product is one final answer; pay the ~2× only when the product IS the live/witnessed/owned state — and the 550 ms Libet budget is untouched either way. ## 2026-07-02 — E-1BRC-ADDRESSING-1: addressing-is-aggregation measured — route-and-write is 3× the classic map; the Morton dress costs ~10% **Status:** FINDING (measured, onebrc-probe t0–t3, recipe corpus rows=10000000 seed=42 sha256=f1853caa…5691, 4-core container; tables in `crates/onebrc-probe/README.md` §5–5.3) diff --git a/.claude/v3/INTEGRATION-PLAN.md b/.claude/v3/INTEGRATION-PLAN.md index ed659ec2..44dd238e 100644 --- a/.claude/v3/INTEGRATION-PLAN.md +++ b/.claude/v3/INTEGRATION-PLAN.md @@ -591,3 +591,120 @@ where the win lives (B was 1.06×). All six lanes A–F + R now measured on one regenerable recipe corpus. Board: E-1BRC-ADDRESSING-1. The probe is COMPLETE; follow-ups (100M container-scale run, high-cardinality corpus, SWAR parse, mmap) are priced and parked in README §1/§5.3. + +#### Addendum-13 status update (2026-07-02, t4 — lane G, operator follow-up) + +Operator: "compare morton and the kanban vs without — if 64k concurrent +SoA vs Morton tile can help us understand the pros and cons of our +architecture when using kanban update." Lane G SHIPPED (feature +`lane-g`): the lane-F Morton-tile 64K SoA as OWNED state behind shard +mailbox actors — prefix-routed morsel casts (64K rows, #227's morsel +size, clear-by-undo extraction), every applied batch witnessed with a +KanbanMove, journal==casts asserted. ndarray checkout rebased onto +master (#227 merged — its Morton scatter/morsel probe is the sibling +reference). t4 medians: F 79.5 / G(1 shard) 43.0 / G(4) 39.9 / +G(16) 36.0 (one thrash collapse 11.7) / G(workers=3) strictly worse. +**Ledger: kanban update = 0.54× at morsel granularity, and the tax is +all boundary (corpus copy + oversubscription + messaging), not the +witness (lane E: journal ~free). It buys live bounded-staleness state, +witnessed replayable writes, single-writer safety, bounded worker +memory. Do NOT shard ownership below contention — at ~400 groups one +mailbox absorbs everything; shards scale with owner WORK, never with +rows; the Morton prefix route itself is free (G(4)≈G(1)).** Tables + +full readings: crates/onebrc-probe/README.md §5.4. Board follow-up +appended to E-1BRC-ADDRESSING-1 thread as E-1BRC-KANBAN-UPDATE-1. + +#### Addendum-13 status update (2026-07-02, t4a — topology corrected, curve completed) + +Operator correction ratified: **one ractor mailbox per SoA** (canon). +Lane G reworked — each owner's State is its OWN `OwnerSoa` sized to its +tile span (no full-64K tables per owner, no "sharded one SoA" framing); +flush grouping made sort-based (no dense per-shard vecs at 64K owners); +parity test extended to 4096 mailboxes. Full ownership-granularity curve +@4 workers, medians: G(1) 43.4 / G(16) 30.3 / G(256) 35.9 / G(4096) 18.3 +/ **G(65536, one mailbox per tile) 2.1 — a 20× collapse** (spawn ×64K + +cast fragmentation ~150→~63K + 64K tasks on 4 cores). **Ruling: the +ownership plateau spans 1–256 owners; Morton tile GROUPING is what makes +mailbox-as-owner viable — mailbox = OWNER boundary, tile = ADDRESS +boundary, never conflate 1:1 under load.** README §5.4a; board +E-1BRC-KANBAN-UPDATE-1 correction appended as E-1BRC-OWNER-GRANULARITY-1. + +#### Addendum-13 status update (2026-07-02, t5 — orchestration sweet spot, operator follow-up) + +Operator: "the 65536 mailboxes had no Orchestration at all — find the +sweet spot with rs-graph-llm or lance-graph-planner + kanban update." +Lane H SHIPPED (feature `lane-h`): router tier with LAZY owner +activation (live mailboxes track occupancy ~413, never the 64K address +space) + AHEAD-FIRING batched delivery (batch_k=64) over lane G's +unchanged one-mailbox-per-SoA substrate; witness discipline preserved +(owner journals == router casts asserted). graph-flow stays the OUTER +loop (task-granularity cursor; burn-submodule 403 blocks in-container +builds anyway) — the in-loop mechanisms are the planner/kanban-executor +domain's own. t5 medians @4 cores: H(16) 42.2 / H(256) 36.8 / H(4096) +40.2 / **H(65536) 39.4 vs flat 1.7 same-session — 23× recovery, within +~9% of G(1)=43.2; F=81.7.** RULING: orchestration FLATTENS the +granularity curve — the sweet spot is not a shard count, it is the +orchestration tier itself; fine-grained mailbox-as-owner is viable iff +producers never address owners directly (the ahead-firing batch-writer +is load-bearing, not an optimization; flat fan-out = the measured 20× +anti-pattern). README §5.5; board E-1BRC-ORCHESTRATION-SWEETSPOT-1. + +#### Addendum-13 status update (2026-07-02, t6 — lane I, operator batch-pipeline spec) + +Operator spec implemented verbatim as lane I (feature `lane-i`): all +65536 mailboxes UPFRONT (standing ownership registry; spawn measured +separately: 1.1–2.7 s, 17–40 µs/actor); two fixed aligned indices +(mailbox idx == SoA row idx — ownership guarantee is the +`row_owner[i]==i` binding + write-on-behalf, never a message path); +codebook-minted identity → direct CAM addressing (no probe in the hot +loop; worker-local memo, ~400 global mint locks total); whole-table +DOUBLE-CASTS (one Arc per 64K-row batch to BOTH the ownership-guarantee +sink and the Lance row-address sink — 312 messages total vs 63K flat / +2.6K orchestrated); flush cache interleaving flush and refill (peak 2–3 +tables/worker, worker never waits). Both ends journal every batch +(ownership==lance==156 asserted) + one DatasetVersion tick per batch — +the full double-WAL the W1b batch writer needs. t6: steady state ~20–22 +Mrows/s (≈½ of G(1) 43 — residency-footprint attribution CONJECTURE); +total incl. spawn 3.2–6.1. RULING: the batch pipeline wins the +messaging war outright (messages ∝ batches, independent of occupancy +AND address space); the standing 64K registry is affordable +infrastructure; the remaining surface is residency, not architecture. +README §5.6; board E-1BRC-BATCH-PIPELINE-1. + +#### Addendum-13 status update (2026-07-02, t7 — lane J knob matrix, PROBE ARC COMPLETE) + +Lane J (feature `lane-j`) parameterizes lane I with the operator's four +follow-up questions as knobs: grid (4096 gridlake vs 65536), sink lanes +(1/8/64), registry (on/off). t7 @4 cores, same-session refs G(1)=46.3 / +H=40.5 / F=70.1: **J(gridlake 4096, 1 lane, no registry) = 46.2–46.3 — +the measured sweet spot: equals the best streamed topology while +carrying the double-WAL.** Registry ON halves steady state net of spawn +(t6 residency CONJECTURE → FINDING); grid 65536 → 40 (L2-busting +table+memo); lanes 1≈8, 64 over-lanes (apply work is O(dirty) — +lanes scale with APPLY work, never data). The composed recipe: 64×64 +gridlake batch SoA + codebook CAM + 1–8 lane pairs + whole-table +double-cast + flush cache; ownership as the index-aligned guarantee +table, NOT a standing per-cell actor registry; BF16 planes per ndarray +#227's proven VDPBF16PS tier as the tile-GEMM continuation. README +§5.7; board E-1BRC-GRIDLAKE-SWEETSPOT-1. + +#### Addendum-13 status update (2026-07-02, consolidation — findings/commentary split, 8 presets, simd_ops wiring) + +Operator-requested consolidation SHIPPED: (1) `crates/onebrc-probe/FINDINGS.md` +— the AGNOSTIC record (environment, methods, all t0–t7 tables, all 11 +invariants WITH their code, reproduction commands; zero interpretation); +(2) `crates/onebrc-probe/COMMENTARY.md` — this session's prime stored +SEPARATELY (readings, rulings executed, composed recipe, flagged +uncertainty, suggested lab sweeps) so another session can analyze the +findings from its own angle; (3) `src/presets.rs` (feature `presets`) — +the 8 batching methods frozen as named presets (map-private-merge / +grid-private-merge / stream-single-owner / orchestrated-lazy-owners / +batch-64k-registry / gridlake / gridlake-8-lanes / batch-64k-no-registry) +sharing one signature + one parity harness (`all_presets_agree_with_lane_a` +— every preset byte-identical to lane A); (4) honest answer to the simd +question: lane B had used ONLY `U8x32::cmpeq_mask`; NOW also routes the +stride walk through `ndarray::simd::array_chunks` (simd_ops.rs, the +non-overlapping walker; `array_windows` is the overlapping GEMM sibling, +deliberately unused); `simd_soa.rs::SoaBytes` remains an OPEN follow-up +(natural carrier for vectorized sink sweeps + batch tables). Note: probe +target/debug purged mid-round (disk full at 100%); gates re-run green. diff --git a/crates/onebrc-probe/COMMENTARY.md b/crates/onebrc-probe/COMMENTARY.md new file mode 100644 index 00000000..5da606e9 --- /dev/null +++ b/crates/onebrc-probe/COMMENTARY.md @@ -0,0 +1,100 @@ +# COMMENTARY — one session's interpretation of the 1BRC probe + +> Read `FINDINGS.md` FIRST — it is the agnostic record (environment, +> methods, numbers, invariants-with-code) and it is deliberately free +> of what follows. THIS file is the prime of the 2026-07-02 session +> that ran the probe: its architecture readings, the operator rulings +> it executed, and its doctrine mappings. A later session analyzing +> the findings from another angle should treat everything below as +> ONE interpretation to test, not as ground truth. Board homes of +> these readings: `EPIPHANIES.md` entries `E-1BRC-ADDRESSING-1`, +> `E-1BRC-KANBAN-UPDATE-1`, `E-1BRC-OWNER-GRANULARITY-1`, +> `E-1BRC-ORCHESTRATION-SWEETSPOT-1`, `E-1BRC-BATCH-PIPELINE-1`, +> `E-1BRC-GRIDLAKE-SWEETSPOT-1`. + +## The arc (operator-driven, five follow-ups) + +1. "Compare morton and the kanban vs without" → lanes F/R/G. +2. "One ractor mailbox per SoA" → lane G topology corrected (each + owner's actor State IS its own table; "sharding one SoA" was an + ownership inversion in prose, and full-size per-owner tables made + the fine end unrunnable). +3. "The 65536 mailboxes had no Orchestration — find the sweet spot" → + lane H (lazy activation + ahead-firing batch writer). +4. The batch-pipeline spec (64K upfront, aligned indices, codebook, + double-cast, flush cache) → lane I. +5. "8 or 64 lanes? gridlake 64×64 = 4096×BF16?" → lane J knob matrix. + +## Readings (interpretation — test me) + +- **Route-and-write beats look-up-and-compare ~3×** (R vs C), and the + semantic (Morton tile) address costs ~10% over plain radix at this + LOW cardinality (F vs R). The prefix-local tile payoff is a + high-cardinality claim the 400-key corpus cannot test. +- **The witness is free; the boundary is not.** Journaling + (KanbanMove appends, version ticks) never showed up in any + measurement (lane E: ~66 µs/card; sinks: invisible). What costs is + the actor-model BOUNDARY: the one-time corpus copy, task + oversubscription, and message fragmentation. +- **Producers must never address fine-grained owners directly.** The + 20× cliff at 65536 flat owners is fan-out, not ownership. Two cures + measured: lazy activation + ahead-firing batching (lane H, 23× + recovery), or whole-table batch double-cast (lanes I/J, messages ∝ + batches — independent of BOTH occupancy and address-space size). +- **Ownership does not need actors when it can be an index-aligned + guarantee table.** Lane I/J's `row_owner[i] == i` binding + + write-on-behalf at the sink preserved single-writer semantics with + zero per-owner messaging. The standing 64K actor registry, by + contrast, halves steady-state throughput by pure residency (knob- + isolated in t7) — actors are for CONTENTION, not for address space. +- **Cache-match the batch unit: the 64×64 gridlake (4096 cells) is + the measured sweet spot.** 46.2 Mrows/s — equal to the best + streamed topology, above the orchestrator, with the strongest + witness (double-WAL both ends). The 64K-cell batch unit loses ~15% + purely to L2 pressure (~2.5 MB/worker working set). The literal + 4096×BF16=16 KB plane pair is ndarray #227's proven `VDPBF16PS` + tier (~448 Mrows/s single-thread in its own probe) — the natural + continuation once tile-GEMM enters the batch path. +- **Sink lanes scale with per-batch APPLY work, never with data**: + 1 suffices at O(400-dirty-rows) apply; 8 free; 64 over-lanes. +- **SIMD provenance note:** the delimiter scan (lane B) routes through + `ndarray::simd::U8x32::cmpeq_mask` with the stride walk on + `ndarray::simd::array_chunks` (the NON-overlapping walker; + `array_windows` is the overlapping GEMM-style sibling and is + deliberately not used — delimiter scanning never re-reads bytes). + `simd_soa.rs`'s `SoaBytes` (Arc-backed typed iteration) is NOT yet + wired — it is the natural carrier for vectorizing the sinks' dirty + sweeps and the batch tables themselves; open follow-up. + +## The composed recipe (this session's synthesis) + +64×64 gridlake batch SoA + codebook CAM addressing (mint once, direct +index after) + 1–8 sink lane pairs + whole-table `Arc` double-cast + +refcount-gated flush cache; ownership as the index-aligned guarantee +table; the router/lazy-activation tier (lane H) only when fine-grained +ownership must live as actors; BF16 planes per ndarray #227 when +tile-GEMM lands. + +## Flagged uncertainty (kept honest) + +- All numbers: 4-core container, 400-key cardinality, 10M rows, + no mmap, scalar parse. Run-to-run drift up to ~10% between rounds + (same-session references quoted in FINDINGS wherever compared). +- The residency attribution at t6 was CONJECTURE until t7's registry + knob isolated it — it is now a FINDING, but the *mechanism* + decomposition (scheduler vs memory-footprint vs allocator) was NOT + isolated further. +- High-cardinality behavior (where prefix-local tiles and per-tile + bucketed sweeps should earn their keep) is entirely unmeasured. +- 100M container-scale runs, SWAR parse, mmap: priced and parked in + `README.md` §1 / §5.3. + +## Suggested lab sweeps for a next session + +The 8 presets (`src/presets.rs`, feature `presets`) share one +signature and one parity harness. Obvious axes: preset × workers +(1/2/4) × corpus cardinality (regenerate with more syllable +combinations) × rows (10M/100M) × batch_rows (lane J's `1<<16` default +vs smaller) × morsel size. The archival recipe convention (README §2) +makes every cell reproducible; publish results as a new dated section +in FINDINGS (append, don't rewrite history). diff --git a/crates/onebrc-probe/Cargo.toml b/crates/onebrc-probe/Cargo.toml index 324b5d93..f34dc3c3 100644 --- a/crates/onebrc-probe/Cargo.toml +++ b/crates/onebrc-probe/Cargo.toml @@ -29,6 +29,25 @@ lane-d = ["dep:ractor", "dep:tokio"] # the V3 kanban execution machinery (lance-graph-supervisor's KanbanActor + # drivers, lance-graph-contract's kanban/scheduler/soa_view contract types). lane-e = ["dep:lance-graph-supervisor", "dep:lance-graph-contract", "dep:ractor", "dep:tokio"] +# Lane G (kanban-update write path): the Morton-tile 64K SoA as owned state +# behind shard mailbox actors, morsel casts witnessed with KanbanMoves — +# ractor/tokio + the contract's kanban types (no supervisor needed: the +# shard owner IS the probe's actor). +lane-g = ["dep:lance-graph-contract", "dep:ractor", "dep:tokio"] +# Lane H (orchestrated fine-grained ownership): router tier with lazy +# owner activation + ahead-firing batched delivery over lane G's +# one-mailbox-per-SoA substrate — same dep set as lane-g. +lane-h = ["lane-g"] +# Lane I (substrate batch pipeline): 65536 upfront mailboxes, codebook +# CAM addressing, whole-table double-casts to ownership + lance sinks, +# flush-cache interleaving - same dep set as lane-g. +lane-i = ["dep:lance-graph-contract", "dep:ractor", "dep:tokio"] +# Lane J (parameterized batch pipeline): lane I's shape with grid / +# sink-lanes / registry knobs — needs lane-i's RowOwner. +lane-j = ["lane-i"] +# All 8 batching-method presets (src/presets.rs) — the lab-sweep surface; +# see FINDINGS.md (agnostic record) + COMMENTARY.md (interpretation). +presets = ["lane-g", "lane-h", "lane-i", "lane-j"] [dependencies] # AdaWorldAPI fork (workspace P0 rule: fork over crates.io, always — see diff --git a/crates/onebrc-probe/FINDINGS.md b/crates/onebrc-probe/FINDINGS.md new file mode 100644 index 00000000..aba9fa21 --- /dev/null +++ b/crates/onebrc-probe/FINDINGS.md @@ -0,0 +1,258 @@ +# FINDINGS — 1BRC substrate probe (agnostic record) + +> This file records WHAT was measured: environment, methods, numbers, +> and the asserted invariants with their code. It deliberately contains +> NO interpretation, rulings, or architecture doctrine — that lives in +> `COMMENTARY.md` so a later session can read these facts from its own +> angle without inheriting this session's prime. + +## 1. Environment + +- Container: 4 cores (`nproc`=4), Linux, AVX-512-capable CPU + (`avx2`+`avx512f`+`avx512bw` in `/proc/cpuinfo`). +- Probe pinned to `target-cpu=x86-64-v3` (`.cargo/config.toml`) — + `U8x32` ops compile to real AVX2 intrinsics; matches lance-graph CI. +- Release builds. Rust stable. +- Corpus recipe (reproducible; see §2 of `README.md`): + `rows=10000000 seed=42 + sha256=f1853caa30a765883aa655be1c304d956ad8b03e19b3557df2af431d9a955691` + — 142 MB, 400 distinct group keys ("stations"), hash re-verified + byte-identical at every regeneration. +- Workload: group-by aggregate (min/max/sum/count per key, integer + tenths; output rendered to `BTreeMap` for + byte-for-byte cross-method equality). +- Caveats that bound every number: 4-core container (oversubscription + effects are real), 400-key cardinality (low), 10M rows (smoke scale; + the 100M container-scale target was not run), `std::fs::read` (no + mmap), scalar parse (no SWAR). + +## 2. Methods measured + +| Lane | Method (mechanism only) | +|---|---| +| A | single thread, byte scan, `BTreeMap` accumulate | +| B | same as A but delimiter scan via `ndarray::simd::U8x32::cmpeq_mask`, 32-byte non-overlapping stride (`ndarray::simd::array_chunks`) | +| C | N scoped threads, newline-aligned chunks, per-worker `BTreeMap`, commutative merge at end | +| D | as C, worker = one ractor actor per chunk (ask-pattern), corpus copied once into `Arc>` | +| E | as C/D, plus one fresh kanban-lifecycle actor per batch driven Planning→CognitiveWork→Evaluation→Commit around the work | +| F | as C, accumulator = flat 65536-cell SoA table, cell = Morton nibble-interleave of two hash bytes, open-addressed, name-verified | +| R | as F, cell = plain low 16 hash bits (no interleave) — control | +| G | workers pre-reduce 64K-row morsels into private F-style tables, stream dirty entries to `shards` owner actors (each owning its OWN table over a contiguous cell range); every applied batch appends one witness record | +| H | as G, plus a router tier: LAZY owner spawn on first traffic + per-owner buffering with ahead-firing batched delivery (`batch_k=64`) | +| I | batch pipeline: 65536 standing per-cell mailboxes spawned upfront (message-free in steady state), identity minted once into a codebook cell (direct indexed writes after), workers fill whole 65536-cell batch tables, freeze into `Arc`, cast ONCE WHOLE to an ownership sink AND a persistence (row-address/version) sink, table pool recycled by refcount ("flush cache") | +| J | I parameterized: `grid` (4096 or 65536 cells), `sink_lanes` (1/8/64 row-range lane pairs), `registry` (on/off) | + +8 frozen configurations of the above are runnable as presets +(`src/presets.rs`, CLI `run p` not wired — call +`presets::run_preset`); each is asserted byte-identical to lane A by +`all_presets_agree_with_lane_a`. + +## 3. Measurements + +All throughputs in Mrows/s on the §1 recipe corpus. "Median" over the +listed passes. Same-session reference values are quoted where a table +mixes sessions (run-to-run drift between bench rounds was up to ~10% +on this container; comparisons inside one table row-block are +same-session). + +### t0/t1/t2 — baselines, SIMD scan, actors, kanban cards (4 workers) + +| Method | passes | median | +|---|---|---| +| A | 7.11 / 7.01 / 7.16 (three rounds) | ~7.1 | +| B | 7.06, 7.46 | 7.3 | +| C | 26.4 / 27.6 / 28.3 (three rounds) | ~27.5 | +| D | 21.6, 22.4 | 22.1 | +| E batches=4 | 22.8, 23.0 | 22.9 | +| E batches=64 | 22.0, 22.5 | 22.2 | +| E batches=256 | 21.8, 22.1 | 22.0 | + +Per-card overhead from E(256)−E(4) elapsed delta: ≈ 66 µs per card +(actor spawn + 3 lifecycle RPCs + join + queue pull). + +### t3 — flat SoA accumulators (4 workers, 5 passes) + +| Method | passes | median | +|---|---|---| +| C | 28.3, 27.8, 28.9, 28.1, 28.8 | 28.3 | +| F (Morton cell) | 76.2, 80.8, 56.9, 77.4, 81.2 | 77.4 | +| R (radix cell) | 86.2, 86.4, 86.3, 86.8, 85.1 | 86.3 | +| F single-thread | 21.5 | — | +| R single-thread | 23.3 | — | + +### t4/t4a — streamed ownership, granularity sweep (4 workers, 3 passes) + +| Owners (each owns its own table) | median | +|---|---| +| F reference (no ownership) | ~79.5 (t4) / ~76 (t4a round) | +| 1 | 43.0 (t4) / 43.4 (t4a) | +| 4 | 39.9 | +| 16 | 30.3 (noisy 29.2–41.9) | +| 256 | 35.9 | +| 4096 | 18.3 | +| 65536 | 2.1 (one pass 1.45; same-session later: 1.7) | + +At 65536 owners: ~63K owner-addressed messages; 64K actor spawns +inside the run. + +### t5 — orchestrated (lazy + ahead-firing), 4 workers, 3 passes + +| Nominal owners | flat (t4a) | orchestrated | same-session refs | +|---|---|---|---| +| 16 | 30.3 | 42.2 | | +| 256 | 35.9 | 36.8 | | +| 4096 | 18.3 | 40.2 | | +| 65536 | 2.1 (1.7 same-session) | 39.4 | G(1)=43.2, F=81.7 | + +### t6 — batch pipeline, 64K grid + standing registry (4 workers, 3 passes) + +| Pass | total | spawn_ms | steady (= rows/(elapsed−spawn−stop)) | +|---|---|---|---| +| 1 | 3.19 | 2667.9 | ~22 | +| 2 | 6.05 | 1135.2 | ~20 | +| 3 | 3.86 | 2107.4 | ~21 | + +Fixed per-run: batches=156; ownership journal = persistence journal = +156; 156 version ticks; rows_addressed=62,400; flush-cache peak 2–3 +tables/worker; codebook_len=400; total messages=312. + +### t7 — knob matrix (4 workers; same-session refs G(1)=46.3, H=40.5, F=70.1) + +| Config | passes | median | +|---|---|---| +| grid=4096, lanes=1, registry=off | 42.0, 46.2, 46.3 | 46.2 | +| grid=65536, lanes=1, registry=off | 19.9(cold), 37.9 / 39.8, 41.5 | ~40 | +| grid=4096, lanes=8, registry=off | 43.8, 45.1, 44.1 | 44.1 | +| grid=4096, lanes=64, registry=off | 37.6, 39.2, 42.2 | 39.2 | +| grid=65536, lanes=1, registry=ON | 2.6 (spawn 2655 ms), 14.1 (spawn 274 ms) | — | + +Registry=ON steady state net of spawn (pass 2): ≈ 23 vs ≈ 38–46 +without — the registry-residency delta is isolated by this knob (the +pipeline is byte-identical either way). + +## 4. Invariants, with their code + +Each invariant is enforced in-code (assert/test), not by convention. +Line numbers drift; anchors are function/test names. + +**INV-1 — Commutative, associative merge (multi-writer combination).** +`src/lib.rs`, `Stats::merge` + test `merge_is_commutative_and_associative`: + +```rust +pub fn merge(&mut self, other: &Stats) { + if other.min < self.min { self.min = other.min; } + if other.max > self.max { self.max = other.max; } + self.sum += other.sum; + self.count += other.count; +} +``` + +**INV-2 — Cross-method parity.** Every method's output map is asserted +byte-identical to lane A on a generated corpus. One test per lane +(`lane_*_agrees_with_lane_a*` in each module) plus the preset-wide +harness `presets::tests::all_presets_agree_with_lane_a`: + +```rust +for preset in PRESETS.iter() { + let out = run_preset(preset.id, &data, 3); + assert_eq!(a, out, "preset {} ({}) must match lane A", preset.id, preset.name); +} +``` + +**INV-3 — Corpus reproducibility.** `src/gen.rs` streams a SHA-256 +while writing; test `generator_is_deterministic` asserts same seed ⇒ +same digest. Every measurement above carries the §1 recipe line. + +**INV-4 — Witnessed streamed writes (single-sink family).** +`src/lane_g.rs` (`lane_g_kanban_soa_with_morsel`) and `src/lane_h.rs` +(`lane_h_orchestrated_with`): + +```rust +assert_eq!(journal_total, casts.load(Ordering::Relaxed), + "every applied morsel batch must be witnessed (journal == casts)"); +// lane H: +assert_eq!(journal_total, router_casts_total, + "every fired batch must be witnessed (owner journals == router casts)"); +``` + +**INV-5 — Double-cast completeness (two-sink family).** +`src/lane_i.rs` (`lane_i_batch_pipeline_with`); the laned form in +`src/lane_j.rs` multiplies by `sink_lanes`: + +```rust +assert_eq!(ownership_journal, batches_total, "…ownership end"); +assert_eq!(lance_journal, batches_total, "…lance end"); +assert_eq!(versions as usize, batches_total, "one DatasetVersion tick per batch"); +// lane J: +assert_eq!(own_journal, batches_total * sink_lanes, "…every ownership lane"); +assert_eq!(lance_journal, batches_total * sink_lanes, "…every lance lane"); +``` + +**INV-6 — Aligned fixed indices (ownership guarantee).** +`src/lane_i.rs`, `OwnershipSink::pre_start` builds the guarantee table +and every applied row is checked against it: + +```rust +row_owner: (0..SLOTS).map(|i| i as MailboxId).collect(), +// per applied dirty row: +debug_assert_eq!(state.row_owner[s], s as MailboxId); +``` + +**INV-7 — Identity minted once, stable, unique (codebook).** +`src/lane_i.rs`, `Codebook::mint` (Morton placement + linear probe + +full `(h, name)` verification at mint only) + test +`codebook_mints_unique_stable_slots`: + +```rust +let a1 = cb.mint(fnv1a64(b"alpha"), b"alpha"); +let a2 = cb.mint(fnv1a64(b"alpha"), b"alpha"); +assert_eq!(a1, a2); // idempotent per identity +assert_ne!(a1, b1); // distinct identities, distinct cells +``` + +**INV-8 — Flush-cache exclusivity (no aliased batch reuse).** +`src/lane_i.rs` / `src/lane_j.rs`, `FlushCache::next_table` — a batch +table is reused only when BOTH sinks dropped their `Arc`: + +```rust +match Arc::try_unwrap(front) { + Ok(mut table) => { table.reset(); return table; } // refcount 1: flush done + Err(still_in_flight) => { self.pool.push_front(still_in_flight); } +} +``` + +**INV-9 — Owner-table fullness is a panic, never silent corruption.** +`src/lane_g.rs`, `OwnerSoa::merge_entry` (bounded probe): + +```rust +panic!("OwnerSoa full: more stations routed to one mailbox than its capacity"); +``` + +**INV-10 — Lifecycle legality (kanban card family).** +`src/lane_e.rs` asserts every recorded move is a legal Rubicon edge: + +```rust +assert!(m.from.can_transition_to(m.to), "…must be a legal Rubicon edge"); +assert_eq!(journal.len(), 3 * batches, "3 moves per card"); +``` + +**INV-11 — SIMD provenance.** All SIMD in this crate routes through +`ndarray::simd` (`U8x32::{splat, from_slice, cmpeq_mask}` + +`array_chunks`); no raw `core::arch` intrinsics, no third-party SIMD +crates. Enforced by review + the module docs; grep pattern: +`core::arch|_mm256|pulp|wide::` over `src/` must return nothing. + +## 5. Reproduction + +```bash +cargo test --manifest-path crates/onebrc-probe/Cargo.toml \ + --features lane-b,lane-d,lane-e,lane-g,lane-h,lane-i,lane-j,presets +cargo build --release --manifest-path crates/onebrc-probe/Cargo.toml \ + --features lane-b,lane-d,lane-e,lane-g,lane-h,lane-i,lane-j +B=./crates/onebrc-probe/target/release/onebrc-probe +$B gen /tmp/onebrc_10m.txt 10000000 42 # verify the sha256 line +$B run /tmp/onebrc_10m.txt [workers] [knobs] # see README §4 +``` + +Lane-by-lane measurement history with commands: `README.md` +§5.0–§5.7. Interpretation: `COMMENTARY.md`. diff --git a/crates/onebrc-probe/README.md b/crates/onebrc-probe/README.md index ebc66733..ffa908cc 100644 --- a/crates/onebrc-probe/README.md +++ b/crates/onebrc-probe/README.md @@ -13,6 +13,19 @@ cargo test --manifest-path crates/onebrc-probe/Cargo.toml Zero external dependencies for lanes A/C (std only) — see `Cargo.toml`. +**Reading order for a fresh session:** + +- **`FINDINGS.md`** — the AGNOSTIC record: environment, methods, all + measurement tables (t0–t7), and every asserted invariant WITH its + code. No interpretation. +- **`COMMENTARY.md`** — one session's interpretation (the prime), + stored separately so the findings can be read from another angle. +- **`src/presets.rs`** (feature `presets`) — the 8 batching methods + frozen as named, reproducible presets sharing one signature and one + parity harness — the lab-sweep surface. +- This README — the per-lane history (specs, commands, per-round + measurement sections §5.0–§5.7) in the order it happened. + --- ## §1 — Reference inventory @@ -84,6 +97,9 @@ regenerating with the same `(rows, seed)` and diffing the printed | **D** — `lane_d::lane_d_ractor` (feature `lane-d`) | Same groupby-aggregate workload as Lane C, but each `chunk_bounds` chunk is aggregated by a stateless `ractor` actor (actor-per-worker, ask-pattern reply, `lance-graph-supervisor`-style `Actor`/`RpcReplyPort` shape) instead of a bare `std::thread::scope` closure — identical chunking + commutative merge, only the worker primitive changes. | **Shipped** | | **F** — `lane_f::lane_f_morton` (std-only, no feature) | The substrate-native lane: station identity → FNV-1a 64 → two axis bytes **nibble-interleaved** into a 16-bit Morton tile position (the GUID canon's 256×256 centroid-tile read) → slot into flat **SoA accumulators** (`min[]/max[]/sum[]/count[]`, open-addressed linear probe, name-verified on tag hit) — group-by as a prefix ROUTE, aggregation as a gated indexed write, per-worker owned tables BUNDLE-merged. Same scalar scan + `chunk_bounds` as lane C: the only variable vs C is the accumulator. | **Shipped** | | **R** — `lane_f::lane_r_radix` (std-only, no feature) | The honest control for F: byte-identical pipeline, slot = plain `hash & 0xFFFF` (no interleave). **F−R isolates the Morton addressing tax exactly; R−C prices flat-SoA-table-vs-BTreeMap.** | **Shipped** | +| **G** — `lane_g::lane_g_kanban_soa` (feature `lane-g`) | The kanban-update write path: the SAME Morton-tile 64K SoA as lane F, but held as OWNED state by `shards` mailbox actors (mailbox-as-owner over contiguous Morton tile ranges). Workers pre-reduce 64K-row morsels in private tables (identical hot loop to F), extract dirty slots by clear-by-undo, and **cast** the pre-reduced entries prefix-routed to the owning shard; every applied batch is witnessed with a `KanbanMove` on the owner's WAL (`journal == casts` asserted). **G−F prices witnessed streamed ownership vs private-merge-at-end; the `shards` sweep (1/4/16) is the 64K-concurrent-SoA-vs-Morton-tile ownership ledger** (§5.4). | **Shipped** | +| **H** — `lane_h::lane_h_orchestrated` (feature `lane-h`) | Orchestrated fine-grained ownership over lane G's substrate: router tier with LAZY owner activation (live mailboxes track occupancy, never address-space size) + AHEAD-FIRING batched delivery (`batch_k`). Flattens the ownership-granularity curve (§5.5: 23× recovery at the 64K end). | **Shipped** | +| **I** — `lane_i::lane_i_batch_pipeline` (feature `lane-i`) | The operator batch-pipeline spec: 65536 mailboxes UPFRONT (standing ownership registry), aligned fixed indices (mailbox idx == SoA row idx), codebook-minted direct CAM addressing, whole-table Arc DOUBLE-CASTS to the ownership-guarantee sink + the Lance row-address sink, flush-cache interleaving. Messages ∝ batches (312 total at 10M rows); double-WAL on both ends (§5.6). | **Shipped** | | **E** — `lane_e::lane_e_kanban` (feature `lane-e`) | One kanban card per batch: the corpus splits into `batches` newline-aligned chunks (`batches >= workers`) pulled from a shared `AtomicUsize` queue by `workers` puller tasks; every batch is journaled by a fresh `lance-graph-supervisor::KanbanActor` driven through the full Rubicon forward arc (`Planning->CognitiveWork->Evaluation->Commit`, `drive_version_tick` × 3) around the actual `lane_a_scalar` work. The combined journal is asserted to carry exactly `3 * batches` legal `KanbanMove`s. `batches == workers` vs Lane D isolates the kanban journaling cost (identical chunking + actor-model tax, only the actor type differs); fine-grained batching (`batches >> workers`) prices per-card scheduling overhead (feeds W2d, the 550 ms Libet budget question). | **Shipped** | --- @@ -92,12 +108,12 @@ regenerating with the same `(rows, seed)` and diffing the printed ```text onebrc-probe gen -onebrc-probe run [workers] [batches] +onebrc-probe run [workers] [batches|shards|owners_nominal] ``` Lane `b` requires `--features lane-b`; lane `d` requires `--features lane-d`; lane `e` requires `--features lane-e` (see `Cargo.toml` `[features]`). Lanes -A/C stay dependency-free either way. `batches` is **lane-`e`-only** (ignored +A/C stay dependency-free either way. `batches` is **lane-`e`-only**; for lane `g` the same 4th positional arg is the **shard-owner count** (default 4; ignored by every other lane): the number of newline-aligned batches the corpus splits into, each journaled as one kanban card (`batches >= workers`, default `workers * 16`): @@ -296,3 +312,268 @@ Readings — the numbers Addendum-13 sent this lane to fetch: bucketing + cascade-ordered sweeps (earns its keep only at high cardinality), kanban tile-batch scheduling (lane E priced it: ~66 µs/card), SIMD scan (lane B's variable — composable later). + +### §5.4 — t4 (lane G, kanban update vs without) — measured 2026-07-02 + +The operator's question: *"compare morton and the kanban vs without — +if 64k concurrent SoA vs Morton tile can help us understand the pros +and cons of our architecture when using kanban update."* Lane G holds +the SAME Morton-tile 64K SoA as lane F, but as owned state behind +shard mailbox actors: workers pre-reduce 64K-row morsels (identical +hot loop to F), cast the dirty entries prefix-routed to the owning +shard, every applied batch witnessed with a `KanbanMove` (journal == +casts asserted). Same recipe corpus, 4-core container, 3 passes each: + +| Config | throughput passes (Mrows/s) | median | +|---|---|---| +| F, workers=4 (no kanban, private merge) | 79.5, 80.0, 78.4 | **79.5** | +| G, workers=4, shards=1 | 42.7, 44.7, 43.0 | **43.0** | +| G, workers=4, shards=4 | 43.0, 39.9, 38.7 | **39.9** | +| G, workers=4, shards=16 | 36.8, 36.0, 11.7 | **36.0** (one collapse) | +| G, workers=3, shards=1 | 37.7, 38.3, 33.3 | 37.7 | +| G, workers=3, shards=4 | 37.7, 35.7, 36.8 | 36.8 | +| F, workers=3 (reference) | 63.5 | 63.5 | + +The pros-and-cons ledger this sweep was sent to fetch: + +- **Kanban update costs ~0.54× at morsel granularity** (43.0 vs 79.5). + The tax decomposes into KNOWN boundary costs, not the witness + itself: the actor-boundary `Arc` corpus copy (lane D's finding), + blocking-pool workers + async owner threads oversubscribing 4 cores + (F runs exactly 4 scoped threads), and per-morsel extraction + + message allocation. Lane E already proved the journal append itself + is ~free. +- **What the ~2× buys** (what F cannot do): LIVE state — mid-flight, + the shard owners hold a bounded-staleness view of the whole + aggregation, queryable at any instant; WITNESSED writes — every + applied batch on the WAL, `journal == casts` asserted, replay-ready; + single-writer safety by construction (actor mailbox = serialized + `&mut`, E-CE64-MB-4); bounded worker memory (workers hold one morsel + table, owners hold THE state). +- **Do not shard ownership below contention.** At ~400 stations the + owner's apply work (~150 batch merges total) is trivially absorbed + by ONE mailbox; every added shard is pure scheduling overhead on a + 4-core box (1 → 4 → 16 shards: 43.0 → 39.9 → 36.0, with one + 16-shard collapse to 11.7 when 20 tasks thrashed 4 cores). The + Morton-tile PREFIX ROUTE itself is structurally free (G(4) is + within ~7% of G(1) before thrash) — tile-sharding is the right + MECHANISM, but its trigger is owner-side contention (high + cardinality / heavy per-entry work), never data volume. Shard count + scales with owner WORK, not with rows. +- **Don't starve scanners to feed owners:** workers=3 + dedicated + owner core is strictly worse (37.7 < 43.0) — apply work is too + light to deserve a core at this cardinality. +- **W2d guidance:** if the consumer needs one merged answer at the + end, private-merge (F) is 2× faster; the kanban-update path is what + you pay when the substrate's claims (live view, witness, replay, + ownership) are the product. The 550 ms Libet budget is untouched + either way — the tax is throughput, not latency floor. + +### §5.4a — one mailbox per SoA (topology corrected) + the full ownership-granularity curve + +Correction to §5.4's framing (operator: *"I thought we spawn one ractor +mailbox per SoA?"* — yes, that is the canon): lane G's owners were +always independent (nothing shared), but each owner allocated a +full-64K-slot table and the prose said "sharding the 64K SoA" — an +ownership inversion, and it made the fine-grained end unrunnable. Fixed: +each owner's `State` is now its OWN `OwnerSoa` sized to its tile span +(one mailbox = one SoA, verbatim), which unlocks the sweep out to the +literal "64K concurrent SoAs" — `shards=65536`, one mailbox per tile, +spawn cost included in the measurement. Same recipe corpus, 4 cores, +3 passes, medians: + +| Owners (mailbox=SoA pairs) | tile span/owner | median Mrows/s | +|---|---|---| +| F reference (no kanban) | — | ~76 (61.8–76.4 this round) | +| **1** | 65536 | **43.4** | +| 16 | 4096 | 30.3 (noisy: 29.2–41.9) | +| 256 | 256 | 35.9 | +| 4096 | 16 | **18.3** | +| **65536** (mailbox/tile) | 1 | **2.1** (6.9 s worst pass) | + +Readings — the completed pros-and-cons ledger: + +- **The ownership-granularity curve is a plateau then a cliff.** 1–256 + owners live in the same ~30–43 band (topology within it is + noise-dominated on 4 cores); 4096 owners halves throughput; 65536 + owners — the "64K concurrent SoA" end — collapses **20×** vs one + owner. The costs at the fine end: 64K actor spawns (paid inside the + run), cast fragmentation (each morsel's ~413 stations scatter to + ~413 distinct owners → casts explode from ~150 to ~63K), and 64K + mailbox tasks scheduled over 4 cores. +- **Morton tile GROUPING is therefore not an optimization detail — it + is what makes mailbox-as-owner viable.** One mailbox per semantic + cell (per station/tile) is architecturally clean and measurably + catastrophic at OLAP arrival rates; grouping tiles into a few + prefix-contiguous owners (matched to contention, not to data) keeps + the whole kanban-update discipline inside ~0.5× of the unwitnessed + ceiling. The canon's own answer, measured: the mailbox is the OWNER + boundary, the tile is the ADDRESS boundary, and they must not be + conflated 1:1 under load. +- Per-owner SoA memory is now ∝ tile span (the 64K-owner run holds + 64K × 64-slot tables — the collapse above is scheduling + messaging, + not memory). + +### §5.5 — t5 (lane H: orchestration finds the sweet spot) + +Operator: *"the 65536 mailboxes had no Orchestration at all — check with +rs-graph-llm or lance-graph-planner + kanban update to find the sweet +spot."* Correct — t4a's fine end was the FLAT topology (64K eager +spawns, ~63K owner-addressed casts, no orchestration tier). Lane H +(feature `lane-h`) adds the two planner/kanban-executor mechanisms on +top of lane G's unchanged one-mailbox-per-SoA substrate: **lazy +activation** (a router tier spawns an owner only on first traffic — +live mailboxes track OCCUPANCY ~413, never the 64K address space) and +**ahead-firing batched delivery** (routers buffer per-owner entries, +fire one batched `Apply` at `batch_k=64`; drain flushes remainders). +Witness discipline unchanged: `Σ owner journals == Σ router casts` +asserted. (graph-flow/rs-graph-llm orchestrates at TASK granularity — +the M25 persisted-cursor shape; per-morsel it would put a session save +on the hot path, and its in-container build is blocked by the +pre-existing burn-submodule 403 — so it stays the OUTER loop; lane H +measures the in-loop planner-domain mechanisms.) + +Same recipe corpus, 4 cores, 3 passes, medians; same-session flat +references: + +| Nominal owners | flat (t4a lane G) | **orchestrated (lane H)** | +|---|---|---| +| 16 | 30.3 | **42.2** | +| 256 | 35.9 | 36.8 | +| 4096 | 18.3 | **40.2** | +| 65536 | 2.1 (1.7 same-session) | **39.4** | +| — G(1)=43.2 / F=81.7 same-session | | | + +Readings: + +- **Orchestration flattens the granularity curve.** Flat topology: + plateau then a 20× cliff. Orchestrated: ~37–43 Mrows/s at EVERY + nominal granularity — a **23× recovery at the 64K end** + (1.7 → 39.4), landing within ~9% of the best coarse topology. +- **The sweet spot is not a shard count — it is the orchestration + tier itself.** With lazy activation + ahead-firing batching, the + live-mailbox population tracks occupancy (~413) and message count + tracks batches, both independent of nominal granularity. Ownership + granularity becomes a SEMANTIC choice (per-tile addressability, + per-owner WAL) rather than a performance gamble. The residual gap + to F (~2×) is the same boundary tax G(1) pays — orchestration adds + nothing measurable on top. +- **Architecture consequence (W2d/W2e):** mailbox-as-owner at fine + semantic granularity is viable IF AND ONLY IF producers never + address owners directly — the router/delegation tier (the + ahead-firing batch-writer shape) is a load-bearing part of the + kanban-update architecture, not an optimization. Flat fan-out to + fine-grained owners is the measured anti-pattern (20×). + +### §5.6 — t6 (lane I: the batch pipeline — 65536 upfront, double-cast, flush cache) + +Operator spec: all 65536 mailboxes upfront; two fixed aligned indices +(mailbox idx == SoA row idx); codebook index → direct CAM addressing; +whole-table double-casts into the mailbox-ownership-guarantee table AND +the Lance row-address table; flush cache so flushing and reindexing +interleave. Lane I (feature `lane-i`) implements it verbatim — see +`lane_i.rs` for the mechanism-by-mechanism mapping. Same recipe corpus, +4 cores, 3 passes (stderr breakdown per run): + +| Pass | total Mrows/s | mailbox_spawn_ms | steady-state* Mrows/s | +|---|---|---|---| +| 1 | 3.19 | 2667.9 | ~22 | +| 2 | 6.05 | 1135.2 | ~20 | +| 3 | 3.86 | 2107.4 | ~21 | + +*steady-state = rows / (elapsed − spawn − stop); spawn is standing- +infrastructure setup, paid once per process lifetime, amortized to ~0 +in a long-running substrate. + +Fixed per-run facts (identical all passes): `batches=156`, +`versions=156` (one DatasetVersion tick per batch), +`rows_addressed=62,400` (400 stations × 156 batches), +`flush_cache_peak_tables_per_worker=2–3`, `codebook_len=400`, +ownership journal == lance journal == 156 (double-cast completeness +asserted). + +Readings: + +- **The batch pipeline wins the messaging war outright.** 312 messages + TOTAL (156 whole-table Arcs × 2 ends) versus ~63K owner-addressed + casts flat (t4a) and ~2.6K orchestrated (t5). Message count tracks + BATCHES — independent of occupancy AND of address-space size. One + allocation serves both ends (the Arc double-cast); nothing is ever + repacked into per-owner entry lists. +- **The flush cache interleaves as designed.** Peak 2–3 tables per + worker: while both sinks flush batch n, the worker fills n+1 from a + recycled table (refcount-gated). The worker never waits for a flush. +- **The costs are residency + one-time spawn, not routing.** The 64K + upfront spawn costs 1.1–2.7 s on this container (17–40 µs/actor, + high variance under memory churn) — standing infrastructure, + amortizable. Steady state (~20–22 Mrows/s) runs at roughly HALF of + the 1-owner streamed topology (G(1) 43.0) — attribution (CONJECTURE, + not isolated): the resident 64K-actor footprint's cache/memory + pressure on a 4-core container, plus the two serialized sinks. On + real silicon with real RAM the residency term shrinks; the messaging + and witness terms are already optimal. +- **The witness story is the strongest of any lane:** every batch is + journaled on BOTH the ownership end and the persistence end with one + KanbanMove each + a version tick — the double-WAL that makes the + batch replayable from either side. Where lanes G/H witnessed the + ownership side only, lane I is the full write-ahead shape the real + batch writer (W1b) needs. +- Composition guidance across t4–t6: **H's lazy activation** (when the + registry need not pre-exist) and **I's whole-table double-cast + + flush cache** (when it does) are complementary; both keep producers + from ever addressing fine-grained owners directly. The 65536 + standing registry is affordable as infrastructure (one spawn, ~2 s), + and the batch data path costs messages ∝ batches — the remaining + optimization surface is residency footprint, not architecture. + +### §5.7 — t7 (lane J: the knob matrix — grid × lanes × registry) + +The operator's four follow-up questions, one parameterized lane +(`lane-j`), one matrix. Same recipe corpus, 4 cores; same-session +references: G(1)=46.3, H(65536 nominal)=40.5, F=70.1. + +| Knob | Config | Mrows/s (passes) | +|---|---|---| +| **grid** | **4096 (64×64 gridlake)**, 1 lane, no registry | **42.0, 46.2, 46.3** | +| grid | 65536 (256×256), 1 lane, no registry | 19.9*, 37.9 / 39.8, 41.5 | +| **lanes** | 4096 grid, 8 lanes | 43.8, 45.1, 44.1 | +| lanes | 4096 grid, 64 lanes | 37.6, 39.2, 42.2 | +| **registry** | 65536 grid, registry ON | 2.6 (spawn 2655 ms), 14.1 (spawn 274 ms) | + +*first-pass cold outlier. + +Answers: + +- **"With Orchestrator it was 39.4?"** — t6's ~20 decomposes into TWO + now-isolated costs: the standing 64K registry (knob: registry ON + halves steady state even net of spawn — the t6 CONJECTURE is now a + FINDING) and the 64K-cell table+memo working set (~2.5 MB/worker, + L2-busting). Remove both and the batch pipeline BEATS the + orchestrator: 46.2 vs 40.5. +- **"Different cache?"** — yes, and it was the wrong one. Matching + the SoA batch unit to the 64×64 gridlake tile (4096 cells ≈ 80 KB + integer-exact; the literal 4096×BF16=16 KB pair is ndarray #227's + proven VDPBF16PS tier, ~448 Mrows/s single-thread in its own + probe) recovers the loss completely: grid 4096 → 46 vs grid + 65536 → 40. +- **"8 or 64 lanes?"** — **1 lane suffices; 8 is free; 64 over-lanes** + (37–42). Per-batch apply work here is O(400 dirty rows) — trivial; + sink lanes only pay when per-batch apply work is heavy. Lane count + scales with APPLY work, never with data or address space. +- **"Orchestration with 400 the sweet spot?"** — no: the gridlake + batch pipeline (46.2, double-WAL on both ends, 312 msgs) beats + H's lazy-owner orchestration (40.5, single WAL, ~2.6K msgs). H's + mechanisms remain right when fine-grained OWNERSHIP must live as + actors; when ownership can be the index-aligned guarantee table, + the batch pipeline dominates. +- **"64×64 gridlake SoA?"** — **yes: the measured sweet spot.** + J(4096, 1 lane, no registry) equals the best streamed topology + (G(1)) while carrying the double witness — the kanban-update tax + at gridlake scale is ~0 vs the best owned topology, and the only + remaining gap is the universal actor-boundary cost vs F (70). + +The composed sweet-spot recipe, one line: **64×64 gridlake batch +SoA + codebook CAM addressing + 1–8 sink lane pairs + whole-table +double-cast + flush cache; ownership as the index-aligned guarantee +table (no standing per-cell actor registry); BF16 planes per #227's +tier when tile-GEMM lands.** diff --git a/crates/onebrc-probe/src/lane_b.rs b/crates/onebrc-probe/src/lane_b.rs index 401d92e2..0e7679b2 100644 --- a/crates/onebrc-probe/src/lane_b.rs +++ b/crates/onebrc-probe/src/lane_b.rs @@ -43,7 +43,7 @@ //! `line_start` / `pending_semi` state the SIMD pass left behind. use crate::{parse_temp_tenths, Stats}; -use ndarray::simd::U8x32; +use ndarray::simd::{array_chunks, U8x32}; use std::collections::BTreeMap; /// Lane B — SIMD delimiter scan. One pass over `data` in 32-byte strides @@ -63,10 +63,15 @@ pub fn lane_b_simd(data: &[u8]) -> BTreeMap { let nl_needle = U8x32::splat(b'\n'); let semi_needle = U8x32::splat(b';'); + // The non-overlapping 32-byte stride walk routes through + // `ndarray::simd::array_chunks` (simd_ops.rs) — the W1a batch-walk + // primitive; `array_windows` is its OVERLAPPING sibling (GEMM-style + // row windows) and is deliberately NOT used here: delimiter scanning + // never re-reads bytes. let aligned_end = (len / U8x32::LANES) * U8x32::LANES; - let mut pos = 0usize; - while pos < aligned_end { - let block = U8x32::from_slice(&data[pos..pos + U8x32::LANES]); + for (chunk_idx, chunk) in array_chunks::(&data[..aligned_end]).enumerate() { + let pos = chunk_idx * U8x32::LANES; + let block = U8x32::from_slice(chunk); let nl_mask = block.cmpeq_mask(nl_needle); let semi_mask = block.cmpeq_mask(semi_needle); // `;` and `\n` never occupy the same byte, so OR-ing loses no @@ -98,7 +103,6 @@ pub fn lane_b_simd(data: &[u8]) -> BTreeMap { } combined &= combined - 1; // clear the lowest set bit } - pos += U8x32::LANES; } // Tail — fewer than 32 bytes remain. Finish with a plain scalar scan, diff --git a/crates/onebrc-probe/src/lane_f.rs b/crates/onebrc-probe/src/lane_f.rs index bb8c20c7..386ee5ec 100644 --- a/crates/onebrc-probe/src/lane_f.rs +++ b/crates/onebrc-probe/src/lane_f.rs @@ -62,11 +62,11 @@ use crate::{parse_temp_tenths, Stats}; use std::collections::BTreeMap; /// Number of accumulator slots — the full 16-bit tile space (256×256). -const SLOTS: usize = 1 << 16; +pub(crate) const SLOTS: usize = 1 << 16; /// FNV-1a 64-bit over the station name bytes. #[inline(always)] -fn fnv1a64(bytes: &[u8]) -> u64 { +pub(crate) fn fnv1a64(bytes: &[u8]) -> u64 { let mut h: u64 = 0xcbf2_9ce4_8422_2325; for &b in bytes { h ^= b as u64; @@ -80,7 +80,7 @@ fn fnv1a64(bytes: &[u8]) -> u64 { /// the GUID canon (alternating-axis refinement; each byte's nibbles are the /// axis's coarse→fine ancestry). #[inline(always)] -fn morton_slot(h: u64) -> u16 { +pub(crate) fn morton_slot(h: u64) -> u16 { let x = (h & 0xFF) as u16; let y = ((h >> 8) & 0xFF) as u16; ((x & 0xF0) << 8) | ((y & 0xF0) << 4) | ((x & 0x0F) << 4) | (y & 0x0F) @@ -98,24 +98,24 @@ fn radix_slot(h: u64) -> u16 { /// slot, open-addressed (linear probe) on collision. Single-writer by /// ownership — each worker builds its own table; cross-worker combination /// is the commutative BUNDLE merge in [`table_to_map`] + `merge_maps`. -struct SoaTable { +pub(crate) struct SoaTable { /// Full 64-bit hash tag per slot; `None`-ness is carried by `names`. - tags: Vec, + pub(crate) tags: Vec, /// Station name owned per occupied slot (empty vec = unoccupied). /// Verified byte-for-byte on every tag hit — see module doc "Hash". - names: Vec>, + pub(crate) names: Vec>, // SoA value arrays — the "SoA-shaped accumulators" of Addendum-13: // one field per column, indexed by slot, updated by gated indexed // writes (min/max/sum/count — each write is a fold, never a blind // overwrite of foreign state). - mins: Vec, - maxs: Vec, - sums: Vec, - counts: Vec, + pub(crate) mins: Vec, + pub(crate) maxs: Vec, + pub(crate) sums: Vec, + pub(crate) counts: Vec, } impl SoaTable { - fn new() -> Self { + pub(crate) fn new() -> Self { Self { tags: vec![0; SLOTS], names: vec![Vec::new(); SLOTS], @@ -127,9 +127,11 @@ impl SoaTable { } /// Route `name` to its slot (slot fn + linear probe) and fold one - /// observation into the SoA columns at that address. + /// observation into the SoA columns at that address. Returns the + /// RESOLVED slot index (post-probe) so callers that track dirty slots + /// (lane G's morsel extraction) can record it; lanes F/R ignore it. #[inline(always)] - fn observe(&mut self, slot0: u16, h: u64, name: &[u8], tenths: i32) { + pub(crate) fn observe(&mut self, slot0: u16, h: u64, name: &[u8], tenths: i32) -> usize { let mut s = slot0 as usize; loop { if self.counts[s] == 0 { @@ -155,6 +157,7 @@ impl SoaTable { } self.sums[s] += tenths as i64; self.counts[s] += 1; + s } } @@ -190,7 +193,7 @@ fn accumulate_table(data: &[u8], slot_of: impl Fn(u64) -> u16 + Copy) -> SoaTabl /// shape (occupied slots only) so cross-worker combination reuses the same /// commutative `merge_maps` BUNDLE step every other lane uses — and so the /// parity tests compare like with like. -fn table_to_map(table: SoaTable) -> BTreeMap { +pub(crate) fn table_to_map(table: SoaTable) -> BTreeMap { let mut out = BTreeMap::new(); for s in 0..SLOTS { if table.counts[s] > 0 { diff --git a/crates/onebrc-probe/src/lane_g.rs b/crates/onebrc-probe/src/lane_g.rs new file mode 100644 index 00000000..f55b5289 --- /dev/null +++ b/crates/onebrc-probe/src/lane_g.rs @@ -0,0 +1,575 @@ +//! Lane G — the kanban-update write path: the 64K SoA as OWNED state +//! behind mailbox actors, with every write witnessed on the board. +//! +//! The operator's question (2026-07-02): *"compare morton and the kanban +//! vs without — if 64k concurrent SoA vs Morton tile can help us +//! understand the pros and cons of our architecture when using kanban +//! update."* Lane F answered the ADDRESS question (Morton vs radix, +//! ~10% tax) with per-worker PRIVATE tables merged once at the end — +//! fast, but the accumulator state is invisible until the merge and no +//! write is witnessed. Lane G answers the OWNERSHIP question: the same +//! 64K-slot Morton-tile SoA, but held as **owned state by shard mailbox +//! actors** (mailbox-as-owner), updated by **streamed morsel casts** +//! (the kanban-update path), each applied batch journaled as a +//! [`KanbanMove`] — the write-ahead witness trail. +//! +//! ## Topology +//! +//! ```text +//! workers (spawn_blocking, scalar scan) shard owners (ractor actors) +//! ┌─────────────────────────────────┐ ┌──────────────────────────┐ +//! │ chunk → 64K-row morsels │ cast │ shard 0: Morton tiles │ +//! │ per-morsel local SoA pre-reduce │──Apply───► │ [0, 64K/S) SoA + WAL │ +//! │ dirty-slot extract, clear-by-undo│ (by tile │ shard 1: tiles ... │ +//! │ route entries by Morton PREFIX │ prefix) │ shard S-1: tiles ... │ +//! └─────────────────────────────────┘ └──────────────────────────┘ +//! ``` +//! +//! - **One mailbox per SoA (the canon, verbatim):** each owner actor's +//! `State` IS its own complete [`OwnerSoa`], sized to its tile span — +//! there is NO shared table and no "one SoA sharded across owners". +//! `shards` only chooses how many (mailbox, SoA) pairs partition the +//! tile space: 1 = one mailbox owning one SoA covering all tiles … +//! 65536 = one mailbox per tile, each owning its own tiny SoA — the +//! literal "64K concurrent SoAs" end of the operator's question. +//! - **Route by prefix:** an entry's owner is the top bits of its Morton +//! slot (`slot * shards >> 16`) — contiguous tile ranges, the HHTL +//! prefix route. A station's hash always lands with the same owner, so +//! the owners' SoAs are disjoint by construction. +//! - **Mailbox-as-owner:** the serialized message loop is the single +//! writer of the owner's SoA (the same compile-time no-aliasing +//! argument as `KanbanActor`, E-CE64-MB-4). No lock, no shared `&mut`. +//! - **Kanban update = witnessed write:** every applied morsel batch +//! appends one `KanbanMove` (`CognitiveWork → Evaluation`, a legal +//! Rubicon forward edge) to the owner's journal — recorded directly to +//! the WAL trail (the `storage_kanban` journaling precedent: the FSM +//! gates live advancement; post-hoc witness records write the fields). +//! The lane asserts `Σ journal lengths == total casts` — nothing +//! applied unwitnessed, nothing witnessed unapplied. +//! - **Morsels** (64K rows, `#227`'s morsel size): workers pre-reduce +//! each morsel in a private table (identical hot loop to lane F), +//! extract only the dirty slots (≤ ~station-count entries), reset them +//! by undo, and cast the pre-reduced entries to the owning shards. +//! Streaming morsels — not one merge at the end — is what makes the +//! owner state LIVE: at any instant the shard mailboxes hold a bounded- +//! staleness view of the whole aggregation (queryable mid-flight, the +//! substrate claim lane F cannot make). +//! +//! ## What G vs F measures +//! +//! Same address (Morton tiles), same scan, same pre-reduction. The delta +//! is pure architecture: streamed witnessed writes through owner +//! mailboxes (+ the actor-boundary `Arc` corpus copy, as lanes D/E) +//! versus private-merge-at-end. `shards` sweeps the GRANULARITY of +//! ownership — 1 mailbox (every update serializes through one queue), +//! through Morton-tile-range groupings (4/16/256/…), out to 65536 (one +//! mailbox per tile, 64K concurrent SoAs, spawn cost included in the +//! measurement). That curve is the pros-and-cons ledger of "64K +//! concurrent SoA vs Morton tile grouping" under kanban update. + +use crate::lane_f::{fnv1a64, morton_slot, SoaTable, SLOTS}; +use crate::{chunk_bounds, merge_maps, parse_temp_tenths, Stats}; +use lance_graph_contract::collapse_gate::MailboxId; +use lance_graph_contract::kanban::{ExecTarget, KanbanColumn, KanbanMove}; +use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; +use std::collections::BTreeMap; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +/// Rows per worker morsel — `#227`'s morsel size (64K rows ≈ L1/L2-resident +/// pre-reduction working set). +pub const DEFAULT_MORSEL_ROWS: usize = 1 << 16; + +/// One pre-reduced station aggregate extracted from a worker morsel. +pub(crate) struct MorselEntry { + pub(crate) h: u64, + pub(crate) name: Vec, + pub(crate) stats: Stats, +} + +/// Messages a shard owner accepts. +pub(crate) enum ShardMsg { + /// One morsel's pre-reduced entries for THIS shard's tile range — + /// fire-and-forget (`cast`): workers never wait on owners, owners + /// drain their mailbox in arrival order (single-writer serialization). + Apply { entries: Vec }, + /// Drain point: reply with the shard's aggregate map + journal length. + /// Sent only after every worker has joined, so mailbox FIFO guarantees + /// all `Apply`s are applied first. + Finish { + reply: RpcReplyPort<(BTreeMap, usize)>, + }, +} + +/// A mailbox's OWN SoA — one per owner, sized to its tile range (the +/// canon: one ractor mailbox per SoA; there is no shared table and no +/// "one SoA sharded across owners"). Same parallel-array shape as lane +/// F's `SoaTable`, capacity ∝ the owner's tile span (clamped: at least +/// 64 slots so tiny per-tile owners can still absorb hash-collided +/// stations; at most 4096 — ~10× the whole corpus's station count — so +/// a 1-owner topology doesn't allocate 64K slots it can never fill). +/// Local placement = `morton_slot & (capacity-1)` + linear probe with +/// full `(h, name)` verification; the PREFIX routing to this mailbox +/// already happened at the worker. +pub(crate) struct OwnerSoa { + capacity: usize, // power of two + tags: Vec, + names: Vec>, + mins: Vec, + maxs: Vec, + sums: Vec, + counts: Vec, +} + +impl OwnerSoa { + pub(crate) fn for_span(span: usize) -> Self { + let capacity = span.clamp(64, 4096).next_power_of_two(); + Self { + capacity, + tags: vec![0; capacity], + names: vec![Vec::new(); capacity], + mins: vec![i32::MAX; capacity], + maxs: vec![i32::MIN; capacity], + sums: vec![0; capacity], + counts: vec![0; capacity], + } + } + + /// Fold one pre-reduced entry (the commutative BUNDLE merge) into + /// this mailbox's own SoA. Panics if the owner's table is full — + /// a probe-sizing bug, never silent corruption. + fn merge_entry(&mut self, e: &MorselEntry) { + let mut s = morton_slot(e.h) as usize & (self.capacity - 1); + for _ in 0..self.capacity { + if self.counts[s] == 0 { + self.tags[s] = e.h; + self.names[s] = e.name.clone(); + self.mins[s] = e.stats.min; + self.maxs[s] = e.stats.max; + self.sums[s] = e.stats.sum; + self.counts[s] = e.stats.count; + return; + } + if self.tags[s] == e.h && self.names[s] == e.name { + if e.stats.min < self.mins[s] { + self.mins[s] = e.stats.min; + } + if e.stats.max > self.maxs[s] { + self.maxs[s] = e.stats.max; + } + self.sums[s] += e.stats.sum; + self.counts[s] += e.stats.count; + return; + } + s = (s + 1) & (self.capacity - 1); + } + panic!("OwnerSoa full: more stations routed to one mailbox than its capacity"); + } + + /// Sweep occupied slots into the common output map shape. + fn into_map(self) -> BTreeMap { + let mut out = BTreeMap::new(); + for s in 0..self.capacity { + if self.counts[s] > 0 { + let name = String::from_utf8(self.names[s].clone()).expect("station name utf8"); + out.insert( + name, + Stats { + min: self.mins[s], + max: self.maxs[s], + sum: self.sums[s], + count: self.counts[s], + }, + ); + } + } + out + } +} + +/// Owner state: THIS mailbox's own SoA + its kanban WAL. +pub(crate) struct ShardState { + id: MailboxId, + table: OwnerSoa, + journal: Vec, +} + +/// The shard-owner actor — mailbox-as-owner: the actor and its SoA are +/// one thing; `shards` is simply HOW MANY (mailbox, SoA) pairs the tile +/// space is partitioned into, from 1 (one mailbox owns all tiles) to +/// 65536 (one mailbox per tile — the literal "64K concurrent SoAs"). +pub(crate) struct ShardOwner; + +impl Actor for ShardOwner { + type Msg = ShardMsg; + type State = ShardState; + /// `(mailbox id, tile span)` — span sizes this owner's own SoA. + type Arguments = (MailboxId, usize); + + async fn pre_start( + &self, + _myself: ActorRef, + (id, span): Self::Arguments, + ) -> Result { + Ok(ShardState { + id, + table: OwnerSoa::for_span(span), + journal: Vec::new(), + }) + } + + async fn handle( + &self, + _myself: ActorRef, + msg: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match msg { + ShardMsg::Apply { entries } => { + for e in &entries { + state.table.merge_entry(e); + } + // The witnessed write: one legal Rubicon forward move per + // applied morsel batch, recorded to the WAL trail (see + // module doc — journaling, not live FSM advancement). + let pos = state.journal.len() as u32 + 1; + state.journal.push(KanbanMove { + mailbox: state.id, + from: KanbanColumn::CognitiveWork, + to: KanbanColumn::Evaluation, + witness_chain_position: pos, + libet_offset_us: 0, + exec: ExecTarget::Native, + }); + } + ShardMsg::Finish { reply } => { + let journal_len = state.journal.len(); + let table = std::mem::replace(&mut state.table, OwnerSoa::for_span(1)); + let _ = reply.send((table.into_map(), journal_len)); + } + } + Ok(()) + } +} + +/// A station's owning shard: the top bits of its Morton slot — contiguous +/// tile ranges (the prefix route). Computed from the hash's PRE-probe slot +/// so it is deterministic per station regardless of worker-local probing. +#[inline(always)] +fn shard_of(h: u64, shards: usize) -> usize { + (morton_slot(h) as usize * shards) >> 16 +} + +/// Extract the dirty slots of a worker's morsel table into per-shard entry +/// batches, resetting each extracted slot by undo (`#227`'s clear-by-undo: +/// O(dirty), never O(SLOTS)), and cast every non-empty batch to its owner. +/// Grouping is sort-based (O(dirty log dirty) on ≤ ~station-count entries), +/// NOT a dense `Vec` per shard — a 64K-owner topology must not allocate +/// 64K empty vecs per morsel flush. +fn flush_morsel( + table: &mut SoaTable, + dirty: &mut Vec, + shards: usize, + owners: &[ActorRef], + casts: &AtomicUsize, +) { + if dirty.is_empty() { + return; + } + let mut tagged: Vec<(usize, MorselEntry)> = Vec::with_capacity(dirty.len()); + for &s in dirty.iter() { + let h = table.tags[s]; + tagged.push(( + shard_of(h, shards), + MorselEntry { + h, + name: std::mem::take(&mut table.names[s]), + stats: Stats { + min: table.mins[s], + max: table.maxs[s], + sum: table.sums[s], + count: table.counts[s], + }, + }, + )); + // Clear-by-undo: reset exactly the slots this morsel touched. + table.tags[s] = 0; + table.mins[s] = i32::MAX; + table.maxs[s] = i32::MIN; + table.sums[s] = 0; + table.counts[s] = 0; + } + dirty.clear(); + tagged.sort_unstable_by_key(|(shard, _)| *shard); + let mut it = tagged.into_iter().peekable(); + while let Some((shard, first)) = it.next() { + let mut entries = vec![first]; + while it.peek().is_some_and(|(s, _)| *s == shard) { + entries.push(it.next().expect("peeked").1); + } + owners[shard] + .cast(ShardMsg::Apply { entries }) + .expect("cast morsel batch to shard owner"); + casts.fetch_add(1, Ordering::Relaxed); + } +} + +/// One worker's scan: identical per-record hot loop to lane F (same hash, +/// same Morton slot, same probe), pre-reducing into a private table, with +/// a morsel-boundary flush that streams the dirty entries to the owners. +fn worker_scan( + data: &[u8], + start: usize, + end: usize, + morsel_rows: usize, + shards: usize, + owners: &[ActorRef], + casts: &AtomicUsize, +) { + let mut table = SoaTable::new(); + let mut dirty: Vec = Vec::with_capacity(1024); + let mut rows_in_morsel = 0usize; + let mut i = start; + while i < end { + let name_start = i; + while data[i] != b';' { + i += 1; + } + let name = &data[name_start..i]; + i += 1; // skip ';' + let temp_start = i; + while data[i] != b'\n' { + i += 1; + } + let tenths = parse_temp_tenths(&data[temp_start..i]); + i += 1; // skip '\n' + + let h = fnv1a64(name); + let s = table.observe(morton_slot(h), h, name, tenths); + if table.counts[s] == 1 { + dirty.push(s); // first observation of this station this morsel + } + rows_in_morsel += 1; + if rows_in_morsel == morsel_rows { + flush_morsel(&mut table, &mut dirty, shards, owners, casts); + rows_in_morsel = 0; + } + } + flush_morsel(&mut table, &mut dirty, shards, owners, casts); +} + +/// Lane H's worker variant: the SAME scan + morsel pre-reduction as +/// [`worker_scan`], but the morsel flush GROUPS the dirty entries into +/// `groups` buckets by a caller-supplied `group_of(hash) -> group_idx` +/// mapping and hands each non-empty bucket to `sink` instead of casting +/// to owners directly — the orchestration tier owns delivery from there +/// (lazy activation + ahead-firing batching). The mapping is a closure +/// (not the hardwired `slot * groups >> 16`) so lane H can derive the +/// router bucket FROM the owner tile rather than as an independent +/// partition of the slot space — see `lane_h::router_of_owner`. +pub(crate) fn worker_scan_grouped( + data: &[u8], + start: usize, + end: usize, + morsel_rows: usize, + groups: usize, + group_of: impl Fn(u64) -> usize, + mut sink: impl FnMut(usize, Vec), +) { + let groups = groups.max(1); + let mut table = SoaTable::new(); + let mut dirty: Vec = Vec::with_capacity(1024); + let mut flush = |table: &mut SoaTable, dirty: &mut Vec| { + if dirty.is_empty() { + return; + } + let mut per_group: Vec> = (0..groups).map(|_| Vec::new()).collect(); + for &s in dirty.iter() { + let h = table.tags[s]; + let group = group_of(h).min(groups - 1); + per_group[group].push(MorselEntry { + h, + name: std::mem::take(&mut table.names[s]), + stats: Stats { + min: table.mins[s], + max: table.maxs[s], + sum: table.sums[s], + count: table.counts[s], + }, + }); + table.tags[s] = 0; + table.mins[s] = i32::MAX; + table.maxs[s] = i32::MIN; + table.sums[s] = 0; + table.counts[s] = 0; + } + dirty.clear(); + for (group, entries) in per_group.into_iter().enumerate() { + if !entries.is_empty() { + sink(group, entries); + } + } + }; + + let mut rows_in_morsel = 0usize; + let mut i = start; + while i < end { + let name_start = i; + while data[i] != b';' { + i += 1; + } + let name = &data[name_start..i]; + i += 1; // skip ';' + let temp_start = i; + while data[i] != b'\n' { + i += 1; + } + let tenths = parse_temp_tenths(&data[temp_start..i]); + i += 1; // skip '\n' + + let h = fnv1a64(name); + let s = table.observe(morton_slot(h), h, name, tenths); + if table.counts[s] == 1 { + dirty.push(s); + } + rows_in_morsel += 1; + if rows_in_morsel == morsel_rows { + flush(&mut table, &mut dirty); + rows_in_morsel = 0; + } + } + flush(&mut table, &mut dirty); +} + +/// Lane G with an explicit morsel size (tests use a tiny morsel to force +/// multi-morsel flushes + clear-by-undo on small corpora). +pub fn lane_g_kanban_soa_with_morsel( + data: &[u8], + workers: usize, + shards: usize, + morsel_rows: usize, +) -> BTreeMap { + let workers = workers.max(1); + let shards = shards.clamp(1, SLOTS); // 1 mailbox … 1 mailbox per tile + let morsel_rows = morsel_rows.max(1); + let bounds = chunk_bounds(data, workers); + + // Async threads host the shard owners; the scan workers run on the + // blocking pool so a CPU-bound scan cannot starve the owners' mailbox + // processing (real overlap between producing and applying — the + // streamed-write architecture under measurement, not an artifact of + // task scheduling). + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(workers) + .build() + .expect("build tokio runtime for lane G"); + + runtime.block_on(async move { + // Actor-model boundary cost: one upfront corpus copy, as lanes D/E. + let shared: Arc> = Arc::new(data.to_vec()); + let casts = Arc::new(AtomicUsize::new(0)); + + let mut owners = Vec::with_capacity(shards); + let mut owner_handles = Vec::with_capacity(shards); + // One (mailbox, SoA) pair per shard — each owner's OWN SoA is + // sized to its tile span (SLOTS/shards, i.e. the whole tile space + // at shards=1 down to one tile per mailbox at shards=65536). + let span = SLOTS.div_ceil(shards); + for sid in 0..shards { + let (actor, handle) = Actor::spawn(None, ShardOwner, (sid as MailboxId, span)) + .await + .expect("spawn shard owner"); + owners.push(actor); + owner_handles.push(handle); + } + let owners = Arc::new(owners); + + let mut worker_handles = Vec::with_capacity(bounds.len()); + for &(start, end) in &bounds { + let shared = Arc::clone(&shared); + let owners = Arc::clone(&owners); + let casts = Arc::clone(&casts); + worker_handles.push(tokio::task::spawn_blocking(move || { + worker_scan(&shared, start, end, morsel_rows, shards, &owners, &casts); + })); + } + for h in worker_handles { + h.await.expect("lane G worker join"); + } + + // All workers joined ⇒ every Apply is already enqueued; mailbox + // FIFO applies them before Finish is handled. + let mut maps = Vec::with_capacity(shards); + let mut journal_total = 0usize; + for actor in owners.iter() { + let (map, journal_len) = ractor::call!(actor, |reply| ShardMsg::Finish { reply }) + .expect("lane G shard finish rpc"); + maps.push(map); + journal_total += journal_len; + } + assert_eq!( + journal_total, + casts.load(Ordering::Relaxed), + "every applied morsel batch must be witnessed (journal == casts)" + ); + + for actor in owners.iter() { + actor.stop(None); + } + for h in owner_handles { + h.await.expect("shard owner join"); + } + + merge_maps(maps) + }) +} + +/// Lane G — kanban-update write path at the default (64K-row) morsel size. +pub fn lane_g_kanban_soa(data: &[u8], workers: usize, shards: usize) -> BTreeMap { + lane_g_kanban_soa_with_morsel(data, workers, shards, DEFAULT_MORSEL_ROWS) +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Lane G must agree byte-for-byte with lane A — across a TINY morsel + /// size (1000 rows) so the multi-morsel flush + clear-by-undo path and + /// the cross-morsel owner merge are exercised, for both the + /// single-mailbox (shards=1) and tile-sharded (shards=4) topologies. + #[test] + fn lane_g_agrees_with_lane_a_across_morsels_and_shard_counts() { + let dir = std::env::temp_dir(); + let path = dir.join(format!("onebrc_probe_test_g_{}.txt", std::process::id())); + let result = crate::gen::gen(&path, 50_000, 55).expect("gen"); + assert_eq!(result.rows, 50_000); + + let data = std::fs::read(&path).expect("read generated corpus"); + std::fs::remove_file(&path).ok(); + + let a = crate::lane_a_scalar(&data); + let g1 = lane_g_kanban_soa_with_morsel(&data, 3, 1, 1000); + let g4 = lane_g_kanban_soa_with_morsel(&data, 3, 4, 1000); + // Fine-grained ownership: 4096 mailboxes, each owning its own + // 16-tile-span SoA (span < 64 → minimum 64-slot capacity per + // owner) — exercises the per-owner sizing + local probing. + let g4096 = lane_g_kanban_soa_with_morsel(&data, 3, 4096, 1000); + assert_eq!(a, g1, "lane G (1 mailbox) must match lane A"); + assert_eq!(a, g4, "lane G (4 mailboxes) must match lane A"); + assert_eq!(a, g4096, "lane G (4096 mailboxes) must match lane A"); + assert!(!a.is_empty()); + } + + /// Prefix routing is total and deterministic: every possible hash maps + /// to a valid shard, and equal hashes always land in the same shard. + #[test] + fn shard_routing_is_total_and_stable() { + for shards in [1usize, 3, 4, 16] { + for h in [0u64, 1, 0xFFFF, 0xDEAD_BEEF, u64::MAX] { + let s = shard_of(h, shards); + assert!(s < shards, "shard {s} out of range for {shards}"); + assert_eq!(s, shard_of(h, shards), "routing must be deterministic"); + } + } + } +} diff --git a/crates/onebrc-probe/src/lane_h.rs b/crates/onebrc-probe/src/lane_h.rs new file mode 100644 index 00000000..2f29256d --- /dev/null +++ b/crates/onebrc-probe/src/lane_h.rs @@ -0,0 +1,391 @@ +//! Lane H — ORCHESTRATED fine-grained ownership: lazy mailbox +//! activation plus ahead-firing batched delivery over lane G's +//! one-mailbox-per-SoA substrate. +//! +//! The operator's follow-up (2026-07-02): *"I understand the 65536 +//! mailboxes had no Orchestration at all — can you check with +//! rs-graph-llm or lance-graph-planner + kanban update to find the +//! sweet spot."* Correct: lane G's t4a fine end was the FLAT topology — +//! all 64K owners spawned eagerly up front, every morsel's entries cast +//! owner-by-owner (~63K messages), no orchestration layer. Lane H adds +//! the two orchestration mechanisms from the planner / kanban-executor +//! domain (`v3-kanban-executor-engineer`: delegation + the ahead-firing +//! batch writer; `elevation/`: pay for a level only when traffic asks +//! for it): +//! +//! 1. **Lazy activation** — the tile ADDRESS space is 64K, but the +//! corpus OCCUPANCY is ~413 stations. A router tier spawns an owner +//! mailbox only when its tile range first receives traffic, so a +//! nominal 65536-owner topology activates only the ~413 occupied +//! ones. Spawn cost tracks occupancy, never address-space size. The +//! router grain is derived FROM the owner grain (`router_of_owner`, +//! integer division) rather than as an independent partition of the +//! slot space, so each `owner_idx` is spawned in exactly one router — +//! the live-owner count is true occupancy, never inflated by a +//! router-boundary straddle. +//! 2. **Ahead-firing batched delivery** — routers buffer per-owner +//! entries and fire one batched `Apply` cast when an owner's buffer +//! reaches `batch_k` (the ahead-firing batch-writer shape); the +//! drain at the end flushes remainders. Cast fragmentation collapses +//! from ~63K owner-addressed messages to (spawns + a few flushes per +//! ACTIVE owner). +//! +//! Everything below the orchestration layer is lane G verbatim: one +//! mailbox per SoA (`OwnerSoa` sized to the owner's tile span), morsel +//! pre-reduction with clear-by-undo at the workers, `KanbanMove` +//! witness per applied batch, `Σ owner journals == Σ router casts` +//! asserted. +//! +//! ## Topology +//! +//! ```text +//! workers (scan, morsels) routers (R actors, orchestration) owners (LAZY, per occupied range) +//! ┌─────────────────────┐ ┌───────────────────────────────┐ ┌─────────────────────────────┐ +//! │ morsel pre-reduce │ 1 │ buffer entries per owner idx │ K │ OwnerSoa + KanbanMove WAL │ +//! │ group by router │───► │ spawn owner on FIRST entry │───► │ (spawned on demand only — │ +//! │ region (R casts max │cast │ fire Apply at batch_k │cast │ ~occupancy, not 64K) │ +//! │ per morsel) │ │ drain-flush at finish │ │ │ +//! └─────────────────────┘ └───────────────────────────────┘ └─────────────────────────────┘ +//! ``` +//! +//! ## Why NOT rs-graph-llm's graph-flow here +//! +//! graph-flow (the M25 `KanbanSessionStorage` arc) orchestrates at TASK +//! granularity — a persisted session cursor per step. Putting it on the +//! per-morsel hot path would insert a session save per batch, measuring +//! storage latency rather than orchestration structure; its in-container +//! build is also blocked by the pre-existing burn-submodule 403 (W3b). +//! graph-flow belongs at the OUTER loop (the job that runs this lane); +//! the in-loop mechanisms lane H measures are the planner/kanban-executor +//! domain's own (lazy delegation + ahead-firing batch writer), driven +//! against the same contract types. + +use crate::lane_f::{morton_slot, SLOTS}; +use crate::lane_g::{MorselEntry, ShardMsg, ShardOwner}; +use crate::{merge_maps, Stats}; +use lance_graph_contract::collapse_gate::MailboxId; +use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +/// Default ahead-firing threshold: entries buffered per owner before the +/// router fires a batched `Apply`. +pub const DEFAULT_BATCH_K: usize = 64; + +/// A station's owner tile index — the fine, occupancy-matched grain: the +/// `owners_nominal`-quantized Morton prefix. A stable GLOBAL id (the same +/// station always yields the same `owner_idx`, independent of routing), so +/// it doubles as the owner mailbox's [`MailboxId`]. +#[inline] +fn owner_of(h: u64, owners_nominal: usize) -> usize { + (morton_slot(h) as usize * owners_nominal) >> 16 +} + +/// The single router that owns `owner_idx`. The router grain is derived +/// FROM the owner grain by integer division — NOT an independent partition +/// of the Morton slot space — so every `owner_idx` maps to exactly one +/// router. That keeps the lazy activation honest: an owner is spawned in +/// one router only, `MailboxId`s never duplicate, and the live-owner count +/// equals true occupancy. (Fixes the router-boundary straddle CodeRabbit +/// flagged on #635 r3515365083: when `routers_n` and `owners_nominal` +/// partitioned the slot space independently — e.g. 3 routers vs 16 owners — +/// one `owner_idx` range crossed a router boundary and was lazily spawned +/// as a separate `ShardOwner` in two routers, inflating the very count +/// this lane exists to measure.) +#[inline] +fn router_of_owner(owner_idx: usize, owners_nominal: usize, routers_n: usize) -> usize { + // owner_idx ∈ [0, owners_nominal); map monotonically onto [0, routers_n). + (owner_idx * routers_n / owners_nominal).min(routers_n - 1) +} + +/// Messages the router (orchestration) tier accepts. +enum RouteMsg { + /// One worker morsel's pre-reduced entries for THIS router's tile + /// region (already grouped by the worker — at most one cast per + /// morsel per router). + Morsel { entries: Vec }, + /// Workers have joined: flush every remaining buffer to its (lazily + /// spawned) owner, then reply with the owner_idxs this router + /// activated, their live owner refs, and how many `Apply` casts it + /// fired in total. The owner_idxs let the coordinator assert that no + /// `owner_idx` was activated in more than one router. + Drain { + reply: RpcReplyPort<(Vec, Vec>, usize)>, + }, +} + +/// Router state: per-owner entry buffers + the lazily-activated owners +/// of this router's tile region. +struct RouterState { + owners_nominal: usize, + owner_span: usize, + batch_k: usize, + buffers: HashMap>, + live: HashMap>, + casts_fired: usize, +} + +impl RouterState { + /// Ensure the owner mailbox for `owner_idx` is live — LAZY spawn on + /// first traffic (the orchestration mechanism #1: activation tracks + /// occupancy, never address-space size). + async fn owner(&mut self, owner_idx: usize) -> Result, ActorProcessingErr> { + if let Some(actor) = self.live.get(&owner_idx) { + return Ok(actor.clone()); + } + let (actor, _handle) = + Actor::spawn(None, ShardOwner, (owner_idx as MailboxId, self.owner_span)).await?; + // The join handle is detached: owners are collected + stopped by + // the coordinator after Drain hands their refs over. + self.live.insert(owner_idx, actor.clone()); + Ok(actor) + } + + async fn fire(&mut self, owner_idx: usize) -> Result<(), ActorProcessingErr> { + if let Some(entries) = self.buffers.remove(&owner_idx) { + if !entries.is_empty() { + let owner = self.owner(owner_idx).await?; + owner + .cast(ShardMsg::Apply { entries }) + .expect("router fires batched Apply"); + self.casts_fired += 1; + } + } + Ok(()) + } +} + +/// The router actor — the in-loop orchestration tier (delegation cache + +/// ahead-firing batch writer over lazy owners). +struct Router; + +impl Actor for Router { + type Msg = RouteMsg; + type State = RouterState; + /// `(owners_nominal, owner_span, batch_k)`. + type Arguments = (usize, usize, usize); + + async fn pre_start( + &self, + _myself: ActorRef, + (owners_nominal, owner_span, batch_k): Self::Arguments, + ) -> Result { + Ok(RouterState { + owners_nominal, + owner_span, + batch_k, + buffers: HashMap::new(), + live: HashMap::new(), + casts_fired: 0, + }) + } + + async fn handle( + &self, + _myself: ActorRef, + msg: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match msg { + RouteMsg::Morsel { entries } => { + let mut ready: Vec = Vec::new(); + for e in entries { + let owner_idx = owner_of(e.h, state.owners_nominal); + let buf = state.buffers.entry(owner_idx).or_default(); + buf.push(e); + // Ahead-firing: mark for delivery once the buffer + // reaches the batch threshold (mechanism #2). + if buf.len() >= state.batch_k { + ready.push(owner_idx); + } + } + for owner_idx in ready { + state.fire(owner_idx).await?; + } + } + RouteMsg::Drain { reply } => { + let pending: Vec = state.buffers.keys().copied().collect(); + for owner_idx in pending { + state.fire(owner_idx).await?; + } + let owner_idxs: Vec = state.live.keys().copied().collect(); + let owners: Vec> = state.live.values().cloned().collect(); + let _ = reply.send((owner_idxs, owners, state.casts_fired)); + } + } + Ok(()) + } +} + +/// Lane H with explicit knobs (tests use tiny morsels / small `batch_k` +/// to force multi-flush paths). +pub fn lane_h_orchestrated_with( + data: &[u8], + workers: usize, + owners_nominal: usize, + morsel_rows: usize, + batch_k: usize, +) -> BTreeMap { + let workers = workers.max(1); + let owners_nominal = owners_nominal.clamp(1, SLOTS); + let morsel_rows = morsel_rows.max(1); + let batch_k = batch_k.max(1); + let bounds = crate::chunk_bounds(data, workers); + // Router tier sized to the machine, never to the address space — + // the orchestration layer is the COARSE, contention-matched grain; + // the owners are the fine, occupancy-matched grain. + let routers_n = workers; + let owner_span = SLOTS.div_ceil(owners_nominal); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(workers) + .build() + .expect("build tokio runtime for lane H"); + + runtime.block_on(async move { + let shared: Arc> = Arc::new(data.to_vec()); + let worker_casts = Arc::new(AtomicUsize::new(0)); + + let mut routers = Vec::with_capacity(routers_n); + let mut router_handles = Vec::with_capacity(routers_n); + for _ in 0..routers_n { + let (actor, handle) = Actor::spawn(None, Router, (owners_nominal, owner_span, batch_k)) + .await + .expect("spawn router"); + routers.push(actor); + router_handles.push(handle); + } + let routers = Arc::new(routers); + + let mut worker_handles = Vec::with_capacity(bounds.len()); + for &(start, end) in &bounds { + let shared = Arc::clone(&shared); + let routers = Arc::clone(&routers); + let worker_casts = Arc::clone(&worker_casts); + let routers_n = routers.len(); + worker_handles.push(tokio::task::spawn_blocking(move || { + crate::lane_g::worker_scan_grouped( + &shared, + start, + end, + morsel_rows, + routers_n, + // Route each entry to the ONE router that owns its + // owner tile — router grain derived from owner grain, + // never an independent partition — so no owner_idx is + // ever spawned in two routers. + |h| router_of_owner(owner_of(h, owners_nominal), owners_nominal, routers_n), + |router_idx, entries| { + routers[router_idx] + .cast(RouteMsg::Morsel { entries }) + .expect("cast morsel to router"); + worker_casts.fetch_add(1, Ordering::Relaxed); + }, + ); + })); + } + for h in worker_handles { + h.await.expect("lane H worker join"); + } + + // Drain the orchestration tier: remaining buffers flush, lazily + // activated owners are handed to the coordinator. + let mut all_owners: Vec> = Vec::new(); + let mut all_owner_idxs: Vec = Vec::new(); + let mut router_casts_total = 0usize; + for router in routers.iter() { + let (owner_idxs, owners, casts) = + ractor::call!(router, |reply| RouteMsg::Drain { reply }).expect("router drain rpc"); + all_owner_idxs.extend(owner_idxs); + all_owners.extend(owners); + router_casts_total += casts; + } + // Router grain is derived from owner grain, so each owner_idx is + // activated in exactly one router — no MailboxId spawned twice, so + // the live-owner count is true occupancy, never inflated by a + // router-boundary straddle (#635 r3515365083). + let unique_owner_idxs: HashSet = all_owner_idxs.iter().copied().collect(); + assert_eq!( + unique_owner_idxs.len(), + all_owner_idxs.len(), + "each owner_idx must activate in exactly one router (no straddle-duplicated MailboxIds)" + ); + + let mut maps = Vec::with_capacity(all_owners.len()); + let mut journal_total = 0usize; + for owner in &all_owners { + let (map, journal_len) = ractor::call!(owner, |reply| ShardMsg::Finish { reply }) + .expect("lane H owner finish rpc"); + maps.push(map); + journal_total += journal_len; + } + assert_eq!( + journal_total, router_casts_total, + "every fired batch must be witnessed (owner journals == router casts)" + ); + + for owner in &all_owners { + owner.stop(None); + } + for router in routers.iter() { + router.stop(None); + } + for h in router_handles { + h.await.expect("router join"); + } + + merge_maps(maps) + }) +} + +/// Lane H — orchestrated fine-grained ownership at the default morsel +/// size (64K rows) and ahead-firing threshold ([`DEFAULT_BATCH_K`]). +pub fn lane_h_orchestrated( + data: &[u8], + workers: usize, + owners_nominal: usize, +) -> BTreeMap { + lane_h_orchestrated_with( + data, + workers, + owners_nominal, + crate::lane_g::DEFAULT_MORSEL_ROWS, + DEFAULT_BATCH_K, + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Lane H must agree byte-for-byte with lane A across a tiny morsel + /// (multi-flush) + tiny batch_k (forces ahead-firing mid-stream) at + /// both a coarse and the full fine-grained nominal granularity — + /// and the fine-grained run must activate only ~occupancy owners + /// (lazy activation), never the nominal 64K. + /// + /// The coarse run uses 16 nominal owners with 3 routers (workers) — a + /// DELIBERATELY misaligned pair (16 is not a multiple of 3), the exact + /// router-boundary-straddle case CodeRabbit flagged (#635 r3515365083). + /// The in-run `unique_owner_idxs.len() == all_owner_idxs.len()` assert + /// inside `lane_h_orchestrated_with` fails here if any `owner_idx` is + /// activated in more than one router, so this test guards the fix. + #[test] + fn lane_h_agrees_with_lane_a_and_activates_only_occupied_owners() { + let dir = std::env::temp_dir(); + let path = dir.join(format!("onebrc_probe_test_h_{}.txt", std::process::id())); + let result = crate::gen::gen(&path, 50_000, 77).expect("gen"); + assert_eq!(result.rows, 50_000); + + let data = std::fs::read(&path).expect("read generated corpus"); + std::fs::remove_file(&path).ok(); + + let a = crate::lane_a_scalar(&data); + let h_coarse = lane_h_orchestrated_with(&data, 3, 16, 1000, 8); + let h_fine = lane_h_orchestrated_with(&data, 3, SLOTS, 1000, 8); + assert_eq!(a, h_coarse, "lane H (16 nominal owners) must match lane A"); + assert_eq!(a, h_fine, "lane H (65536 nominal owners) must match lane A"); + assert!(!a.is_empty()); + } +} diff --git a/crates/onebrc-probe/src/lane_i.rs b/crates/onebrc-probe/src/lane_i.rs new file mode 100644 index 00000000..84f25283 --- /dev/null +++ b/crates/onebrc-probe/src/lane_i.rs @@ -0,0 +1,693 @@ +//! Lane I — the substrate-native batch pipeline, per the operator's +//! spec (2026-07-02): all 65536 mailboxes spawned UPFRONT, two fixed +//! aligned indices (mailbox index == SoA row index), codebook-minted +//! identity for direct CAM addressing, whole-table double-casts into +//! the mailbox-ownership-guarantee table AND the Lance row-address +//! table, and a flush cache so flushing and reindexing interleave. +//! +//! ## The spec, mechanism by mechanism +//! +//! 1. **All 65536 mailboxes upfront.** [`RowOwner`] actors are standing +//! infrastructure — the ownership REGISTRY — spawned eagerly before +//! any data moves (spawn wall-time measured and reported separately +//! on stderr). Unlike lanes G/H they are NOT a data path: in steady +//! state no record, morsel, or batch is ever addressed to an +//! individual mailbox — that fan-out was t4a's measured 20× +//! anti-pattern. +//! 2. **Two fixed indices, aligned.** Index one: the mailbox index +//! (0..65536, fixed at spawn). Index two: the row index of every +//! 65536-row SoA table (batch tables, the ownership table, the +//! Lance row-address table). They are the SAME space: mailbox `i` +//! owns row `i` in every table by index correspondence — the +//! ownership guarantee is the ownership sink's `row_owner[i] == i` +//! binding plus every batch applied ON BEHALF of the row owners +//! (the write-on-behalf iron rule), not a message path. +//! 3. **Codebook index.** Station identity is MINTED once into a +//! unique row slot ([`Codebook`]: Morton placement + linear probe + +//! full `(h, name)` verification, at mint only). Workers memoize +//! `hash → slot` locally, so the global mint mutex is touched only +//! on first sight of a station (~stations per worker, total). After +//! mint, the hot loop is a direct CAM index — no probe, no name +//! compare, no per-record hash-table walk. +//! 4. **Whole-table double-cast.** Workers accumulate into full +//! 65536-row SoA [`BatchTable`]s (dirty-list tracked). A full table +//! freezes into an `Arc` and is cast ONCE, WHOLE, to both ends — +//! the ownership-guarantee sink and the Lance row-address sink; one +//! allocation travels to both (the double cast), nothing is +//! repacked into per-owner entry lists (lanes G/H's shape). Each +//! sink journals one [`KanbanMove`] per applied batch; the Lance +//! side additionally ticks a per-batch version (the `DatasetVersion` +//! shape) and stamps `row → latest version` in its address table. +//! 5. **Flush cache.** Each worker cycles a pool of batch tables: +//! freeze + double-cast batch `n`, then immediately continue +//! filling batch `n+1` from the pool. A pooled table is reused only +//! once BOTH sinks dropped their `Arc` (refcount back to 1 — flush +//! complete); otherwise the pool grows. Flushing (at the sinks) and +//! reindexing-next (at the worker) therefore INTERLEAVE — the +//! worker never waits for a flush. +//! +//! ## Invariants asserted +//! +//! - `ownership journal == lance journal == total batches` — every +//! batch witnessed on BOTH ends (double-cast completeness). +//! - Output map (rendered from the ownership table + codebook) equals +//! lane A byte-for-byte. + +use crate::lane_f::{fnv1a64, morton_slot, SLOTS}; +use crate::{chunk_bounds, parse_temp_tenths, Stats}; +use lance_graph_contract::collapse_gate::MailboxId; +use lance_graph_contract::kanban::{ExecTarget, KanbanColumn, KanbanMove}; +use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; +use std::collections::{BTreeMap, VecDeque}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Instant; + +/// Rows accumulated per batch table before freeze + double-cast — the +/// "65536-sized CAM-addressed batches" of the spec. +pub const BATCH_ROWS: usize = SLOTS; + +// ─── Codebook: identity minted once, direct CAM addressing after ──────── + +/// The mint registry: station identity → unique row slot in the fixed +/// 0..65536 space. Collisions are resolved HERE, once, at mint (Morton +/// placement + linear probe + full name verification); every consumer +/// afterwards addresses rows directly by the minted slot. +struct Codebook { + tags: Vec, + names: Vec>, + used: Vec, + len: usize, +} + +impl Codebook { + fn new() -> Self { + Self { + tags: vec![0; SLOTS], + names: vec![Vec::new(); SLOTS], + used: vec![false; SLOTS], + len: 0, + } + } + + /// Mint (or find) the unique slot for `(h, name)`. + fn mint(&mut self, h: u64, name: &[u8]) -> u16 { + let mut s = morton_slot(h) as usize; + loop { + if !self.used[s] { + self.used[s] = true; + self.tags[s] = h; + self.names[s] = name.to_vec(); + self.len += 1; + return s as u16; + } + if self.tags[s] == h && self.names[s] == name { + return s as u16; + } + s = (s + 1) & (SLOTS - 1); + } + } +} + +/// Worker-local memo: `hash → slot`, open-addressed on the same 64K +/// space with full name verification, so the global mint mutex is +/// touched only on first local sight of a station. +struct SlotMemo { + tags: Vec, + names: Vec>, + slots: Vec, + used: Vec, +} + +impl SlotMemo { + fn new() -> Self { + Self { + tags: vec![0; SLOTS], + names: vec![Vec::new(); SLOTS], + slots: vec![0; SLOTS], + used: vec![false; SLOTS], + } + } + + #[inline(always)] + fn resolve(&mut self, h: u64, name: &[u8], global: &Mutex) -> u16 { + let mut s = morton_slot(h) as usize; + loop { + if !self.used[s] { + // First local sight: consult the global mint (the only + // lock on the whole hot path; ~stations per worker). + let slot = global.lock().expect("codebook lock").mint(h, name); + self.used[s] = true; + self.tags[s] = h; + self.names[s] = name.to_vec(); + self.slots[s] = slot; + return slot; + } + if self.tags[s] == h && self.names[s] == name { + return self.slots[s]; + } + s = (s + 1) & (SLOTS - 1); + } + } +} + +// ─── BatchTable: the 65536-row CAM-addressed accumulation unit ────────── + +/// One full-address-space SoA batch: 65536 rows, direct-indexed by the +/// codebook slot (no probe in the hot loop), dirty-list tracked so the +/// sinks' merges are O(dirty), and clear-by-undo recyclable through the +/// flush cache. +pub(crate) struct BatchTable { + mins: Vec, + maxs: Vec, + sums: Vec, + counts: Vec, + dirty: Vec, +} + +impl BatchTable { + fn new() -> Self { + Self { + mins: vec![i32::MAX; SLOTS], + maxs: vec![i32::MIN; SLOTS], + sums: vec![0; SLOTS], + counts: vec![0; SLOTS], + dirty: Vec::with_capacity(1024), + } + } + + /// Direct CAM write — the codebook guarantees slot uniqueness, so + /// this is a pure indexed fold: no probe, no compare. + #[inline(always)] + fn observe(&mut self, slot: u16, tenths: i32) { + let s = slot as usize; + if self.counts[s] == 0 { + self.dirty.push(slot); + } + if tenths < self.mins[s] { + self.mins[s] = tenths; + } + if tenths > self.maxs[s] { + self.maxs[s] = tenths; + } + self.sums[s] += tenths as i64; + self.counts[s] += 1; + } + + /// Clear-by-undo: reset exactly the dirty rows (O(dirty), never + /// O(SLOTS)) — the recycle step of the flush cache. + fn reset(&mut self) { + for &slot in &self.dirty { + let s = slot as usize; + self.mins[s] = i32::MAX; + self.maxs[s] = i32::MIN; + self.sums[s] = 0; + self.counts[s] = 0; + } + self.dirty.clear(); + } +} + +// ─── RowOwner: the 65536 standing mailboxes (ownership registry) ──────── + +/// A standing row-owner mailbox. All 65536 are spawned upfront; mailbox +/// `i` owns row `i` of every 65536-row table by index correspondence. +/// Deliberately message-free in steady state: the ownership guarantee +/// is structural (aligned fixed indices + the sinks applying writes on +/// behalf), so the data path never fans out to 64K actors. +pub(crate) struct RowOwner; + +impl Actor for RowOwner { + type Msg = (); + type State = (); + type Arguments = (); + + async fn pre_start( + &self, + _myself: ActorRef, + _args: Self::Arguments, + ) -> Result { + Ok(()) + } + + async fn handle( + &self, + _myself: ActorRef, + _msg: Self::Msg, + _state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + Ok(()) + } +} + +// ─── Ownership sink: the mailbox-SoA ownership-guarantee table ────────── + +/// Messages the ownership sink accepts. +enum OwnershipMsg { + /// A frozen whole batch (the `Arc` is SHARED with the Lance sink — + /// one allocation, double-cast). + Batch { table: Arc }, + /// Drain: render the canonical table through the codebook into the + /// common output map shape; reply `(map, journal_len)`. + Finish { + reply: RpcReplyPort<(BTreeMap, usize)>, + }, +} + +/// Ownership sink state: the canonical 65536-row SoA whose row `i` is +/// owned by mailbox `i` (`row_owner[i] == i as MailboxId` — the +/// guarantee table itself), every batch applied on behalf of the row +/// owners and witnessed with one [`KanbanMove`]. +struct OwnershipState { + row_owner: Vec, + mins: Vec, + maxs: Vec, + sums: Vec, + counts: Vec, + journal: Vec, + codebook: Arc>, +} + +struct OwnershipSink; + +impl Actor for OwnershipSink { + type Msg = OwnershipMsg; + type State = OwnershipState; + type Arguments = Arc>; + + async fn pre_start( + &self, + _myself: ActorRef, + codebook: Self::Arguments, + ) -> Result { + Ok(OwnershipState { + // The two fixed indices, aligned: row i is owned by mailbox i. + row_owner: (0..SLOTS).map(|i| i as MailboxId).collect(), + mins: vec![i32::MAX; SLOTS], + maxs: vec![i32::MIN; SLOTS], + sums: vec![0; SLOTS], + counts: vec![0; SLOTS], + journal: Vec::new(), + codebook, + }) + } + + async fn handle( + &self, + _myself: ActorRef, + msg: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match msg { + OwnershipMsg::Batch { table } => { + // Merge O(dirty): fold each dirty row of the frozen + // batch into the canonical row — applied ON BEHALF of + // `row_owner[s]` (the write-on-behalf discipline; the + // debug_assert pins the aligned-indices guarantee). + for &slot in &table.dirty { + let s = slot as usize; + debug_assert_eq!(state.row_owner[s], s as MailboxId); + if table.mins[s] < state.mins[s] { + state.mins[s] = table.mins[s]; + } + if table.maxs[s] > state.maxs[s] { + state.maxs[s] = table.maxs[s]; + } + state.sums[s] += table.sums[s]; + state.counts[s] += table.counts[s]; + } + let pos = state.journal.len() as u32 + 1; + state.journal.push(KanbanMove { + mailbox: 0, // the sink writes on behalf; witness position orders batches + from: KanbanColumn::CognitiveWork, + to: KanbanColumn::Evaluation, + witness_chain_position: pos, + libet_offset_us: 0, + exec: ExecTarget::Native, + }); + } + OwnershipMsg::Finish { reply } => { + let cb = state.codebook.lock().expect("codebook lock"); + let mut out = BTreeMap::new(); + for s in 0..SLOTS { + if state.counts[s] > 0 { + let name = + String::from_utf8(cb.names[s].clone()).expect("station name utf8"); + out.insert( + name, + Stats { + min: state.mins[s], + max: state.maxs[s], + sum: state.sums[s], + count: state.counts[s], + }, + ); + } + } + let _ = reply.send((out, state.journal.len())); + } + } + Ok(()) + } +} + +// ─── Lance sink: the row-address table (persistence half) ─────────────── + +/// Messages the Lance-side sink accepts. +enum LanceMsg { + Batch { + table: Arc, + }, + /// Drain: reply `(rows_addressed_total, versions_ticked, journal_len)`. + Finish { + reply: RpcReplyPort<(usize, u32, usize)>, + }, +} + +/// Lance row-address table state: `slot → latest batch version` (the +/// row address the columnar writer would consume), one version tick +/// per applied batch (the `DatasetVersion` shape), same per-batch +/// witness journal as the ownership side. +struct LanceState { + latest_version: Vec, + rows_addressed: usize, + version: u32, + journal: Vec, +} + +struct LanceSink; + +impl Actor for LanceSink { + type Msg = LanceMsg; + type State = LanceState; + type Arguments = (); + + async fn pre_start( + &self, + _myself: ActorRef, + _args: Self::Arguments, + ) -> Result { + Ok(LanceState { + latest_version: vec![0; SLOTS], + rows_addressed: 0, + version: 0, + journal: Vec::new(), + }) + } + + async fn handle( + &self, + _myself: ActorRef, + msg: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match msg { + LanceMsg::Batch { table } => { + // One version tick per batch; every dirty row's address + // is stamped with the version that carries it. + state.version += 1; + for &slot in &table.dirty { + state.latest_version[slot as usize] = state.version; + state.rows_addressed += 1; + } + let pos = state.journal.len() as u32 + 1; + state.journal.push(KanbanMove { + mailbox: 0, + from: KanbanColumn::CognitiveWork, + to: KanbanColumn::Evaluation, + witness_chain_position: pos, + libet_offset_us: 0, + exec: ExecTarget::Native, + }); + } + LanceMsg::Finish { reply } => { + let _ = reply.send((state.rows_addressed, state.version, state.journal.len())); + } + } + Ok(()) + } +} + +// ─── The worker: fill → freeze → double-cast → recycle (flush cache) ──── + +/// The flush cache: previously double-cast batches, recycled once BOTH +/// sinks have dropped their `Arc` (refcount 1). If the head is still in +/// flight, a fresh table is allocated instead — the pool grows to +/// whatever depth the flush/refill interleave needs. +struct FlushCache { + pool: VecDeque>, + peak: usize, + allocated: usize, +} + +impl FlushCache { + fn new() -> Self { + Self { + pool: VecDeque::new(), + peak: 0, + allocated: 0, + } + } + + fn next_table(&mut self) -> BatchTable { + if let Some(front) = self.pool.pop_front() { + match Arc::try_unwrap(front) { + Ok(mut table) => { + // Both sinks are done with it: recycle by undo. + table.reset(); + return table; + } + Err(still_in_flight) => { + // Flush not complete yet — keep it queued, grow. + self.pool.push_front(still_in_flight); + } + } + } + self.allocated += 1; + self.peak = self.peak.max(self.allocated); + BatchTable::new() + } + + fn park(&mut self, table: Arc) { + self.pool.push_back(table); + } +} + +/// One worker: scan its chunk, resolve each record's slot through the +/// memoized codebook (direct CAM index afterwards), fill the current +/// batch table; at `batch_rows` records freeze the table into an `Arc`, +/// DOUBLE-CAST it (ownership + lance) and continue on the next table +/// from the flush cache. +#[allow(clippy::too_many_arguments)] +fn worker_fill( + data: &[u8], + start: usize, + end: usize, + batch_rows: usize, + codebook: &Mutex, + ownership: &ActorRef, + lance: &ActorRef, + batches: &AtomicUsize, +) -> usize { + let mut memo = SlotMemo::new(); + let mut cache = FlushCache::new(); + let mut table = cache.next_table(); + let mut rows_in_batch = 0usize; + + let double_cast = |cache: &mut FlushCache, full: BatchTable| { + if full.dirty.is_empty() { + // Nothing accumulated (empty tail) — nothing to cast. + cache.park(Arc::new(full)); + return; + } + let frozen = Arc::new(full); + ownership + .cast(OwnershipMsg::Batch { + table: Arc::clone(&frozen), + }) + .expect("double-cast: ownership side"); + lance + .cast(LanceMsg::Batch { + table: Arc::clone(&frozen), + }) + .expect("double-cast: lance side"); + batches.fetch_add(1, Ordering::Relaxed); + cache.park(frozen); + }; + + let mut i = start; + while i < end { + let name_start = i; + while data[i] != b';' { + i += 1; + } + let name = &data[name_start..i]; + i += 1; // skip ';' + let temp_start = i; + while data[i] != b'\n' { + i += 1; + } + let tenths = parse_temp_tenths(&data[temp_start..i]); + i += 1; // skip '\n' + + let h = fnv1a64(name); + let slot = memo.resolve(h, name, codebook); + table.observe(slot, tenths); + rows_in_batch += 1; + if rows_in_batch == batch_rows { + let full = std::mem::replace(&mut table, cache.next_table()); + double_cast(&mut cache, full); + rows_in_batch = 0; + } + } + double_cast(&mut cache, table); + cache.peak +} + +/// Lane I with an explicit batch size (tests use a small batch to force +/// multiple double-casts + flush-cache recycling on small corpora). +pub fn lane_i_batch_pipeline_with( + data: &[u8], + workers: usize, + batch_rows: usize, +) -> BTreeMap { + let workers = workers.max(1); + let batch_rows = batch_rows.max(1); + let bounds = chunk_bounds(data, workers); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(workers) + .build() + .expect("build tokio runtime for lane I"); + + runtime.block_on(async move { + let shared: Arc> = Arc::new(data.to_vec()); + let codebook = Arc::new(Mutex::new(Codebook::new())); + let batches = Arc::new(AtomicUsize::new(0)); + + // 1. ALL 65536 mailboxes upfront — the standing ownership + // registry. Spawn wall-time reported separately (stderr): + // it is infrastructure setup, not steady-state data path. + let spawn_t0 = Instant::now(); + let mut owners = Vec::with_capacity(SLOTS); + for _ in 0..SLOTS { + let (actor, _handle) = Actor::spawn(None, RowOwner, ()) + .await + .expect("spawn row-owner mailbox"); + owners.push(actor); + } + let spawn_ms = spawn_t0.elapsed().as_secs_f64() * 1000.0; + + // 2. The two ends of the double cast. + let (ownership, ownership_handle) = + Actor::spawn(None, OwnershipSink, Arc::clone(&codebook)) + .await + .expect("spawn ownership sink"); + let (lance, lance_handle) = Actor::spawn(None, LanceSink, ()) + .await + .expect("spawn lance sink"); + + // 3. Workers: fill → freeze → double-cast → recycle. + let mut worker_handles = Vec::with_capacity(bounds.len()); + for &(start, end) in &bounds { + let shared = Arc::clone(&shared); + let codebook = Arc::clone(&codebook); + let ownership = ownership.clone(); + let lance = lance.clone(); + let batches = Arc::clone(&batches); + worker_handles.push(tokio::task::spawn_blocking(move || { + worker_fill( + &shared, start, end, batch_rows, &codebook, &ownership, &lance, &batches, + ) + })); + } + let mut flush_cache_peak = 0usize; + for h in worker_handles { + flush_cache_peak = flush_cache_peak.max(h.await.expect("lane I worker join")); + } + + // 4. Drain both ends; assert double-cast completeness. + let (map, ownership_journal) = + ractor::call!(ownership, |reply| OwnershipMsg::Finish { reply }) + .expect("ownership finish rpc"); + let (rows_addressed, versions, lance_journal) = + ractor::call!(lance, |reply| LanceMsg::Finish { reply }).expect("lance finish rpc"); + + let batches_total = batches.load(Ordering::Relaxed); + assert_eq!( + ownership_journal, batches_total, + "every batch must be witnessed on the ownership end" + ); + assert_eq!( + lance_journal, batches_total, + "every batch must be witnessed on the lance end" + ); + assert_eq!( + versions as usize, batches_total, + "one DatasetVersion tick per batch" + ); + + ownership.stop(None); + lance.stop(None); + ownership_handle.await.expect("ownership sink join"); + lance_handle.await.expect("lance sink join"); + let stop_t0 = Instant::now(); + for owner in &owners { + owner.stop(None); + } + let stop_ms = stop_t0.elapsed().as_secs_f64() * 1000.0; + + eprintln!( + "lane_i: mailboxes={SLOTS} mailbox_spawn_ms={spawn_ms:.1} mailbox_stop_ms={stop_ms:.1} \ + batches={batches_total} versions={versions} rows_addressed={rows_addressed} \ + flush_cache_peak_tables_per_worker={flush_cache_peak} codebook_len={}", + codebook.lock().expect("codebook lock").len + ); + + map + }) +} + +/// Lane I — the substrate batch pipeline at the spec'd batch size +/// (65536-row CAM-addressed batches). +pub fn lane_i_batch_pipeline(data: &[u8], workers: usize) -> BTreeMap { + lane_i_batch_pipeline_with(data, workers, BATCH_ROWS) +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Lane I must agree byte-for-byte with lane A — with a SMALL batch + /// size (1000 rows) so many batches double-cast and the flush cache + /// actually recycles tables mid-stream (Arc refcount path), across + /// an odd worker count. + #[test] + fn lane_i_agrees_with_lane_a_with_recycled_flush_cache() { + let dir = std::env::temp_dir(); + let path = dir.join(format!("onebrc_probe_test_i_{}.txt", std::process::id())); + let result = crate::gen::gen(&path, 50_000, 91).expect("gen"); + assert_eq!(result.rows, 50_000); + + let data = std::fs::read(&path).expect("read generated corpus"); + std::fs::remove_file(&path).ok(); + + let a = crate::lane_a_scalar(&data); + let i = lane_i_batch_pipeline_with(&data, 3, 1000); + assert_eq!(a, i, "lane I must produce identical aggregates to lane A"); + assert!(!a.is_empty()); + } + + /// The codebook mints stable, unique slots: same identity always + /// resolves to the same slot; distinct identities never share one. + #[test] + fn codebook_mints_unique_stable_slots() { + let mut cb = Codebook::new(); + let a1 = cb.mint(fnv1a64(b"alpha"), b"alpha"); + let b1 = cb.mint(fnv1a64(b"beta"), b"beta"); + let a2 = cb.mint(fnv1a64(b"alpha"), b"alpha"); + assert_eq!(a1, a2, "mint must be idempotent per identity"); + assert_ne!(a1, b1, "distinct identities get distinct slots"); + assert_eq!(cb.len, 2); + } +} diff --git a/crates/onebrc-probe/src/lane_j.rs b/crates/onebrc-probe/src/lane_j.rs new file mode 100644 index 00000000..da4e418e --- /dev/null +++ b/crates/onebrc-probe/src/lane_j.rs @@ -0,0 +1,625 @@ +//! Lane J — the parameterized batch pipeline: lane I's shape with the +//! three knobs the operator's follow-up questions name (2026-07-02): +//! +//! > *"but with Orchestrator it was 39.4? / is the batch writer with +//! > different cache? / does it need 8, or 64 lanes? / or is +//! > Orchestration with 400 the sweet spot / or should we assign 8x8 or +//! > 64x64 gridlake soa / what if we match the soa into a grid +//! > 64x64 = 4096 xBF16 = 16kb?"* +//! +//! One lane, three knobs, so every question is a measured cell of the +//! same matrix rather than a new lane: +//! +//! 1. **`grid`** — the SoA address-space size: 65536 (lane I's 256×256) +//! or **4096 (the 64×64 gridlake tile)**. The gridlake batch table +//! is 4096 cells × (i32 min + i32 max + i64 sum + u32 count) = 80 KB +//! integer-exact — the cache-matched unit (vs lane I's 1.25 MB +//! L2-busting table + a 64K-slot memo per worker). The literal +//! "4096 × BF16 = 16 KB" plane pair is ndarray #227's PROVEN tier +//! (`bf16_tile_gemm` VDPBF16PS ladder; its `onebrc_cascade_probe` +//! measured the 64×64 Z-order grid at ~448 Mrows/s single-thread, +//! bit-exact via the hi/lo split) — this probe keeps integer tenths +//! for exactness-without-tile-GEMM and cites that example as the +//! BF16 continuation. +//! 2. **`sink_lanes`** — 1 / 8 / 64 ownership+lance lane PAIRS, each +//! owning a contiguous row-range slice of the grid; every frozen +//! batch `Arc` is cast to ALL lanes (messages = batches × 2·lanes, +//! still ∝ batches); each lane merges only its range's dirty rows. +//! Answers "does the batch writer need 8, or 64 lanes?". +//! 3. **`registry`** — spawn the full upfront `RowOwner` mailbox +//! registry (one per grid cell) or skip it. The pipeline is +//! byte-identical either way, so `registry off − on` ISOLATES the +//! residency cost that t6 could only flag as CONJECTURE. +//! +//! Everything else is lane I verbatim: codebook-minted direct CAM +//! addressing (mint once, memoized, no probe in the hot loop), +//! whole-table `Arc` double-casts, refcount-gated flush cache +//! (flush/refill interleave), per-batch witness on every sink lane +//! (`Σ ownership journals == Σ lance journals == batches × lanes` +//! asserted), one `DatasetVersion`-shaped tick per batch per lance +//! lane. + +use crate::lane_f::{fnv1a64, morton_slot}; +use crate::lane_i::RowOwner; +use crate::{chunk_bounds, merge_maps, parse_temp_tenths, Stats}; +use lance_graph_contract::kanban::{ExecTarget, KanbanColumn, KanbanMove}; +use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; +use std::collections::{BTreeMap, VecDeque}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Instant; + +// ─── Grid-sized codebook + memo (runtime capacity, else lane I's) ─────── + +/// Mint registry over a runtime-sized grid (power of two): identity → +/// unique cell, collisions resolved once at mint. +struct GridCodebook { + mask: usize, + tags: Vec, + names: Vec>, + used: Vec, + len: usize, +} + +impl GridCodebook { + fn new(grid: usize) -> Self { + assert!(grid.is_power_of_two(), "grid must be a power of two"); + Self { + mask: grid - 1, + tags: vec![0; grid], + names: vec![Vec::new(); grid], + used: vec![false; grid], + len: 0, + } + } + + fn mint(&mut self, h: u64, name: &[u8]) -> u16 { + let mut s = morton_slot(h) as usize & self.mask; + loop { + if !self.used[s] { + self.used[s] = true; + self.tags[s] = h; + self.names[s] = name.to_vec(); + self.len += 1; + return s as u16; + } + if self.tags[s] == h && self.names[s] == name { + return s as u16; + } + s = (s + 1) & self.mask; + } + } +} + +/// Worker-local `hash → cell` memo over the same grid. +struct GridMemo { + mask: usize, + tags: Vec, + names: Vec>, + slots: Vec, + used: Vec, +} + +impl GridMemo { + fn new(grid: usize) -> Self { + Self { + mask: grid - 1, + tags: vec![0; grid], + names: vec![Vec::new(); grid], + slots: vec![0; grid], + used: vec![false; grid], + } + } + + #[inline(always)] + fn resolve(&mut self, h: u64, name: &[u8], global: &Mutex) -> u16 { + let mut s = morton_slot(h) as usize & self.mask; + loop { + if !self.used[s] { + let slot = global.lock().expect("codebook lock").mint(h, name); + self.used[s] = true; + self.tags[s] = h; + self.names[s] = name.to_vec(); + self.slots[s] = slot; + return slot; + } + if self.tags[s] == h && self.names[s] == name { + return self.slots[s]; + } + s = (s + 1) & self.mask; + } + } +} + +// ─── Grid batch table (the gridlake SoA unit at grid=4096: ~80 KB) ────── + +pub(crate) struct GridBatch { + mins: Vec, + maxs: Vec, + sums: Vec, + counts: Vec, + dirty: Vec, +} + +impl GridBatch { + fn new(grid: usize) -> Self { + Self { + mins: vec![i32::MAX; grid], + maxs: vec![i32::MIN; grid], + sums: vec![0; grid], + counts: vec![0; grid], + dirty: Vec::with_capacity(1024), + } + } + + #[inline(always)] + fn observe(&mut self, slot: u16, tenths: i32) { + let s = slot as usize; + if self.counts[s] == 0 { + self.dirty.push(slot); + } + if tenths < self.mins[s] { + self.mins[s] = tenths; + } + if tenths > self.maxs[s] { + self.maxs[s] = tenths; + } + self.sums[s] += tenths as i64; + self.counts[s] += 1; + } + + fn reset(&mut self) { + for &slot in &self.dirty { + let s = slot as usize; + self.mins[s] = i32::MAX; + self.maxs[s] = i32::MIN; + self.sums[s] = 0; + self.counts[s] = 0; + } + self.dirty.clear(); + } +} + +// ─── Laned sinks: each lane owns a contiguous row-range slice ─────────── + +enum LaneMsg { + Batch { + table: Arc, + }, + /// Ownership lanes reply `(partial map, journal_len)`; lance lanes + /// reply an empty map + journal_len (address table summarized via + /// journal/version equality asserts). + Finish { + reply: RpcReplyPort<(BTreeMap, usize)>, + }, +} + +/// One ownership-side lane: the canonical SoA slice for rows +/// `[lo, hi)`, merged O(dirty-in-range) per batch, witnessed per batch. +struct OwnershipLaneState { + lo: usize, + hi: usize, + mins: Vec, + maxs: Vec, + sums: Vec, + counts: Vec, + journal: Vec, + codebook: Arc>, +} + +struct OwnershipLane; + +impl Actor for OwnershipLane { + type Msg = LaneMsg; + type State = OwnershipLaneState; + /// `(lo, hi, grid, codebook)`. + type Arguments = (usize, usize, usize, Arc>); + + async fn pre_start( + &self, + _myself: ActorRef, + (lo, hi, grid, codebook): Self::Arguments, + ) -> Result { + let _ = grid; + Ok(OwnershipLaneState { + lo, + hi, + mins: vec![i32::MAX; hi - lo], + maxs: vec![i32::MIN; hi - lo], + sums: vec![0; hi - lo], + counts: vec![0; hi - lo], + journal: Vec::new(), + codebook, + }) + } + + async fn handle( + &self, + _myself: ActorRef, + msg: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match msg { + LaneMsg::Batch { table } => { + for &slot in &table.dirty { + let s = slot as usize; + if s < state.lo || s >= state.hi { + continue; // another lane's row range + } + let l = s - state.lo; + if table.mins[s] < state.mins[l] { + state.mins[l] = table.mins[s]; + } + if table.maxs[s] > state.maxs[l] { + state.maxs[l] = table.maxs[s]; + } + state.sums[l] += table.sums[s]; + state.counts[l] += table.counts[s]; + } + let pos = state.journal.len() as u32 + 1; + state.journal.push(KanbanMove { + mailbox: state.lo as u32, + from: KanbanColumn::CognitiveWork, + to: KanbanColumn::Evaluation, + witness_chain_position: pos, + libet_offset_us: 0, + exec: ExecTarget::Native, + }); + } + LaneMsg::Finish { reply } => { + let cb = state.codebook.lock().expect("codebook lock"); + let mut out = BTreeMap::new(); + for l in 0..(state.hi - state.lo) { + if state.counts[l] > 0 { + let name = String::from_utf8(cb.names[state.lo + l].clone()) + .expect("station name utf8"); + out.insert( + name, + Stats { + min: state.mins[l], + max: state.maxs[l], + sum: state.sums[l], + count: state.counts[l], + }, + ); + } + } + let _ = reply.send((out, state.journal.len())); + } + } + Ok(()) + } +} + +/// One lance-side lane: `row → latest version` for its range + one +/// version tick per batch. +struct LanceLaneState { + lo: usize, + hi: usize, + latest_version: Vec, + version: u32, + journal: Vec, +} + +struct LanceLane; + +impl Actor for LanceLane { + type Msg = LaneMsg; + type State = LanceLaneState; + /// `(lo, hi)`. + type Arguments = (usize, usize); + + async fn pre_start( + &self, + _myself: ActorRef, + (lo, hi): Self::Arguments, + ) -> Result { + Ok(LanceLaneState { + lo, + hi, + latest_version: vec![0; hi - lo], + version: 0, + journal: Vec::new(), + }) + } + + async fn handle( + &self, + _myself: ActorRef, + msg: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match msg { + LaneMsg::Batch { table } => { + state.version += 1; + for &slot in &table.dirty { + let s = slot as usize; + if s >= state.lo && s < state.hi { + state.latest_version[s - state.lo] = state.version; + } + } + let pos = state.journal.len() as u32 + 1; + state.journal.push(KanbanMove { + mailbox: state.lo as u32, + from: KanbanColumn::CognitiveWork, + to: KanbanColumn::Evaluation, + witness_chain_position: pos, + libet_offset_us: 0, + exec: ExecTarget::Native, + }); + } + LaneMsg::Finish { reply } => { + let _ = reply.send((BTreeMap::new(), state.journal.len())); + } + } + Ok(()) + } +} + +// ─── Flush cache (identical mechanism to lane I, grid-sized tables) ───── + +struct GridFlushCache { + grid: usize, + pool: VecDeque>, + peak: usize, + allocated: usize, +} + +impl GridFlushCache { + fn new(grid: usize) -> Self { + Self { + grid, + pool: VecDeque::new(), + peak: 0, + allocated: 0, + } + } + + fn next_table(&mut self) -> GridBatch { + if let Some(front) = self.pool.pop_front() { + match Arc::try_unwrap(front) { + Ok(mut table) => { + table.reset(); + return table; + } + Err(still_in_flight) => { + self.pool.push_front(still_in_flight); + } + } + } + self.allocated += 1; + self.peak = self.peak.max(self.allocated); + GridBatch::new(self.grid) + } + + fn park(&mut self, table: Arc) { + self.pool.push_back(table); + } +} + +// ─── The lane ──────────────────────────────────────────────────────────── + +/// Lane J: the parameterized batch pipeline. `grid` = SoA cell count +/// (4096 = the 64×64 gridlake tile; 65536 = lane I's full space); +/// `sink_lanes` = ownership+lance lane pairs; `registry` = spawn the +/// full upfront per-cell mailbox registry or skip it (isolates the +/// residency cost); `batch_rows` = rows per frozen batch. +pub fn lane_j_grid_pipeline_with( + data: &[u8], + workers: usize, + grid: usize, + sink_lanes: usize, + registry: bool, + batch_rows: usize, +) -> BTreeMap { + let workers = workers.max(1); + let grid = grid.clamp(64, 1 << 16).next_power_of_two(); + let sink_lanes = sink_lanes.clamp(1, grid); + let batch_rows = batch_rows.max(1); + let bounds = chunk_bounds(data, workers); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(workers) + .build() + .expect("build tokio runtime for lane J"); + + runtime.block_on(async move { + let shared: Arc> = Arc::new(data.to_vec()); + let codebook = Arc::new(Mutex::new(GridCodebook::new(grid))); + let batches = Arc::new(AtomicUsize::new(0)); + + // Knob 3: the standing per-cell mailbox registry, or none. + let spawn_t0 = Instant::now(); + let mut owners = Vec::new(); + if registry { + owners.reserve(grid); + for _ in 0..grid { + let (actor, _handle) = Actor::spawn(None, RowOwner, ()) + .await + .expect("spawn row-owner mailbox"); + owners.push(actor); + } + } + let spawn_ms = spawn_t0.elapsed().as_secs_f64() * 1000.0; + + // Knob 2: laned sinks — each pair owns rows [lo, hi). + let span = grid.div_ceil(sink_lanes); + let mut own_lanes = Vec::with_capacity(sink_lanes); + let mut lance_lanes = Vec::with_capacity(sink_lanes); + let mut lane_handles = Vec::with_capacity(sink_lanes * 2); + for l in 0..sink_lanes { + let lo = l * span; + let hi = ((l + 1) * span).min(grid); + let (o, oh) = Actor::spawn(None, OwnershipLane, (lo, hi, grid, Arc::clone(&codebook))) + .await + .expect("spawn ownership lane"); + let (z, zh) = Actor::spawn(None, LanceLane, (lo, hi)) + .await + .expect("spawn lance lane"); + own_lanes.push(o); + lance_lanes.push(z); + lane_handles.push(oh); + lane_handles.push(zh); + } + let own_lanes = Arc::new(own_lanes); + let lance_lanes = Arc::new(lance_lanes); + + let mut worker_handles = Vec::with_capacity(bounds.len()); + for &(start, end) in &bounds { + let shared = Arc::clone(&shared); + let codebook = Arc::clone(&codebook); + let own_lanes = Arc::clone(&own_lanes); + let lance_lanes = Arc::clone(&lance_lanes); + let batches = Arc::clone(&batches); + worker_handles.push(tokio::task::spawn_blocking(move || { + let mut memo = GridMemo::new(grid); + let mut cache = GridFlushCache::new(grid); + let mut table = cache.next_table(); + let mut rows_in_batch = 0usize; + + let double_cast = |cache: &mut GridFlushCache, full: GridBatch| { + if full.dirty.is_empty() { + cache.park(Arc::new(full)); + return; + } + let frozen = Arc::new(full); + for lane in own_lanes.iter() { + lane.cast(LaneMsg::Batch { + table: Arc::clone(&frozen), + }) + .expect("cast to ownership lane"); + } + for lane in lance_lanes.iter() { + lane.cast(LaneMsg::Batch { + table: Arc::clone(&frozen), + }) + .expect("cast to lance lane"); + } + batches.fetch_add(1, Ordering::Relaxed); + cache.park(frozen); + }; + + let data = &shared[..]; + let mut i = start; + while i < end { + let name_start = i; + while data[i] != b';' { + i += 1; + } + let name = &data[name_start..i]; + i += 1; + let temp_start = i; + while data[i] != b'\n' { + i += 1; + } + let tenths = parse_temp_tenths(&data[temp_start..i]); + i += 1; + + let h = fnv1a64(name); + let slot = memo.resolve(h, name, &codebook); + table.observe(slot, tenths); + rows_in_batch += 1; + if rows_in_batch == batch_rows { + let full = std::mem::replace(&mut table, cache.next_table()); + double_cast(&mut cache, full); + rows_in_batch = 0; + } + } + double_cast(&mut cache, table); + cache.peak + })); + } + let mut flush_cache_peak = 0usize; + for h in worker_handles { + flush_cache_peak = flush_cache_peak.max(h.await.expect("lane J worker join")); + } + + let batches_total = batches.load(Ordering::Relaxed); + let mut maps = Vec::with_capacity(sink_lanes); + let mut own_journal = 0usize; + for lane in own_lanes.iter() { + let (map, j) = ractor::call!(lane, |reply| LaneMsg::Finish { reply }) + .expect("ownership lane finish"); + maps.push(map); + own_journal += j; + } + let mut lance_journal = 0usize; + for lane in lance_lanes.iter() { + let (_, j) = + ractor::call!(lane, |reply| LaneMsg::Finish { reply }).expect("lance lane finish"); + lance_journal += j; + } + assert_eq!( + own_journal, + batches_total * sink_lanes, + "every batch witnessed on every ownership lane" + ); + assert_eq!( + lance_journal, + batches_total * sink_lanes, + "every batch witnessed on every lance lane" + ); + + eprintln!( + "lane_j: grid={grid} sink_lanes={sink_lanes} registry={registry} \ + mailbox_spawn_ms={spawn_ms:.1} batches={batches_total} \ + flush_cache_peak={flush_cache_peak} codebook_len={}", + codebook.lock().expect("codebook lock").len + ); + + for lane in own_lanes.iter() { + lane.stop(None); + } + for lane in lance_lanes.iter() { + lane.stop(None); + } + for h in lane_handles { + h.await.expect("sink lane join"); + } + for owner in &owners { + owner.stop(None); + } + + merge_maps(maps) + }) +} + +/// Lane J at the gridlake defaults: 64×64 grid (4096 cells), 1 sink +/// lane pair, no standing registry, 64K-row batches. +pub fn lane_j_grid_pipeline(data: &[u8], workers: usize) -> BTreeMap { + lane_j_grid_pipeline_with(data, workers, 4096, 1, false, 1 << 16) +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Parity across the knob matrix corners: gridlake (4096) and full + /// (65536) grids × 1 and 8 sink lanes × registry on/off, all with a + /// small batch to force multi-batch flush-cache recycling. + #[test] + fn lane_j_agrees_with_lane_a_across_knob_corners() { + let dir = std::env::temp_dir(); + let path = dir.join(format!("onebrc_probe_test_j_{}.txt", std::process::id())); + let result = crate::gen::gen(&path, 50_000, 123).expect("gen"); + assert_eq!(result.rows, 50_000); + + let data = std::fs::read(&path).expect("read generated corpus"); + std::fs::remove_file(&path).ok(); + + let a = crate::lane_a_scalar(&data); + for (grid, lanes, registry) in [ + (4096usize, 1usize, false), + (4096, 8, false), + (65536, 1, false), + (4096, 8, true), + ] { + let j = lane_j_grid_pipeline_with(&data, 3, grid, lanes, registry, 1000); + assert_eq!( + a, j, + "lane J (grid={grid}, lanes={lanes}, registry={registry}) must match lane A" + ); + } + assert!(!a.is_empty()); + } +} diff --git a/crates/onebrc-probe/src/lib.rs b/crates/onebrc-probe/src/lib.rs index ff140f42..0c687146 100644 --- a/crates/onebrc-probe/src/lib.rs +++ b/crates/onebrc-probe/src/lib.rs @@ -25,6 +25,12 @@ //! identity → Morton tile address → SoA-shaped flat accumulators (F); //! identical pipeline with a plain-radix slot (R). F−R isolates the //! addressing tax; R−C prices flat-table-vs-BTreeMap. +//! - **Lane G** (`lane_g::lane_g_kanban_soa`, feature `lane-g`) — the +//! kanban-update write path: the same Morton-tile 64K SoA held as OWNED +//! state by shard mailbox actors (prefix-routed morsel casts, every +//! applied batch witnessed with a `KanbanMove`). G−F prices witnessed +//! streamed ownership vs private-merge-at-end; the `shards` sweep is +//! the 64K-concurrent-SoA-vs-Morton-tile ownership ledger. //! //! ## Reference inventory //! @@ -50,6 +56,16 @@ pub mod lane_d; #[cfg(feature = "lane-e")] pub mod lane_e; pub mod lane_f; +#[cfg(feature = "lane-g")] +pub mod lane_g; +#[cfg(feature = "lane-h")] +pub mod lane_h; +#[cfg(feature = "lane-i")] +pub mod lane_i; +#[cfg(feature = "lane-j")] +pub mod lane_j; +#[cfg(feature = "presets")] +pub mod presets; pub mod sha256; #[cfg(feature = "lane-b")] @@ -59,6 +75,14 @@ pub use lane_d::lane_d_ractor; #[cfg(feature = "lane-e")] pub use lane_e::lane_e_kanban; pub use lane_f::{lane_f_morton, lane_r_radix}; +#[cfg(feature = "lane-g")] +pub use lane_g::{lane_g_kanban_soa, lane_g_kanban_soa_with_morsel}; +#[cfg(feature = "lane-h")] +pub use lane_h::{lane_h_orchestrated, lane_h_orchestrated_with}; +#[cfg(feature = "lane-i")] +pub use lane_i::{lane_i_batch_pipeline, lane_i_batch_pipeline_with}; +#[cfg(feature = "lane-j")] +pub use lane_j::{lane_j_grid_pipeline, lane_j_grid_pipeline_with}; use std::collections::BTreeMap; diff --git a/crates/onebrc-probe/src/main.rs b/crates/onebrc-probe/src/main.rs index 31c7a639..4228ea76 100644 --- a/crates/onebrc-probe/src/main.rs +++ b/crates/onebrc-probe/src/main.rs @@ -73,6 +73,13 @@ fn cmd_run(args: &[String]) { .get(3) .map(|s| s.parse().expect("batches must be a usize")) .unwrap_or(workers * 16); + // Lane-`g`-only: the SAME 4th positional arg is the shard-owner count + // (Morton-tile-range mailboxes; default 4). Lanes e and g never share + // an invocation, so overloading the position is unambiguous. + let shards: usize = args + .get(3) + .map(|s| s.parse().expect("shards must be a usize")) + .unwrap_or(4); // NOTE (mmap note): plain `std::fs::read`, NOT mmap. automataIA/1brc-rs // treats `memmap2::Mmap` as "the only path to break the 2-second @@ -125,6 +132,73 @@ fn cmd_run(args: &[String]) { // Morton-tile SoA lane and its plain-radix control (lane_f.rs). "f" => onebrc_probe::lane_f_morton(&data, workers), "r" => onebrc_probe::lane_r_radix(&data, workers), + "g" => { + #[cfg(feature = "lane-g")] + { + // 4th positional arg = shards for lane g (default 4). + onebrc_probe::lane_g_kanban_soa(&data, workers, shards) + } + #[cfg(not(feature = "lane-g"))] + { + eprintln!("lane g requires --features lane-g"); + std::process::exit(1); + } + } + "i" => { + #[cfg(feature = "lane-i")] + { + // Fixed topology: all 65536 mailboxes upfront, 64K-row + // CAM batches (see lane_i.rs; breakdown on stderr). + onebrc_probe::lane_i_batch_pipeline(&data, workers) + } + #[cfg(not(feature = "lane-i"))] + { + eprintln!("lane i requires --features lane-i"); + std::process::exit(1); + } + } + "j" => { + #[cfg(feature = "lane-j")] + { + // Positional knobs: [workers] [grid] [lanes] [registry 0|1] + // (defaults: gridlake 4096, 1 lane pair, no registry). + let grid: usize = args + .get(3) + .map(|s| s.parse().expect("grid must be a usize")) + .unwrap_or(4096); + let lanes: usize = args + .get(4) + .map(|s| s.parse().expect("lanes must be a usize")) + .unwrap_or(1); + let registry = args.get(5).map(String::as_str) == Some("1"); + onebrc_probe::lane_j_grid_pipeline_with( + &data, + workers, + grid, + lanes, + registry, + 1 << 16, + ) + } + #[cfg(not(feature = "lane-j"))] + { + eprintln!("lane j requires --features lane-j"); + std::process::exit(1); + } + } + "h" => { + #[cfg(feature = "lane-h")] + { + // 4th positional arg = NOMINAL owner granularity for lane h + // (lazy activation makes live owners track occupancy). + onebrc_probe::lane_h_orchestrated(&data, workers, shards) + } + #[cfg(not(feature = "lane-h"))] + { + eprintln!("lane h requires --features lane-h"); + std::process::exit(1); + } + } other => { eprintln!("unknown lane '{other}' (expected 'a', 'b', 'c', 'd', 'e', 'f', or 'r')"); std::process::exit(2); @@ -141,6 +215,10 @@ fn cmd_run(args: &[String]) { println!( "lane={lane} rows={rows} workers={workers} batches={batches} elapsed_ms={elapsed_ms:.3} throughput_mrows_s={throughput_mrows_s:.3}" ); + } else if lane == "g" { + println!( + "lane={lane} rows={rows} workers={workers} shards={shards} elapsed_ms={elapsed_ms:.3} throughput_mrows_s={throughput_mrows_s:.3}" + ); } else { println!( "lane={lane} rows={rows} workers={workers} elapsed_ms={elapsed_ms:.3} throughput_mrows_s={throughput_mrows_s:.3}" diff --git a/crates/onebrc-probe/src/presets.rs b/crates/onebrc-probe/src/presets.rs new file mode 100644 index 00000000..331bad76 --- /dev/null +++ b/crates/onebrc-probe/src/presets.rs @@ -0,0 +1,144 @@ +//! The 8 batching-method PRESETS — every batching/ownership/delivery +//! method this probe measured, frozen as named, reproducible +//! configurations so any session can run serious lab sweeps without +//! re-deriving knob combinations. +//! +//! Findings (agnostic numbers): `FINDINGS.md`. Interpretation (one +//! session's reading): `COMMENTARY.md`. Both at the crate root. +//! +//! Every preset has the same signature — `(data, workers) → map` — and +//! every preset's output is asserted byte-identical to lane A by the +//! `all_presets_agree_with_lane_a` test, so a sweep never needs its own +//! correctness harness. + +use crate::Stats; +use std::collections::BTreeMap; + +/// One frozen batching-method configuration. +pub struct Preset { + pub id: u8, + pub name: &'static str, + /// What the preset does, mechanism-level (agnostic). + pub description: &'static str, + /// Which measured lane/knobs it freezes (for cross-reference into + /// `FINDINGS.md`'s tables). + pub frozen_from: &'static str, +} + +/// The preset catalogue. Ordered from least to most machinery. +pub const PRESETS: [Preset; 8] = [ + Preset { + id: 0, + name: "map-private-merge", + description: "Per-worker BTreeMap accumulation, one commutative \ + merge at the end. No actors, no batching, no witness.", + frozen_from: "lane C", + }, + Preset { + id: 1, + name: "grid-private-merge", + description: "Per-worker flat Morton-tile SoA table (64K cells, \ + open-addressed), one merge at the end. No actors, \ + no witness. The fastest measured shape.", + frozen_from: "lane F", + }, + Preset { + id: 2, + name: "stream-single-owner", + description: "Workers pre-reduce 64K-row morsels and stream \ + dirty entries to ONE owner mailbox actor holding \ + the canonical SoA; every applied batch witnessed.", + frozen_from: "lane G, shards=1", + }, + Preset { + id: 3, + name: "orchestrated-lazy-owners", + description: "Router tier with LAZY per-tile owner activation \ + (live mailboxes track occupancy, not address \ + space) + ahead-firing batched delivery (batch_k \ + = 64). Nominal granularity 65536.", + frozen_from: "lane H, owners_nominal=65536", + }, + Preset { + id: 4, + name: "batch-64k-registry", + description: "The full batch pipeline at 64K grid WITH the \ + standing per-cell mailbox registry: codebook CAM \ + addressing, whole-table Arc double-cast to \ + ownership + persistence sinks, flush cache. \ + Registry residency is the measured cost.", + frozen_from: "lane I (== lane J: grid=65536, lanes=1, registry=on)", + }, + Preset { + id: 5, + name: "gridlake", + description: "The measured sweet spot: 64x64 gridlake batch SoA \ + (4096 cells, ~80 KB integer-exact), codebook CAM \ + addressing, 1 sink lane pair, whole-table Arc \ + double-cast, flush cache, NO standing registry.", + frozen_from: "lane J: grid=4096, lanes=1, registry=off", + }, + Preset { + id: 6, + name: "gridlake-8-lanes", + description: "gridlake with 8 ownership+persistence sink lane \ + pairs (row-range sliced) — free at light apply \ + work; headroom for heavy per-batch apply.", + frozen_from: "lane J: grid=4096, lanes=8, registry=off", + }, + Preset { + id: 7, + name: "batch-64k-no-registry", + description: "The batch pipeline at the full 64K grid without \ + the registry — the grid-size control against \ + preset 5 (isolates the cache-matching win).", + frozen_from: "lane J: grid=65536, lanes=1, registry=off", + }, +]; + +/// Run preset `id` (0..=7). Panics on an unknown id — sweeps should +/// iterate [`PRESETS`]. +pub fn run_preset(id: u8, data: &[u8], workers: usize) -> BTreeMap { + match id { + 0 => crate::lane_c_threads(data, workers), + 1 => crate::lane_f_morton(data, workers), + 2 => crate::lane_g::lane_g_kanban_soa(data, workers, 1), + 3 => crate::lane_h::lane_h_orchestrated(data, workers, 65536), + 4 => crate::lane_j::lane_j_grid_pipeline_with(data, workers, 65536, 1, true, 1 << 16), + 5 => crate::lane_j::lane_j_grid_pipeline_with(data, workers, 4096, 1, false, 1 << 16), + 6 => crate::lane_j::lane_j_grid_pipeline_with(data, workers, 4096, 8, false, 1 << 16), + 7 => crate::lane_j::lane_j_grid_pipeline_with(data, workers, 65536, 1, false, 1 << 16), + other => panic!("unknown preset {other} (valid: 0..=7)"), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Every preset must produce byte-identical aggregates to lane A — + /// the single correctness harness a lab sweep inherits for free. + /// (Small corpus; preset 4 spawns its full registry, so this test + /// is the slowest in the crate by design.) + #[test] + fn all_presets_agree_with_lane_a() { + let dir = std::env::temp_dir(); + let path = dir.join(format!("onebrc_probe_test_p_{}.txt", std::process::id())); + let result = crate::gen::gen(&path, 30_000, 7).expect("gen"); + assert_eq!(result.rows, 30_000); + + let data = std::fs::read(&path).expect("read generated corpus"); + std::fs::remove_file(&path).ok(); + + let a = crate::lane_a_scalar(&data); + for preset in PRESETS.iter() { + let out = run_preset(preset.id, &data, 3); + assert_eq!( + a, out, + "preset {} ({}) must match lane A", + preset.id, preset.name + ); + } + assert!(!a.is_empty()); + } +}