diff --git a/graph-flow/Cargo.toml b/graph-flow/Cargo.toml index e531042..a697d2e 100644 --- a/graph-flow/Cargo.toml +++ b/graph-flow/Cargo.toml @@ -35,8 +35,15 @@ rig = { workspace = true, optional = true } # package = "rig-core", aliased to ` # the `engine::local::Lance` backend; optional + off by default because it pulls # the heavy Lance tree. Enable with `--features surreal-lance`. surrealdb = { git = "https://github.com/AdaWorldAPI/surrealdb", branch = "main", optional = true, default-features = false, features = ["kv-lance"] } +# D-V3-W3b: KanbanSessionStorage — the replayable-langgraph-handler backend. +# graph-flow-kanban is the outer planning-phase envelope (Rubicon kanban +# cycle); graph-flow composes it (per that crate's own description). Both +# optional + off by default; enable with `--features kanban`. +graph-flow-kanban = { path = "../graph-flow-kanban", optional = true } +lance-graph-contract = { path = "../../lance-graph/crates/lance-graph-contract", optional = true } [features] default = [] rig = ["dep:rig"] -surreal-lance = ["dep:surrealdb"] \ No newline at end of file +surreal-lance = ["dep:surrealdb"] +kanban = ["dep:graph-flow-kanban", "dep:lance-graph-contract"] \ No newline at end of file diff --git a/graph-flow/examples/dispatch_bench.rs b/graph-flow/examples/dispatch_bench.rs new file mode 100644 index 0000000..6171bfe --- /dev/null +++ b/graph-flow/examples/dispatch_bench.rs @@ -0,0 +1,234 @@ +//! dispatch_bench — measures graph-flow per-task-step dispatch cost. +//! +//! Added as static/inventory artifact for the lance-graph W2e/W3b evaluation +//! (graph-flow as replayable kanban-integrated langgraph handler). Runs +//! standalone against InMemorySessionStorage — no network, no external deps +//! beyond the graph-flow default feature set. +//! +//! NOTE ON RUNNING THIS FILE IN THIS WORKSPACE: `rs-graph-llm` is a Cargo +//! workspace, and `cargo run --example dispatch_bench -p graph-flow` from +//! this checkout resolves a *workspace-wide* Cargo.lock. Other members +//! (`crates/episodic-arc-task` -> `lance-graph-contract` git dep; +//! `graph-flow`'s own optional `surreal-lance` feature -> `AdaWorldAPI/surrealdb` +//! -> `AdaWorldAPI/ndarray` -> a `burn` git submodule) pull git sources during +//! lockfile generation regardless of whether those features are active for +//! THIS binary. In a network-sandboxed environment those git fetches 403 and +//! the whole-workspace build fails before this example is ever reached. The +//! numbers quoted in the accompanying report were produced by copying this +//! exact file into an isolated single-package crate (`[workspace]` header + +//! a `graph-flow = { path = "..." }` dep, default features only) so cargo +//! never needs to touch the git-sourced optional deps. Once this workspace +//! has network access (or a checked-in Cargo.lock), `cargo run --release +//! --example dispatch_bench -p graph-flow` from the repo root reproduces the +//! same numbers directly. +//! +//! Two execution paths through a linear 4-task no-op graph: +//! (a) "continue_and_execute" — a single `FlowRunner::run()` call drives the +//! whole 4-task chain to completion in one in-process recursion +//! (`Graph::execute_session` recursing via `NextAction::ContinueAndExecute`). +//! Storage cost: exactly 1x SessionStorage::get + 1x SessionStorage::save +//! per full chain, regardless of chain length. +//! (b) "continue_stepwise" — every task returns `NextAction::Continue`, so the +//! caller must call `FlowRunner::run()` once PER task. Storage cost: +//! 1x get + 1x save PER STEP (4x per chain for a 4-task graph). +//! +//! This isolates the storage-roundtrip overhead FlowRunner::run() pays per call, +//! which is exactly the overhead a kanban-board-backed SessionStorage impl would +//! also pay on every step in stepwise mode. + +use async_trait::async_trait; +use graph_flow::{ + Context, ExecutionStatus, FlowRunner, GraphBuilder, InMemorySessionStorage, NextAction, + Session, SessionStorage, Task, TaskResult, +}; +use std::sync::Arc; +use std::time::Instant; + +/// A task that does nothing but report the configured NextAction. +/// No context access — isolates Graph/FlowRunner dispatch cost from +/// Context (DashMap + serde_json) cost. +struct NoOpTask { + id: String, + next: NextAction, +} + +#[async_trait] +impl Task for NoOpTask { + fn id(&self) -> &str { + &self.id + } + + async fn run(&self, _context: Context) -> graph_flow::Result { + Ok(TaskResult::new(None, self.next.clone())) + } +} + +fn build_chain(prefix: &str, step_action: NextAction) -> (Arc, String) { + let ids: Vec = (0..4).map(|i| format!("{prefix}_t{i}")).collect(); + let tasks: Vec> = ids + .iter() + .enumerate() + .map(|(i, id)| { + let next = if i == 3 { + NextAction::End + } else { + step_action.clone() + }; + Arc::new(NoOpTask { + id: id.clone(), + next, + }) + }) + .collect(); + + let mut builder = GraphBuilder::new(format!("{prefix}_graph")); + for t in &tasks { + builder = builder.add_task(t.clone()); + } + for w in ids.windows(2) { + builder = builder.add_edge(&w[0], &w[1]); + } + let graph = Arc::new(builder.build()); + (graph, ids[0].clone()) +} + +/// Run the "continue_and_execute" chain once via a single FlowRunner::run() call. +/// Returns elapsed wall time for the whole 4-task chain. +async fn run_continue_and_execute_once( + runner: &FlowRunner, + storage: &InMemorySessionStorage, + session_id: &str, + start_task: &str, +) -> std::time::Duration { + let session = Session::new_from_task(session_id.to_string(), start_task); + storage.save(session).await.unwrap(); + + let t0 = Instant::now(); + let result = runner.run(session_id).await.unwrap(); + let elapsed = t0.elapsed(); + assert!(matches!(result.status, ExecutionStatus::Completed)); + elapsed +} + +/// Run the "continue_stepwise" chain: call FlowRunner::run() once per task, +/// each call doing its own storage get+save. Returns elapsed wall time for +/// the whole 4-task chain (sum of the 4 per-step runner.run() calls). +async fn run_continue_stepwise_once( + runner: &FlowRunner, + storage: &InMemorySessionStorage, + session_id: &str, + start_task: &str, +) -> std::time::Duration { + let session = Session::new_from_task(session_id.to_string(), start_task); + storage.save(session).await.unwrap(); + + let t0 = Instant::now(); + loop { + let result = runner.run(session_id).await.unwrap(); + match result.status { + ExecutionStatus::Completed => break, + ExecutionStatus::Paused { .. } => continue, + other => panic!("unexpected status: {other:?}"), + } + } + t0.elapsed() +} + +const TASKS_PER_CHAIN: u64 = 4; + +async fn bench_variant( + label: &str, + batch_sizes: &[u64], + warmup_iters: u64, + mut run_once: F, +) where + F: FnMut(u64) -> Fut, + Fut: std::future::Future, +{ + println!("\n=== {label} ==="); + + // Warm up: run a handful of iterations untimed first (allocator, DashMap + // shard init, tokio task-local caches, etc. settle here — NOT included + // in any reported number below). + for i in 0..warmup_iters { + run_once(u64::MAX - i).await; + } + + println!( + "{:>10} | {:>14} | {:>16} | {:>18}", + "batch", "total_ns", "ns_per_chain", "ns_per_task_step" + ); + for &batch in batch_sizes { + let mut total = std::time::Duration::ZERO; + for i in 0..batch { + total += run_once(i).await; + } + let total_ns = total.as_nanos(); + let ns_per_chain = total_ns / batch as u128; + let ns_per_step = total_ns / (batch as u128 * TASKS_PER_CHAIN as u128); + println!( + "{:>10} | {:>14} | {:>16} | {:>18}", + batch, total_ns, ns_per_chain, ns_per_step + ); + } +} + +#[tokio::main] +async fn main() { + let batch_sizes: [u64; 3] = [1, 64, 4096]; + + // --- (a) continue_and_execute: 1 storage get + 1 storage save per whole chain --- + { + let (graph, start) = build_chain("cae", NextAction::ContinueAndExecute); + let storage = Arc::new(InMemorySessionStorage::new()); + let runner = FlowRunner::new(graph, storage.clone()); + let start = start.clone(); + bench_variant( + "continue_and_execute (1 get+save per 4-task chain)", + &batch_sizes, + 200, + |i| { + let runner = runner.clone(); + let storage = storage.clone(); + let start = start.clone(); + async move { + let sid = format!("cae_session_{i}"); + run_continue_and_execute_once(&runner, &storage, &sid, &start).await + } + }, + ) + .await; + } + + // --- (b) continue_stepwise: 1 storage get + 1 storage save PER TASK STEP --- + { + let (graph, start) = build_chain("stw", NextAction::Continue); + let storage = Arc::new(InMemorySessionStorage::new()); + let runner = FlowRunner::new(graph, storage.clone()); + let start = start.clone(); + bench_variant( + "continue_stepwise (1 get+save per task step, 4x per chain)", + &batch_sizes, + 200, + |i| { + let runner = runner.clone(); + let storage = storage.clone(); + let start = start.clone(); + async move { + let sid = format!("stw_session_{i}"); + run_continue_stepwise_once(&runner, &storage, &sid, &start).await + } + }, + ) + .await; + } + + println!( + "\nNote: single tokio runtime for the whole process (#[tokio::main]) — \ + runtime bootstrap happens BEFORE any Instant::now() call above, so it \ + is NOT included in the batch=1 numbers. batch=1 numbers do still carry \ + first-call effects (cold DashMap shard allocation inside a *fresh* \ + InMemorySessionStorage would show here, but warmup above reuses the \ + same storage instance, so that's already amortized)." + ); +} diff --git a/graph-flow/src/lib.rs b/graph-flow/src/lib.rs index 5c143f9..2a75970 100644 --- a/graph-flow/src/lib.rs +++ b/graph-flow/src/lib.rs @@ -171,6 +171,8 @@ pub mod error; pub mod graph; pub mod runner; pub mod storage; +#[cfg(feature = "kanban")] +pub mod storage_kanban; pub mod storage_postgres; #[cfg(feature = "surreal-lance")] pub mod storage_surreal; @@ -185,6 +187,8 @@ pub use runner::FlowRunner; pub use storage::{ GraphStorage, InMemoryGraphStorage, InMemorySessionStorage, Session, SessionStorage, }; +#[cfg(feature = "kanban")] +pub use storage_kanban::KanbanSessionStorage; pub use storage_postgres::PostgresSessionStorage; #[cfg(feature = "surreal-lance")] pub use storage_surreal::SurrealSessionStorage; diff --git a/graph-flow/src/storage_kanban.rs b/graph-flow/src/storage_kanban.rs new file mode 100644 index 0000000..3e12465 --- /dev/null +++ b/graph-flow/src/storage_kanban.rs @@ -0,0 +1,402 @@ +//! Kanban-move-log-backed [`SessionStorage`] — D-V3-W3b. +//! +//! Makes every graph-flow execution **replayable**: the kanban move log is +//! both the WAL and the state (M25: "the board is BOTH the WAL and the +//! state"). Every [`Session`] snapshot is upserted alongside a +//! [`lance_graph_contract::kanban::KanbanMove`] log derived from the observed +//! transition, so a killed-mid-graph session can be resumed from the same +//! storage and its lifecycle audited via [`KanbanSessionStorage::moves`]. +//! +//! Behind the `kanban` feature — it pulls the sibling `graph-flow-kanban` +//! envelope crate and `lance-graph-contract`, so it is off by default. +//! +//! `graph-flow` is the **composer**: this module hosts a +//! [`graph_flow_kanban::KanbanPlanEnvelope`] per session as the kanban-state +//! carrier (column + move log), matching that crate's own description +//! ("`rs-graph-llm`'s `graph-flow` composes this outer envelope"). See the +//! private `save_inner` method below for the one documented deviation from +//! the envelope's gated `advance`/`try_transition` API. + +use async_trait::async_trait; +use std::collections::HashMap; +use tokio::sync::RwLock; + +use graph_flow_kanban::KanbanPlanEnvelope; +use lance_graph_contract::collapse_gate::MailboxId; +use lance_graph_contract::kanban::{ExecTarget, KanbanColumn, KanbanMove}; + +use crate::{error::Result, graph::ExecutionStatus, storage::SessionStorage, Session}; + +/// One session's kanban-tracked state: the latest [`Session`] snapshot plus +/// the [`KanbanPlanEnvelope`] (column + move log) derived from its save +/// history. +struct KanbanSessionRecord { + snapshot: Session, + envelope: KanbanPlanEnvelope, +} + +/// A [`SessionStorage`] backed by an in-memory kanban move log, one +/// [`KanbanPlanEnvelope`] per session, all attributed to a single `mailbox`. +/// +/// The `mailbox` is a field of the **storage** (the storage acts on behalf +/// of one mailbox), never of the [`Session`] — sessions carry no +/// owner/mailbox/tenant field (V3 DTO purity). +pub struct KanbanSessionStorage { + mailbox: MailboxId, + inner: RwLock>, +} + +impl KanbanSessionStorage { + /// Construct a storage whose emitted [`KanbanMove`]s are all attributed + /// to `mailbox`. + #[must_use] + pub fn new(mailbox: MailboxId) -> Self { + Self { + mailbox, + inner: RwLock::new(HashMap::new()), + } + } + + /// The full [`KanbanMove`] log for `session_id` — the auditable WAL + /// trail. Empty if the session is unknown or has never crossed a kanban + /// column boundary (e.g. it has only ever been saved once). + pub async fn moves(&self, session_id: &str) -> Vec { + let map = self.inner.read().await; + map.get(session_id) + .map(|record| record.envelope.moves.clone()) + .unwrap_or_default() + } + + /// Replay `session_id`: the last saved [`Session`] snapshot. The move + /// log ([`Self::moves`]) is the auditable trail of how it got there; + /// v1 replay is "snapshot + log" per M25's design (no per-move + /// context reconstruction yet). + pub async fn replay(&self, session_id: &str) -> Option { + let map = self.inner.read().await; + map.get(session_id).map(|record| record.snapshot.clone()) + } + + /// Save `session`, deriving the kanban-column transition from the + /// observed change AND (when available) the caller's + /// [`ExecutionStatus`] — the precise terminal-column form of + /// [`SessionStorage::save`], used when the caller has `status` in hand + /// (e.g. from [`crate::runner::FlowRunner::run`]'s returned + /// [`crate::graph::ExecutionResult`]). + pub async fn save_with_status(&self, session: Session, status: &ExecutionStatus) -> Result<()> { + self.save_inner(session, Some(status)).await + } + + /// V1 Rubicon mapping (orchestrator-decided 2026-07-02; revisable, tests + /// pin it): + /// + /// 1. first save of a session (no prior snapshot) -> `Planning`-equivalent + /// (the [`KanbanPlanEnvelope`] spawn state — no move is recorded, there + /// is no real predecessor to transition from); + /// 2. else, if `current_task_id` CHANGED from the prior snapshot -> + /// `CognitiveWork`-equivalent (takes priority over the status-based + /// rules below — a task hop is cognitive work regardless of the + /// caller-supplied status); + /// 3. else (task unchanged), a status of `WaitingForInput` / `Paused`-like + /// (or no status supplied — `save()`'s plain-trait path cannot see + /// one) -> `Evaluation`-equivalent (the safe "review" default); + /// 4. else (task unchanged), a status of `Completed`-like -> + /// `Commit`-equivalent; + /// 5. else (task unchanged), a status of `Error`-like -> `Prune`-equivalent. + /// + /// A move is appended to the envelope only when the computed target + /// column differs from the envelope's current column (repeated task hops + /// that both classify to `CognitiveWork`, for example, collapse to a + /// single column occupancy — a [`KanbanMove`] is a *transition* record, + /// not a per-save heartbeat). + async fn save_inner(&self, session: Session, status: Option<&ExecutionStatus>) -> Result<()> { + let mut map = self.inner.write().await; + match map.get_mut(&session.id) { + None => { + // Rule 1: first save -> Planning, the envelope's spawn state. + // No move recorded (no real predecessor), matching + // `KanbanPlanEnvelope::new`'s own empty-moves spawn. + let id = session.id.clone(); + let envelope = KanbanPlanEnvelope::new(self.mailbox, ExecTarget::Native); + map.insert( + id, + KanbanSessionRecord { + snapshot: session, + envelope, + }, + ); + } + Some(record) => { + let to = classify_column(&record.snapshot.current_task_id, &session.current_task_id, status); + if to != record.envelope.column { + let from = record.envelope.column; + record.envelope.moves.push(KanbanMove { + mailbox: self.mailbox, + from, + to, + // Deliberate V1 divergence from `KanbanPlanEnvelope::record_move`'s + // own "cycle stamp" convention: no re-deliberation-cycle counter + // exists at this layer, so `witness_chain_position` is simply the + // move-log length before this push — monotonic per session, per + // the brief's explicit instruction. + witness_chain_position: record.envelope.moves.len() as u32, + libet_offset_us: 0, + exec: ExecTarget::Native, + }); + record.envelope.column = to; + } + record.snapshot = session; + } + } + Ok(()) + } +} + +/// See [`KanbanSessionStorage::save_inner`] doc comment for the full V1 +/// Rubicon mapping this function implements (rules 2-5; rule 1, the +/// first-save case, is handled structurally by the envelope's spawn state +/// and never reaches this function). +fn classify_column( + prior_task_id: &str, + new_task_id: &str, + status: Option<&ExecutionStatus>, +) -> KanbanColumn { + if prior_task_id != new_task_id { + return KanbanColumn::CognitiveWork; + } + match status { + Some(ExecutionStatus::WaitingForInput) | Some(ExecutionStatus::Paused { .. }) | None => { + KanbanColumn::Evaluation + } + Some(ExecutionStatus::Completed) => KanbanColumn::Commit, + Some(ExecutionStatus::Error(_)) => KanbanColumn::Prune, + } +} + +#[async_trait] +impl SessionStorage for KanbanSessionStorage { + async fn save(&self, session: Session) -> Result<()> { + self.save_inner(session, None).await + } + + async fn get(&self, id: &str) -> Result> { + let map = self.inner.read().await; + Ok(map.get(id).map(|record| record.snapshot.clone())) + } + + async fn delete(&self, id: &str) -> Result<()> { + let mut map = self.inner.write().await; + map.remove(id); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + context::Context, graph::Graph, runner::FlowRunner, task::TaskResult, GraphBuilder, + NextAction, Task, + }; + use std::sync::Arc; + + fn session_at(id: &str, task_id: &str) -> Session { + Session { + id: id.to_string(), + graph_id: "g".to_string(), + current_task_id: task_id.to_string(), + status_message: None, + context: Context::new(), + } + } + + // --- unit test 1: mapping-per-status ------------------------------ + + #[test] + fn v1_mapping_per_status() { + // Task unchanged: status decides the target column. + assert_eq!(classify_column("t", "t", None), KanbanColumn::Evaluation); + assert_eq!( + classify_column("t", "t", Some(&ExecutionStatus::WaitingForInput)), + KanbanColumn::Evaluation + ); + assert_eq!( + classify_column( + "t", + "t", + Some(&ExecutionStatus::Paused { + next_task_id: "t".to_string(), + reason: "no outgoing edge".to_string(), + }) + ), + KanbanColumn::Evaluation + ); + assert_eq!( + classify_column("t", "t", Some(&ExecutionStatus::Completed)), + KanbanColumn::Commit + ); + assert_eq!( + classify_column("t", "t", Some(&ExecutionStatus::Error("boom".to_string()))), + KanbanColumn::Prune + ); + + // Task changed: CognitiveWork wins regardless of status. + assert_eq!( + classify_column("t1", "t2", Some(&ExecutionStatus::Completed)), + KanbanColumn::CognitiveWork + ); + assert_eq!( + classify_column("t1", "t2", Some(&ExecutionStatus::Error("x".to_string()))), + KanbanColumn::CognitiveWork + ); + } + + // --- unit test 2: moves() monotonic witness_chain_position -------- + + #[tokio::test] + async fn moves_witness_chain_position_is_monotonic() { + let storage = KanbanSessionStorage::new(1); + let id = "sess-mono"; + + storage.save(session_at(id, "a")).await.unwrap(); // first save: no move + storage.save(session_at(id, "b")).await.unwrap(); // task changed -> CognitiveWork (move 0) + storage + .save_with_status(session_at(id, "b"), &ExecutionStatus::Completed) + .await + .unwrap(); // unchanged, Completed -> Commit (move 1) + storage + .save_with_status(session_at(id, "c"), &ExecutionStatus::Error("x".to_string())) + .await + .unwrap(); // task changed -> CognitiveWork (move 2) + + let moves = storage.moves(id).await; + assert_eq!(moves.len(), 3); + let positions: Vec = moves.iter().map(|m| m.witness_chain_position).collect(); + assert_eq!(positions, vec![0, 1, 2]); + let cols: Vec = moves.iter().map(|m| m.to).collect(); + assert_eq!( + cols, + vec![ + KanbanColumn::CognitiveWork, + KanbanColumn::Commit, + KanbanColumn::CognitiveWork, + ] + ); + } + + // --- gate test: M25 kill-mid-graph replay -------------------------- + + struct CountingTask { + task_id: &'static str, + next: NextAction, + } + + #[async_trait] + impl Task for CountingTask { + fn id(&self) -> &str { + self.task_id + } + + async fn run(&self, context: Context) -> crate::error::Result { + let mut trace: Vec = context.get("trace").await.unwrap_or_default(); + trace.push(self.task_id.to_string()); + context.set("trace", trace).await; + Ok(TaskResult::new( + Some(format!("{} done", self.task_id)), + self.next.clone(), + )) + } + } + + /// Builds a fresh 3-task linear graph (task1 -> task2 -> task3, End at + /// task3). Called twice in the gate test: once for the pre-kill steps, + /// once (a fresh, identically-built instance) for the resume. + fn build_graph() -> Graph { + let t1 = Arc::new(CountingTask { + task_id: "task1", + next: NextAction::Continue, + }); + let t2 = Arc::new(CountingTask { + task_id: "task2", + next: NextAction::Continue, + }); + let t3 = Arc::new(CountingTask { + task_id: "task3", + next: NextAction::End, + }); + GraphBuilder::new("kanban_replay_graph") + .add_task(t1.clone()) + .add_task(t2.clone()) + .add_task(t3.clone()) + .add_edge(t1.id(), t2.id()) + .add_edge(t2.id(), t3.id()) + .build() + } + + #[tokio::test] + async fn m25_kill_mid_graph_replay_resumes_without_repeats_or_gaps() { + let storage = Arc::new(KanbanSessionStorage::new(7)); + let session_id = "sess-m25".to_string(); + + // Seed the session (this is the "first save" -> Planning). + let session = Session::new_from_task(session_id.clone(), "task1"); + storage.save(session).await.unwrap(); + + // Run task1 and task2 step-by-step, then "kill" by dropping the + // FlowRunner and its graph. + { + let graph = Arc::new(build_graph()); + let runner = FlowRunner::new(graph, storage.clone()); + + let result = runner.run(&session_id).await.unwrap(); + assert!(matches!(result.status, crate::graph::ExecutionStatus::Paused { .. })); + + let result = runner.run(&session_id).await.unwrap(); + assert!(matches!(result.status, crate::graph::ExecutionStatus::Paused { .. })); + // `runner` and its `graph` are dropped at the end of this block — + // the "kill". + } + + // From the SAME storage state, a FRESH FlowRunner over a FRESH + // (identically built) graph resumes the session to completion. + let graph2 = Arc::new(build_graph()); + let runner2 = FlowRunner::new(graph2, storage.clone()); + let final_result = runner2.run(&session_id).await.unwrap(); + assert!(matches!( + final_result.status, + crate::graph::ExecutionStatus::Completed + )); + + // The caller has `final_result.status` in hand (FlowRunner::run's + // return value) — the natural real-world finalization call: refine + // the plain-`save()` landing column (`Evaluation`, since the trait + // path saw no status) into the precise terminal column. + let settled = storage.get(&session_id).await.unwrap().unwrap(); + storage + .save_with_status(settled, &final_result.status) + .await + .unwrap(); + + // No repeats, no gaps: each task ran exactly once, in order. + let final_session = storage.replay(&session_id).await.unwrap(); + let trace: Vec = final_session.context.get("trace").await.unwrap(); + assert_eq!(trace, vec!["task1", "task2", "task3"]); + + // The recorded move-log column sequence matches the expected V1 + // mapping sequence exactly: + // task1->task2 hop -> CognitiveWork + // task2->task3 hop -> CognitiveWork again, collapsed + // (same column, no move appended) + // task3 End via plain save() -> Evaluation (no status available) + // follow-up save_with_status -> Commit (status now known) + let moves = storage.moves(&session_id).await; + let cols: Vec = moves.iter().map(|m| m.to).collect(); + assert_eq!( + cols, + vec![ + KanbanColumn::CognitiveWork, + KanbanColumn::Evaluation, + KanbanColumn::Commit, + ] + ); + } +}