diff --git a/Cargo.lock b/Cargo.lock index 6cbea840..d17c685f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1127,6 +1127,7 @@ dependencies = [ "elf-chunking", "elf-cli", "elf-config", + "elf-domain", "elf-providers", "elf-storage", "qdrant-client", diff --git a/apps/elf-api/src/routes.rs b/apps/elf-api/src/routes.rs index 3887ba2d..9c212ab7 100644 --- a/apps/elf-api/src/routes.rs +++ b/apps/elf-api/src/routes.rs @@ -16,7 +16,7 @@ use axum::{ routing, }; use serde::{Deserialize, Serialize}; -use serde_json::Value; +use serde_json::{Map, Value}; use time::{OffsetDateTime, format_description::well_known::Rfc3339}; use utoipa::{OpenApi, ToSchema}; use utoipa_scalar::{Scalar, Servable}; @@ -24,7 +24,14 @@ use uuid::Uuid; use crate::state::AppState; use elf_config::{SecurityAuthKey, SecurityAuthRole}; -use elf_domain::{english_gate, writegate::WritePolicy}; +use elf_domain::{ + consolidation::{ + ConsolidationInputRef, ConsolidationLineage, ConsolidationReviewAction, + ConsolidationReviewState, + }, + english_gate, + writegate::WritePolicy, +}; use elf_service::{ AddEventRequest, AddEventResponse, AddNoteInput, AddNoteRequest, AddNoteResponse, AdminGraphPredicateAliasAddRequest, AdminGraphPredicateAliasesListRequest, @@ -34,15 +41,20 @@ use elf_service::{ AdminIngestionProfileDefaultResponse, AdminIngestionProfileDefaultSetRequest, AdminIngestionProfileGetRequest, AdminIngestionProfileListRequest, AdminIngestionProfileResponse, AdminIngestionProfileVersionsListRequest, - AdminIngestionProfileVersionsListResponse, AdminIngestionProfilesListResponse, DeleteRequest, - DeleteResponse, DocType, DocsExcerptResponse, DocsExcerptsGetRequest, DocsGetRequest, - DocsGetResponse, DocsPutRequest, DocsPutResponse, DocsSearchL0Request, DocsSearchL0Response, - Error, EventMessage, GranteeKind, GraphQueryEntityRef, GraphQueryPredicateRef, - GraphQueryRequest, GraphQueryResponse, IngestionProfileSelector, ListRequest, ListResponse, - NoteFetchRequest, NoteFetchResponse, NoteProvenanceBundleResponse, NoteProvenanceGetRequest, - PayloadLevel, PublishNoteRequest, QueryPlan, RankingRequestOverride, RebuildReport, - SearchDetailsRequest, SearchDetailsResult, SearchExplainRequest, SearchExplainResponse, - SearchIndexItem, SearchRequest, SearchResponse, SearchSessionGetRequest, SearchTimelineGroup, + AdminIngestionProfileVersionsListResponse, AdminIngestionProfilesListResponse, + ConsolidationProposalGetRequest, ConsolidationProposalInput, ConsolidationProposalResponse, + ConsolidationProposalReviewRequest, ConsolidationProposalsListRequest, + ConsolidationProposalsListResponse, ConsolidationRunCreateRequest, + ConsolidationRunCreateResponse, ConsolidationRunGetRequest, ConsolidationRunResponse, + ConsolidationRunsListRequest, ConsolidationRunsListResponse, DeleteRequest, DeleteResponse, + DocType, DocsExcerptResponse, DocsExcerptsGetRequest, DocsGetRequest, DocsGetResponse, + DocsPutRequest, DocsPutResponse, DocsSearchL0Request, DocsSearchL0Response, Error, + EventMessage, GranteeKind, GraphQueryEntityRef, GraphQueryPredicateRef, GraphQueryRequest, + GraphQueryResponse, IngestionProfileSelector, ListRequest, ListResponse, NoteFetchRequest, + NoteFetchResponse, NoteProvenanceBundleResponse, NoteProvenanceGetRequest, PayloadLevel, + PublishNoteRequest, QueryPlan, RankingRequestOverride, RebuildReport, SearchDetailsRequest, + SearchDetailsResult, SearchExplainRequest, SearchExplainResponse, SearchIndexItem, + SearchRequest, SearchResponse, SearchSessionGetRequest, SearchTimelineGroup, SearchTimelineRequest, SearchTrajectoryResponse, SearchTrajectorySummary, ShareScope, SpaceGrantRevokeRequest, SpaceGrantRevokeResponse, SpaceGrantUpsertRequest, SpaceGrantsListRequest, TextPositionSelector, TextQuoteSelector, TraceBundleGetRequest, @@ -115,6 +127,12 @@ const VIEWER_HTML: &str = include_str!("../static/viewer.html"); admin_ingestion_profile_versions_list, admin_ingestion_profile_default_get, admin_ingestion_profile_default_set, + consolidation_run_create, + consolidation_runs_list, + consolidation_run_get, + consolidation_proposals_list, + consolidation_proposal_get, + consolidation_proposal_review, rebuild_qdrant, searches_raw, trace_recent_list, @@ -140,6 +158,7 @@ const VIEWER_HTML: &str = include_str!("../static/viewer.html"); (name = "docs", description = "Document extension ingestion, search, and excerpt retrieval."), (name = "search", description = "Progressive search sessions and raw search diagnostics."), (name = "graph", description = "Graph query and predicate administration."), + (name = "consolidation", description = "Reviewable derived consolidation proposals."), (name = "admin", description = "Local admin and operator inspection routes."), ) )] @@ -314,6 +333,35 @@ struct AdminIngestionProfileDefaultSetBody { version: Option, } +#[derive(Clone, Debug, Deserialize)] +struct ConsolidationRunCreateBody { + job_kind: String, + input_refs: Vec, + #[serde(default = "empty_json_object")] + source_snapshot: Value, + lineage: ConsolidationLineage, + #[serde(default)] + proposals: Vec, +} + +#[derive(Clone, Debug, Deserialize)] +struct ConsolidationRunsListQuery { + limit: Option, +} + +#[derive(Clone, Debug, Deserialize)] +struct ConsolidationProposalsListQuery { + run_id: Option, + review_state: Option, + limit: Option, +} + +#[derive(Clone, Debug, Deserialize)] +struct ConsolidationProposalReviewBody { + action: ConsolidationReviewAction, + review_comment: Option, +} + #[derive(Clone, Debug, Serialize, ToSchema)] struct AdminIngestionProfileDefaultResponseV2 { profile_id: String, @@ -583,6 +631,20 @@ pub fn admin_router(state: AppState) -> Router { "/v2/admin/events/ingestion-profiles", routing::get(admin_ingestion_profiles_list).post(admin_ingestion_profile_create), ) + .route( + "/v2/admin/consolidation/runs", + routing::get(consolidation_runs_list).post(consolidation_run_create), + ) + .route("/v2/admin/consolidation/runs/{run_id}", routing::get(consolidation_run_get)) + .route("/v2/admin/consolidation/proposals", routing::get(consolidation_proposals_list)) + .route( + "/v2/admin/consolidation/proposals/{proposal_id}", + routing::get(consolidation_proposal_get), + ) + .route( + "/v2/admin/consolidation/proposals/{proposal_id}/review", + routing::post(consolidation_proposal_review), + ) .route("/v2/admin/qdrant/rebuild", routing::post(rebuild_qdrant)) .route("/v2/admin/searches/raw", routing::post(searches_raw)) .route("/v2/admin/traces/recent", routing::get(trace_recent_list)) @@ -620,6 +682,10 @@ where .merge(Scalar::with_url(SCALAR_DOCS_PATH, ::openapi())) } +fn empty_json_object() -> Value { + Value::Object(Map::new()) +} + fn json_error( status: StatusCode, code: &str, @@ -2370,6 +2436,241 @@ async fn admin_note_provenance_get( Ok(Json(response)) } +#[utoipa::path( + post, + path = "/v2/admin/consolidation/runs", + tag = "consolidation", + request_body = Value, + responses( + (status = 200, description = "Consolidation run was created.", body = Value), + (status = 400, description = "Invalid request.", body = ErrorBody), + (status = 401, description = "Authentication required.", body = ErrorBody), + (status = 403, description = "Admin access required.", body = ErrorBody), + (status = 500, description = "Internal error.", body = ErrorBody), + ) +)] +async fn consolidation_run_create( + State(state): State, + headers: HeaderMap, + payload: Result, JsonRejection>, +) -> Result, ApiError> { + let ctx = RequestContext::from_headers(&headers)?; + let Json(payload) = payload.map_err(|err| { + tracing::warn!(error = %err, "Invalid request payload."); + + json_error(StatusCode::BAD_REQUEST, "INVALID_REQUEST", "Invalid request payload.", None) + })?; + let response = state + .service + .consolidation_run_create(ConsolidationRunCreateRequest { + tenant_id: ctx.tenant_id, + project_id: ctx.project_id, + agent_id: ctx.agent_id, + job_kind: payload.job_kind, + input_refs: payload.input_refs, + source_snapshot: payload.source_snapshot, + lineage: payload.lineage, + proposals: payload.proposals, + }) + .await?; + + Ok(Json(response)) +} + +#[utoipa::path( + get, + path = "/v2/admin/consolidation/runs", + tag = "consolidation", + params(("limit" = Option, Query, description = "Maximum runs to return.")), + responses( + (status = 200, description = "Consolidation runs.", body = Value), + (status = 400, description = "Invalid request.", body = ErrorBody), + (status = 401, description = "Authentication required.", body = ErrorBody), + (status = 403, description = "Admin access required.", body = ErrorBody), + (status = 500, description = "Internal error.", body = ErrorBody), + ) +)] +async fn consolidation_runs_list( + State(state): State, + headers: HeaderMap, + query: Result, QueryRejection>, +) -> Result, ApiError> { + let ctx = RequestContext::from_headers(&headers)?; + let Query(query) = query.map_err(|err| { + tracing::warn!(error = %err, "Invalid query parameters."); + + json_error( + StatusCode::BAD_REQUEST, + "INVALID_REQUEST", + "Invalid query parameters.".to_string(), + None, + ) + })?; + let response = state + .service + .consolidation_runs_list(ConsolidationRunsListRequest { + tenant_id: ctx.tenant_id, + project_id: ctx.project_id, + limit: query.limit, + }) + .await?; + + Ok(Json(response)) +} + +#[utoipa::path( + get, + path = "/v2/admin/consolidation/runs/{run_id}", + tag = "consolidation", + params(("run_id" = Uuid, Path, description = "Consolidation run ID.")), + responses( + (status = 200, description = "Consolidation run.", body = Value), + (status = 400, description = "Invalid request.", body = ErrorBody), + (status = 401, description = "Authentication required.", body = ErrorBody), + (status = 403, description = "Admin access required.", body = ErrorBody), + (status = 404, description = "Consolidation run was not found.", body = ErrorBody), + (status = 500, description = "Internal error.", body = ErrorBody), + ) +)] +async fn consolidation_run_get( + State(state): State, + headers: HeaderMap, + Path(run_id): Path, +) -> Result, ApiError> { + let ctx = RequestContext::from_headers(&headers)?; + let response = state + .service + .consolidation_run_get(ConsolidationRunGetRequest { + tenant_id: ctx.tenant_id, + project_id: ctx.project_id, + run_id, + }) + .await?; + + Ok(Json(response)) +} + +#[utoipa::path( + get, + path = "/v2/admin/consolidation/proposals", + tag = "consolidation", + params( + ("run_id" = Option, Query, description = "Optional run filter."), + ("review_state" = Option, Query, description = "Optional review-state filter."), + ("limit" = Option, Query, description = "Maximum proposals to return."), + ), + responses( + (status = 200, description = "Consolidation proposals.", body = Value), + (status = 400, description = "Invalid request.", body = ErrorBody), + (status = 401, description = "Authentication required.", body = ErrorBody), + (status = 403, description = "Admin access required.", body = ErrorBody), + (status = 500, description = "Internal error.", body = ErrorBody), + ) +)] +async fn consolidation_proposals_list( + State(state): State, + headers: HeaderMap, + query: Result, QueryRejection>, +) -> Result, ApiError> { + let ctx = RequestContext::from_headers(&headers)?; + let Query(query) = query.map_err(|err| { + tracing::warn!(error = %err, "Invalid query parameters."); + + json_error( + StatusCode::BAD_REQUEST, + "INVALID_REQUEST", + "Invalid query parameters.".to_string(), + None, + ) + })?; + let response = state + .service + .consolidation_proposals_list(ConsolidationProposalsListRequest { + tenant_id: ctx.tenant_id, + project_id: ctx.project_id, + run_id: query.run_id, + review_state: query.review_state, + limit: query.limit, + }) + .await?; + + Ok(Json(response)) +} + +#[utoipa::path( + get, + path = "/v2/admin/consolidation/proposals/{proposal_id}", + tag = "consolidation", + params(("proposal_id" = Uuid, Path, description = "Consolidation proposal ID.")), + responses( + (status = 200, description = "Consolidation proposal.", body = Value), + (status = 400, description = "Invalid request.", body = ErrorBody), + (status = 401, description = "Authentication required.", body = ErrorBody), + (status = 403, description = "Admin access required.", body = ErrorBody), + (status = 404, description = "Consolidation proposal was not found.", body = ErrorBody), + (status = 500, description = "Internal error.", body = ErrorBody), + ) +)] +async fn consolidation_proposal_get( + State(state): State, + headers: HeaderMap, + Path(proposal_id): Path, +) -> Result, ApiError> { + let ctx = RequestContext::from_headers(&headers)?; + let response = state + .service + .consolidation_proposal_get(ConsolidationProposalGetRequest { + tenant_id: ctx.tenant_id, + project_id: ctx.project_id, + proposal_id, + }) + .await?; + + Ok(Json(response)) +} + +#[utoipa::path( + post, + path = "/v2/admin/consolidation/proposals/{proposal_id}/review", + tag = "consolidation", + params(("proposal_id" = Uuid, Path, description = "Consolidation proposal ID.")), + request_body = Value, + responses( + (status = 200, description = "Consolidation proposal review action was applied.", body = Value), + (status = 400, description = "Invalid request.", body = ErrorBody), + (status = 401, description = "Authentication required.", body = ErrorBody), + (status = 403, description = "Admin access required.", body = ErrorBody), + (status = 404, description = "Consolidation proposal was not found.", body = ErrorBody), + (status = 500, description = "Internal error.", body = ErrorBody), + ) +)] +async fn consolidation_proposal_review( + State(state): State, + headers: HeaderMap, + Path(proposal_id): Path, + payload: Result, JsonRejection>, +) -> Result, ApiError> { + let ctx = RequestContext::from_headers(&headers)?; + let Json(payload) = payload.map_err(|err| { + tracing::warn!(error = %err, "Invalid request payload."); + + json_error(StatusCode::BAD_REQUEST, "INVALID_REQUEST", "Invalid request payload.", None) + })?; + let response = state + .service + .consolidation_proposal_review(ConsolidationProposalReviewRequest { + tenant_id: ctx.tenant_id, + project_id: ctx.project_id, + reviewer_agent_id: ctx.agent_id, + proposal_id, + review_action: payload.action, + review_comment: payload.review_comment, + }) + .await?; + + Ok(Json(response)) +} + #[utoipa::path( get, path = "/v2/admin/events/ingestion-profiles", diff --git a/apps/elf-api/tests/http.rs b/apps/elf-api/tests/http.rs index 92a5b113..fc7c7339 100644 --- a/apps/elf-api/tests/http.rs +++ b/apps/elf-api/tests/http.rs @@ -844,6 +844,12 @@ async fn openapi_json_route_serves_generated_contract() { assert_openapi_method(&spec, "/v2/admin/searches/raw", "post"); assert_openapi_method(&spec, "/v2/admin/events/ingestion-profiles/default", "get"); assert_openapi_method(&spec, "/v2/admin/events/ingestion-profiles/default", "put"); + assert_openapi_method(&spec, "/v2/admin/consolidation/runs", "post"); + assert_openapi_method(&spec, "/v2/admin/consolidation/runs", "get"); + assert_openapi_method(&spec, "/v2/admin/consolidation/runs/{run_id}", "get"); + assert_openapi_method(&spec, "/v2/admin/consolidation/proposals", "get"); + assert_openapi_method(&spec, "/v2/admin/consolidation/proposals/{proposal_id}", "get"); + assert_openapi_method(&spec, "/v2/admin/consolidation/proposals/{proposal_id}/review", "post"); } #[tokio::test] @@ -868,6 +874,7 @@ async fn scalar_docs_route_serves_api_reference_html() { assert!(html.contains("@scalar/api-reference")); assert!(html.contains("/v2/admin/events/ingestion-profiles/default")); + assert!(html.contains("/v2/admin/consolidation/proposals")); } #[tokio::test] diff --git a/apps/elf-eval/fixtures/real_world_memory/consolidation/contradiction_report_discard.json b/apps/elf-eval/fixtures/real_world_memory/consolidation/contradiction_report_discard.json index e24e82a9..86a0266f 100644 --- a/apps/elf-eval/fixtures/real_world_memory/consolidation/contradiction_report_discard.json +++ b/apps/elf-eval/fixtures/real_world_memory/consolidation/contradiction_report_discard.json @@ -109,6 +109,13 @@ "actual_review_action": "discard", "source_mutations": [], "unsupported_claim_count": 1, + "unsupported_claim_flags": [ + { + "claim_id": "unsupported-applied-worker-claim", + "message": "The fixture has no evidence that a consolidation worker applied source note edits in production.", + "source_ref": "unsupported-applied-draft" + } + ], "diff": { "summary": "Reject a stale source-rewrite synthesis and preserve it as a contradiction report.", "before": {}, @@ -122,14 +129,6 @@ } } } - ], - "executable_gaps": [ - { - "primitive": "live_consolidation_worker_generation", - "follow_up_issue": "[ELF vNext P1] Implement reviewable consolidation worker and proposal review flow", - "reason": "This fixture scores checked-in proposal payloads; it does not execute scheduled LLM generation.", - "blocks_fixture_pass": false - } ] } } diff --git a/apps/elf-eval/fixtures/real_world_memory/consolidation/preference_candidate_defer.json b/apps/elf-eval/fixtures/real_world_memory/consolidation/preference_candidate_defer.json index 5af09e1d..715a17cc 100644 --- a/apps/elf-eval/fixtures/real_world_memory/consolidation/preference_candidate_defer.json +++ b/apps/elf-eval/fixtures/real_world_memory/consolidation/preference_candidate_defer.json @@ -100,14 +100,6 @@ } } } - ], - "executable_gaps": [ - { - "primitive": "live_consolidation_worker_generation", - "follow_up_issue": "[ELF vNext P1] Implement reviewable consolidation worker and proposal review flow", - "reason": "This fixture scores checked-in proposal payloads; it does not execute scheduled LLM generation.", - "blocks_fixture_pass": false - } ] } } diff --git a/apps/elf-eval/fixtures/real_world_memory/consolidation/project_summary_apply.json b/apps/elf-eval/fixtures/real_world_memory/consolidation/project_summary_apply.json index 7bb750d3..0424673d 100644 --- a/apps/elf-eval/fixtures/real_world_memory/consolidation/project_summary_apply.json +++ b/apps/elf-eval/fixtures/real_world_memory/consolidation/project_summary_apply.json @@ -114,14 +114,6 @@ } } } - ], - "executable_gaps": [ - { - "primitive": "live_consolidation_worker_generation", - "follow_up_issue": "[ELF vNext P1] Implement reviewable consolidation worker and proposal review flow", - "reason": "This fixture scores checked-in proposal payloads; it does not execute scheduled LLM generation.", - "blocks_fixture_pass": false - } ] } } diff --git a/apps/elf-eval/fixtures/real_world_memory/consolidation/weekly_decision_summary_apply.json b/apps/elf-eval/fixtures/real_world_memory/consolidation/weekly_decision_summary_apply.json index 20b73944..135d5bfa 100644 --- a/apps/elf-eval/fixtures/real_world_memory/consolidation/weekly_decision_summary_apply.json +++ b/apps/elf-eval/fixtures/real_world_memory/consolidation/weekly_decision_summary_apply.json @@ -103,14 +103,6 @@ } } } - ], - "executable_gaps": [ - { - "primitive": "live_consolidation_worker_generation", - "follow_up_issue": "[ELF vNext P1] Implement reviewable consolidation worker and proposal review flow", - "reason": "This fixture scores checked-in proposal payloads; it does not execute scheduled LLM generation.", - "blocks_fixture_pass": false - } ] } } diff --git a/apps/elf-eval/src/bin/real_world_job_benchmark.rs b/apps/elf-eval/src/bin/real_world_job_benchmark.rs index 1751a7be..9c41027f 100644 --- a/apps/elf-eval/src/bin/real_world_job_benchmark.rs +++ b/apps/elf-eval/src/bin/real_world_job_benchmark.rs @@ -460,6 +460,8 @@ struct ConsolidationProposalFixture { #[serde(default)] unsupported_claim_count: usize, #[serde(default)] + unsupported_claim_flags: Vec, + #[serde(default)] diff: Value, } @@ -1484,6 +1486,12 @@ fn validate_consolidation_proposal( path.display() )); } + if proposal.unsupported_claim_flags.iter().any(|flag| !flag.is_object()) { + return Err(eyre::eyre!( + "{} consolidation unsupported-claim flags must be JSON objects.", + path.display() + )); + } Ok(()) } @@ -2852,7 +2860,9 @@ fn consolidation_proposal_report( review_action_correct: proposal.expected_review_action == proposal.actual_review_action, source_mutation_count: proposal.source_mutations.len() + forbidden_diff_key_count(&proposal.diff), - unsupported_claim_count: proposal.unsupported_claim_count, + unsupported_claim_count: proposal + .unsupported_claim_count + .max(proposal.unsupported_claim_flags.len()), } } diff --git a/apps/elf-eval/tests/real_world_job_benchmark.rs b/apps/elf-eval/tests/real_world_job_benchmark.rs index a48c3226..4199148b 100644 --- a/apps/elf-eval/tests/real_world_job_benchmark.rs +++ b/apps/elf-eval/tests/real_world_job_benchmark.rs @@ -351,7 +351,7 @@ fn consolidation_fixtures_report_reviewable_proposal_metrics() -> Result<()> { ); assert_eq!( report.pointer("/summary/consolidation/executable_gap_count").and_then(Value::as_u64), - Some(4) + Some(0) ); assert_eq!( report.pointer("/summary/consolidation/lineage_completeness").and_then(Value::as_f64), @@ -1140,8 +1140,10 @@ fn consolidation_report_renders_markdown_metrics_and_gaps() -> Result<()> { assert!(markdown.contains("## Consolidation")); assert!(markdown.contains("Source Mutations")); - assert!(markdown.contains("live_consolidation_worker_generation")); - assert!(markdown.contains("[ELF vNext P1] Implement reviewable consolidation worker")); + assert!(markdown.contains("Proposal Unsupported Claims")); + assert!(markdown.contains("Executable Gaps")); + assert!(markdown.contains("consolidation-contradiction-report-discard-001")); + assert!(!markdown.contains("live_consolidation_worker_generation")); Ok(()) } diff --git a/apps/elf-worker/Cargo.toml b/apps/elf-worker/Cargo.toml index 4f51afb6..12445e57 100644 --- a/apps/elf-worker/Cargo.toml +++ b/apps/elf-worker/Cargo.toml @@ -21,6 +21,7 @@ uuid = { workspace = true } elf-chunking = { workspace = true } elf-cli = { workspace = true } elf-config = { workspace = true } +elf-domain = { workspace = true } elf-providers = { workspace = true } elf-storage = { workspace = true } diff --git a/apps/elf-worker/src/worker.rs b/apps/elf-worker/src/worker.rs index a89604b6..53511239 100644 --- a/apps/elf-worker/src/worker.rs +++ b/apps/elf-worker/src/worker.rs @@ -17,11 +17,19 @@ use uuid::Uuid; use crate::{Error, Result}; use elf_chunking::{Chunk, ChunkingConfig, Tokenizer}; use elf_config::EmbeddingProviderConfig; +use elf_domain::consolidation::{ + CONSOLIDATION_CONTRACT_SCHEMA_V1, ConsolidationJobPayload, ConsolidationProposalContract, + ConsolidationReviewState, ConsolidationRunState, ConsolidationValidationError, +}; use elf_providers::embedding; use elf_storage::{ + consolidation::{self, ConsolidationRunStateUpdate}, db::Db, doc_outbox, docs, - models::{DocIndexingOutboxEntry, IndexingOutboxEntry, MemoryNote, TraceOutboxJob}, + models::{ + ConsolidationProposal, ConsolidationRunJob, DocIndexingOutboxEntry, IndexingOutboxEntry, + MemoryNote, TraceOutboxJob, + }, outbox, qdrant::{BM25_MODEL, BM25_VECTOR_NAME, DENSE_VECTOR_NAME, QdrantStore}, queries, @@ -35,6 +43,7 @@ const BASE_BACKOFF_MS: i64 = 500; const MAX_BACKOFF_MS: i64 = 30_000; const TRACE_CLEANUP_INTERVAL_SECONDS: i64 = 900; const TRACE_OUTBOX_LEASE_SECONDS: i64 = 30; +const CONSOLIDATION_JOB_LEASE_SECONDS: i64 = 30; const MAX_OUTBOX_ERROR_CHARS: usize = 1_024; /// Shared runtime state used by the worker loop. @@ -228,6 +237,9 @@ pub async fn run_worker(state: WorkerState) -> Result<()> { if let Err(err) = process_trace_outbox_once(&state).await { tracing::error!(error = %err, "Search trace outbox processing failed."); } + if let Err(err) = process_consolidation_run_job_once(&state).await { + tracing::error!(error = %err, "Consolidation run job processing failed."); + } let now = OffsetDateTime::now_utc(); @@ -257,6 +269,7 @@ pub async fn process_once(state: &WorkerState) -> Result<()> { process_indexing_outbox_once(state).await?; process_doc_indexing_outbox_once(state).await?; process_trace_outbox_once(state).await?; + process_consolidation_run_job_once(state).await?; Ok(()) } @@ -473,6 +486,54 @@ fn project_doc_ref_fields( Ok((doc_ts, thread_id, domain, repo)) } +fn proposal_row_from_contract( + job: &ConsolidationRunJob, + now: OffsetDateTime, + proposal: ConsolidationProposalContract, +) -> Result { + proposal.validate().map_err(consolidation_validation_error)?; + + Ok(ConsolidationProposal { + proposal_id: Uuid::new_v4(), + run_id: job.run_id, + tenant_id: job.tenant_id.clone(), + project_id: job.project_id.clone(), + agent_id: job.agent_id.clone(), + contract_schema: CONSOLIDATION_CONTRACT_SCHEMA_V1.to_string(), + proposal_kind: proposal.proposal_kind, + apply_intent: proposal.apply_intent.as_str().to_string(), + review_state: ConsolidationReviewState::Proposed.as_str().to_string(), + source_refs: encode_json(&proposal.source_refs, "consolidation source_refs")?, + source_snapshot: proposal.source_snapshot, + lineage: encode_json(&proposal.lineage, "consolidation lineage")?, + diff: encode_json(&proposal.diff, "consolidation diff")?, + confidence: proposal.confidence, + unsupported_claim_flags: encode_json( + &proposal.unsupported_claim_flags, + "consolidation unsupported_claim_flags", + )?, + contradiction_markers: encode_json( + &proposal.markers.contradictions, + "consolidation contradiction_markers", + )?, + staleness_markers: encode_json( + &proposal.markers.staleness, + "consolidation staleness_markers", + )?, + target_ref: proposal.target_ref, + proposed_payload: proposal.proposed_payload, + reviewer_agent_id: None, + review_comment: None, + reviewed_at: None, + created_at: now, + updated_at: now, + }) +} + +fn consolidation_validation_error(err: ConsolidationValidationError) -> Error { + Error::Validation(err.to_string()) +} + async fn process_indexing_outbox_once(state: &WorkerState) -> Result<()> { let now = OffsetDateTime::now_utc(); let job = outbox::claim_next_indexing_outbox_job(&state.db, now, CLAIM_LEASE_SECONDS).await?; @@ -566,6 +627,34 @@ async fn process_trace_outbox_once(state: &WorkerState) -> Result<()> { Ok(()) } +async fn process_consolidation_run_job_once(state: &WorkerState) -> Result<()> { + let now = OffsetDateTime::now_utc(); + let job = consolidation::claim_next_consolidation_run_job( + &state.db, + now, + CONSOLIDATION_JOB_LEASE_SECONDS, + ) + .await?; + let Some(job) = job else { return Ok(()) }; + let result = handle_consolidation_job(&state.db, &job).await; + + match result { + Ok(()) => {}, + Err(err) => { + tracing::error!( + error = %err, + job_id = %job.job_id, + run_id = %job.run_id, + "Consolidation run job failed." + ); + + mark_consolidation_failed(&state.db, job.job_id, job.attempts, &err).await?; + }, + } + + Ok(()) +} + async fn handle_upsert(state: &WorkerState, job: &IndexingOutboxEntry) -> Result<()> { let note = fetch_note(&state.db, job.note_id).await?; let Some(note) = note else { @@ -865,6 +954,92 @@ async fn handle_trace_job(db: &Db, job: &TraceOutboxJob) -> Result<()> { Ok(()) } +async fn handle_consolidation_job(db: &Db, job: &ConsolidationRunJob) -> Result<()> { + let payload: ConsolidationJobPayload = serde_json::from_value(job.payload.clone())?; + + payload.validate().map_err(consolidation_validation_error)?; + + let existing = consolidation::get_consolidation_run( + &db.pool, + job.tenant_id.as_str(), + job.project_id.as_str(), + job.run_id, + ) + .await? + .ok_or_else(|| Error::Validation("Consolidation run does not exist.".to_string()))?; + let current_state = + ConsolidationRunState::parse(existing.status.as_str()).ok_or_else(|| { + Error::Validation("Stored consolidation run status is invalid.".to_string()) + })?; + let now = OffsetDateTime::now_utc(); + let mut tx = db.pool.begin().await?; + + match current_state { + ConsolidationRunState::Pending => { + current_state + .validate_transition(ConsolidationRunState::Running) + .map_err(consolidation_validation_error)?; + + let empty_error = Value::Object(Default::default()); + + consolidation::update_consolidation_run_state( + &mut *tx, + ConsolidationRunStateUpdate { + tenant_id: job.tenant_id.as_str(), + project_id: job.project_id.as_str(), + run_id: job.run_id, + status: ConsolidationRunState::Running.as_str(), + error: &empty_error, + now, + }, + ) + .await? + .ok_or_else(|| Error::Validation("Consolidation run disappeared.".to_string()))?; + }, + ConsolidationRunState::Running => {}, + ConsolidationRunState::Completed + | ConsolidationRunState::Failed + | ConsolidationRunState::Cancelled => { + consolidation::mark_consolidation_run_job_done(&mut *tx, job.job_id, now).await?; + + tx.commit().await?; + + return Ok(()); + }, + } + + for proposal in payload.proposals { + let row = proposal_row_from_contract(job, now, proposal)?; + + consolidation::insert_consolidation_proposal(&mut *tx, &row).await?; + } + + ConsolidationRunState::Running + .validate_transition(ConsolidationRunState::Completed) + .map_err(consolidation_validation_error)?; + + let empty_error = Value::Object(Default::default()); + + consolidation::update_consolidation_run_state( + &mut *tx, + ConsolidationRunStateUpdate { + tenant_id: job.tenant_id.as_str(), + project_id: job.project_id.as_str(), + run_id: job.run_id, + status: ConsolidationRunState::Completed.as_str(), + error: &empty_error, + now, + }, + ) + .await? + .ok_or_else(|| Error::Validation("Consolidation run disappeared.".to_string()))?; + consolidation::mark_consolidation_run_job_done(&mut *tx, job.job_id, now).await?; + + tx.commit().await?; + + Ok(()) +} + async fn insert_trace_stages_tx( executor: &mut PgConnection, trace_id: Uuid, @@ -1441,6 +1616,31 @@ async fn mark_trace_failed(db: &Db, outbox_id: Uuid, attempts: i32, err: &Error) Ok(()) } +async fn mark_consolidation_failed( + db: &Db, + job_id: Uuid, + attempts: i32, + err: &Error, +) -> Result<()> { + let next_attempts = attempts.saturating_add(1); + let backoff = backoff_for_attempt(next_attempts); + let now = OffsetDateTime::now_utc(); + let available_at = now + backoff; + let error_text = sanitize_outbox_error(&err.to_string()); + + consolidation::mark_consolidation_run_job_failed( + db, + job_id, + next_attempts, + error_text.as_str(), + available_at, + now, + ) + .await?; + + Ok(()) +} + #[cfg(test)] mod tests { use serde_json; diff --git a/docs/guide/benchmarking/real_world_agent_memory_benchmark.md b/docs/guide/benchmarking/real_world_agent_memory_benchmark.md index f26afadb..cef2c363 100644 --- a/docs/guide/benchmarking/real_world_agent_memory_benchmark.md +++ b/docs/guide/benchmarking/real_world_agent_memory_benchmark.md @@ -314,9 +314,9 @@ proposal usefulness, lineage completeness, review action correctness, proposal unsupported-claim count, executable gap count, and source mutation count. Source mutation count must remain `0` for proposal-only cases. -These fixtures encode proposal expectations only. They do not claim that a live -scheduled consolidation worker generated the proposals; the report records that missing -primitive as an executable gap with a follow-up issue title. +These fixtures use the same reviewable proposal shape as the runtime manual/fixture +consolidation service. They remain offline fixture responses and do not claim scheduled +provider-backed proposal generation. Current checked-in knowledge-compilation increment: diff --git a/docs/spec/real_world_agent_memory_benchmark_v1.md b/docs/spec/real_world_agent_memory_benchmark_v1.md index 67bdba04..9da9c453 100644 --- a/docs/spec/real_world_agent_memory_benchmark_v1.md +++ b/docs/spec/real_world_agent_memory_benchmark_v1.md @@ -558,10 +558,10 @@ Consolidation suite reports MUST also include: - source mutation count. For proposal-only consolidation jobs, source mutation count MUST be `0`. If the runner -cannot execute a live consolidation primitive, the report MUST include an executable -gap with a precise follow-up issue or issue title. A proposal-only fixture MAY still -pass when it verifies checked-in proposal payloads and the gap explicitly says that no -live worker generation claim is being made. +or adapter cannot execute the consolidation primitive it claims to evaluate, the report +MUST include an executable gap with a precise follow-up issue or issue title. Offline +fixtures MAY still pass when they verify checked-in proposal payloads and clearly avoid +claiming scheduled provider-backed generation. ## Claim Rules diff --git a/docs/spec/system_consolidation_proposals_v1.md b/docs/spec/system_consolidation_proposals_v1.md index ff27cd1a..35f2f95a 100644 --- a/docs/spec/system_consolidation_proposals_v1.md +++ b/docs/spec/system_consolidation_proposals_v1.md @@ -97,6 +97,57 @@ Allowed run transitions: Terminal states are `completed`, `failed`, and `cancelled`. +## Worker Job Contract + +Storage table: `consolidation_run_jobs`. + +The first runtime implementation is queue-backed and deterministic. Creating a +fixture or manual consolidation run stores the immutable run input snapshot, enqueues +one worker job, and returns the run plus `job_id`. The worker materializes queued +proposal payloads into `consolidation_proposals`; API creation must not call LLM, +embedding, rerank, or external provider adapters. + +Required fields: + +- `job_id` +- `run_id` +- `tenant_id` +- `project_id` +- `agent_id` +- `job_kind` +- `status` +- `payload` +- `attempts` +- `last_error` +- `available_at` +- `created_at` +- `updated_at` + +Job states: + +- `PENDING` +- `CLAIMED` +- `DONE` +- `FAILED` + +`payload` is a JSON object with: + +- `contract_schema = "elf.consolidation/v1"` +- `proposals`: array of proposal contracts matching this spec + +Worker rules: + +- Claim one due `PENDING`, expired `CLAIMED`, or retryable `FAILED` job with a lease. +- Validate `payload.contract_schema` and every proposal before persistence. +- Transition the run through `pending -> running -> completed` when materialization + succeeds. +- Insert proposals with `review_state = proposed`. +- Mark the job `DONE` in the same transaction as the proposal and run-state writes. +- On failure, mark the job `FAILED`, increment attempts, preserve a bounded error, and + schedule retry. +- Never mutate authoritative source notes, events, docs, traces, graph facts, or + search traces. + ## Proposal Contract Storage table: `consolidation_proposals`. @@ -117,6 +168,7 @@ Required fields: - `lineage` - `diff` - `confidence` +- `unsupported_claim_flags` - `contradiction_markers` - `staleness_markers` - `target_ref` @@ -132,6 +184,12 @@ Required fields: `lineage` must include non-empty `source_refs`. It may also include `parent_run_id` and `parent_proposal_ids`. +`unsupported_claim_flags` is a reviewer prompt array. Each flag has: + +- `claim_id`: optional stable claim identifier +- `message`: non-empty reviewer-facing text +- `source`: optional source reference + `contradiction_markers` and `staleness_markers` are review prompts. Each marker has: - `severity`: `low`, `medium`, or `high` @@ -186,16 +244,28 @@ Terminal states are `rejected`, `applied`, and `archived`. `applied` means the proposal has been approved and marked as applied to the derived target. It does not mean authoritative source memory was changed. +Operator review actions map to the lifecycle states: + +- `approve`: `proposed -> approved` +- `apply`: `approved -> applied`, or `proposed -> approved -> applied` with both + transitions audited +- `discard`: `proposed|approved -> rejected` +- `defer`: `proposed|approved -> archived` + +Every review transition must write an append-only audit event with proposal id, run id, +reviewer agent id, action, prior state, next state, optional comment, and timestamp. + ## Service Boundary The first implementation exposes fixture-driven service flows: -- create a consolidation run with optional proposal payloads +- create a consolidation run with optional proposal payloads and queued worker `job_id` - list consolidation runs - get a consolidation run - list consolidation proposals - get a consolidation proposal -- transition proposal review state +- transition proposal review state through `approve`, `apply`, `discard`, and `defer` + actions with review-event readback These flows must not call LLM, embedding, rerank, or external provider adapters. diff --git a/docs/spec/system_elf_memory_service_v2.md b/docs/spec/system_elf_memory_service_v2.md index d103944a..a9fe99c8 100644 --- a/docs/spec/system_elf_memory_service_v2.md +++ b/docs/spec/system_elf_memory_service_v2.md @@ -980,6 +980,32 @@ Behavior: - These endpoints mirror the public note list/detail reads for local admin viewer use. - Note metadata that includes `created_at`, `hit_count`, and `last_hit_at` is available through `GET /v2/admin/notes/{note_id}/provenance`. +Admin consolidation proposal review: +- POST /v2/admin/consolidation/runs +- GET /v2/admin/consolidation/runs +- GET /v2/admin/consolidation/runs/{run_id} +- GET /v2/admin/consolidation/proposals +- GET /v2/admin/consolidation/proposals/{proposal_id} +- POST /v2/admin/consolidation/proposals/{proposal_id}/review + +Behavior: +- These endpoints expose fixture-driven or manually supplied consolidation runs and + reviewable derived proposals. +- Creating a consolidation run enqueues a deterministic `consolidation_run_jobs` + worker job and returns `job_id`; the worker materializes supplied proposal payloads + into `consolidation_proposals`. +- Proposal payloads must follow `elf.consolidation/v1`, carry source refs and + snapshots, and may include unsupported-claim flags, contradiction markers, and + staleness markers for reviewer inspection. +- Review action values are `approve`, `apply`, `discard`, and `defer`. +- `apply` records an approval transition before the applied transition when a proposal + starts from `proposed`. +- Every review action writes append-only review audit events returned by proposal + detail readback. +- These endpoints must not call LLM, embedding, rerank, or external provider adapters. +- They must not mutate authoritative source notes, docs, events, traces, graph facts, + or search traces. + POST /v2/admin/qdrant/rebuild Behavior: diff --git a/packages/elf-domain/src/consolidation.rs b/packages/elf-domain/src/consolidation.rs index cd957554..e9af2075 100644 --- a/packages/elf-domain/src/consolidation.rs +++ b/packages/elf-domain/src/consolidation.rs @@ -63,6 +63,8 @@ pub enum ConsolidationValidationError { /// Name of the invalid field. field: &'static str, }, + /// The queued contract schema did not match the consolidation v1 contract. + InvalidContractSchema, } impl Display for ConsolidationValidationError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -79,6 +81,8 @@ impl Display for ConsolidationValidationError { Self::InvalidRunTransition { from, to } => write!(f, "invalid consolidation run transition from {from:?} to {to:?}"), Self::UnknownState { field } => write!(f, "{field} is not a known state"), + Self::InvalidContractSchema => + write!(f, "contract_schema must be elf.consolidation/v1"), } } } @@ -234,6 +238,40 @@ impl ConsolidationMarkers { } } +/// Unsupported-claim marker attached to a proposal for reviewer inspection. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct ConsolidationUnsupportedClaimFlag { + /// Stable claim identifier when the source fixture or worker supplies one. + pub claim_id: Option, + /// Human-readable unsupported-claim description. + pub message: String, + /// Optional source that demonstrates why the claim is unsupported. + pub source: Option, +} +impl ConsolidationUnsupportedClaimFlag { + /// Validates unsupported-claim marker content and optional source evidence. + pub fn validate(&self) -> Result<(), ConsolidationValidationError> { + if self.message.trim().is_empty() { + return Err(ConsolidationValidationError::EmptyText { + field: "unsupported_claim_flags.message", + }); + } + + if let Some(claim_id) = &self.claim_id + && claim_id.trim().is_empty() + { + return Err(ConsolidationValidationError::EmptyText { + field: "unsupported_claim_flags.claim_id", + }); + } + if let Some(source) = &self.source { + source.validate()?; + } + + Ok(()) + } +} + /// Derived-output apply intent for a reviewable proposal. #[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)] #[serde(rename_all = "snake_case")] @@ -265,6 +303,31 @@ impl ConsolidationApplyIntent { } } +/// Reviewer action requested for a consolidation proposal. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum ConsolidationReviewAction { + /// Approve a proposal for later application. + Approve, + /// Apply an approved proposal to a derived target. + Apply, + /// Discard a proposal as rejected. + Discard, + /// Defer a proposal by archiving it for later audit. + Defer, +} +impl ConsolidationReviewAction { + /// Returns the canonical storage string. + pub fn as_str(self) -> &'static str { + match self { + Self::Approve => "approve", + Self::Apply => "apply", + Self::Discard => "discard", + Self::Defer => "defer", + } + } +} + /// Review lifecycle for a consolidation proposal. #[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)] #[serde(rename_all = "snake_case")] @@ -439,6 +502,9 @@ pub struct ConsolidationProposalContract { pub lineage: ConsolidationLineage, /// Model or fixture confidence in the proposal. pub confidence: f32, + #[serde(default)] + /// Unsupported claims that the reviewer must inspect before accepting a proposal. + pub unsupported_claim_flags: Vec, /// Review markers for contradiction and staleness checks. pub markers: ConsolidationMarkers, /// Reviewable derived-output diff. @@ -467,6 +533,11 @@ impl ConsolidationProposalContract { } self.markers.validate()?; + + for flag in &self.unsupported_claim_flags { + flag.validate()?; + } + self.diff.validate()?; validate_json_object("target_ref", &self.target_ref)?; @@ -476,6 +547,30 @@ impl ConsolidationProposalContract { } } +/// Worker payload for materializing one consolidation run. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct ConsolidationJobPayload { + /// Versioned consolidation contract schema. + pub contract_schema: String, + #[serde(default)] + /// Proposals to persist for review. + pub proposals: Vec, +} +impl ConsolidationJobPayload { + /// Validates the queued worker payload and all proposal contracts. + pub fn validate(&self) -> Result<(), ConsolidationValidationError> { + if self.contract_schema != CONSOLIDATION_CONTRACT_SCHEMA_V1 { + return Err(ConsolidationValidationError::InvalidContractSchema); + } + + for proposal in &self.proposals { + proposal.validate()?; + } + + Ok(()) + } +} + /// Validates a source reference list. pub fn validate_source_refs( source_refs: &[ConsolidationInputRef], diff --git a/packages/elf-domain/tests/consolidation.rs b/packages/elf-domain/tests/consolidation.rs index 6d815d0f..65828267 100644 --- a/packages/elf-domain/tests/consolidation.rs +++ b/packages/elf-domain/tests/consolidation.rs @@ -6,10 +6,11 @@ use time::OffsetDateTime; use uuid::Uuid; use elf_domain::consolidation::{ - ConsolidationApplyIntent, ConsolidationInputRef, ConsolidationLineage, ConsolidationMarkers, - ConsolidationProposalContract, ConsolidationProposalDiff, ConsolidationReviewState, - ConsolidationRunState, ConsolidationSourceKind, ConsolidationSourceSnapshot, - ConsolidationValidationError, + CONSOLIDATION_CONTRACT_SCHEMA_V1, ConsolidationApplyIntent, ConsolidationInputRef, + ConsolidationJobPayload, ConsolidationLineage, ConsolidationMarkers, + ConsolidationProposalContract, ConsolidationProposalDiff, ConsolidationReviewAction, + ConsolidationReviewState, ConsolidationRunState, ConsolidationSourceKind, + ConsolidationSourceSnapshot, ConsolidationUnsupportedClaimFlag, ConsolidationValidationError, }; #[test] @@ -62,6 +63,23 @@ fn proposal_contract_rejects_destructive_diff_payloads() { assert_eq!(proposal.validate(), Err(ConsolidationValidationError::DestructiveDiff)); } +#[test] +fn unsupported_claim_flags_require_reviewer_text() { + let source = source_ref(); + let mut proposal = proposal_contract(source.clone()); + + proposal.unsupported_claim_flags = vec![ConsolidationUnsupportedClaimFlag { + claim_id: Some("unsupported-worker-claim".to_string()), + message: " ".to_string(), + source: Some(source), + }]; + + assert_eq!( + proposal.validate(), + Err(ConsolidationValidationError::EmptyText { field: "unsupported_claim_flags.message" }) + ); +} + #[test] fn destructive_apply_intents_are_not_part_of_the_contract() { let parsed = @@ -70,6 +88,19 @@ fn destructive_apply_intents_are_not_part_of_the_contract() { assert!(parsed.is_err()); } +#[test] +fn review_actions_use_explicit_operator_vocabulary() { + let action = serde_json::from_value::(serde_json::json!("defer")) + .expect("review action should parse"); + + assert_eq!(action.as_str(), "defer"); + + let parsed = + serde_json::from_value::(serde_json::json!("silently_apply")); + + assert!(parsed.is_err()); +} + #[test] fn proposal_lifecycle_requires_approval_before_apply() { assert!( @@ -111,6 +142,21 @@ fn run_lifecycle_rejects_skipping_generation_state() { ); } +#[test] +fn queued_payload_requires_consolidation_contract_schema() { + let source = source_ref(); + let mut payload = ConsolidationJobPayload { + contract_schema: CONSOLIDATION_CONTRACT_SCHEMA_V1.to_string(), + proposals: vec![proposal_contract(source)], + }; + + assert!(payload.validate().is_ok()); + + payload.contract_schema = "elf.consolidation/v0".to_string(); + + assert_eq!(payload.validate(), Err(ConsolidationValidationError::InvalidContractSchema)); +} + fn proposal_contract(source: ConsolidationInputRef) -> ConsolidationProposalContract { let lineage = ConsolidationLineage { source_refs: vec![source.clone()], @@ -125,6 +171,7 @@ fn proposal_contract(source: ConsolidationInputRef) -> ConsolidationProposalCont source_snapshot: serde_json::json!({ "window": "fixture" }), lineage, confidence: 0.85, + unsupported_claim_flags: Vec::new(), markers: ConsolidationMarkers::default(), diff: ConsolidationProposalDiff { summary: "Create one derived note from stable evidence.".to_string(), diff --git a/packages/elf-service/src/consolidation.rs b/packages/elf-service/src/consolidation.rs index b5194834..9ac2a32f 100644 --- a/packages/elf-service/src/consolidation.rs +++ b/packages/elf-service/src/consolidation.rs @@ -2,19 +2,23 @@ use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; -use time::OffsetDateTime; +use time::{Duration, OffsetDateTime}; use uuid::Uuid; use crate::{ElfService, Error, Result}; use elf_domain::consolidation::{ self, CONSOLIDATION_CONTRACT_SCHEMA_V1, ConsolidationApplyIntent, ConsolidationInputRef, - ConsolidationLineage, ConsolidationMarkers, ConsolidationProposalContract, - ConsolidationProposalDiff, ConsolidationReviewState, ConsolidationRunState, + ConsolidationJobPayload, ConsolidationLineage, ConsolidationMarkers, + ConsolidationProposalContract, ConsolidationProposalDiff, ConsolidationReviewAction, + ConsolidationReviewState, ConsolidationRunState, ConsolidationUnsupportedClaimFlag, ConsolidationValidationError, }; use elf_storage::{ - consolidation::{ConsolidationProposalReviewUpdate, ConsolidationRunStateUpdate}, - models::{ConsolidationProposal, ConsolidationRun}, + consolidation::{ + ConsolidationProposalReviewEventInsert, ConsolidationProposalReviewUpdate, + ConsolidationRunJobInsert, + }, + models::{ConsolidationProposal, ConsolidationProposalReviewEvent, ConsolidationRun}, }; const DEFAULT_LIST_LIMIT: i64 = 50; @@ -29,11 +33,11 @@ pub struct ConsolidationRunCreateRequest { pub project_id: String, /// Agent registering the run. pub agent_id: String, - /// Job kind, such as `fixture`, `manual`, or `scheduled`. + /// Job kind, such as `fixture` or `manual`. pub job_kind: String, /// Input references considered by the run. pub input_refs: Vec, - #[serde(default)] + #[serde(default = "empty_object")] /// Aggregate source snapshot metadata for the run. pub source_snapshot: Value, /// Run lineage. @@ -52,7 +56,7 @@ pub struct ConsolidationProposalInput { pub apply_intent: ConsolidationApplyIntent, /// Source references directly supporting the proposal. pub source_refs: Vec, - #[serde(default)] + #[serde(default = "empty_object")] /// Aggregate source snapshot metadata for reviewer inspection. pub source_snapshot: Value, /// Proposal lineage. @@ -60,33 +64,35 @@ pub struct ConsolidationProposalInput { /// Fixture confidence in the proposal. pub confidence: f32, #[serde(default)] + /// Unsupported claims reviewers must inspect before accepting the proposal. + pub unsupported_claim_flags: Vec, + #[serde(default)] /// Review markers for contradiction and staleness checks. pub markers: ConsolidationMarkers, /// Reviewable derived-output diff. pub diff: ConsolidationProposalDiff, - #[serde(default)] + #[serde(default = "empty_object")] /// Derived target reference, when the target already exists. pub target_ref: Value, - #[serde(default)] + #[serde(default = "empty_object")] /// Proposed derived output payload. pub proposed_payload: Value, } impl ConsolidationProposalInput { - fn validate(&self) -> Result<()> { - let contract = ConsolidationProposalContract { - proposal_kind: self.proposal_kind.clone(), + fn into_contract(self) -> ConsolidationProposalContract { + ConsolidationProposalContract { + proposal_kind: self.proposal_kind, apply_intent: self.apply_intent, - source_refs: self.source_refs.clone(), - source_snapshot: self.source_snapshot.clone(), - lineage: self.lineage.clone(), + source_refs: self.source_refs, + source_snapshot: self.source_snapshot, + lineage: self.lineage, confidence: self.confidence, - markers: self.markers.clone(), - diff: self.diff.clone(), - target_ref: self.target_ref.clone(), - proposed_payload: self.proposed_payload.clone(), - }; - - contract.validate().map_err(validation_error) + unsupported_claim_flags: self.unsupported_claim_flags, + markers: self.markers, + diff: self.diff, + target_ref: self.target_ref, + proposed_payload: self.proposed_payload, + } } } @@ -95,6 +101,8 @@ impl ConsolidationProposalInput { pub struct ConsolidationRunCreateResponse { /// Created run. pub run: ConsolidationRunResponse, + /// Enqueued worker job identifier. + pub job_id: Uuid, /// Proposals stored with the run. pub proposals: Vec, } @@ -141,7 +149,7 @@ pub struct ConsolidationRunResponse { pub agent_id: String, /// Versioned consolidation contract schema. pub contract_schema: String, - /// Job kind, such as fixture, manual, or scheduled. + /// Job kind, such as fixture or manual. pub job_kind: String, /// Current run state. pub status: String, @@ -214,23 +222,67 @@ pub struct ConsolidationProposalsListResponse { pub proposals: Vec, } -/// Request to transition a proposal review state. +/// Request to apply one proposal review action. #[derive(Clone, Debug, Deserialize)] pub struct ConsolidationProposalReviewRequest { /// Tenant that owns the proposal. pub tenant_id: String, /// Project that owns the proposal. pub project_id: String, - /// Agent performing the review transition. + /// Agent performing the review action. pub reviewer_agent_id: String, /// Proposal identifier. pub proposal_id: Uuid, - /// Requested review state. - pub review_state: ConsolidationReviewState, + /// Requested review action. + pub review_action: ConsolidationReviewAction, /// Optional reviewer comment. pub review_comment: Option, } +/// Public consolidation proposal review audit DTO. +#[derive(Clone, Debug, Serialize)] +pub struct ConsolidationProposalReviewEventResponse { + /// Review event identifier. + pub review_id: Uuid, + /// Reviewed proposal identifier. + pub proposal_id: Uuid, + /// Parent consolidation run identifier. + pub run_id: Uuid, + /// Tenant that owns the proposal. + pub tenant_id: String, + /// Project that owns the proposal. + pub project_id: String, + /// Agent that performed the review action. + pub reviewer_agent_id: String, + /// Review action requested by the reviewer. + pub action: String, + /// Review state before the transition. + pub from_review_state: String, + /// Review state after the transition. + pub to_review_state: String, + /// Optional reviewer comment. + pub review_comment: Option, + /// Creation timestamp. + pub created_at: OffsetDateTime, +} +impl From for ConsolidationProposalReviewEventResponse { + fn from(event: ConsolidationProposalReviewEvent) -> Self { + Self { + review_id: event.review_id, + proposal_id: event.proposal_id, + run_id: event.run_id, + tenant_id: event.tenant_id, + project_id: event.project_id, + reviewer_agent_id: event.reviewer_agent_id, + action: event.action, + from_review_state: event.from_review_state, + to_review_state: event.to_review_state, + review_comment: event.review_comment, + created_at: event.created_at, + } + } +} + /// Public consolidation proposal DTO. #[derive(Clone, Debug, Serialize)] pub struct ConsolidationProposalResponse { @@ -262,6 +314,8 @@ pub struct ConsolidationProposalResponse { pub diff: Value, /// Proposal confidence score. pub confidence: f32, + /// Serialized unsupported-claim flags. + pub unsupported_claim_flags: Value, /// Serialized contradiction markers. pub contradiction_markers: Value, /// Serialized staleness markers. @@ -280,6 +334,8 @@ pub struct ConsolidationProposalResponse { pub created_at: OffsetDateTime, /// Last update timestamp. pub updated_at: OffsetDateTime, + /// Append-only review events for detail readback. + pub review_events: Vec, } impl From for ConsolidationProposalResponse { fn from(proposal: ConsolidationProposal) -> Self { @@ -298,6 +354,7 @@ impl From for ConsolidationProposalResponse { lineage: proposal.lineage, diff: proposal.diff, confidence: proposal.confidence, + unsupported_claim_flags: proposal.unsupported_claim_flags, contradiction_markers: proposal.contradiction_markers, staleness_markers: proposal.staleness_markers, target_ref: proposal.target_ref, @@ -307,6 +364,7 @@ impl From for ConsolidationProposalResponse { reviewed_at: proposal.reviewed_at, created_at: proposal.created_at, updated_at: proposal.updated_at, + review_events: Vec::new(), } } } @@ -326,25 +384,26 @@ impl ElfService { req.lineage.validate().map_err(validation_error)?; - for proposal in &req.proposals { - proposal.validate()?; - } + let proposal_contracts = + req.proposals.into_iter().map(ConsolidationProposalInput::into_contract).collect(); + let payload = ConsolidationJobPayload { + contract_schema: CONSOLIDATION_CONTRACT_SCHEMA_V1.to_string(), + proposals: proposal_contracts, + }; + + payload.validate().map_err(validation_error)?; - let has_proposals = !req.proposals.is_empty(); let now = OffsetDateTime::now_utc(); - let run_state = if has_proposals { - ConsolidationRunState::Running - } else { - ConsolidationRunState::Pending - }; + let run_state = ConsolidationRunState::Pending; let run_id = Uuid::new_v4(); - let mut run = ConsolidationRun { + let job_id = Uuid::new_v4(); + let run = ConsolidationRun { run_id, tenant_id: req.tenant_id.clone(), project_id: req.project_id.clone(), agent_id: req.agent_id.clone(), contract_schema: CONSOLIDATION_CONTRACT_SCHEMA_V1.to_string(), - job_kind: req.job_kind, + job_kind: req.job_kind.clone(), status: run_state.as_str().to_string(), input_refs: to_value(&req.input_refs)?, source_snapshot: req.source_snapshot, @@ -354,53 +413,32 @@ impl ElfService { updated_at: now, completed_at: terminal_time(run_state, now), }; - let mut proposals = Vec::with_capacity(req.proposals.len()); + let payload_value = to_value(&payload)?; let mut tx = self.db.pool.begin().await?; elf_storage::consolidation::insert_consolidation_run(&mut *tx, &run).await?; - - for input in req.proposals { - let proposal = proposal_row_from_input( + elf_storage::consolidation::insert_consolidation_run_job( + &mut *tx, + ConsolidationRunJobInsert { + job_id, run_id, - req.tenant_id.as_str(), - req.project_id.as_str(), - req.agent_id.as_str(), + tenant_id: req.tenant_id.as_str(), + project_id: req.project_id.as_str(), + agent_id: req.agent_id.as_str(), + job_kind: req.job_kind.as_str(), + payload: &payload_value, now, - input, - )?; - - elf_storage::consolidation::insert_consolidation_proposal(&mut *tx, &proposal).await?; - - proposals.push(ConsolidationProposalResponse::from(proposal)); - } - - if has_proposals { - run_state - .validate_transition(ConsolidationRunState::Completed) - .map_err(validation_error)?; - - let terminal_error = empty_object(); - - run = elf_storage::consolidation::update_consolidation_run_state( - &mut *tx, - ConsolidationRunStateUpdate { - tenant_id: req.tenant_id.as_str(), - project_id: req.project_id.as_str(), - run_id, - status: ConsolidationRunState::Completed.as_str(), - error: &terminal_error, - now, - }, - ) - .await? - .ok_or_else(|| Error::NotFound { - message: "consolidation run not found".to_string(), - })?; - } + }, + ) + .await?; tx.commit().await?; - Ok(ConsolidationRunCreateResponse { run: ConsolidationRunResponse::from(run), proposals }) + Ok(ConsolidationRunCreateResponse { + run: ConsolidationRunResponse::from(run), + job_id, + proposals: Vec::new(), + }) } /// Fetches one consolidation run. @@ -453,8 +491,18 @@ impl ElfService { .ok_or_else(|| Error::NotFound { message: "consolidation proposal not found".to_string(), })?; + let review_events = self + .consolidation_proposal_review_events( + req.tenant_id.as_str(), + req.project_id.as_str(), + req.proposal_id, + ) + .await?; + let mut response = ConsolidationProposalResponse::from(proposal); + + response.review_events = review_events; - Ok(ConsolidationProposalResponse::from(proposal)) + Ok(response) } /// Lists consolidation proposals. @@ -478,7 +526,7 @@ impl ElfService { Ok(ConsolidationProposalsListResponse { proposals }) } - /// Applies one allowed proposal review-state transition. + /// Applies one allowed proposal review action. pub async fn consolidation_proposal_review( &self, req: ConsolidationProposalReviewRequest, @@ -505,65 +553,88 @@ impl ElfService { message: "stored proposal review_state is invalid".to_string(), } })?; + let now = OffsetDateTime::now_utc(); + let steps = review_steps(current, req.review_action)?; + let mut tx = self.db.pool.begin().await?; + let mut last_state = current; + let mut updated = existing; + + for (step_index, (action, next_state)) in steps.into_iter().enumerate() { + last_state.validate_transition(next_state).map_err(validation_error)?; + + let transition_time = now.saturating_add(Duration::milliseconds(step_index as i64)); + + elf_storage::consolidation::insert_consolidation_proposal_review_event( + &mut *tx, + ConsolidationProposalReviewEventInsert { + review_id: Uuid::new_v4(), + proposal_id: req.proposal_id, + run_id: updated.run_id, + tenant_id: req.tenant_id.as_str(), + project_id: req.project_id.as_str(), + reviewer_agent_id: req.reviewer_agent_id.as_str(), + action: action.as_str(), + from_review_state: last_state.as_str(), + to_review_state: next_state.as_str(), + review_comment: req.review_comment.as_deref(), + created_at: transition_time, + }, + ) + .await?; + + updated = elf_storage::consolidation::update_consolidation_proposal_review( + &mut *tx, + ConsolidationProposalReviewUpdate { + tenant_id: req.tenant_id.as_str(), + project_id: req.project_id.as_str(), + proposal_id: req.proposal_id, + review_state: next_state.as_str(), + reviewer_agent_id: req.reviewer_agent_id.as_str(), + review_comment: req.review_comment.as_deref(), + now: transition_time, + }, + ) + .await? + .ok_or_else(|| Error::NotFound { + message: "consolidation proposal not found".to_string(), + })?; + last_state = next_state; + } - current.validate_transition(req.review_state).map_err(validation_error)?; + tx.commit().await?; - let updated = elf_storage::consolidation::update_consolidation_proposal_review( + let review_events = self + .consolidation_proposal_review_events( + req.tenant_id.as_str(), + req.project_id.as_str(), + req.proposal_id, + ) + .await?; + let mut response = ConsolidationProposalResponse::from(updated); + + response.review_events = review_events; + + Ok(response) + } + + async fn consolidation_proposal_review_events( + &self, + tenant_id: &str, + project_id: &str, + proposal_id: Uuid, + ) -> Result> { + let events = elf_storage::consolidation::list_consolidation_proposal_review_events( &self.db.pool, - ConsolidationProposalReviewUpdate { - tenant_id: req.tenant_id.as_str(), - project_id: req.project_id.as_str(), - proposal_id: req.proposal_id, - review_state: req.review_state.as_str(), - reviewer_agent_id: req.reviewer_agent_id.as_str(), - review_comment: req.review_comment.as_deref(), - now: OffsetDateTime::now_utc(), - }, + tenant_id, + project_id, + proposal_id, ) - .await? - .ok_or_else(|| Error::NotFound { - message: "consolidation proposal not found".to_string(), - })?; + .await?; - Ok(ConsolidationProposalResponse::from(updated)) + Ok(events.into_iter().map(ConsolidationProposalReviewEventResponse::from).collect()) } } -fn proposal_row_from_input( - run_id: Uuid, - tenant_id: &str, - project_id: &str, - agent_id: &str, - now: OffsetDateTime, - input: ConsolidationProposalInput, -) -> Result { - Ok(ConsolidationProposal { - proposal_id: Uuid::new_v4(), - run_id, - tenant_id: tenant_id.to_string(), - project_id: project_id.to_string(), - agent_id: agent_id.to_string(), - contract_schema: CONSOLIDATION_CONTRACT_SCHEMA_V1.to_string(), - proposal_kind: input.proposal_kind, - apply_intent: input.apply_intent.as_str().to_string(), - review_state: ConsolidationReviewState::Proposed.as_str().to_string(), - source_refs: to_value(&input.source_refs)?, - source_snapshot: input.source_snapshot, - lineage: to_value(&input.lineage)?, - diff: to_value(&input.diff)?, - confidence: input.confidence, - contradiction_markers: to_value(&input.markers.contradictions)?, - staleness_markers: to_value(&input.markers.staleness)?, - target_ref: input.target_ref, - proposed_payload: input.proposed_payload, - reviewer_agent_id: None, - review_comment: None, - reviewed_at: None, - created_at: now, - updated_at: now, - }) -} - fn validate_context(tenant_id: &str, project_id: &str, agent_id: &str) -> Result<()> { validate_non_empty("tenant_id", tenant_id)?; validate_non_empty("project_id", project_id)?; @@ -572,7 +643,14 @@ fn validate_context(tenant_id: &str, project_id: &str, agent_id: &str) -> Result } fn validate_job_kind(job_kind: &str) -> Result<()> { - validate_non_empty("job_kind", job_kind) + validate_non_empty("job_kind", job_kind)?; + + match job_kind { + "fixture" | "manual" => Ok(()), + _ => Err(Error::InvalidRequest { + message: "job_kind must be fixture or manual for consolidation v1.".to_string(), + }), + } } fn validate_non_empty(field: &'static str, value: &str) -> Result<()> { @@ -595,6 +673,41 @@ fn validation_error(err: ConsolidationValidationError) -> Error { Error::InvalidRequest { message: err.to_string() } } +fn review_steps( + current: ConsolidationReviewState, + action: ConsolidationReviewAction, +) -> Result> { + let steps = match action { + ConsolidationReviewAction::Approve => + vec![(ConsolidationReviewAction::Approve, ConsolidationReviewState::Approved)], + ConsolidationReviewAction::Apply => match current { + ConsolidationReviewState::Proposed => vec![ + (ConsolidationReviewAction::Approve, ConsolidationReviewState::Approved), + (ConsolidationReviewAction::Apply, ConsolidationReviewState::Applied), + ], + ConsolidationReviewState::Approved => + vec![(ConsolidationReviewAction::Apply, ConsolidationReviewState::Applied)], + ConsolidationReviewState::Rejected + | ConsolidationReviewState::Applied + | ConsolidationReviewState::Archived => + vec![(ConsolidationReviewAction::Apply, ConsolidationReviewState::Applied)], + }, + ConsolidationReviewAction::Discard => + vec![(ConsolidationReviewAction::Discard, ConsolidationReviewState::Rejected)], + ConsolidationReviewAction::Defer => + vec![(ConsolidationReviewAction::Defer, ConsolidationReviewState::Archived)], + }; + let mut state = current; + + for (_, next_state) in &steps { + state.validate_transition(*next_state).map_err(validation_error)?; + + state = *next_state; + } + + Ok(steps) +} + fn bounded_limit(limit: Option) -> i64 { limit.map(i64::from).unwrap_or(DEFAULT_LIST_LIMIT).clamp(1, MAX_LIST_LIMIT) } diff --git a/packages/elf-service/src/lib.rs b/packages/elf-service/src/lib.rs index 55f98c4d..af866289 100644 --- a/packages/elf-service/src/lib.rs +++ b/packages/elf-service/src/lib.rs @@ -40,10 +40,10 @@ pub use self::{ }, consolidation::{ ConsolidationProposalGetRequest, ConsolidationProposalInput, ConsolidationProposalResponse, - ConsolidationProposalReviewRequest, ConsolidationProposalsListRequest, - ConsolidationProposalsListResponse, ConsolidationRunCreateRequest, - ConsolidationRunCreateResponse, ConsolidationRunGetRequest, ConsolidationRunResponse, - ConsolidationRunsListRequest, ConsolidationRunsListResponse, + ConsolidationProposalReviewEventResponse, ConsolidationProposalReviewRequest, + ConsolidationProposalsListRequest, ConsolidationProposalsListResponse, + ConsolidationRunCreateRequest, ConsolidationRunCreateResponse, ConsolidationRunGetRequest, + ConsolidationRunResponse, ConsolidationRunsListRequest, ConsolidationRunsListResponse, }, delete::{DeleteRequest, DeleteResponse}, docs::{ diff --git a/packages/elf-service/tests/acceptance/consolidation.rs b/packages/elf-service/tests/acceptance/consolidation.rs new file mode 100644 index 00000000..696776e0 --- /dev/null +++ b/packages/elf-service/tests/acceptance/consolidation.rs @@ -0,0 +1,380 @@ +use std::sync::{Arc, atomic::AtomicUsize}; + +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank}; +use elf_chunking::ChunkingConfig; +use elf_domain::consolidation::{ + ConsolidationApplyIntent, ConsolidationInputRef, ConsolidationLineage, ConsolidationMarker, + ConsolidationMarkerSeverity, ConsolidationMarkers, ConsolidationProposalDiff, + ConsolidationReviewAction, ConsolidationSourceKind, ConsolidationSourceSnapshot, + ConsolidationUnsupportedClaimFlag, +}; +use elf_service::{ + AddNoteInput, AddNoteRequest, ConsolidationProposalGetRequest, ConsolidationProposalInput, + ConsolidationProposalReviewRequest, ConsolidationProposalsListRequest, + ConsolidationProposalsListResponse, ConsolidationRunCreateRequest, + ConsolidationRunCreateResponse, ConsolidationRunGetRequest, ElfService, Providers, +}; +use elf_storage::{db::Db, qdrant::QdrantStore}; +use elf_testkit::TestDatabase; +use elf_worker::worker::{self, WorkerState}; + +const TENANT_ID: &str = "tenant_consolidation"; +const PROJECT_ID: &str = "project_consolidation"; +const AGENT_ID: &str = "agent_consolidation"; + +struct ConsolidationFixture { + service: ElfService, + _test_db: TestDatabase, +} + +fn source_ref(note_id: Uuid) -> ConsolidationInputRef { + ConsolidationInputRef { + kind: ConsolidationSourceKind::Note, + id: note_id, + snapshot: ConsolidationSourceSnapshot { + status: Some("active".to_string()), + updated_at: Some(OffsetDateTime::UNIX_EPOCH), + content_hash: Some("blake3:acceptance-source".to_string()), + embedding_version: Some("test:test:4096".to_string()), + trace_version: None, + source_ref: serde_json::json!({ "schema": "acceptance/v1" }), + metadata: serde_json::json!({ "fixture": "consolidation" }), + }, + } +} + +fn lineage(source: &ConsolidationInputRef) -> ConsolidationLineage { + ConsolidationLineage { + source_refs: vec![source.clone()], + parent_run_id: None, + parent_proposal_ids: Vec::new(), + } +} + +fn proposal_input(source: &ConsolidationInputRef, kind: &str) -> ConsolidationProposalInput { + ConsolidationProposalInput { + proposal_kind: kind.to_string(), + apply_intent: ConsolidationApplyIntent::CreateDerivedNote, + source_refs: vec![source.clone()], + source_snapshot: serde_json::json!({ "source_count": 1 }), + lineage: lineage(source), + confidence: 0.82, + unsupported_claim_flags: vec![ConsolidationUnsupportedClaimFlag { + claim_id: Some("unsupported-claim".to_string()), + message: "The source does not prove that source notes may be rewritten.".to_string(), + source: Some(source.clone()), + }], + markers: ConsolidationMarkers { + contradictions: vec![ConsolidationMarker { + severity: ConsolidationMarkerSeverity::High, + message: "Stale rewrite evidence conflicts with the proposal-only rule." + .to_string(), + source: Some(source.clone()), + }], + staleness: Vec::new(), + }, + diff: ConsolidationProposalDiff { + summary: "Create a reviewed derived note without changing source evidence.".to_string(), + before: serde_json::json!({}), + after: serde_json::json!({ + "target": "derived_note", + "text": "Fact: Consolidation proposals are derived and reviewable." + }), + }, + target_ref: serde_json::json!({}), + proposed_payload: serde_json::json!({ + "type": "fact", + "text": "Fact: Consolidation proposals are derived and reviewable." + }), + } +} + +fn proposal_id_by_kind(response: &ConsolidationProposalsListResponse, proposal_kind: &str) -> Uuid { + response + .proposals + .iter() + .find(|proposal| proposal.proposal_kind == proposal_kind) + .map(|proposal| proposal.proposal_id) + .expect("proposal kind should be present") +} + +async fn setup_service(test_name: &str) -> Option { + let Some(test_db) = acceptance::test_db().await else { + eprintln!("Skipping {test_name}; set ELF_PG_DSN to run this test."); + + return None; + }; + let Some(qdrant_url) = acceptance::test_qdrant_url() else { + eprintln!("Skipping {test_name}; set ELF_QDRANT_URL to run this test."); + + return None; + }; + let collection = test_db.collection_name("elf_acceptance"); + let docs_collection = test_db.collection_name("elf_acceptance_docs"); + let cfg = acceptance::test_config( + test_db.dsn().to_string(), + qdrant_url, + 4_096, + collection, + docs_collection, + ); + let extractor = SpyExtractor { + calls: Arc::new(AtomicUsize::new(0)), + payload: serde_json::json!({ "notes": [] }), + }; + let providers = Providers::new( + Arc::new(StubEmbedding { vector_dim: 4_096 }), + Arc::new(StubRerank), + Arc::new(extractor), + ); + let service = + acceptance::build_service(cfg, providers).await.expect("Failed to build service."); + + acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); + + Some(ConsolidationFixture { service, _test_db: test_db }) +} + +async fn insert_source_note(service: &ElfService, key: &str, text: &str) -> Uuid { + let response = service + .add_note(AddNoteRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: AGENT_ID.to_string(), + scope: "agent_private".to_string(), + notes: vec![AddNoteInput { + r#type: "fact".to_string(), + key: Some(key.to_string()), + text: text.to_string(), + structured: None, + importance: 0.7, + confidence: 0.9, + ttl_days: None, + source_ref: serde_json::json!({ "schema": "acceptance/v1", "key": key }), + write_policy: None, + }], + }) + .await + .expect("add_note should persist source note"); + + response.results[0].note_id.expect("source note id should be present") +} + +async fn create_run_with_proposals( + service: &ElfService, + source: &ConsolidationInputRef, + proposals: Vec, +) -> ConsolidationRunCreateResponse { + service + .consolidation_run_create(ConsolidationRunCreateRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: AGENT_ID.to_string(), + job_kind: "manual".to_string(), + input_refs: vec![source.clone()], + source_snapshot: serde_json::json!({ "source_count": 1 }), + lineage: lineage(source), + proposals, + }) + .await + .expect("consolidation run should be created") +} + +async fn process_consolidation_worker(service: &ElfService) { + let tokenizer = elf_chunking::load_tokenizer(&service.cfg.chunking.tokenizer_repo) + .expect("worker tokenizer should load"); + let mut embedding = acceptance::dummy_embedding_provider(); + + embedding.dimensions = service.cfg.storage.qdrant.vector_dim; + + let worker_state = WorkerState { + db: Db::connect(&service.cfg.storage.postgres).await.expect("Failed to connect worker DB."), + qdrant: QdrantStore::new(&service.cfg.storage.qdrant) + .expect("Failed to build Qdrant store."), + docs_qdrant: QdrantStore::new_with_collection( + &service.cfg.storage.qdrant, + &service.cfg.storage.qdrant.docs_collection, + ) + .expect("Failed to build docs Qdrant store."), + embedding, + chunking: ChunkingConfig { + max_tokens: service.cfg.chunking.max_tokens, + overlap_tokens: service.cfg.chunking.overlap_tokens, + }, + tokenizer, + }; + + worker::process_once(&worker_state).await.expect("consolidation worker should process once"); +} + +async fn materialized_proposals( + service: &ElfService, + run_id: Uuid, +) -> ConsolidationProposalsListResponse { + service + .consolidation_proposals_list(ConsolidationProposalsListRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + run_id: Some(run_id), + review_state: None, + limit: None, + }) + .await + .expect("consolidation proposals should be listed") +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run this test."] +async fn apply_action_is_audited_without_source_rewrite() { + let Some(fixture) = setup_service("apply_action_is_audited_without_source_rewrite").await + else { + return; + }; + let service = &fixture.service; + let source_text = + "Fact: Current consolidation output is derived and never rewrites source notes."; + let note_id = insert_source_note(service, "consolidation_source_rule", source_text).await; + let source = source_ref(note_id); + let created = + create_run_with_proposals(service, &source, vec![proposal_input(&source, "derived_note")]) + .await; + + assert_eq!(created.run.status, "pending"); + assert!(created.proposals.is_empty()); + + process_consolidation_worker(service).await; + + let completed = service + .consolidation_run_get(ConsolidationRunGetRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + run_id: created.run.run_id, + }) + .await + .expect("consolidation run should remain readable"); + let materialized = materialized_proposals(service, created.run.run_id).await; + let proposal = &materialized.proposals[0]; + let job_status: String = + sqlx::query_scalar("SELECT status FROM consolidation_run_jobs WHERE job_id = $1") + .bind(created.job_id) + .fetch_one(&service.db.pool) + .await + .expect("consolidation job should be queryable"); + + assert_eq!(completed.status, "completed"); + assert_eq!(job_status, "DONE"); + assert_eq!(materialized.proposals.len(), 1); + assert_eq!(proposal.review_state, "proposed"); + assert_eq!(proposal.unsupported_claim_flags.as_array().map(Vec::len), Some(1)); + assert_eq!(proposal.contradiction_markers.as_array().map(Vec::len), Some(1)); + + let reviewed = service + .consolidation_proposal_review(ConsolidationProposalReviewRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + reviewer_agent_id: AGENT_ID.to_string(), + proposal_id: proposal.proposal_id, + review_action: ConsolidationReviewAction::Apply, + review_comment: Some("Apply reviewed derived proposal.".to_string()), + }) + .await + .expect("review action should apply"); + + assert_eq!(reviewed.review_state, "applied"); + assert_eq!(reviewed.review_events.len(), 2); + assert_eq!(reviewed.review_events[0].action, "approve"); + assert_eq!(reviewed.review_events[0].from_review_state, "proposed"); + assert_eq!(reviewed.review_events[0].to_review_state, "approved"); + assert_eq!(reviewed.review_events[1].action, "apply"); + assert_eq!(reviewed.review_events[1].from_review_state, "approved"); + assert_eq!(reviewed.review_events[1].to_review_state, "applied"); + + let stored_text: String = + sqlx::query_scalar("SELECT text FROM memory_notes WHERE note_id = $1") + .bind(note_id) + .fetch_one(&service.db.pool) + .await + .expect("source note should still exist"); + let version_count: i64 = + sqlx::query_scalar("SELECT count(*) FROM memory_note_versions WHERE note_id = $1") + .bind(note_id) + .fetch_one(&service.db.pool) + .await + .expect("source note versions should be queryable"); + + assert_eq!(stored_text, source_text); + assert_eq!(version_count, 1); +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run this test."] +async fn discard_and_defer_actions_remain_auditable() { + let Some(fixture) = setup_service("discard_and_defer_actions_remain_auditable").await else { + return; + }; + let service = &fixture.service; + let note_id = insert_source_note( + service, + "consolidation_review_actions", + "Fact: Discarded and deferred proposals remain auditable.", + ) + .await; + let source = source_ref(note_id); + let created = create_run_with_proposals( + service, + &source, + vec![ + proposal_input(&source, "contradiction_report"), + proposal_input(&source, "preference_candidate"), + ], + ) + .await; + + process_consolidation_worker(service).await; + + let materialized = materialized_proposals(service, created.run.run_id).await; + let discarded_id = proposal_id_by_kind(&materialized, "contradiction_report"); + let deferred_id = proposal_id_by_kind(&materialized, "preference_candidate"); + let discarded = service + .consolidation_proposal_review(ConsolidationProposalReviewRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + reviewer_agent_id: AGENT_ID.to_string(), + proposal_id: discarded_id, + review_action: ConsolidationReviewAction::Discard, + review_comment: Some("Discard stale synthesis.".to_string()), + }) + .await + .expect("discard should be allowed"); + let deferred = service + .consolidation_proposal_review(ConsolidationProposalReviewRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + reviewer_agent_id: AGENT_ID.to_string(), + proposal_id: deferred_id, + review_action: ConsolidationReviewAction::Defer, + review_comment: Some("Defer until more evidence is available.".to_string()), + }) + .await + .expect("defer should be allowed"); + let deferred_readback = service + .consolidation_proposal_get(ConsolidationProposalGetRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + proposal_id: deferred_id, + }) + .await + .expect("deferred proposal should remain readable"); + + assert_eq!(discarded.review_state, "rejected"); + assert_eq!(discarded.review_events.len(), 1); + assert_eq!(discarded.review_events[0].action, "discard"); + assert_eq!(deferred.review_state, "archived"); + assert_eq!(deferred.review_events.len(), 1); + assert_eq!(deferred.review_events[0].action, "defer"); + assert_eq!(deferred_readback.review_events.len(), 1); + assert_eq!(deferred_readback.review_events[0].to_review_state, "archived"); +} diff --git a/packages/elf-service/tests/acceptance/suite.rs b/packages/elf-service/tests/acceptance/suite.rs index 0d9839f4..abc17fa7 100644 --- a/packages/elf-service/tests/acceptance/suite.rs +++ b/packages/elf-service/tests/acceptance/suite.rs @@ -1,6 +1,7 @@ mod add_note_no_llm; mod chunk_search; mod chunking; +mod consolidation; mod docs_extension_v1; mod english_only_boundary; mod evidence_binding; @@ -488,6 +489,10 @@ TRUNCATE doc_chunk_embeddings, doc_chunks, doc_documents, + consolidation_run_jobs, + consolidation_proposal_reviews, + consolidation_proposals, + consolidation_runs, memory_notes", ) .execute(executor) diff --git a/packages/elf-storage/src/consolidation.rs b/packages/elf-storage/src/consolidation.rs index c8baeae6..d6699e36 100644 --- a/packages/elf-storage/src/consolidation.rs +++ b/packages/elf-storage/src/consolidation.rs @@ -2,12 +2,16 @@ use serde_json::Value; use sqlx::PgExecutor; -use time::OffsetDateTime; +use time::{Duration, OffsetDateTime}; use uuid::Uuid; use crate::{ Result, - models::{ConsolidationProposal, ConsolidationRun}, + db::Db, + models::{ + ConsolidationProposal, ConsolidationProposalReviewEvent, ConsolidationRun, + ConsolidationRunJob, + }, }; const CONSOLIDATION_RUN_SELECT: &str = "\ @@ -45,6 +49,7 @@ SELECT lineage, diff, confidence, + COALESCE(unsupported_claim_flags, '[]'::jsonb) AS unsupported_claim_flags, COALESCE(contradiction_markers, '[]'::jsonb) AS contradiction_markers, COALESCE(staleness_markers, '[]'::jsonb) AS staleness_markers, COALESCE(target_ref, '{}'::jsonb) AS target_ref, @@ -92,6 +97,52 @@ pub struct ConsolidationProposalReviewUpdate<'a> { pub now: OffsetDateTime, } +/// Arguments for inserting a consolidation proposal review event. +pub struct ConsolidationProposalReviewEventInsert<'a> { + /// Review event identifier. + pub review_id: Uuid, + /// Reviewed proposal identifier. + pub proposal_id: Uuid, + /// Parent consolidation run identifier. + pub run_id: Uuid, + /// Tenant that owns the proposal. + pub tenant_id: &'a str, + /// Project that owns the proposal. + pub project_id: &'a str, + /// Reviewing agent identifier. + pub reviewer_agent_id: &'a str, + /// Review action requested by the reviewer. + pub action: &'a str, + /// Review state before the transition. + pub from_review_state: &'a str, + /// Review state after the transition. + pub to_review_state: &'a str, + /// Optional reviewer comment. + pub review_comment: Option<&'a str>, + /// Creation timestamp. + pub created_at: OffsetDateTime, +} + +/// Arguments for inserting a consolidation worker job. +pub struct ConsolidationRunJobInsert<'a> { + /// Worker job identifier. + pub job_id: Uuid, + /// Consolidation run to materialize. + pub run_id: Uuid, + /// Tenant that owns the run. + pub tenant_id: &'a str, + /// Project that owns the run. + pub project_id: &'a str, + /// Agent that registered the run. + pub agent_id: &'a str, + /// Job kind, such as fixture or manual. + pub job_kind: &'a str, + /// Queued proposal payload. + pub payload: &'a Value, + /// Creation timestamp. + pub now: OffsetDateTime, +} + /// Inserts one consolidation run. pub async fn insert_consolidation_run<'e, E>(executor: E, run: &ConsolidationRun) -> Result<()> where @@ -137,6 +188,45 @@ VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14)", Ok(()) } +/// Enqueues one consolidation worker job. +pub async fn insert_consolidation_run_job<'e, E>( + executor: E, + args: ConsolidationRunJobInsert<'_>, +) -> Result<()> +where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ +INSERT INTO consolidation_run_jobs ( + job_id, + run_id, + tenant_id, + project_id, + agent_id, + job_kind, + status, + payload, + available_at, + created_at, + updated_at +) +VALUES ($1,$2,$3,$4,$5,$6,'PENDING',$7,$8,$8,$8)", + ) + .bind(args.job_id) + .bind(args.run_id) + .bind(args.tenant_id) + .bind(args.project_id) + .bind(args.agent_id) + .bind(args.job_kind) + .bind(args.payload) + .bind(args.now) + .execute(executor) + .await?; + + Ok(()) +} + /// Fetches one consolidation run by tenant and run identifier. pub async fn get_consolidation_run<'e, E>( executor: E, @@ -198,6 +288,120 @@ LIMIT $3", Ok(rows) } +/// Claims the next due consolidation worker job and leases it until `lease_seconds`. +pub async fn claim_next_consolidation_run_job( + db: &Db, + now: OffsetDateTime, + lease_seconds: i64, +) -> Result> { + let mut tx = db.pool.begin().await?; + let row = sqlx::query_as::<_, ConsolidationRunJob>( + "\ +SELECT + job_id, + run_id, + tenant_id, + project_id, + agent_id, + job_kind, + status, + payload, + attempts, + last_error, + available_at, + created_at, + updated_at +FROM consolidation_run_jobs +WHERE status IN ('PENDING','FAILED','CLAIMED') AND available_at <= $1 +ORDER BY available_at ASC +LIMIT 1 +FOR UPDATE SKIP LOCKED", + ) + .bind(now) + .fetch_optional(&mut *tx) + .await?; + let job = if let Some(mut job) = row { + let lease_until = now + Duration::seconds(lease_seconds); + + sqlx::query( + "\ +UPDATE consolidation_run_jobs +SET status = 'CLAIMED', available_at = $1, updated_at = $2 +WHERE job_id = $3", + ) + .bind(lease_until) + .bind(now) + .bind(job.job_id) + .execute(&mut *tx) + .await?; + + job.status = "CLAIMED".to_string(); + job.available_at = lease_until; + job.updated_at = now; + + Some(job) + } else { + None + }; + + tx.commit().await?; + + Ok(job) +} + +/// Marks a consolidation worker job as completed. +pub async fn mark_consolidation_run_job_done<'e, E>( + executor: E, + job_id: Uuid, + now: OffsetDateTime, +) -> Result<()> +where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ +UPDATE consolidation_run_jobs +SET status = 'DONE', updated_at = $1 +WHERE job_id = $2", + ) + .bind(now) + .bind(job_id) + .execute(executor) + .await?; + + Ok(()) +} + +/// Marks a consolidation worker job as failed and schedules its retry. +pub async fn mark_consolidation_run_job_failed( + db: &Db, + job_id: Uuid, + attempts: i32, + error_text: &str, + available_at: OffsetDateTime, + now: OffsetDateTime, +) -> Result<()> { + sqlx::query( + "\ +UPDATE consolidation_run_jobs +SET status = 'FAILED', + attempts = $1, + last_error = $2, + available_at = $3, + updated_at = $4 +WHERE job_id = $5", + ) + .bind(attempts) + .bind(error_text) + .bind(available_at) + .bind(now) + .bind(job_id) + .execute(&db.pool) + .await?; + + Ok(()) +} + /// Updates one consolidation run state. pub async fn update_consolidation_run_state<'e, E>( executor: E, @@ -271,6 +475,7 @@ INSERT INTO consolidation_proposals ( lineage, diff, confidence, + unsupported_claim_flags, contradiction_markers, staleness_markers, target_ref, @@ -281,7 +486,7 @@ INSERT INTO consolidation_proposals ( created_at, updated_at ) -VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23)", +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24)", ) .bind(proposal.proposal_id) .bind(proposal.run_id) @@ -297,6 +502,7 @@ VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$ .bind(&proposal.lineage) .bind(&proposal.diff) .bind(proposal.confidence) + .bind(&proposal.unsupported_claim_flags) .bind(&proposal.contradiction_markers) .bind(&proposal.staleness_markers) .bind(&proposal.target_ref) @@ -361,6 +567,7 @@ SELECT lineage, diff, confidence, + COALESCE(unsupported_claim_flags, '[]'::jsonb) AS unsupported_claim_flags, COALESCE(contradiction_markers, '[]'::jsonb) AS contradiction_markers, COALESCE(staleness_markers, '[]'::jsonb) AS staleness_markers, COALESCE(target_ref, '{}'::jsonb) AS target_ref, @@ -422,6 +629,7 @@ RETURNING lineage, diff, confidence, + COALESCE(unsupported_claim_flags, '[]'::jsonb) AS unsupported_claim_flags, COALESCE(contradiction_markers, '[]'::jsonb) AS contradiction_markers, COALESCE(staleness_markers, '[]'::jsonb) AS staleness_markers, COALESCE(target_ref, '{}'::jsonb) AS target_ref, @@ -444,3 +652,82 @@ RETURNING Ok(row) } + +/// Inserts one proposal review audit event. +pub async fn insert_consolidation_proposal_review_event<'e, E>( + executor: E, + args: ConsolidationProposalReviewEventInsert<'_>, +) -> Result<()> +where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ +INSERT INTO consolidation_proposal_reviews ( + review_id, + proposal_id, + run_id, + tenant_id, + project_id, + reviewer_agent_id, + action, + from_review_state, + to_review_state, + review_comment, + created_at +) +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)", + ) + .bind(args.review_id) + .bind(args.proposal_id) + .bind(args.run_id) + .bind(args.tenant_id) + .bind(args.project_id) + .bind(args.reviewer_agent_id) + .bind(args.action) + .bind(args.from_review_state) + .bind(args.to_review_state) + .bind(args.review_comment) + .bind(args.created_at) + .execute(executor) + .await?; + + Ok(()) +} + +/// Lists review events for one consolidation proposal. +pub async fn list_consolidation_proposal_review_events<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + proposal_id: Uuid, +) -> Result> +where + E: PgExecutor<'e>, +{ + let rows = sqlx::query_as::<_, ConsolidationProposalReviewEvent>( + "\ +SELECT + review_id, + proposal_id, + run_id, + tenant_id, + project_id, + reviewer_agent_id, + action, + from_review_state, + to_review_state, + review_comment, + created_at +FROM consolidation_proposal_reviews +WHERE tenant_id = $1 AND project_id = $2 AND proposal_id = $3 +ORDER BY created_at ASC, review_id ASC", + ) + .bind(tenant_id) + .bind(project_id) + .bind(proposal_id) + .fetch_all(executor) + .await?; + + Ok(rows) +} diff --git a/packages/elf-storage/src/models.rs b/packages/elf-storage/src/models.rs index baf9afb8..2e3711d2 100644 --- a/packages/elf-storage/src/models.rs +++ b/packages/elf-storage/src/models.rs @@ -344,6 +344,8 @@ pub struct ConsolidationProposal { pub diff: Value, /// Proposal confidence score. pub confidence: f32, + /// Serialized unsupported-claim flags. + pub unsupported_claim_flags: Value, /// Serialized contradiction markers. pub contradiction_markers: Value, /// Serialized staleness markers. @@ -364,6 +366,64 @@ pub struct ConsolidationProposal { pub updated_at: OffsetDateTime, } +/// Persisted consolidation proposal review event row. +#[derive(Debug, FromRow)] +pub struct ConsolidationProposalReviewEvent { + /// Review event identifier. + pub review_id: Uuid, + /// Reviewed proposal identifier. + pub proposal_id: Uuid, + /// Parent consolidation run identifier. + pub run_id: Uuid, + /// Tenant that owns the proposal. + pub tenant_id: String, + /// Project that owns the proposal. + pub project_id: String, + /// Agent that performed the review action. + pub reviewer_agent_id: String, + /// Review action requested by the reviewer. + pub action: String, + /// Review state before the transition. + pub from_review_state: String, + /// Review state after the transition. + pub to_review_state: String, + /// Optional reviewer comment. + pub review_comment: Option, + /// Creation timestamp. + pub created_at: OffsetDateTime, +} + +/// Persisted consolidation worker job row. +#[derive(Debug, FromRow)] +pub struct ConsolidationRunJob { + /// Worker job identifier. + pub job_id: Uuid, + /// Consolidation run to materialize. + pub run_id: Uuid, + /// Tenant that owns the run. + pub tenant_id: String, + /// Project that owns the run. + pub project_id: String, + /// Agent that registered the run. + pub agent_id: String, + /// Job kind, such as fixture or manual. + pub job_kind: String, + /// Current job status. + pub status: String, + /// Queued proposal payload. + pub payload: Value, + /// Number of attempts already made. + pub attempts: i32, + /// Most recent failure text, if any. + pub last_error: Option, + /// Earliest time the job may be claimed again. + pub available_at: OffsetDateTime, + /// Creation timestamp. + pub created_at: OffsetDateTime, + /// Last update timestamp. + pub updated_at: OffsetDateTime, +} + /// Persisted document row. #[derive(Debug, FromRow)] pub struct DocDocument { diff --git a/packages/elf-storage/src/schema.rs b/packages/elf-storage/src/schema.rs index 4b7e29fd..bd39ed1b 100644 --- a/packages/elf-storage/src/schema.rs +++ b/packages/elf-storage/src/schema.rs @@ -79,6 +79,11 @@ fn expand_includes(sql: &str) -> String { out.push_str(include_str!("../../../sql/tables/031_consolidation_runs.sql")), "tables/032_consolidation_proposals.sql" => out .push_str(include_str!("../../../sql/tables/032_consolidation_proposals.sql")), + "tables/033_consolidation_proposal_reviews.sql" => out.push_str(include_str!( + "../../../sql/tables/033_consolidation_proposal_reviews.sql" + )), + "tables/034_consolidation_run_jobs.sql" => + out.push_str(include_str!("../../../sql/tables/034_consolidation_run_jobs.sql")), "tables/023_memory_ingest_decisions.sql" => out .push_str(include_str!("../../../sql/tables/023_memory_ingest_decisions.sql")), "tables/024_memory_space_grants.sql" => diff --git a/packages/elf-storage/tests/db_smoke.rs b/packages/elf-storage/tests/db_smoke.rs index 07577e9c..7807c199 100644 --- a/packages/elf-storage/tests/db_smoke.rs +++ b/packages/elf-storage/tests/db_smoke.rs @@ -61,6 +61,15 @@ fn chunk_tables_exist_after_bootstrap() { assert_eq!(count, 1); + let count: i64 = sqlx::query_scalar( + "SELECT count(*) FROM information_schema.tables WHERE table_name = 'consolidation_proposal_reviews'", + ) + .fetch_one(&db.pool) + .await + .expect("Failed to query schema tables."); + + assert_eq!(count, 1); + let count: i64 = sqlx::query_scalar( "SELECT count(*) FROM information_schema.tables WHERE table_name = 'memory_space_grants'", ) diff --git a/sql/init.sql b/sql/init.sql index 780778f4..9e0b06fb 100644 --- a/sql/init.sql +++ b/sql/init.sql @@ -31,3 +31,5 @@ \ir tables/030_memory_ingestion_profile_defaults.sql \ir tables/031_consolidation_runs.sql \ir tables/032_consolidation_proposals.sql +\ir tables/033_consolidation_proposal_reviews.sql +\ir tables/034_consolidation_run_jobs.sql diff --git a/sql/tables/032_consolidation_proposals.sql b/sql/tables/032_consolidation_proposals.sql index 3b3addc5..bdb470b4 100644 --- a/sql/tables/032_consolidation_proposals.sql +++ b/sql/tables/032_consolidation_proposals.sql @@ -13,6 +13,7 @@ CREATE TABLE IF NOT EXISTS consolidation_proposals ( lineage jsonb NOT NULL, diff jsonb NOT NULL, confidence real NOT NULL, + unsupported_claim_flags jsonb NOT NULL DEFAULT '[]'::jsonb, contradiction_markers jsonb NOT NULL DEFAULT '[]'::jsonb, staleness_markers jsonb NOT NULL DEFAULT '[]'::jsonb, target_ref jsonb NOT NULL DEFAULT '{}'::jsonb, @@ -75,6 +76,15 @@ ALTER TABLE consolidation_proposals ADD CONSTRAINT ck_consolidation_proposals_confidence CHECK (confidence >= 0.0 AND confidence <= 1.0); +ALTER TABLE consolidation_proposals + ADD COLUMN IF NOT EXISTS unsupported_claim_flags jsonb NOT NULL DEFAULT '[]'::jsonb; + +ALTER TABLE consolidation_proposals + DROP CONSTRAINT IF EXISTS ck_consolidation_proposals_unsupported_claim_flags; +ALTER TABLE consolidation_proposals + ADD CONSTRAINT ck_consolidation_proposals_unsupported_claim_flags + CHECK (jsonb_typeof(unsupported_claim_flags) = 'array'); + ALTER TABLE consolidation_proposals DROP CONSTRAINT IF EXISTS ck_consolidation_proposals_contradiction_markers; ALTER TABLE consolidation_proposals diff --git a/sql/tables/033_consolidation_proposal_reviews.sql b/sql/tables/033_consolidation_proposal_reviews.sql new file mode 100644 index 00000000..1ce15c73 --- /dev/null +++ b/sql/tables/033_consolidation_proposal_reviews.sql @@ -0,0 +1,37 @@ +CREATE TABLE IF NOT EXISTS consolidation_proposal_reviews ( + review_id uuid PRIMARY KEY, + proposal_id uuid NOT NULL REFERENCES consolidation_proposals(proposal_id) ON DELETE CASCADE, + run_id uuid NOT NULL REFERENCES consolidation_runs(run_id) ON DELETE CASCADE, + tenant_id text NOT NULL, + project_id text NOT NULL, + reviewer_agent_id text NOT NULL, + action text NOT NULL, + from_review_state text NOT NULL, + to_review_state text NOT NULL, + review_comment text NULL, + created_at timestamptz NOT NULL DEFAULT now() +); + +ALTER TABLE consolidation_proposal_reviews + DROP CONSTRAINT IF EXISTS ck_consolidation_proposal_reviews_action; +ALTER TABLE consolidation_proposal_reviews + ADD CONSTRAINT ck_consolidation_proposal_reviews_action + CHECK (action IN ('approve', 'apply', 'discard', 'defer')); + +ALTER TABLE consolidation_proposal_reviews + DROP CONSTRAINT IF EXISTS ck_consolidation_proposal_reviews_from_state; +ALTER TABLE consolidation_proposal_reviews + ADD CONSTRAINT ck_consolidation_proposal_reviews_from_state + CHECK (from_review_state IN ('proposed', 'approved', 'rejected', 'applied', 'archived')); + +ALTER TABLE consolidation_proposal_reviews + DROP CONSTRAINT IF EXISTS ck_consolidation_proposal_reviews_to_state; +ALTER TABLE consolidation_proposal_reviews + ADD CONSTRAINT ck_consolidation_proposal_reviews_to_state + CHECK (to_review_state IN ('proposed', 'approved', 'rejected', 'applied', 'archived')); + +CREATE INDEX IF NOT EXISTS idx_consolidation_proposal_reviews_proposal_created + ON consolidation_proposal_reviews (proposal_id, created_at ASC, review_id ASC); + +CREATE INDEX IF NOT EXISTS idx_consolidation_proposal_reviews_context_created + ON consolidation_proposal_reviews (tenant_id, project_id, created_at DESC); diff --git a/sql/tables/034_consolidation_run_jobs.sql b/sql/tables/034_consolidation_run_jobs.sql new file mode 100644 index 00000000..600bf102 --- /dev/null +++ b/sql/tables/034_consolidation_run_jobs.sql @@ -0,0 +1,33 @@ +CREATE TABLE IF NOT EXISTS consolidation_run_jobs ( + job_id uuid PRIMARY KEY, + run_id uuid NOT NULL REFERENCES consolidation_runs(run_id) ON DELETE CASCADE, + tenant_id text NOT NULL, + project_id text NOT NULL, + agent_id text NOT NULL, + job_kind text NOT NULL, + status text NOT NULL, + payload jsonb NOT NULL, + attempts int NOT NULL DEFAULT 0, + last_error text NULL, + available_at timestamptz NOT NULL DEFAULT now(), + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +ALTER TABLE consolidation_run_jobs + DROP CONSTRAINT IF EXISTS ck_consolidation_run_jobs_status; +ALTER TABLE consolidation_run_jobs + ADD CONSTRAINT ck_consolidation_run_jobs_status + CHECK (status IN ('PENDING', 'CLAIMED', 'DONE', 'FAILED')); + +ALTER TABLE consolidation_run_jobs + DROP CONSTRAINT IF EXISTS ck_consolidation_run_jobs_payload; +ALTER TABLE consolidation_run_jobs + ADD CONSTRAINT ck_consolidation_run_jobs_payload + CHECK (jsonb_typeof(payload) = 'object'); + +CREATE INDEX IF NOT EXISTS idx_consolidation_run_jobs_status_available + ON consolidation_run_jobs (status, available_at); + +CREATE INDEX IF NOT EXISTS idx_consolidation_run_jobs_run_status + ON consolidation_run_jobs (run_id, status);