diff --git a/apps/elf-api/src/routes.rs b/apps/elf-api/src/routes.rs index 9c212ab7..ff51fb3f 100644 --- a/apps/elf-api/src/routes.rs +++ b/apps/elf-api/src/routes.rs @@ -30,6 +30,7 @@ use elf_domain::{ ConsolidationReviewState, }, english_gate, + knowledge::KnowledgePageKind, writegate::WritePolicy, }; use elf_service::{ @@ -50,17 +51,19 @@ use elf_service::{ DocType, DocsExcerptResponse, DocsExcerptsGetRequest, DocsGetRequest, DocsGetResponse, DocsPutRequest, DocsPutResponse, DocsSearchL0Request, DocsSearchL0Response, Error, EventMessage, GranteeKind, GraphQueryEntityRef, GraphQueryPredicateRef, GraphQueryRequest, - GraphQueryResponse, IngestionProfileSelector, ListRequest, ListResponse, NoteFetchRequest, - NoteFetchResponse, NoteProvenanceBundleResponse, NoteProvenanceGetRequest, PayloadLevel, - PublishNoteRequest, QueryPlan, RankingRequestOverride, RebuildReport, SearchDetailsRequest, - SearchDetailsResult, SearchExplainRequest, SearchExplainResponse, SearchIndexItem, - SearchRequest, SearchResponse, SearchSessionGetRequest, SearchTimelineGroup, - SearchTimelineRequest, SearchTrajectoryResponse, SearchTrajectorySummary, ShareScope, - SpaceGrantRevokeRequest, SpaceGrantRevokeResponse, SpaceGrantUpsertRequest, - SpaceGrantsListRequest, TextPositionSelector, TextQuoteSelector, TraceBundleGetRequest, - TraceBundleResponse, TraceGetRequest, TraceGetResponse, TraceRecentListRequest, - TraceRecentListResponse, TraceTrajectoryGetRequest, UnpublishNoteRequest, UpdateRequest, - UpdateResponse, search::TraceBundleMode, + GraphQueryResponse, IngestionProfileSelector, KnowledgePageGetRequest, + KnowledgePageLintRequest, KnowledgePageLintResponse, KnowledgePageRebuildRequest, + KnowledgePageRebuildResponse, KnowledgePageResponse, KnowledgePagesListRequest, + KnowledgePagesListResponse, ListRequest, ListResponse, NoteFetchRequest, NoteFetchResponse, + NoteProvenanceBundleResponse, NoteProvenanceGetRequest, PayloadLevel, PublishNoteRequest, + QueryPlan, RankingRequestOverride, RebuildReport, SearchDetailsRequest, SearchDetailsResult, + SearchExplainRequest, SearchExplainResponse, SearchIndexItem, SearchRequest, SearchResponse, + SearchSessionGetRequest, SearchTimelineGroup, SearchTimelineRequest, SearchTrajectoryResponse, + SearchTrajectorySummary, ShareScope, SpaceGrantRevokeRequest, SpaceGrantRevokeResponse, + SpaceGrantUpsertRequest, SpaceGrantsListRequest, TextPositionSelector, TextQuoteSelector, + TraceBundleGetRequest, TraceBundleResponse, TraceGetRequest, TraceGetResponse, + TraceRecentListRequest, TraceRecentListResponse, TraceTrajectoryGetRequest, + UnpublishNoteRequest, UpdateRequest, UpdateResponse, search::TraceBundleMode, }; /// JSON OpenAPI contract route. @@ -133,6 +136,10 @@ const VIEWER_HTML: &str = include_str!("../static/viewer.html"); consolidation_proposals_list, consolidation_proposal_get, consolidation_proposal_review, + knowledge_page_rebuild, + knowledge_pages_list, + knowledge_page_get, + knowledge_page_lint, rebuild_qdrant, searches_raw, trace_recent_list, @@ -159,6 +166,7 @@ const VIEWER_HTML: &str = include_str!("../static/viewer.html"); (name = "search", description = "Progressive search sessions and raw search diagnostics."), (name = "graph", description = "Graph query and predicate administration."), (name = "consolidation", description = "Reviewable derived consolidation proposals."), + (name = "knowledge", description = "Derived knowledge page rebuild and lint readback."), (name = "admin", description = "Local admin and operator inspection routes."), ) )] @@ -362,6 +370,29 @@ struct ConsolidationProposalReviewBody { review_comment: Option, } +#[derive(Clone, Debug, Deserialize)] +struct KnowledgePageRebuildBody { + page_kind: KnowledgePageKind, + page_key: String, + title: Option, + #[serde(default)] + note_ids: Vec, + #[serde(default)] + event_ids: Vec, + #[serde(default)] + relation_ids: Vec, + #[serde(default)] + proposal_ids: Vec, + #[serde(default = "empty_json_object")] + provider_metadata: Value, +} + +#[derive(Clone, Debug, Deserialize)] +struct KnowledgePagesListQuery { + page_kind: Option, + limit: Option, +} + #[derive(Clone, Debug, Serialize, ToSchema)] struct AdminIngestionProfileDefaultResponseV2 { profile_id: String, @@ -645,6 +676,10 @@ pub fn admin_router(state: AppState) -> Router { "/v2/admin/consolidation/proposals/{proposal_id}/review", routing::post(consolidation_proposal_review), ) + .route("/v2/admin/knowledge/pages", routing::get(knowledge_pages_list)) + .route("/v2/admin/knowledge/pages/rebuild", routing::post(knowledge_page_rebuild)) + .route("/v2/admin/knowledge/pages/{page_id}", routing::get(knowledge_page_get)) + .route("/v2/admin/knowledge/pages/{page_id}/lint", routing::post(knowledge_page_lint)) .route("/v2/admin/qdrant/rebuild", routing::post(rebuild_qdrant)) .route("/v2/admin/searches/raw", routing::post(searches_raw)) .route("/v2/admin/traces/recent", routing::get(trace_recent_list)) @@ -2671,6 +2706,159 @@ async fn consolidation_proposal_review( Ok(Json(response)) } +#[utoipa::path( + post, + path = "/v2/admin/knowledge/pages/rebuild", + tag = "knowledge", + request_body = Value, + responses( + (status = 200, description = "Knowledge page was rebuilt.", body = Value), + (status = 400, description = "Invalid request.", body = ErrorBody), + (status = 401, description = "Authentication required.", body = ErrorBody), + (status = 403, description = "Admin access required.", body = ErrorBody), + (status = 500, description = "Internal error.", body = ErrorBody), + ) +)] +async fn knowledge_page_rebuild( + State(state): State, + headers: HeaderMap, + payload: Result, JsonRejection>, +) -> Result, ApiError> { + let ctx = RequestContext::from_headers(&headers)?; + let Json(payload) = payload.map_err(|err| { + tracing::warn!(error = %err, "Invalid request payload."); + + json_error(StatusCode::BAD_REQUEST, "INVALID_REQUEST", "Invalid request payload.", None) + })?; + let response = state + .service + .knowledge_page_rebuild(KnowledgePageRebuildRequest { + tenant_id: ctx.tenant_id, + project_id: ctx.project_id, + agent_id: ctx.agent_id, + page_kind: payload.page_kind, + page_key: payload.page_key, + title: payload.title, + note_ids: payload.note_ids, + event_ids: payload.event_ids, + relation_ids: payload.relation_ids, + proposal_ids: payload.proposal_ids, + provider_metadata: payload.provider_metadata, + }) + .await?; + + Ok(Json(response)) +} + +#[utoipa::path( + get, + path = "/v2/admin/knowledge/pages", + tag = "knowledge", + params( + ("page_kind" = Option, Query, description = "Optional page-kind filter."), + ("limit" = Option, Query, description = "Maximum pages to return."), + ), + responses( + (status = 200, description = "Knowledge pages.", body = Value), + (status = 400, description = "Invalid request.", body = ErrorBody), + (status = 401, description = "Authentication required.", body = ErrorBody), + (status = 403, description = "Admin access required.", body = ErrorBody), + (status = 500, description = "Internal error.", body = ErrorBody), + ) +)] +async fn knowledge_pages_list( + State(state): State, + headers: HeaderMap, + query: Result, QueryRejection>, +) -> Result, ApiError> { + let ctx = RequestContext::from_headers(&headers)?; + let Query(query) = query.map_err(|err| { + tracing::warn!(error = %err, "Invalid query parameters."); + + json_error( + StatusCode::BAD_REQUEST, + "INVALID_REQUEST", + "Invalid query parameters.".to_string(), + None, + ) + })?; + let response = state + .service + .knowledge_pages_list(KnowledgePagesListRequest { + tenant_id: ctx.tenant_id, + project_id: ctx.project_id, + page_kind: query.page_kind, + limit: query.limit, + }) + .await?; + + Ok(Json(response)) +} + +#[utoipa::path( + get, + path = "/v2/admin/knowledge/pages/{page_id}", + tag = "knowledge", + params(("page_id" = Uuid, Path, description = "Knowledge page ID.")), + responses( + (status = 200, description = "Knowledge page.", body = Value), + (status = 400, description = "Invalid request.", body = ErrorBody), + (status = 401, description = "Authentication required.", body = ErrorBody), + (status = 403, description = "Admin access required.", body = ErrorBody), + (status = 404, description = "Knowledge page was not found.", body = ErrorBody), + (status = 500, description = "Internal error.", body = ErrorBody), + ) +)] +async fn knowledge_page_get( + State(state): State, + headers: HeaderMap, + Path(page_id): Path, +) -> Result, ApiError> { + let ctx = RequestContext::from_headers(&headers)?; + let response = state + .service + .knowledge_page_get(KnowledgePageGetRequest { + tenant_id: ctx.tenant_id, + project_id: ctx.project_id, + page_id, + }) + .await?; + + Ok(Json(response)) +} + +#[utoipa::path( + post, + path = "/v2/admin/knowledge/pages/{page_id}/lint", + tag = "knowledge", + params(("page_id" = Uuid, Path, description = "Knowledge page ID.")), + responses( + (status = 200, description = "Knowledge page lint findings.", body = Value), + (status = 400, description = "Invalid request.", body = ErrorBody), + (status = 401, description = "Authentication required.", body = ErrorBody), + (status = 403, description = "Admin access required.", body = ErrorBody), + (status = 404, description = "Knowledge page was not found.", body = ErrorBody), + (status = 500, description = "Internal error.", body = ErrorBody), + ) +)] +async fn knowledge_page_lint( + State(state): State, + headers: HeaderMap, + Path(page_id): Path, +) -> Result, ApiError> { + let ctx = RequestContext::from_headers(&headers)?; + let response = state + .service + .knowledge_page_lint(KnowledgePageLintRequest { + tenant_id: ctx.tenant_id, + project_id: ctx.project_id, + page_id, + }) + .await?; + + Ok(Json(response)) +} + #[utoipa::path( get, path = "/v2/admin/events/ingestion-profiles", diff --git a/apps/elf-api/tests/http.rs b/apps/elf-api/tests/http.rs index fc7c7339..5e34928d 100644 --- a/apps/elf-api/tests/http.rs +++ b/apps/elf-api/tests/http.rs @@ -850,6 +850,10 @@ async fn openapi_json_route_serves_generated_contract() { assert_openapi_method(&spec, "/v2/admin/consolidation/proposals", "get"); assert_openapi_method(&spec, "/v2/admin/consolidation/proposals/{proposal_id}", "get"); assert_openapi_method(&spec, "/v2/admin/consolidation/proposals/{proposal_id}/review", "post"); + assert_openapi_method(&spec, "/v2/admin/knowledge/pages/rebuild", "post"); + assert_openapi_method(&spec, "/v2/admin/knowledge/pages", "get"); + assert_openapi_method(&spec, "/v2/admin/knowledge/pages/{page_id}", "get"); + assert_openapi_method(&spec, "/v2/admin/knowledge/pages/{page_id}/lint", "post"); } #[tokio::test] @@ -875,6 +879,7 @@ async fn scalar_docs_route_serves_api_reference_html() { assert!(html.contains("@scalar/api-reference")); assert!(html.contains("/v2/admin/events/ingestion-profiles/default")); assert!(html.contains("/v2/admin/consolidation/proposals")); + assert!(html.contains("/v2/admin/knowledge/pages")); } #[tokio::test] diff --git a/docs/spec/index.md b/docs/spec/index.md index 228c81a8..127baf7d 100644 --- a/docs/spec/index.md +++ b/docs/spec/index.md @@ -35,6 +35,8 @@ Question this index answers: "what must remain true?" and storage invariants. - `system_consolidation_proposals_v1.md`: Reviewable derived consolidation run and proposal contract over immutable source evidence. +- `system_knowledge_pages_v1.md`: Derived project/entity/concept/issue/decision page + storage, rebuild, citation, and stale-source lint contract. - `system_competitive_parity_gate_v1.md`: Docker-only adoption gate that decides whether ELF meets or exceeds selected external memory-system baselines. - `production_corpus_manifest_v1.md`: Sanitized/private coding-agent production diff --git a/docs/spec/system_elf_memory_service_v2.md b/docs/spec/system_elf_memory_service_v2.md index ac8f313a..0db9c469 100644 --- a/docs/spec/system_elf_memory_service_v2.md +++ b/docs/spec/system_elf_memory_service_v2.md @@ -1006,6 +1006,22 @@ Behavior: - They must not mutate authoritative source notes, docs, events, traces, graph facts, or search traces. +Admin derived knowledge pages: +- POST /v2/admin/knowledge/pages/rebuild +- GET /v2/admin/knowledge/pages +- GET /v2/admin/knowledge/pages/{page_id} +- POST /v2/admin/knowledge/pages/{page_id}/lint + +Behavior: +- These endpoints expose deterministic rebuild, list/detail readback, and stale-source + lint for derived knowledge pages. +- Page payloads must follow `elf.knowledge_page/v1`, preserve section citations, and + write normalized source refs for lint. +- Pages are derived and rebuildable; rebuilding or linting a page must not mutate + authoritative notes, event audits, graph facts, consolidation proposals, docs, + traces, or source pointers. +- The detailed contract is defined in `system_knowledge_pages_v1.md`. + POST /v2/admin/qdrant/rebuild Behavior: diff --git a/docs/spec/system_knowledge_pages_v1.md b/docs/spec/system_knowledge_pages_v1.md new file mode 100644 index 00000000..17496c16 --- /dev/null +++ b/docs/spec/system_knowledge_pages_v1.md @@ -0,0 +1,130 @@ +# Derived Knowledge Pages v1 Specification + +Purpose: Define derived knowledge page storage, rebuild, citation, and lint contracts. +Status: normative +Read this when: You are implementing, validating, or reviewing project/entity/concept/issue/decision page rebuild behavior. +Not this document: Viewer integration, search ranking, live LLM page generation, or source-note mutation. +Defines: `elf.knowledge_page/v1` pages, sections, source refs, lint findings, and deterministic rebuild metadata. + +## Core Rule + +Knowledge pages are derived artifacts. They must never replace or mutate authoritative +notes, docs, event audits, graph facts, consolidation proposals, traces, or source +pointers. + +Postgres remains the storage authority for both source memory and derived page records. +Knowledge pages are rebuildable from explicit source references and may be deleted or +rebuilt without changing source memory. + +## Storage + +The v1 storage tables are: + +- `knowledge_pages` +- `knowledge_page_sections` +- `knowledge_page_source_refs` +- `knowledge_page_lint_findings` + +`knowledge_pages.contract_schema` must be `elf.knowledge_page/v1`. + +Allowed `knowledge_pages.page_kind` values: + +- `project` +- `entity` +- `concept` +- `issue` +- `decision` + +Allowed `knowledge_page_source_refs.source_kind` values: + +- `note` +- `event` +- `relation` +- `proposal` + +`event` currently means a durable `add_event` audit row in `memory_ingest_decisions`. + +## Citation Contract + +Every persisted page section must have at least one citation or an explicit +`unsupported_reason`. + +Each citation must be persisted twice: + +- in `knowledge_page_sections.citations` for section-local readback +- in `knowledge_page_source_refs` for normalized lint and stale-source detection + +The normalized source ref must preserve: + +- `source_kind` +- `source_id` +- source status when available +- source `updated_at` or equivalent freshness timestamp when available +- source content hash when available +- source snapshot metadata + +## Rebuild Contract + +The v1 rebuild path is deterministic for the same explicit source snapshot. + +Rebuild input sources may include: + +- active or historical `memory_notes` +- durable `add_event` audit rows from `memory_ingest_decisions` +- `graph_facts` plus `graph_fact_evidence` +- applied `consolidation_proposals` + +Unreviewed consolidation proposals must not be used as source input for persisted pages. + +`knowledge_pages.source_coverage` must include: + +- `schema = "elf.knowledge_page.source_coverage/v1"` +- page kind and page key +- per-kind source counts +- total source count +- cited source count +- section count +- unsupported section count +- `coverage_complete` + +`knowledge_pages.rebuild_metadata` must include: + +- `schema = "elf.knowledge_page.rebuild/v1"` +- `source_snapshot_hash` +- `deterministic` +- `provider_metadata` +- `allowed_variance` + +When future provider-backed or LLM-derived page text is persisted, +`rebuild_metadata.deterministic` must be false unless the provider output is fully +replayable from recorded metadata. + +## Lint Contract + +The v1 lint path compares stored normalized source refs with current source rows. + +At minimum, lint must detect: + +- missing source rows +- changed source status +- changed source freshness timestamp +- changed source content hash + +Stale or missing source references must be stored in `knowledge_page_lint_findings` +with `finding_type = "stale_source_ref"` and enough `details` to show stored versus +current source state. + +Lint findings are derived diagnostics. They must not mutate authoritative source +memory. + +## Admin API + +Minimal admin readback endpoints: + +- `POST /v2/admin/knowledge/pages/rebuild` +- `GET /v2/admin/knowledge/pages` +- `GET /v2/admin/knowledge/pages/{page_id}` +- `POST /v2/admin/knowledge/pages/{page_id}/lint` + +These endpoints are local admin/operator surfaces. They must not call LLM, embedding, +rerank, or external provider adapters in v1. diff --git a/packages/elf-domain/src/knowledge.rs b/packages/elf-domain/src/knowledge.rs new file mode 100644 index 00000000..ce933b42 --- /dev/null +++ b/packages/elf-domain/src/knowledge.rs @@ -0,0 +1,86 @@ +//! Derived knowledge page contract identifiers and storage enums. + +use serde::{Deserialize, Serialize}; + +/// Current derived knowledge page contract schema identifier. +pub const KNOWLEDGE_PAGE_CONTRACT_SCHEMA_V1: &str = "elf.knowledge_page/v1"; +/// Current deterministic rebuild metadata schema identifier. +pub const KNOWLEDGE_PAGE_REBUILD_SCHEMA_V1: &str = "elf.knowledge_page.rebuild/v1"; +/// Current source coverage metadata schema identifier. +pub const KNOWLEDGE_PAGE_SOURCE_COVERAGE_SCHEMA_V1: &str = "elf.knowledge_page.source_coverage/v1"; + +/// Derived knowledge page category. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum KnowledgePageKind { + /// Project overview page. + Project, + /// Entity dossier page. + Entity, + /// Concept page. + Concept, + /// Issue timeline or issue dossier page. + Issue, + /// Decision page. + Decision, +} +impl KnowledgePageKind { + /// Returns the canonical storage string. + pub fn as_str(self) -> &'static str { + match self { + Self::Project => "project", + Self::Entity => "entity", + Self::Concept => "concept", + Self::Issue => "issue", + Self::Decision => "decision", + } + } + + /// Parses a canonical storage string. + pub fn parse(raw: &str) -> Option { + match raw { + "project" => Some(Self::Project), + "entity" => Some(Self::Entity), + "concept" => Some(Self::Concept), + "issue" => Some(Self::Issue), + "decision" => Some(Self::Decision), + _ => None, + } + } +} + +/// Authoritative source kind used by a derived page citation. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum KnowledgeSourceKind { + /// Memory note source. + Note, + /// Event source reserved for future durable event rows. + Event, + /// Graph relation fact source. + Relation, + /// Reviewed consolidation proposal source. + Proposal, +} +impl KnowledgeSourceKind { + /// Returns the canonical storage string. + pub fn as_str(self) -> &'static str { + match self { + Self::Note => "note", + Self::Event => "event", + Self::Relation => "relation", + Self::Proposal => "proposal", + } + } + + /// Parses a canonical storage string. + pub fn parse(raw: &str) -> Option { + match raw { + "note" => Some(Self::Note), + "event" => Some(Self::Event), + "relation" => Some(Self::Relation), + "proposal" => Some(Self::Proposal), + _ => None, + } + } +} diff --git a/packages/elf-domain/src/lib.rs b/packages/elf-domain/src/lib.rs index ec1d2fec..9e9747b8 100644 --- a/packages/elf-domain/src/lib.rs +++ b/packages/elf-domain/src/lib.rs @@ -3,6 +3,7 @@ pub mod consolidation; pub mod english_gate; pub mod evidence; +pub mod knowledge; pub mod memory_policy; pub mod ttl; pub mod writegate; diff --git a/packages/elf-service/src/knowledge.rs b/packages/elf-service/src/knowledge.rs new file mode 100644 index 00000000..dab31375 --- /dev/null +++ b/packages/elf-service/src/knowledge.rs @@ -0,0 +1,1411 @@ +//! Deterministic derived knowledge page rebuild and readback service APIs. + +use std::collections::{BTreeMap, BTreeSet}; + +use serde::{Deserialize, Serialize}; +use serde_json::{self, Map, Value}; +use sqlx::{Postgres, Transaction}; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::{ElfService, Error, Result}; +use elf_domain::knowledge::{ + KNOWLEDGE_PAGE_CONTRACT_SCHEMA_V1, KNOWLEDGE_PAGE_REBUILD_SCHEMA_V1, + KNOWLEDGE_PAGE_SOURCE_COVERAGE_SCHEMA_V1, KnowledgePageKind, KnowledgeSourceKind, +}; +use elf_storage::{ + knowledge::{ + self, KnowledgeEventSource, KnowledgeNoteSource, KnowledgePageLintFindingInsert, + KnowledgePageSectionInsert, KnowledgePageSourceRefInsert, KnowledgePageUpsert, + KnowledgeProposalSource, KnowledgeRelationSource, + }, + models::{ + KnowledgePage, KnowledgePageLintFinding, KnowledgePageSection, KnowledgePageSourceRef, + }, +}; + +const DEFAULT_LIST_LIMIT: i64 = 50; +const MAX_LIST_LIMIT: i64 = 200; + +/// Request to rebuild one derived knowledge page from explicit source ids. +#[derive(Clone, Debug, Deserialize)] +pub struct KnowledgePageRebuildRequest { + /// Tenant that owns the page and source records. + pub tenant_id: String, + /// Project that owns the page and source records. + pub project_id: String, + /// Agent requesting the rebuild. + pub agent_id: String, + /// Page kind. + pub page_kind: KnowledgePageKind, + /// Stable page key within the tenant/project/kind namespace. + pub page_key: String, + /// Optional display title; a deterministic title is generated when omitted. + pub title: Option, + #[serde(default)] + /// Memory note sources to compile into the page. + pub note_ids: Vec, + #[serde(default)] + /// Durable add_event audit source ids to compile into the page. + pub event_ids: Vec, + #[serde(default)] + /// Graph relation fact ids to compile into the page. + pub relation_ids: Vec, + #[serde(default)] + /// Applied consolidation proposal ids to compile into the page. + pub proposal_ids: Vec, + #[serde(default = "empty_object")] + /// Provider metadata for nondeterministic or future LLM-derived rebuilds. + pub provider_metadata: Value, +} + +/// Response returned after rebuilding a derived knowledge page. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageRebuildResponse { + /// Rebuilt page with sections, source refs, and lint findings. + pub page: KnowledgePageResponse, +} + +/// Request to get one derived knowledge page. +#[derive(Clone, Debug, Deserialize)] +pub struct KnowledgePageGetRequest { + /// Tenant that owns the page. + pub tenant_id: String, + /// Project that owns the page. + pub project_id: String, + /// Page identifier. + pub page_id: Uuid, +} + +/// Request to list derived knowledge pages. +#[derive(Clone, Debug, Deserialize)] +pub struct KnowledgePagesListRequest { + /// Tenant that owns the pages. + pub tenant_id: String, + /// Project that owns the pages. + pub project_id: String, + /// Optional page-kind filter. + pub page_kind: Option, + /// Maximum number of pages to return. + pub limit: Option, +} + +/// Response returned by derived knowledge page listing. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePagesListResponse { + /// Returned pages. + pub pages: Vec, +} + +/// Request to lint one derived knowledge page against current source snapshots. +#[derive(Clone, Debug, Deserialize)] +pub struct KnowledgePageLintRequest { + /// Tenant that owns the page. + pub tenant_id: String, + /// Project that owns the page. + pub project_id: String, + /// Page identifier. + pub page_id: Uuid, +} + +/// Response returned after linting one knowledge page. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageLintResponse { + /// Page identifier. + pub page_id: Uuid, + /// Current lint findings. + pub findings: Vec, +} + +/// Summary DTO for one derived knowledge page. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageSummary { + /// Page identifier. + pub page_id: Uuid, + /// Tenant that owns the page. + pub tenant_id: String, + /// Project that owns the page. + pub project_id: String, + /// Page kind. + pub page_kind: String, + /// Stable page key. + pub page_key: String, + /// Page title. + pub title: String, + /// Versioned page contract schema. + pub contract_schema: String, + /// Page lifecycle status. + pub status: String, + /// Canonical source snapshot hash. + pub rebuild_source_hash: String, + /// Canonical page content hash. + pub content_hash: String, + /// Source coverage metadata. + pub source_coverage: Value, + /// Rebuild metadata. + pub rebuild_metadata: Value, + /// Creation timestamp. + pub created_at: OffsetDateTime, + /// Last update timestamp. + pub updated_at: OffsetDateTime, + /// Last rebuild timestamp. + pub rebuilt_at: OffsetDateTime, +} +impl From for KnowledgePageSummary { + fn from(page: KnowledgePage) -> Self { + Self { + page_id: page.page_id, + tenant_id: page.tenant_id, + project_id: page.project_id, + page_kind: page.page_kind, + page_key: page.page_key, + title: page.title, + contract_schema: page.contract_schema, + status: page.status, + rebuild_source_hash: page.rebuild_source_hash, + content_hash: page.content_hash, + source_coverage: page.source_coverage, + rebuild_metadata: page.rebuild_metadata, + created_at: page.created_at, + updated_at: page.updated_at, + rebuilt_at: page.rebuilt_at, + } + } +} + +/// Full readback DTO for one derived knowledge page. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageResponse { + /// Page summary. + pub page: KnowledgePageSummary, + /// Page sections. + pub sections: Vec, + /// Normalized source refs. + pub source_refs: Vec, + /// Lint findings. + pub lint_findings: Vec, +} + +/// Readback DTO for one page section. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageSectionResponse { + /// Section identifier. + pub section_id: Uuid, + /// Parent page identifier. + pub page_id: Uuid, + /// Stable section key. + pub section_key: String, + /// Section heading. + pub heading: String, + /// Section role. + pub role: String, + /// Section content. + pub content: String, + /// Display order. + pub ordinal: i32, + /// Serialized citation array. + pub citations: Value, + /// Reason this section is intentionally unsupported, when present. + pub unsupported_reason: Option, + /// Section content hash. + pub content_hash: String, + /// Creation timestamp. + pub created_at: OffsetDateTime, + /// Last update timestamp. + pub updated_at: OffsetDateTime, +} +impl From for KnowledgePageSectionResponse { + fn from(section: KnowledgePageSection) -> Self { + Self { + section_id: section.section_id, + page_id: section.page_id, + section_key: section.section_key, + heading: section.heading, + role: section.role, + content: section.content, + ordinal: section.ordinal, + citations: section.citations, + unsupported_reason: section.unsupported_reason, + content_hash: section.content_hash, + created_at: section.created_at, + updated_at: section.updated_at, + } + } +} + +/// Readback DTO for one normalized source reference. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageSourceRefResponse { + /// Source-reference row identifier. + pub ref_id: Uuid, + /// Parent page identifier. + pub page_id: Uuid, + /// Citing section, when section-scoped. + pub section_id: Option, + /// Source kind. + pub source_kind: String, + /// Authoritative source identifier. + pub source_id: Uuid, + /// Captured source status. + pub source_status: Option, + /// Captured source update timestamp. + pub source_updated_at: Option, + /// Captured source content hash. + pub source_content_hash: Option, + /// Captured source snapshot. + pub source_snapshot: Value, + /// Citation-local metadata. + pub citation_metadata: Value, + /// Creation timestamp. + pub created_at: OffsetDateTime, +} +impl From for KnowledgePageSourceRefResponse { + fn from(source_ref: KnowledgePageSourceRef) -> Self { + Self { + ref_id: source_ref.ref_id, + page_id: source_ref.page_id, + section_id: source_ref.section_id, + source_kind: source_ref.source_kind, + source_id: source_ref.source_id, + source_status: source_ref.source_status, + source_updated_at: source_ref.source_updated_at, + source_content_hash: source_ref.source_content_hash, + source_snapshot: source_ref.source_snapshot, + citation_metadata: source_ref.citation_metadata, + created_at: source_ref.created_at, + } + } +} + +/// Readback DTO for one knowledge page lint finding. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageLintFindingResponse { + /// Lint finding identifier. + pub finding_id: Uuid, + /// Parent page identifier. + pub page_id: Uuid, + /// Associated section, when available. + pub section_id: Option, + /// Finding type. + pub finding_type: String, + /// Finding severity. + pub severity: String, + /// Source kind associated with the finding, when available. + pub source_kind: Option, + /// Source identifier associated with the finding, when available. + pub source_id: Option, + /// Human-readable finding message. + pub message: String, + /// Structured finding details. + pub details: Value, + /// Creation timestamp. + pub created_at: OffsetDateTime, +} +impl From for KnowledgePageLintFindingResponse { + fn from(finding: KnowledgePageLintFinding) -> Self { + Self { + finding_id: finding.finding_id, + page_id: finding.page_id, + section_id: finding.section_id, + finding_type: finding.finding_type, + severity: finding.severity, + source_kind: finding.source_kind, + source_id: finding.source_id, + message: finding.message, + details: finding.details, + created_at: finding.created_at, + } + } +} + +#[derive(Clone, Debug)] +struct SourceSnapshot { + kind: KnowledgeSourceKind, + id: Uuid, + status: Option, + updated_at: Option, + content_hash: Option, + snapshot: Value, + citation_metadata: Value, + line: String, +} + +#[derive(Clone, Debug)] +struct DraftSection { + section_id: Uuid, + section_key: String, + heading: String, + role: String, + content: String, + ordinal: i32, + source_indexes: Vec, + unsupported_reason: Option, + content_hash: String, + citations: Value, +} + +#[derive(Clone, Debug)] +struct LintDraft { + section_id: Option, + finding_type: String, + severity: String, + source_kind: Option, + source_id: Option, + message: String, + details: Value, +} + +#[derive(Clone, Debug)] +struct SourceIds { + note_ids: Vec, + event_ids: Vec, + relation_ids: Vec, + proposal_ids: Vec, +} +impl SourceIds { + fn from_request(req: &KnowledgePageRebuildRequest) -> Result { + let ids = Self { + note_ids: sorted_unique(&req.note_ids), + event_ids: sorted_unique(&req.event_ids), + relation_ids: sorted_unique(&req.relation_ids), + proposal_ids: sorted_unique(&req.proposal_ids), + }; + + ids.validate_non_empty()?; + + Ok(ids) + } + + fn from_source_refs(source_refs: &[KnowledgePageSourceRef]) -> Result { + let mut note_ids = Vec::new(); + let mut event_ids = Vec::new(); + let mut relation_ids = Vec::new(); + let mut proposal_ids = Vec::new(); + + for source_ref in source_refs { + match KnowledgeSourceKind::parse(source_ref.source_kind.as_str()) { + Some(KnowledgeSourceKind::Note) => note_ids.push(source_ref.source_id), + Some(KnowledgeSourceKind::Event) => event_ids.push(source_ref.source_id), + Some(KnowledgeSourceKind::Relation) => relation_ids.push(source_ref.source_id), + Some(KnowledgeSourceKind::Proposal) => proposal_ids.push(source_ref.source_id), + None => { + return Err(Error::InvalidRequest { + message: "stored knowledge page source kind is invalid".to_string(), + }); + }, + } + } + + Ok(Self { + note_ids: sorted_unique(¬e_ids), + event_ids: sorted_unique(&event_ids), + relation_ids: sorted_unique(&relation_ids), + proposal_ids: sorted_unique(&proposal_ids), + }) + } + + fn validate_non_empty(&self) -> Result<()> { + if self.note_ids.is_empty() + && self.event_ids.is_empty() + && self.relation_ids.is_empty() + && self.proposal_ids.is_empty() + { + return Err(Error::InvalidRequest { + message: "at least one source id is required for a knowledge page rebuild" + .to_string(), + }); + } + + Ok(()) + } + + fn require_counts( + &self, + notes: usize, + events: usize, + relations: usize, + proposals: usize, + ) -> Result<()> { + if notes != self.note_ids.len() + || events != self.event_ids.len() + || relations != self.relation_ids.len() + || proposals != self.proposal_ids.len() + { + return Err(Error::InvalidRequest { + message: + "all requested knowledge page sources must exist and proposals must be applied" + .to_string(), + }); + } + + Ok(()) + } +} + +impl ElfService { + /// Rebuilds and persists one derived knowledge page from explicit source ids. + pub async fn knowledge_page_rebuild( + &self, + req: KnowledgePageRebuildRequest, + ) -> Result { + validate_context(req.tenant_id.as_str(), req.project_id.as_str(), req.agent_id.as_str())?; + validate_non_empty("page_key", req.page_key.as_str())?; + validate_object("provider_metadata", &req.provider_metadata)?; + + let ids = SourceIds::from_request(&req)?; + let title = + req.title.clone().unwrap_or_else(|| generated_title(req.page_kind, &req.page_key)); + let sources = self.resolve_sources(&req, &ids).await?; + let now = OffsetDateTime::now_utc(); + let source_snapshot = source_snapshot_value(&sources); + let source_hash = hash_json(&source_snapshot)?; + let mut sections = build_sections(&sources)?; + let lint = lint_unsupported_sections(§ions); + + for section in &mut sections { + section.citations = citations_value(section, &sources); + section.content_hash = hash_json(§ion_hash_payload(section))?; + } + + let source_coverage = + source_coverage_value(req.page_kind, &req.page_key, §ions, &sources); + let rebuild_metadata = rebuild_metadata(&source_hash, &req.provider_metadata); + let content_hash = + page_content_hash(&title, §ions, &source_coverage, &rebuild_metadata)?; + let page_id = Uuid::new_v4(); + let mut tx = self.db.pool.begin().await?; + let page = knowledge::upsert_knowledge_page( + &mut *tx, + KnowledgePageUpsert { + page_id, + tenant_id: req.tenant_id.as_str(), + project_id: req.project_id.as_str(), + page_kind: req.page_kind.as_str(), + page_key: req.page_key.as_str(), + title: title.as_str(), + contract_schema: KNOWLEDGE_PAGE_CONTRACT_SCHEMA_V1, + status: "active", + rebuild_source_hash: source_hash.as_str(), + content_hash: content_hash.as_str(), + source_coverage: &source_coverage, + source_snapshot: &source_snapshot, + rebuild_metadata: &rebuild_metadata, + now, + }, + ) + .await?; + + replace_page_children(&mut tx, page.page_id, §ions, &sources, &lint, now).await?; + + tx.commit().await?; + + Ok(KnowledgePageRebuildResponse { page: self.knowledge_page_response(page).await? }) + } + + /// Gets one derived knowledge page with sections, source refs, and lint findings. + pub async fn knowledge_page_get( + &self, + req: KnowledgePageGetRequest, + ) -> Result { + let page = knowledge::get_knowledge_page( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + req.page_id, + ) + .await? + .ok_or_else(|| Error::NotFound { message: "knowledge page not found".to_string() })?; + + self.knowledge_page_response(page).await + } + + /// Lists derived knowledge pages. + pub async fn knowledge_pages_list( + &self, + req: KnowledgePagesListRequest, + ) -> Result { + let page_kind = req.page_kind.map(KnowledgePageKind::as_str); + let pages = knowledge::list_knowledge_pages( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + page_kind, + bounded_limit(req.limit), + ) + .await? + .into_iter() + .map(KnowledgePageSummary::from) + .collect(); + + Ok(KnowledgePagesListResponse { pages }) + } + + /// Lints a derived knowledge page against current source snapshots. + pub async fn knowledge_page_lint( + &self, + req: KnowledgePageLintRequest, + ) -> Result { + let page = knowledge::get_knowledge_page( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + req.page_id, + ) + .await? + .ok_or_else(|| Error::NotFound { message: "knowledge page not found".to_string() })?; + let source_refs = + knowledge::list_knowledge_page_source_refs(&self.db.pool, page.page_id).await?; + let findings = self.lint_source_refs(&page, &source_refs).await?; + let now = OffsetDateTime::now_utc(); + let mut tx = self.db.pool.begin().await?; + + knowledge::delete_knowledge_page_lint_findings(&mut *tx, page.page_id).await?; + + for finding in &findings { + insert_lint_finding(&mut tx, page.page_id, finding, now).await?; + } + + tx.commit().await?; + + let persisted = knowledge::list_knowledge_page_lint_findings(&self.db.pool, page.page_id) + .await? + .into_iter() + .map(KnowledgePageLintFindingResponse::from) + .collect(); + + Ok(KnowledgePageLintResponse { page_id: page.page_id, findings: persisted }) + } + + async fn knowledge_page_response(&self, page: KnowledgePage) -> Result { + let page_id = page.page_id; + let sections = knowledge::list_knowledge_page_sections(&self.db.pool, page_id) + .await? + .into_iter() + .map(KnowledgePageSectionResponse::from) + .collect(); + let source_refs = knowledge::list_knowledge_page_source_refs(&self.db.pool, page_id) + .await? + .into_iter() + .map(KnowledgePageSourceRefResponse::from) + .collect(); + let lint_findings = knowledge::list_knowledge_page_lint_findings(&self.db.pool, page_id) + .await? + .into_iter() + .map(KnowledgePageLintFindingResponse::from) + .collect(); + + Ok(KnowledgePageResponse { + page: KnowledgePageSummary::from(page), + sections, + source_refs, + lint_findings, + }) + } + + async fn resolve_sources( + &self, + req: &KnowledgePageRebuildRequest, + ids: &SourceIds, + ) -> Result> { + let notes = knowledge::fetch_knowledge_note_sources( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + &ids.note_ids, + ) + .await?; + let events = knowledge::fetch_knowledge_event_sources( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + &ids.event_ids, + ) + .await?; + let relations = knowledge::fetch_knowledge_relation_sources( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + &ids.relation_ids, + ) + .await?; + let proposals = knowledge::fetch_knowledge_proposal_sources( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + &ids.proposal_ids, + ) + .await?; + + ids.require_counts(notes.len(), events.len(), relations.len(), proposals.len())?; + + let mut sources = Vec::new(); + + sources.extend(notes.into_iter().map(note_source_snapshot)); + sources.extend(events.into_iter().map(event_source_snapshot)); + sources.extend(relations.into_iter().map(relation_source_snapshot)); + sources.extend(proposals.into_iter().map(proposal_source_snapshot)); + sources.sort_by_key(source_sort_key); + + Ok(sources) + } + + async fn lint_source_refs( + &self, + page: &KnowledgePage, + source_refs: &[KnowledgePageSourceRef], + ) -> Result> { + let ids = SourceIds::from_source_refs(source_refs)?; + let current = self.resolve_current_source_map(page, &ids).await?; + let mut findings = Vec::new(); + + for source_ref in source_refs { + let key = current_key(source_ref.source_kind.as_str(), source_ref.source_id); + let Some(snapshot) = current.get(&key) else { + findings.push(missing_source_finding(source_ref)); + + continue; + }; + + if source_changed(source_ref, snapshot) { + findings.push(stale_source_finding(source_ref, snapshot)); + } + } + + Ok(findings) + } + + async fn resolve_current_source_map( + &self, + page: &KnowledgePage, + ids: &SourceIds, + ) -> Result> { + let req = KnowledgePageRebuildRequest { + tenant_id: page.tenant_id.clone(), + project_id: page.project_id.clone(), + agent_id: String::new(), + page_kind: KnowledgePageKind::parse(page.page_kind.as_str()).ok_or_else(|| { + Error::InvalidRequest { + message: "stored knowledge page kind is invalid".to_string(), + } + })?, + page_key: page.page_key.clone(), + title: Some(page.title.clone()), + note_ids: ids.note_ids.clone(), + event_ids: ids.event_ids.clone(), + relation_ids: ids.relation_ids.clone(), + proposal_ids: ids.proposal_ids.clone(), + provider_metadata: empty_object(), + }; + let mut sources = self.resolve_sources(&req, ids).await?; + + Ok(sources.drain(..).map(|source| (source_key(&source), source)).collect()) + } +} + +fn build_sections(sources: &[SourceSnapshot]) -> Result> { + let note_indexes = source_indexes(sources, KnowledgeSourceKind::Note); + let event_indexes = source_indexes(sources, KnowledgeSourceKind::Event); + let relation_indexes = source_indexes(sources, KnowledgeSourceKind::Relation); + let proposal_indexes = source_indexes(sources, KnowledgeSourceKind::Proposal); + let mut sections = Vec::new(); + + push_section( + &mut sections, + "source-notes", + "Source Notes", + "current_truth", + sources, + note_indexes, + ); + push_section(&mut sections, "event-audits", "Event Audits", "history", sources, event_indexes); + push_section(&mut sections, "relations", "Relations", "relations", sources, relation_indexes); + push_section( + &mut sections, + "reviewed-proposals", + "Reviewed Proposals", + "proposals", + sources, + proposal_indexes, + ); + + if sections.is_empty() { + return Err(Error::InvalidRequest { + message: "knowledge page rebuild did not produce any cited sections".to_string(), + }); + } + + Ok(sections) +} + +fn push_section( + sections: &mut Vec, + section_key: &str, + heading: &str, + role: &str, + sources: &[SourceSnapshot], + source_indexes: Vec, +) { + if source_indexes.is_empty() { + return; + } + + let ordinal = i32::try_from(sections.len()).unwrap_or(i32::MAX); + let content = source_indexes + .iter() + .filter_map(|index| sources.get(*index)) + .map(|source| format!("- {}", source.line)) + .collect::>() + .join("\n"); + + sections.push(DraftSection { + section_id: Uuid::new_v4(), + section_key: section_key.to_string(), + heading: heading.to_string(), + role: role.to_string(), + content, + ordinal, + source_indexes, + unsupported_reason: None, + content_hash: String::new(), + citations: Value::Array(Vec::new()), + }); +} + +fn lint_unsupported_sections(sections: &[DraftSection]) -> Vec { + sections + .iter() + .filter_map(|section| { + section.unsupported_reason.as_ref().map(|reason| LintDraft { + section_id: Some(section.section_id), + finding_type: "unsupported_section".to_string(), + severity: "warning".to_string(), + source_kind: None, + source_id: None, + message: format!("Knowledge page section lacks citations: {reason}"), + details: serde_json::json!({ "section_key": section.section_key }), + }) + }) + .collect() +} + +fn source_indexes(sources: &[SourceSnapshot], kind: KnowledgeSourceKind) -> Vec { + sources + .iter() + .enumerate() + .filter_map(|(index, source)| (source.kind == kind).then_some(index)) + .collect() +} + +fn citations_value(section: &DraftSection, sources: &[SourceSnapshot]) -> Value { + Value::Array( + section + .source_indexes + .iter() + .filter_map(|index| sources.get(*index)) + .map(source_citation_value) + .collect(), + ) +} + +fn note_source_snapshot(row: KnowledgeNoteSource) -> SourceSnapshot { + let content_hash = hash_text(row.text.as_str()); + let line = format!("{}{}", note_prefix(&row), row.text); + let snapshot = serde_json::json!({ + "kind": "note", + "note_id": row.note_id, + "agent_id": row.agent_id.clone(), + "scope": row.scope.clone(), + "type": row.note_type.clone(), + "key": row.key.clone(), + "status": row.status.clone(), + "updated_at": row.updated_at, + "created_at": row.created_at, + "expires_at": row.expires_at, + "embedding_version": row.embedding_version.clone(), + "content_hash": content_hash, + "source_ref": row.source_ref.clone(), + "importance": row.importance, + "confidence": row.confidence, + }); + + SourceSnapshot { + kind: KnowledgeSourceKind::Note, + id: row.note_id, + status: Some(row.status), + updated_at: Some(row.updated_at), + content_hash: Some(content_hash), + snapshot, + citation_metadata: serde_json::json!({ "section_role": "source_note" }), + line, + } +} + +fn event_source_snapshot(row: KnowledgeEventSource) -> SourceSnapshot { + let content_hash = hash_json_lossy(&row.details); + let line = format!( + "add_event audit {} {} for {}{}", + row.note_op, + row.policy_decision, + row.note_type, + row.note_key.as_ref().map(|key| format!(" key {key}")).unwrap_or_default() + ); + let snapshot = serde_json::json!({ + "kind": "event", + "decision_id": row.decision_id, + "agent_id": row.agent_id.clone(), + "scope": row.scope.clone(), + "pipeline": row.pipeline.clone(), + "note_type": row.note_type.clone(), + "note_key": row.note_key.clone(), + "note_id": row.note_id, + "policy_decision": row.policy_decision.clone(), + "note_op": row.note_op.clone(), + "reason_code": row.reason_code.clone(), + "details_hash": content_hash, + "ts": row.ts, + }); + + SourceSnapshot { + kind: KnowledgeSourceKind::Event, + id: row.decision_id, + status: Some(row.policy_decision), + updated_at: Some(row.ts), + content_hash: Some(content_hash), + snapshot, + citation_metadata: serde_json::json!({ "section_role": "event_audit" }), + line, + } +} + +fn relation_source_snapshot(row: KnowledgeRelationSource) -> SourceSnapshot { + let object = row.object_entity.clone().or(row.object_value.clone()).unwrap_or_default(); + let temporal_status = if row.valid_to.is_some() { "historical" } else { "current" }; + let line = format!("{} {} {} ({temporal_status}).", row.subject, row.predicate, object); + let content_hash = hash_text(line.as_str()); + let snapshot = serde_json::json!({ + "kind": "relation", + "fact_id": row.fact_id, + "agent_id": row.agent_id.clone(), + "scope": row.scope.clone(), + "subject": { "canonical": row.subject.clone(), "kind": row.subject_kind.clone() }, + "predicate": row.predicate.clone(), + "object": { + "entity": row.object_entity.clone(), + "kind": row.object_kind.clone(), + "value": row.object_value.clone() + }, + "valid_from": row.valid_from, + "valid_to": row.valid_to, + "updated_at": row.updated_at, + "content_hash": content_hash, + "evidence_notes": row.evidence_notes.clone(), + }); + + SourceSnapshot { + kind: KnowledgeSourceKind::Relation, + id: row.fact_id, + status: Some(temporal_status.to_string()), + updated_at: Some(row.updated_at), + content_hash: Some(content_hash), + snapshot, + citation_metadata: serde_json::json!({ "section_role": "relation_fact" }), + line, + } +} + +fn proposal_source_snapshot(row: KnowledgeProposalSource) -> SourceSnapshot { + let content_hash = hash_json_lossy(&serde_json::json!({ + "diff": row.diff.clone(), + "proposed_payload": row.proposed_payload.clone(), + "review_state": row.review_state.clone(), + })); + let summary = + row.diff.get("summary").and_then(Value::as_str).unwrap_or("Applied consolidation proposal"); + let line = format!("Applied proposal {}: {summary}", row.proposal_kind); + let snapshot = serde_json::json!({ + "kind": "proposal", + "proposal_id": row.proposal_id, + "run_id": row.run_id, + "agent_id": row.agent_id.clone(), + "proposal_kind": row.proposal_kind.clone(), + "apply_intent": row.apply_intent.clone(), + "review_state": row.review_state.clone(), + "source_refs": row.source_refs.clone(), + "source_snapshot": row.source_snapshot.clone(), + "lineage": row.lineage.clone(), + "diff": row.diff.clone(), + "confidence": row.confidence, + "unsupported_claim_flags": row.unsupported_claim_flags.clone(), + "contradiction_markers": row.contradiction_markers.clone(), + "staleness_markers": row.staleness_markers.clone(), + "target_ref": row.target_ref.clone(), + "proposed_payload_hash": content_hash, + "updated_at": row.updated_at, + }); + + SourceSnapshot { + kind: KnowledgeSourceKind::Proposal, + id: row.proposal_id, + status: Some(row.review_state), + updated_at: Some(row.updated_at), + content_hash: Some(content_hash), + snapshot, + citation_metadata: serde_json::json!({ "section_role": "reviewed_proposal" }), + line, + } +} + +fn source_citation_value(source: &SourceSnapshot) -> Value { + serde_json::json!({ + "source_kind": source.kind.as_str(), + "source_id": source.id, + "source_status": source.status.clone(), + "source_updated_at": source.updated_at, + "source_content_hash": source.content_hash.clone(), + "source_snapshot": source.snapshot.clone(), + "citation_metadata": source.citation_metadata.clone(), + }) +} + +fn source_snapshot_value(sources: &[SourceSnapshot]) -> Value { + serde_json::json!({ + "schema": KNOWLEDGE_PAGE_CONTRACT_SCHEMA_V1, + "sources": sources.iter().map(source_citation_value).collect::>(), + }) +} + +fn source_coverage_value( + page_kind: KnowledgePageKind, + page_key: &str, + sections: &[DraftSection], + sources: &[SourceSnapshot], +) -> Value { + let cited = sections + .iter() + .flat_map(|section| section.source_indexes.iter().copied()) + .collect::>(); + let counts = source_counts(sources); + + serde_json::json!({ + "schema": KNOWLEDGE_PAGE_SOURCE_COVERAGE_SCHEMA_V1, + "page_kind": page_kind.as_str(), + "page_key": page_key, + "source_counts": counts, + "source_count": sources.len(), + "cited_source_count": cited.len(), + "section_count": sections.len(), + "unsupported_section_count": sections.iter().filter(|section| section.unsupported_reason.is_some()).count(), + "coverage_complete": cited.len() == sources.len(), + }) +} + +fn source_counts(sources: &[SourceSnapshot]) -> Value { + let mut counts = BTreeMap::<&str, usize>::new(); + + for source in sources { + *counts.entry(source.kind.as_str()).or_insert(0) += 1; + } + + serde_json::json!(counts) +} + +fn rebuild_metadata(source_hash: &str, provider_metadata: &Value) -> Value { + let llm_derived = + provider_metadata.get("llm_derived").and_then(Value::as_bool).unwrap_or(false); + + serde_json::json!({ + "schema": KNOWLEDGE_PAGE_REBUILD_SCHEMA_V1, + "source_snapshot_hash": source_hash, + "deterministic": !llm_derived, + "provider_metadata": provider_metadata, + "allowed_variance": if llm_derived { + serde_json::json!(["LLM-derived page text may vary; provider metadata records the nondeterministic input path."]) + } else { + serde_json::json!([]) + }, + }) +} + +fn section_hash_payload(section: &DraftSection) -> Value { + serde_json::json!({ + "section_key": section.section_key.clone(), + "heading": section.heading.clone(), + "role": section.role.clone(), + "content": section.content.clone(), + "citations": section.citations.clone(), + "unsupported_reason": section.unsupported_reason.clone(), + }) +} + +fn page_content_hash( + title: &str, + sections: &[DraftSection], + source_coverage: &Value, + rebuild_metadata: &Value, +) -> Result { + hash_json(&serde_json::json!({ + "title": title, + "sections": sections.iter().map(section_hash_payload).collect::>(), + "source_coverage": source_coverage, + "rebuild_metadata": rebuild_metadata, + })) +} + +fn missing_source_finding(source_ref: &KnowledgePageSourceRef) -> LintDraft { + LintDraft { + section_id: source_ref.section_id, + finding_type: "stale_source_ref".to_string(), + severity: "error".to_string(), + source_kind: KnowledgeSourceKind::parse(source_ref.source_kind.as_str()), + source_id: Some(source_ref.source_id), + message: "Knowledge page source reference no longer resolves.".to_string(), + details: serde_json::json!({ + "source_kind": source_ref.source_kind.clone(), + "source_id": source_ref.source_id, + }), + } +} + +fn stale_source_finding( + source_ref: &KnowledgePageSourceRef, + current: &SourceSnapshot, +) -> LintDraft { + LintDraft { + section_id: source_ref.section_id, + finding_type: "stale_source_ref".to_string(), + severity: "warning".to_string(), + source_kind: Some(current.kind), + source_id: Some(current.id), + message: "Knowledge page source reference snapshot is stale.".to_string(), + details: serde_json::json!({ + "stored": { + "status": source_ref.source_status.clone(), + "updated_at": source_ref.source_updated_at, + "content_hash": source_ref.source_content_hash.clone(), + }, + "current": { + "status": current.status.clone(), + "updated_at": current.updated_at, + "content_hash": current.content_hash.clone(), + }, + }), + } +} + +fn source_changed(source_ref: &KnowledgePageSourceRef, current: &SourceSnapshot) -> bool { + source_ref.source_status.as_deref() != current.status.as_deref() + || source_ref.source_updated_at != current.updated_at + || source_ref.source_content_hash.as_deref() != current.content_hash.as_deref() +} + +fn source_sort_key(source: &SourceSnapshot) -> (String, Uuid) { + (source.kind.as_str().to_string(), source.id) +} + +fn source_key(source: &SourceSnapshot) -> String { + current_key(source.kind.as_str(), source.id) +} + +fn current_key(kind: &str, source_id: Uuid) -> String { + format!("{kind}:{source_id}") +} + +fn note_prefix(row: &KnowledgeNoteSource) -> String { + row.key + .as_ref() + .map(|key| format!("[{}:{key}] ", row.note_type)) + .unwrap_or_else(|| format!("[{}] ", row.note_type)) +} + +fn generated_title(page_kind: KnowledgePageKind, page_key: &str) -> String { + format!("{} Knowledge Page: {page_key}", title_kind(page_kind)) +} + +fn title_kind(page_kind: KnowledgePageKind) -> &'static str { + match page_kind { + KnowledgePageKind::Project => "Project", + KnowledgePageKind::Entity => "Entity", + KnowledgePageKind::Concept => "Concept", + KnowledgePageKind::Issue => "Issue", + KnowledgePageKind::Decision => "Decision", + } +} + +fn sorted_unique(ids: &[Uuid]) -> Vec { + ids.iter().copied().collect::>().into_iter().collect() +} + +fn bounded_limit(limit: Option) -> i64 { + limit.map(i64::from).unwrap_or(DEFAULT_LIST_LIMIT).clamp(1, MAX_LIST_LIMIT) +} + +fn validate_context(tenant_id: &str, project_id: &str, agent_id: &str) -> Result<()> { + validate_non_empty("tenant_id", tenant_id)?; + validate_non_empty("project_id", project_id)?; + + validate_non_empty("agent_id", agent_id) +} + +fn validate_non_empty(field: &'static str, value: &str) -> Result<()> { + if value.trim().is_empty() { + return Err(Error::InvalidRequest { message: format!("{field} must not be empty.") }); + } + + Ok(()) +} + +fn validate_object(field: &str, value: &Value) -> Result<()> { + if matches!(value, Value::Object(_)) { + Ok(()) + } else { + Err(Error::InvalidRequest { message: format!("{field} must be a JSON object.") }) + } +} + +fn empty_object() -> Value { + Value::Object(Map::new()) +} + +fn hash_text(text: &str) -> String { + blake3::hash(text.as_bytes()).to_hex().to_string() +} + +fn hash_json_lossy(value: &Value) -> String { + serde_json::to_vec(value) + .map(|raw| blake3::hash(&raw).to_hex().to_string()) + .unwrap_or_else(|_| hash_text(value.to_string().as_str())) +} + +fn hash_json(value: &Value) -> Result { + let raw = serde_json::to_vec(value).map_err(|err| Error::InvalidRequest { + message: format!("failed to serialize knowledge page payload: {err}"), + })?; + + Ok(blake3::hash(&raw).to_hex().to_string()) +} + +async fn replace_page_children( + tx: &mut Transaction<'_, Postgres>, + page_id: Uuid, + sections: &[DraftSection], + sources: &[SourceSnapshot], + lint: &[LintDraft], + now: OffsetDateTime, +) -> Result<()> { + knowledge::delete_knowledge_page_children(&mut **tx, page_id).await?; + + for section in sections { + insert_section(tx, page_id, section, now).await?; + + for source_index in §ion.source_indexes { + let source = sources.get(*source_index).ok_or_else(|| Error::InvalidRequest { + message: "knowledge page section referenced an unknown source".to_string(), + })?; + + insert_source_ref(tx, page_id, section.section_id, source, now).await?; + } + } + for finding in lint { + insert_lint_finding(tx, page_id, finding, now).await?; + } + + Ok(()) +} + +async fn insert_section( + tx: &mut Transaction<'_, Postgres>, + page_id: Uuid, + section: &DraftSection, + now: OffsetDateTime, +) -> Result<()> { + knowledge::insert_knowledge_page_section( + &mut **tx, + KnowledgePageSectionInsert { + section_id: section.section_id, + page_id, + section_key: section.section_key.as_str(), + heading: section.heading.as_str(), + role: section.role.as_str(), + content: section.content.as_str(), + ordinal: section.ordinal, + citations: §ion.citations, + unsupported_reason: section.unsupported_reason.as_deref(), + content_hash: section.content_hash.as_str(), + now, + }, + ) + .await + .map_err(Error::from) +} + +async fn insert_source_ref( + tx: &mut Transaction<'_, Postgres>, + page_id: Uuid, + section_id: Uuid, + source: &SourceSnapshot, + now: OffsetDateTime, +) -> Result<()> { + knowledge::insert_knowledge_page_source_ref( + &mut **tx, + KnowledgePageSourceRefInsert { + ref_id: Uuid::new_v4(), + page_id, + section_id: Some(section_id), + source_kind: source.kind.as_str(), + source_id: source.id, + source_status: source.status.as_deref(), + source_updated_at: source.updated_at, + source_content_hash: source.content_hash.as_deref(), + source_snapshot: &source.snapshot, + citation_metadata: &source.citation_metadata, + now, + }, + ) + .await + .map_err(Error::from) +} + +async fn insert_lint_finding( + tx: &mut Transaction<'_, Postgres>, + page_id: Uuid, + finding: &LintDraft, + now: OffsetDateTime, +) -> Result<()> { + knowledge::insert_knowledge_page_lint_finding( + &mut **tx, + KnowledgePageLintFindingInsert { + finding_id: Uuid::new_v4(), + page_id, + section_id: finding.section_id, + finding_type: finding.finding_type.as_str(), + severity: finding.severity.as_str(), + source_kind: finding.source_kind.map(KnowledgeSourceKind::as_str), + source_id: finding.source_id, + message: finding.message.as_str(), + details: &finding.details, + now, + }, + ) + .await + .map_err(Error::from) +} + +#[cfg(test)] +mod tests { + use crate::knowledge::{ + self, KnowledgePageKind, KnowledgePageSourceRef, KnowledgeSourceKind, OffsetDateTime, + SourceSnapshot, Uuid, + }; + + fn test_source(kind: KnowledgeSourceKind, raw_id: u128, line: &str) -> SourceSnapshot { + let id = Uuid::from_u128(raw_id); + let content_hash = knowledge::hash_text(line); + + SourceSnapshot { + kind, + id, + status: Some("active".to_string()), + updated_at: Some(OffsetDateTime::UNIX_EPOCH), + content_hash: Some(content_hash.clone()), + snapshot: serde_json::json!({ + "kind": kind.as_str(), + "id": id, + "status": "active", + "updated_at": OffsetDateTime::UNIX_EPOCH, + "content_hash": content_hash, + }), + citation_metadata: serde_json::json!({ "fixture": "knowledge_unit" }), + line: line.to_string(), + } + } + + #[test] + fn build_sections_preserves_citations_and_deterministic_hashes() { + let sources = vec![ + test_source(KnowledgeSourceKind::Note, 1, "A source note supports the page."), + test_source(KnowledgeSourceKind::Event, 2, "An event audit supports the page."), + test_source(KnowledgeSourceKind::Relation, 3, "A relation supports the page."), + test_source(KnowledgeSourceKind::Proposal, 4, "An applied proposal supports the page."), + ]; + let mut first_sections = + knowledge::build_sections(&sources).expect("sections should build"); + + for section in &mut first_sections { + section.citations = knowledge::citations_value(section, &sources); + section.content_hash = knowledge::hash_json(&knowledge::section_hash_payload(section)) + .expect("section hash should serialize"); + } + + assert_eq!(first_sections.len(), 4); + assert!(first_sections.iter().all(|section| { + section.citations.as_array().is_some_and(|citations| !citations.is_empty()) + })); + + let coverage = knowledge::source_coverage_value( + KnowledgePageKind::Project, + "elf", + &first_sections, + &sources, + ); + let metadata = knowledge::rebuild_metadata("source-hash", &knowledge::empty_object()); + let first_hash = knowledge::page_content_hash("ELF", &first_sections, &coverage, &metadata) + .expect("page hash should serialize"); + let second_hash = + knowledge::page_content_hash("ELF", &first_sections, &coverage, &metadata) + .expect("page hash should serialize"); + + assert_eq!(coverage["coverage_complete"], true); + assert_eq!(metadata["deterministic"], true); + assert_eq!(first_hash, second_hash); + } + + #[test] + fn rebuild_metadata_records_llm_variance() { + let metadata = knowledge::rebuild_metadata( + "source-hash", + &serde_json::json!({ + "llm_derived": true, + "provider_id": "fixture", + "model": "fixture-model", + }), + ); + + assert_eq!(metadata["deterministic"], false); + assert!(metadata["allowed_variance"].as_array().is_some_and(|items| !items.is_empty())); + assert_eq!(metadata["provider_metadata"]["provider_id"], "fixture"); + } + + #[test] + fn stale_source_comparison_detects_changed_snapshot() { + let source_id = Uuid::from_u128(42); + let stored = KnowledgePageSourceRef { + ref_id: Uuid::from_u128(1), + page_id: Uuid::from_u128(2), + section_id: Some(Uuid::from_u128(3)), + source_kind: "note".to_string(), + source_id, + source_status: Some("active".to_string()), + source_updated_at: Some(OffsetDateTime::UNIX_EPOCH), + source_content_hash: Some("old-hash".to_string()), + source_snapshot: serde_json::json!({}), + citation_metadata: serde_json::json!({}), + created_at: OffsetDateTime::UNIX_EPOCH, + }; + let current = SourceSnapshot { + kind: KnowledgeSourceKind::Note, + id: source_id, + status: Some("active".to_string()), + updated_at: Some(OffsetDateTime::UNIX_EPOCH), + content_hash: Some("new-hash".to_string()), + snapshot: serde_json::json!({}), + citation_metadata: serde_json::json!({}), + line: "Updated note source.".to_string(), + }; + let finding = knowledge::stale_source_finding(&stored, ¤t); + + assert!(knowledge::source_changed(&stored, ¤t)); + assert_eq!(finding.finding_type, "stale_source_ref"); + assert_eq!(finding.source_kind, Some(KnowledgeSourceKind::Note)); + assert_eq!(finding.source_id, Some(source_id)); + } +} diff --git a/packages/elf-service/src/lib.rs b/packages/elf-service/src/lib.rs index 7e2c350f..7ba4f202 100644 --- a/packages/elf-service/src/lib.rs +++ b/packages/elf-service/src/lib.rs @@ -11,6 +11,7 @@ pub mod delete; pub mod docs; pub mod graph; pub mod graph_query; +pub mod knowledge; pub mod list; pub mod notes; pub mod progressive_search; @@ -66,6 +67,12 @@ pub use self::{ AdminIngestionProfileVersionsListRequest, AdminIngestionProfileVersionsListResponse, AdminIngestionProfilesListResponse, IngestionProfileRef, IngestionProfileSelector, }, + knowledge::{ + KnowledgePageGetRequest, KnowledgePageLintFindingResponse, KnowledgePageLintRequest, + KnowledgePageLintResponse, KnowledgePageRebuildRequest, KnowledgePageRebuildResponse, + KnowledgePageResponse, KnowledgePageSectionResponse, KnowledgePageSourceRefResponse, + KnowledgePageSummary, KnowledgePagesListRequest, KnowledgePagesListResponse, + }, list::{ListItem, ListRequest, ListResponse}, notes::{NoteFetchRequest, NoteFetchResponse}, progressive_search::{ diff --git a/packages/elf-service/tests/acceptance/knowledge_pages.rs b/packages/elf-service/tests/acceptance/knowledge_pages.rs new file mode 100644 index 00000000..81ad83f3 --- /dev/null +++ b/packages/elf-service/tests/acceptance/knowledge_pages.rs @@ -0,0 +1,385 @@ +use std::sync::{Arc, atomic::AtomicUsize}; + +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank}; +use elf_domain::knowledge::KnowledgePageKind; +use elf_service::{ + AddNoteInput, AddNoteRequest, ElfService, KnowledgePageLintRequest, + KnowledgePageRebuildRequest, Providers, +}; +use elf_testkit::TestDatabase; + +const TENANT_ID: &str = "tenant_knowledge"; +const PROJECT_ID: &str = "project_knowledge"; +const AGENT_ID: &str = "agent_knowledge"; + +struct KnowledgeFixture { + service: ElfService, + _test_db: TestDatabase, +} + +async fn setup_service(test_name: &str) -> Option { + let Some(test_db) = acceptance::test_db().await else { + eprintln!("Skipping {test_name}; set ELF_PG_DSN to run this test."); + + return None; + }; + let Some(qdrant_url) = acceptance::test_qdrant_url() else { + eprintln!("Skipping {test_name}; set ELF_QDRANT_URL to run this test."); + + return None; + }; + let collection = test_db.collection_name("elf_acceptance"); + let docs_collection = test_db.collection_name("elf_acceptance_docs"); + let cfg = acceptance::test_config( + test_db.dsn().to_string(), + qdrant_url, + 4_096, + collection, + docs_collection, + ); + let extractor = SpyExtractor { + calls: Arc::new(AtomicUsize::new(0)), + payload: serde_json::json!({ "notes": [] }), + }; + let providers = Providers::new( + Arc::new(StubEmbedding { vector_dim: 4_096 }), + Arc::new(StubRerank), + Arc::new(extractor), + ); + let service = + acceptance::build_service(cfg, providers).await.expect("Failed to build service."); + + acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); + + Some(KnowledgeFixture { service, _test_db: test_db }) +} + +async fn insert_source_note(service: &ElfService, key: &str, text: &str) -> Uuid { + let response = service + .add_note(AddNoteRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: AGENT_ID.to_string(), + scope: "agent_private".to_string(), + notes: vec![AddNoteInput { + r#type: "fact".to_string(), + key: Some(key.to_string()), + text: text.to_string(), + structured: None, + importance: 0.7, + confidence: 0.9, + ttl_days: None, + source_ref: serde_json::json!({ "schema": "acceptance/v1", "key": key }), + write_policy: None, + }], + }) + .await + .expect("add_note should persist source note"); + + response.results[0].note_id.expect("source note id should be present") +} + +async fn insert_event_audit(service: &ElfService, note_id: Uuid) -> Uuid { + let decision_id = Uuid::new_v4(); + + sqlx::query( + "\ +INSERT INTO memory_ingest_decisions ( + decision_id, + tenant_id, + project_id, + agent_id, + scope, + pipeline, + note_type, + note_key, + note_id, + base_decision, + policy_decision, + note_op, + reason_code, + details, + ts +) +VALUES ($1,$2,$3,$4,'agent_private','add_event','fact','knowledge_event',$5,'remember','remember','ADD',NULL,$6,$7)", + ) + .bind(decision_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(note_id) + .bind(serde_json::json!({ "fixture": "knowledge_page_event_audit" })) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("event audit should be inserted"); + + decision_id +} + +async fn insert_relation(service: &ElfService, note_id: Uuid) -> Uuid { + let subject_id = Uuid::new_v4(); + let fact_id = Uuid::new_v4(); + let evidence_id = Uuid::new_v4(); + + sqlx::query( + "\ +INSERT INTO graph_entities ( + entity_id, + tenant_id, + project_id, + canonical, + canonical_norm, + kind, + created_at, + updated_at +) +VALUES ($1,$2,$3,'ELF knowledge pages','elf knowledge pages','concept',$4,$4)", + ) + .bind(subject_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("graph entity should be inserted"); + sqlx::query( + "\ +INSERT INTO graph_facts ( + fact_id, + tenant_id, + project_id, + agent_id, + scope, + subject_entity_id, + predicate, + predicate_id, + object_entity_id, + object_value, + valid_from, + valid_to, + created_at, + updated_at +) +VALUES ($1,$2,$3,$4,'project_shared',$5,'compile from',NULL,NULL,'authoritative source memory',$6,NULL,$6,$6)", + ) + .bind(fact_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(subject_id) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("graph fact should be inserted"); + sqlx::query( + "\ +INSERT INTO graph_fact_evidence (evidence_id, fact_id, note_id, created_at) +VALUES ($1,$2,$3,$4)", + ) + .bind(evidence_id) + .bind(fact_id) + .bind(note_id) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("graph fact evidence should be inserted"); + + fact_id +} + +async fn insert_applied_proposal(service: &ElfService, note_id: Uuid) -> Uuid { + let run_id = Uuid::new_v4(); + let proposal_id = Uuid::new_v4(); + let source_refs = serde_json::json!([ + { + "kind": "note", + "id": note_id, + "snapshot": { + "status": "active", + "updated_at": "1970-01-01T00:00:00Z", + "metadata": { "fixture": "knowledge_pages" }, + "source_ref": {} + } + } + ]); + let lineage = serde_json::json!({ "source_refs": source_refs }); + + sqlx::query( + "\ +INSERT INTO consolidation_runs ( + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + job_kind, + status, + input_refs, + source_snapshot, + lineage, + error, + created_at, + updated_at, + completed_at +) +VALUES ($1,$2,$3,$4,'elf.consolidation/v1','manual','completed',$5,$6,$7,'{}'::jsonb,$8,$8,$8)", + ) + .bind(run_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(&source_refs) + .bind(serde_json::json!({ "source_count": 1 })) + .bind(&lineage) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("consolidation run should be inserted"); + sqlx::query( + "\ +INSERT INTO consolidation_proposals ( + proposal_id, + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + proposal_kind, + apply_intent, + review_state, + source_refs, + source_snapshot, + lineage, + diff, + confidence, + unsupported_claim_flags, + contradiction_markers, + staleness_markers, + target_ref, + proposed_payload, + reviewer_agent_id, + review_comment, + reviewed_at, + created_at, + updated_at +) +VALUES ($1,$2,$3,$4,$5,'elf.consolidation/v1','knowledge_page','create_derived_knowledge_page','applied',$6,$7,$8,$9,0.9,'[]'::jsonb,'[]'::jsonb,'[]'::jsonb,'{}'::jsonb,$10,$5,'Apply derived page proposal.',$11,$11,$11)", + ) + .bind(proposal_id) + .bind(run_id) + .bind(TENANT_ID) + .bind(PROJECT_ID) + .bind(AGENT_ID) + .bind(&source_refs) + .bind(serde_json::json!({ "source_count": 1 })) + .bind(&lineage) + .bind(serde_json::json!({ + "summary": "Create a derived knowledge page from cited source memory.", + "before": {}, + "after": { "page_key": "knowledge-foundation" } + })) + .bind(serde_json::json!({ "page_key": "knowledge-foundation" })) + .bind(OffsetDateTime::UNIX_EPOCH) + .execute(&service.db.pool) + .await + .expect("consolidation proposal should be inserted"); + + proposal_id +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run this test."] +async fn rebuilds_pages_with_citations_and_detects_stale_sources() { + let Some(fixture) = + setup_service("rebuilds_pages_with_citations_and_detects_stale_sources").await + else { + return; + }; + let service = &fixture.service; + let note_id = insert_source_note( + service, + "knowledge_pages_foundation", + "Fact: Derived knowledge pages are rebuilt from authoritative source memory and keep citations.", + ) + .await; + let event_id = insert_event_audit(service, note_id).await; + let fact_id = insert_relation(service, note_id).await; + let proposal_id = insert_applied_proposal(service, note_id).await; + let first = service + .knowledge_page_rebuild(KnowledgePageRebuildRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: AGENT_ID.to_string(), + page_kind: KnowledgePageKind::Project, + page_key: "knowledge-foundation".to_string(), + title: Some("Knowledge Foundation".to_string()), + note_ids: vec![note_id], + event_ids: vec![event_id], + relation_ids: vec![fact_id], + proposal_ids: vec![proposal_id], + provider_metadata: serde_json::json!({}), + }) + .await + .expect("knowledge page should rebuild"); + + assert_eq!(first.page.sections.len(), 4); + assert_eq!(first.page.source_refs.len(), 4); + assert!(first.page.sections.iter().all(|section| { + section.citations.as_array().is_some_and(|citations| !citations.is_empty()) + })); + assert_eq!(first.page.page.source_coverage["coverage_complete"], true); + assert_eq!(first.page.page.rebuild_metadata["deterministic"], true); + + let second = service + .knowledge_page_rebuild(KnowledgePageRebuildRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + agent_id: AGENT_ID.to_string(), + page_kind: KnowledgePageKind::Project, + page_key: "knowledge-foundation".to_string(), + title: Some("Knowledge Foundation".to_string()), + note_ids: vec![note_id], + event_ids: vec![event_id], + relation_ids: vec![fact_id], + proposal_ids: vec![proposal_id], + provider_metadata: serde_json::json!({}), + }) + .await + .expect("knowledge page should rebuild deterministically"); + + assert_eq!(first.page.page.page_id, second.page.page.page_id); + assert_eq!(first.page.page.rebuild_source_hash, second.page.page.rebuild_source_hash); + assert_eq!(first.page.page.content_hash, second.page.page.content_hash); + + sqlx::query( + "\ +UPDATE memory_notes +SET text = $1, updated_at = $2 +WHERE note_id = $3", + ) + .bind("Fact: Derived knowledge pages changed after the page snapshot was rebuilt.") + .bind(OffsetDateTime::now_utc()) + .bind(note_id) + .execute(&service.db.pool) + .await + .expect("source note should update"); + + let lint = service + .knowledge_page_lint(KnowledgePageLintRequest { + tenant_id: TENANT_ID.to_string(), + project_id: PROJECT_ID.to_string(), + page_id: first.page.page.page_id, + }) + .await + .expect("knowledge page lint should run"); + + assert!(lint.findings.iter().any(|finding| { + finding.finding_type == "stale_source_ref" + && finding.source_kind.as_deref() == Some("note") + && finding.source_id == Some(note_id) + })); +} diff --git a/packages/elf-service/tests/acceptance/suite.rs b/packages/elf-service/tests/acceptance/suite.rs index abc17fa7..e7d102ef 100644 --- a/packages/elf-service/tests/acceptance/suite.rs +++ b/packages/elf-service/tests/acceptance/suite.rs @@ -7,6 +7,7 @@ mod english_only_boundary; mod evidence_binding; mod graph_ingestion; mod idempotency; +mod knowledge_pages; mod outbox_eventual_consistency; mod rebuild_qdrant; mod sot_vectors; @@ -489,6 +490,10 @@ TRUNCATE doc_chunk_embeddings, doc_chunks, doc_documents, + knowledge_page_lint_findings, + knowledge_page_source_refs, + knowledge_page_sections, + knowledge_pages, consolidation_run_jobs, consolidation_proposal_reviews, consolidation_proposals, diff --git a/packages/elf-storage/src/knowledge.rs b/packages/elf-storage/src/knowledge.rs new file mode 100644 index 00000000..cee88f0f --- /dev/null +++ b/packages/elf-storage/src/knowledge.rs @@ -0,0 +1,896 @@ +//! Derived knowledge page persistence and source-snapshot queries. + +use serde_json::Value; +use sqlx::{FromRow, PgExecutor}; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::{ + Result, + models::{ + KnowledgePage, KnowledgePageLintFinding, KnowledgePageSection, KnowledgePageSourceRef, + }, +}; + +/// Arguments for upserting one derived knowledge page. +pub struct KnowledgePageUpsert<'a> { + /// Page identifier to use for a newly created page. + pub page_id: Uuid, + /// Tenant that owns the page. + pub tenant_id: &'a str, + /// Project that owns the page. + pub project_id: &'a str, + /// Page kind. + pub page_kind: &'a str, + /// Stable page key. + pub page_key: &'a str, + /// Page title. + pub title: &'a str, + /// Versioned page contract schema. + pub contract_schema: &'a str, + /// Page lifecycle status. + pub status: &'a str, + /// Canonical source snapshot hash. + pub rebuild_source_hash: &'a str, + /// Canonical page content hash. + pub content_hash: &'a str, + /// Source coverage metadata. + pub source_coverage: &'a Value, + /// Aggregate source snapshot metadata. + pub source_snapshot: &'a Value, + /// Rebuild metadata. + pub rebuild_metadata: &'a Value, + /// Rebuild timestamp. + pub now: OffsetDateTime, +} + +/// Arguments for inserting one knowledge page section. +pub struct KnowledgePageSectionInsert<'a> { + /// Section identifier. + pub section_id: Uuid, + /// Parent page identifier. + pub page_id: Uuid, + /// Stable section key. + pub section_key: &'a str, + /// Section heading. + pub heading: &'a str, + /// Section role. + pub role: &'a str, + /// Section content. + pub content: &'a str, + /// Section display order. + pub ordinal: i32, + /// Section citations. + pub citations: &'a Value, + /// Reason the section has no citations, when intentionally unsupported. + pub unsupported_reason: Option<&'a str>, + /// Section content hash. + pub content_hash: &'a str, + /// Creation/update timestamp. + pub now: OffsetDateTime, +} + +/// Arguments for inserting one normalized knowledge page citation. +pub struct KnowledgePageSourceRefInsert<'a> { + /// Source-reference row identifier. + pub ref_id: Uuid, + /// Parent page identifier. + pub page_id: Uuid, + /// Section that cites the source, if section-scoped. + pub section_id: Option, + /// Source kind. + pub source_kind: &'a str, + /// Authoritative source identifier. + pub source_id: Uuid, + /// Captured source status. + pub source_status: Option<&'a str>, + /// Captured source updated timestamp. + pub source_updated_at: Option, + /// Captured source content hash. + pub source_content_hash: Option<&'a str>, + /// Captured source snapshot. + pub source_snapshot: &'a Value, + /// Citation-local metadata. + pub citation_metadata: &'a Value, + /// Creation timestamp. + pub now: OffsetDateTime, +} + +/// Arguments for inserting one knowledge page lint finding. +pub struct KnowledgePageLintFindingInsert<'a> { + /// Lint finding identifier. + pub finding_id: Uuid, + /// Parent page identifier. + pub page_id: Uuid, + /// Section associated with the finding, when available. + pub section_id: Option, + /// Finding type. + pub finding_type: &'a str, + /// Finding severity. + pub severity: &'a str, + /// Source kind associated with the finding, when available. + pub source_kind: Option<&'a str>, + /// Source identifier associated with the finding, when available. + pub source_id: Option, + /// Human-readable finding message. + pub message: &'a str, + /// Structured finding details. + pub details: &'a Value, + /// Creation timestamp. + pub now: OffsetDateTime, +} + +/// Authoritative note source row used by the knowledge page rebuilder. +#[derive(Debug, FromRow)] +pub struct KnowledgeNoteSource { + /// Note identifier. + pub note_id: Uuid, + /// Agent that owns the note. + pub agent_id: String, + /// Note scope. + pub scope: String, + /// Note type. + pub note_type: String, + /// Optional note key. + pub key: Option, + /// Note text. + pub text: String, + /// Note importance. + pub importance: f32, + /// Note confidence. + pub confidence: f32, + /// Note status. + pub status: String, + /// Note creation timestamp. + pub created_at: OffsetDateTime, + /// Note update timestamp. + pub updated_at: OffsetDateTime, + /// Optional note expiry timestamp. + pub expires_at: Option, + /// Note embedding version. + pub embedding_version: String, + /// Opaque note source reference. + pub source_ref: Value, +} + +/// Durable add_event audit source row used by the knowledge page rebuilder. +#[derive(Debug, FromRow)] +pub struct KnowledgeEventSource { + /// Ingest decision identifier. + pub decision_id: Uuid, + /// Agent that wrote the audited event-derived note decision. + pub agent_id: String, + /// Scope associated with the audited decision. + pub scope: String, + /// Ingestion pipeline name. + pub pipeline: String, + /// Event-derived note type. + pub note_type: String, + /// Optional note key. + pub note_key: Option, + /// Note identifier affected by the decision, when persisted. + pub note_id: Option, + /// Policy decision. + pub policy_decision: String, + /// Note operation. + pub note_op: String, + /// Optional reason code. + pub reason_code: Option, + /// Structured audit details. + pub details: Value, + /// Audit timestamp. + pub ts: OffsetDateTime, +} + +/// Authoritative graph relation source row used by the knowledge page rebuilder. +#[derive(Debug, FromRow)] +pub struct KnowledgeRelationSource { + /// Graph fact identifier. + pub fact_id: Uuid, + /// Agent that wrote the fact. + pub agent_id: String, + /// Fact scope. + pub scope: String, + /// Subject canonical text. + pub subject: String, + /// Optional subject kind. + pub subject_kind: Option, + /// Predicate text. + pub predicate: String, + /// Optional object entity canonical text. + pub object_entity: Option, + /// Optional object entity kind. + pub object_kind: Option, + /// Optional scalar object value. + pub object_value: Option, + /// Fact validity window start. + pub valid_from: OffsetDateTime, + /// Fact validity window end, when historical. + pub valid_to: Option, + /// Fact update timestamp. + pub updated_at: OffsetDateTime, + /// Evidence notes linked to this fact. + pub evidence_notes: Value, +} + +/// Reviewed consolidation proposal source row used by the knowledge page rebuilder. +#[derive(Debug, FromRow)] +pub struct KnowledgeProposalSource { + /// Consolidation proposal identifier. + pub proposal_id: Uuid, + /// Parent consolidation run identifier. + pub run_id: Uuid, + /// Agent that registered the proposal. + pub agent_id: String, + /// Proposal kind. + pub proposal_kind: String, + /// Proposal apply intent. + pub apply_intent: String, + /// Proposal review state. + pub review_state: String, + /// Serialized proposal source references. + pub source_refs: Value, + /// Serialized proposal source snapshot. + pub source_snapshot: Value, + /// Serialized proposal lineage. + pub lineage: Value, + /// Serialized proposal diff. + pub diff: Value, + /// Proposal confidence. + pub confidence: f32, + /// Unsupported claim flags. + pub unsupported_claim_flags: Value, + /// Contradiction markers. + pub contradiction_markers: Value, + /// Staleness markers. + pub staleness_markers: Value, + /// Derived target reference. + pub target_ref: Value, + /// Proposed derived payload. + pub proposed_payload: Value, + /// Proposal update timestamp. + pub updated_at: OffsetDateTime, +} + +/// Upserts one derived knowledge page and returns the persisted row. +pub async fn upsert_knowledge_page<'e, E>( + executor: E, + args: KnowledgePageUpsert<'_>, +) -> Result +where + E: PgExecutor<'e>, +{ + let row = sqlx::query_as::<_, KnowledgePage>( + "\ +INSERT INTO knowledge_pages ( + page_id, + tenant_id, + project_id, + page_kind, + page_key, + title, + contract_schema, + status, + rebuild_source_hash, + content_hash, + source_coverage, + source_snapshot, + rebuild_metadata, + created_at, + updated_at, + rebuilt_at +) +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$14,$14) +ON CONFLICT (tenant_id, project_id, page_kind, page_key) DO UPDATE +SET + title = EXCLUDED.title, + contract_schema = EXCLUDED.contract_schema, + status = EXCLUDED.status, + rebuild_source_hash = EXCLUDED.rebuild_source_hash, + content_hash = EXCLUDED.content_hash, + source_coverage = EXCLUDED.source_coverage, + source_snapshot = EXCLUDED.source_snapshot, + rebuild_metadata = EXCLUDED.rebuild_metadata, + updated_at = EXCLUDED.updated_at, + rebuilt_at = EXCLUDED.rebuilt_at +RETURNING + page_id, + tenant_id, + project_id, + page_kind, + page_key, + title, + contract_schema, + status, + rebuild_source_hash, + content_hash, + source_coverage, + source_snapshot, + rebuild_metadata, + created_at, + updated_at, + rebuilt_at", + ) + .bind(args.page_id) + .bind(args.tenant_id) + .bind(args.project_id) + .bind(args.page_kind) + .bind(args.page_key) + .bind(args.title) + .bind(args.contract_schema) + .bind(args.status) + .bind(args.rebuild_source_hash) + .bind(args.content_hash) + .bind(args.source_coverage) + .bind(args.source_snapshot) + .bind(args.rebuild_metadata) + .bind(args.now) + .fetch_one(executor) + .await?; + + Ok(row) +} + +/// Deletes all section, citation, and lint child rows for a page before rebuild. +pub async fn delete_knowledge_page_children<'e, E>(executor: E, page_id: Uuid) -> Result<()> +where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ + WITH deleted_lint AS ( + DELETE FROM knowledge_page_lint_findings + WHERE page_id = $1 + ), + deleted_source_refs AS ( + DELETE FROM knowledge_page_source_refs + WHERE page_id = $1 + ) + DELETE FROM knowledge_page_sections + WHERE page_id = $1", + ) + .bind(page_id) + .execute(executor) + .await?; + + Ok(()) +} + +/// Inserts one derived knowledge page section. +pub async fn insert_knowledge_page_section<'e, E>( + executor: E, + args: KnowledgePageSectionInsert<'_>, +) -> Result<()> +where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ +INSERT INTO knowledge_page_sections ( + section_id, + page_id, + section_key, + heading, + role, + content, + ordinal, + citations, + unsupported_reason, + content_hash, + created_at, + updated_at +) +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$11)", + ) + .bind(args.section_id) + .bind(args.page_id) + .bind(args.section_key) + .bind(args.heading) + .bind(args.role) + .bind(args.content) + .bind(args.ordinal) + .bind(args.citations) + .bind(args.unsupported_reason) + .bind(args.content_hash) + .bind(args.now) + .execute(executor) + .await?; + + Ok(()) +} + +/// Inserts one normalized knowledge page citation/source reference. +pub async fn insert_knowledge_page_source_ref<'e, E>( + executor: E, + args: KnowledgePageSourceRefInsert<'_>, +) -> Result<()> +where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ +INSERT INTO knowledge_page_source_refs ( + ref_id, + page_id, + section_id, + source_kind, + source_id, + source_status, + source_updated_at, + source_content_hash, + source_snapshot, + citation_metadata, + created_at +) +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)", + ) + .bind(args.ref_id) + .bind(args.page_id) + .bind(args.section_id) + .bind(args.source_kind) + .bind(args.source_id) + .bind(args.source_status) + .bind(args.source_updated_at) + .bind(args.source_content_hash) + .bind(args.source_snapshot) + .bind(args.citation_metadata) + .bind(args.now) + .execute(executor) + .await?; + + Ok(()) +} + +/// Inserts one knowledge page lint finding. +pub async fn insert_knowledge_page_lint_finding<'e, E>( + executor: E, + args: KnowledgePageLintFindingInsert<'_>, +) -> Result<()> +where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ +INSERT INTO knowledge_page_lint_findings ( + finding_id, + page_id, + section_id, + finding_type, + severity, + source_kind, + source_id, + message, + details, + created_at +) +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)", + ) + .bind(args.finding_id) + .bind(args.page_id) + .bind(args.section_id) + .bind(args.finding_type) + .bind(args.severity) + .bind(args.source_kind) + .bind(args.source_id) + .bind(args.message) + .bind(args.details) + .bind(args.now) + .execute(executor) + .await?; + + Ok(()) +} + +/// Deletes persisted lint findings for one page. +pub async fn delete_knowledge_page_lint_findings<'e, E>(executor: E, page_id: Uuid) -> Result<()> +where + E: PgExecutor<'e>, +{ + sqlx::query("DELETE FROM knowledge_page_lint_findings WHERE page_id = $1") + .bind(page_id) + .execute(executor) + .await?; + + Ok(()) +} + +/// Fetches one knowledge page by identifier. +pub async fn get_knowledge_page<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + page_id: Uuid, +) -> Result> +where + E: PgExecutor<'e>, +{ + let row = sqlx::query_as::<_, KnowledgePage>( + "\ +SELECT + page_id, + tenant_id, + project_id, + page_kind, + page_key, + title, + contract_schema, + status, + rebuild_source_hash, + content_hash, + source_coverage, + source_snapshot, + rebuild_metadata, + created_at, + updated_at, + rebuilt_at +FROM knowledge_pages +WHERE tenant_id = $1 AND project_id = $2 AND page_id = $3 +LIMIT 1", + ) + .bind(tenant_id) + .bind(project_id) + .bind(page_id) + .fetch_optional(executor) + .await?; + + Ok(row) +} + +/// Lists knowledge pages for a tenant and project. +pub async fn list_knowledge_pages<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + page_kind: Option<&str>, + limit: i64, +) -> Result> +where + E: PgExecutor<'e>, +{ + let rows = sqlx::query_as::<_, KnowledgePage>( + "\ +SELECT + page_id, + tenant_id, + project_id, + page_kind, + page_key, + title, + contract_schema, + status, + rebuild_source_hash, + content_hash, + source_coverage, + source_snapshot, + rebuild_metadata, + created_at, + updated_at, + rebuilt_at +FROM knowledge_pages +WHERE tenant_id = $1 + AND project_id = $2 + AND ($3::text IS NULL OR page_kind = $3) +ORDER BY updated_at DESC, page_id DESC +LIMIT $4", + ) + .bind(tenant_id) + .bind(project_id) + .bind(page_kind) + .bind(limit) + .fetch_all(executor) + .await?; + + Ok(rows) +} + +/// Lists sections for one knowledge page. +pub async fn list_knowledge_page_sections<'e, E>( + executor: E, + page_id: Uuid, +) -> Result> +where + E: PgExecutor<'e>, +{ + let rows = sqlx::query_as::<_, KnowledgePageSection>( + "\ +SELECT + section_id, + page_id, + section_key, + heading, + role, + content, + ordinal, + citations, + unsupported_reason, + content_hash, + created_at, + updated_at +FROM knowledge_page_sections +WHERE page_id = $1 +ORDER BY ordinal ASC, section_key ASC", + ) + .bind(page_id) + .fetch_all(executor) + .await?; + + Ok(rows) +} + +/// Lists normalized source refs for one knowledge page. +pub async fn list_knowledge_page_source_refs<'e, E>( + executor: E, + page_id: Uuid, +) -> Result> +where + E: PgExecutor<'e>, +{ + let rows = sqlx::query_as::<_, KnowledgePageSourceRef>( + "\ +SELECT + ref_id, + page_id, + section_id, + source_kind, + source_id, + source_status, + source_updated_at, + source_content_hash, + source_snapshot, + citation_metadata, + created_at +FROM knowledge_page_source_refs +WHERE page_id = $1 +ORDER BY source_kind ASC, source_id ASC, ref_id ASC", + ) + .bind(page_id) + .fetch_all(executor) + .await?; + + Ok(rows) +} + +/// Lists lint findings for one knowledge page. +pub async fn list_knowledge_page_lint_findings<'e, E>( + executor: E, + page_id: Uuid, +) -> Result> +where + E: PgExecutor<'e>, +{ + let rows = sqlx::query_as::<_, KnowledgePageLintFinding>( + "\ +SELECT + finding_id, + page_id, + section_id, + finding_type, + severity, + source_kind, + source_id, + message, + details, + created_at +FROM knowledge_page_lint_findings +WHERE page_id = $1 +ORDER BY severity DESC, created_at ASC, finding_id ASC", + ) + .bind(page_id) + .fetch_all(executor) + .await?; + + Ok(rows) +} + +/// Fetches note sources by identifier for a knowledge page rebuild. +pub async fn fetch_knowledge_note_sources<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + note_ids: &[Uuid], +) -> Result> +where + E: PgExecutor<'e>, +{ + if note_ids.is_empty() { + return Ok(Vec::new()); + } + + let rows = sqlx::query_as::<_, KnowledgeNoteSource>( + "\ +SELECT + note_id, + agent_id, + scope, + type AS note_type, + key, + text, + importance, + confidence, + status, + created_at, + updated_at, + expires_at, + embedding_version, + source_ref +FROM memory_notes +WHERE tenant_id = $1 + AND project_id = $2 + AND note_id = ANY($3::uuid[]) +ORDER BY updated_at ASC, note_id ASC", + ) + .bind(tenant_id) + .bind(project_id) + .bind(note_ids) + .fetch_all(executor) + .await?; + + Ok(rows) +} + +/// Fetches durable add_event audit sources by decision identifier. +pub async fn fetch_knowledge_event_sources<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + decision_ids: &[Uuid], +) -> Result> +where + E: PgExecutor<'e>, +{ + if decision_ids.is_empty() { + return Ok(Vec::new()); + } + + let rows = sqlx::query_as::<_, KnowledgeEventSource>( + "\ +SELECT + decision_id, + agent_id, + scope, + pipeline, + note_type, + note_key, + note_id, + policy_decision, + note_op, + reason_code, + details, + ts +FROM memory_ingest_decisions +WHERE tenant_id = $1 + AND project_id = $2 + AND decision_id = ANY($3::uuid[]) + AND pipeline = 'add_event' +ORDER BY ts ASC, decision_id ASC", + ) + .bind(tenant_id) + .bind(project_id) + .bind(decision_ids) + .fetch_all(executor) + .await?; + + Ok(rows) +} + +/// Fetches relation sources by graph fact identifier for a knowledge page rebuild. +pub async fn fetch_knowledge_relation_sources<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + fact_ids: &[Uuid], +) -> Result> +where + E: PgExecutor<'e>, +{ + if fact_ids.is_empty() { + return Ok(Vec::new()); + } + + let rows = sqlx::query_as::<_, KnowledgeRelationSource>( + "\ +SELECT + gf.fact_id, + gf.agent_id, + gf.scope, + subject.canonical AS subject, + subject.kind AS subject_kind, + gf.predicate, + object_entity.canonical AS object_entity, + object_entity.kind AS object_kind, + gf.object_value, + gf.valid_from, + gf.valid_to, + gf.updated_at, + COALESCE( + jsonb_agg( + jsonb_build_object( + 'note_id', evidence.note_id, + 'status', note.status, + 'updated_at', note.updated_at + ) + ORDER BY evidence.created_at ASC, evidence.note_id ASC + ) FILTER (WHERE evidence.note_id IS NOT NULL), + '[]'::jsonb + ) AS evidence_notes +FROM graph_facts gf +JOIN graph_entities subject ON subject.entity_id = gf.subject_entity_id +LEFT JOIN graph_entities object_entity ON object_entity.entity_id = gf.object_entity_id +LEFT JOIN graph_fact_evidence evidence ON evidence.fact_id = gf.fact_id +LEFT JOIN memory_notes note ON note.note_id = evidence.note_id +WHERE gf.tenant_id = $1 + AND gf.project_id = $2 + AND gf.fact_id = ANY($3::uuid[]) +GROUP BY + gf.fact_id, + gf.agent_id, + gf.scope, + subject.canonical, + subject.kind, + gf.predicate, + object_entity.canonical, + object_entity.kind, + gf.object_value, + gf.valid_from, + gf.valid_to, + gf.updated_at +ORDER BY gf.updated_at ASC, gf.fact_id ASC", + ) + .bind(tenant_id) + .bind(project_id) + .bind(fact_ids) + .fetch_all(executor) + .await?; + + Ok(rows) +} + +/// Fetches applied proposal sources by identifier for a knowledge page rebuild. +pub async fn fetch_knowledge_proposal_sources<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + proposal_ids: &[Uuid], +) -> Result> +where + E: PgExecutor<'e>, +{ + if proposal_ids.is_empty() { + return Ok(Vec::new()); + } + + let rows = sqlx::query_as::<_, KnowledgeProposalSource>( + "\ +SELECT + proposal_id, + run_id, + agent_id, + proposal_kind, + apply_intent, + review_state, + source_refs, + source_snapshot, + lineage, + diff, + confidence, + COALESCE(unsupported_claim_flags, '[]'::jsonb) AS unsupported_claim_flags, + COALESCE(contradiction_markers, '[]'::jsonb) AS contradiction_markers, + COALESCE(staleness_markers, '[]'::jsonb) AS staleness_markers, + COALESCE(target_ref, '{}'::jsonb) AS target_ref, + COALESCE(proposed_payload, '{}'::jsonb) AS proposed_payload, + updated_at +FROM consolidation_proposals +WHERE tenant_id = $1 + AND project_id = $2 + AND proposal_id = ANY($3::uuid[]) + AND review_state = 'applied' +ORDER BY updated_at ASC, proposal_id ASC", + ) + .bind(tenant_id) + .bind(project_id) + .bind(proposal_ids) + .fetch_all(executor) + .await?; + + Ok(rows) +} diff --git a/packages/elf-storage/src/lib.rs b/packages/elf-storage/src/lib.rs index 91c3d369..0631dabc 100644 --- a/packages/elf-storage/src/lib.rs +++ b/packages/elf-storage/src/lib.rs @@ -7,6 +7,7 @@ pub mod db; pub mod doc_outbox; pub mod docs; pub mod graph; +pub mod knowledge; pub mod models; pub mod outbox; pub mod qdrant; diff --git a/packages/elf-storage/src/models.rs b/packages/elf-storage/src/models.rs index 2e3711d2..7343b713 100644 --- a/packages/elf-storage/src/models.rs +++ b/packages/elf-storage/src/models.rs @@ -424,6 +424,124 @@ pub struct ConsolidationRunJob { pub updated_at: OffsetDateTime, } +/// Persisted derived knowledge page row. +#[derive(Debug, FromRow)] +pub struct KnowledgePage { + /// Derived page identifier. + pub page_id: Uuid, + /// Tenant that owns the page. + pub tenant_id: String, + /// Project that owns the page. + pub project_id: String, + /// Page kind, such as project, entity, concept, issue, or decision. + pub page_kind: String, + /// Stable page key within the tenant/project/kind namespace. + pub page_key: String, + /// Human-readable page title. + pub title: String, + /// Versioned knowledge page contract schema. + pub contract_schema: String, + /// Derived page lifecycle status. + pub status: String, + /// BLAKE3 hash of the canonical source snapshot. + pub rebuild_source_hash: String, + /// BLAKE3 hash of the canonical page payload. + pub content_hash: String, + /// Source coverage metadata. + pub source_coverage: Value, + /// Aggregate source snapshot metadata captured during rebuild. + pub source_snapshot: Value, + /// Rebuild metadata, including deterministic/provider information. + pub rebuild_metadata: Value, + /// Creation timestamp. + pub created_at: OffsetDateTime, + /// Last update timestamp. + pub updated_at: OffsetDateTime, + /// Last rebuild timestamp. + pub rebuilt_at: OffsetDateTime, +} + +/// Persisted derived knowledge page section row. +#[derive(Debug, FromRow)] +pub struct KnowledgePageSection { + /// Section identifier. + pub section_id: Uuid, + /// Parent page identifier. + pub page_id: Uuid, + /// Stable section key within one page. + pub section_key: String, + /// Section heading. + pub heading: String, + /// Section role, such as current_truth, history, relations, or proposals. + pub role: String, + /// Section content. + pub content: String, + /// Display order within the page. + pub ordinal: i32, + /// Serialized citation array for this section. + pub citations: Value, + /// Reason a section lacks citations, when intentionally unsupported. + pub unsupported_reason: Option, + /// BLAKE3 hash of the section content and citations. + pub content_hash: String, + /// Creation timestamp. + pub created_at: OffsetDateTime, + /// Last update timestamp. + pub updated_at: OffsetDateTime, +} + +/// Persisted normalized citation/source reference for a knowledge page. +#[derive(Debug, FromRow)] +pub struct KnowledgePageSourceRef { + /// Source-reference row identifier. + pub ref_id: Uuid, + /// Parent page identifier. + pub page_id: Uuid, + /// Section that cites the source, if section-scoped. + pub section_id: Option, + /// Source kind, such as note, relation, proposal, or event. + pub source_kind: String, + /// Authoritative source identifier. + pub source_id: Uuid, + /// Source lifecycle status captured during rebuild. + pub source_status: Option, + /// Source last-update timestamp captured during rebuild. + pub source_updated_at: Option, + /// Source content hash captured during rebuild. + pub source_content_hash: Option, + /// Full source snapshot captured during rebuild. + pub source_snapshot: Value, + /// Citation-local metadata. + pub citation_metadata: Value, + /// Creation timestamp. + pub created_at: OffsetDateTime, +} + +/// Persisted lint finding for one derived knowledge page. +#[derive(Debug, FromRow)] +pub struct KnowledgePageLintFinding { + /// Lint finding identifier. + pub finding_id: Uuid, + /// Parent page identifier. + pub page_id: Uuid, + /// Section associated with the finding, when available. + pub section_id: Option, + /// Finding type, such as stale_source_ref or unsupported_section. + pub finding_type: String, + /// Finding severity. + pub severity: String, + /// Source kind associated with the finding, when available. + pub source_kind: Option, + /// Source identifier associated with the finding, when available. + pub source_id: Option, + /// Human-readable finding message. + pub message: String, + /// Structured finding details. + pub details: Value, + /// Creation timestamp. + pub created_at: OffsetDateTime, +} + /// Persisted document row. #[derive(Debug, FromRow)] pub struct DocDocument { diff --git a/packages/elf-storage/src/schema.rs b/packages/elf-storage/src/schema.rs index bd39ed1b..e12d31a7 100644 --- a/packages/elf-storage/src/schema.rs +++ b/packages/elf-storage/src/schema.rs @@ -84,6 +84,16 @@ fn expand_includes(sql: &str) -> String { )), "tables/034_consolidation_run_jobs.sql" => out.push_str(include_str!("../../../sql/tables/034_consolidation_run_jobs.sql")), + "tables/035_knowledge_pages.sql" => + out.push_str(include_str!("../../../sql/tables/035_knowledge_pages.sql")), + "tables/036_knowledge_page_sections.sql" => out + .push_str(include_str!("../../../sql/tables/036_knowledge_page_sections.sql")), + "tables/037_knowledge_page_source_refs.sql" => out.push_str(include_str!( + "../../../sql/tables/037_knowledge_page_source_refs.sql" + )), + "tables/038_knowledge_page_lint_findings.sql" => out.push_str(include_str!( + "../../../sql/tables/038_knowledge_page_lint_findings.sql" + )), "tables/023_memory_ingest_decisions.sql" => out .push_str(include_str!("../../../sql/tables/023_memory_ingest_decisions.sql")), "tables/024_memory_space_grants.sql" => @@ -99,3 +109,22 @@ fn expand_includes(sql: &str) -> String { out } + +#[cfg(test)] +mod tests { + use crate::schema; + + #[test] + fn render_schema_expands_all_includes() { + let schema = schema::render_schema(4_096); + + assert!( + !schema.contains("\\ir "), + "rendered schema must not leave psql include directives" + ); + assert!(schema.contains("CREATE TABLE IF NOT EXISTS knowledge_pages")); + assert!(schema.contains("CREATE TABLE IF NOT EXISTS knowledge_page_sections")); + assert!(schema.contains("CREATE TABLE IF NOT EXISTS knowledge_page_source_refs")); + assert!(schema.contains("CREATE TABLE IF NOT EXISTS knowledge_page_lint_findings")); + } +} diff --git a/sql/init.sql b/sql/init.sql index 9e0b06fb..98b2ee45 100644 --- a/sql/init.sql +++ b/sql/init.sql @@ -33,3 +33,7 @@ \ir tables/032_consolidation_proposals.sql \ir tables/033_consolidation_proposal_reviews.sql \ir tables/034_consolidation_run_jobs.sql +\ir tables/035_knowledge_pages.sql +\ir tables/036_knowledge_page_sections.sql +\ir tables/037_knowledge_page_source_refs.sql +\ir tables/038_knowledge_page_lint_findings.sql diff --git a/sql/tables/035_knowledge_pages.sql b/sql/tables/035_knowledge_pages.sql new file mode 100644 index 00000000..a13f3cbe --- /dev/null +++ b/sql/tables/035_knowledge_pages.sql @@ -0,0 +1,54 @@ +CREATE TABLE IF NOT EXISTS knowledge_pages ( + page_id uuid PRIMARY KEY, + tenant_id text NOT NULL, + project_id text NOT NULL, + page_kind text NOT NULL, + page_key text NOT NULL, + title text NOT NULL, + contract_schema text NOT NULL, + status text NOT NULL, + rebuild_source_hash text NOT NULL, + content_hash text NOT NULL, + source_coverage jsonb NOT NULL DEFAULT '{}'::jsonb, + source_snapshot jsonb NOT NULL DEFAULT '{}'::jsonb, + rebuild_metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + rebuilt_at timestamptz NOT NULL DEFAULT now() +); + +ALTER TABLE knowledge_pages + DROP CONSTRAINT IF EXISTS ck_knowledge_pages_page_kind; +ALTER TABLE knowledge_pages + ADD CONSTRAINT ck_knowledge_pages_page_kind + CHECK (page_kind IN ('project', 'entity', 'concept', 'issue', 'decision')); + +ALTER TABLE knowledge_pages + DROP CONSTRAINT IF EXISTS ck_knowledge_pages_status; +ALTER TABLE knowledge_pages + ADD CONSTRAINT ck_knowledge_pages_status + CHECK (status IN ('active', 'stale', 'archived')); + +ALTER TABLE knowledge_pages + DROP CONSTRAINT IF EXISTS ck_knowledge_pages_source_coverage; +ALTER TABLE knowledge_pages + ADD CONSTRAINT ck_knowledge_pages_source_coverage + CHECK (jsonb_typeof(source_coverage) = 'object'); + +ALTER TABLE knowledge_pages + DROP CONSTRAINT IF EXISTS ck_knowledge_pages_source_snapshot; +ALTER TABLE knowledge_pages + ADD CONSTRAINT ck_knowledge_pages_source_snapshot + CHECK (jsonb_typeof(source_snapshot) = 'object'); + +ALTER TABLE knowledge_pages + DROP CONSTRAINT IF EXISTS ck_knowledge_pages_rebuild_metadata; +ALTER TABLE knowledge_pages + ADD CONSTRAINT ck_knowledge_pages_rebuild_metadata + CHECK (jsonb_typeof(rebuild_metadata) = 'object'); + +CREATE UNIQUE INDEX IF NOT EXISTS uq_knowledge_pages_context_key + ON knowledge_pages (tenant_id, project_id, page_kind, page_key); + +CREATE INDEX IF NOT EXISTS idx_knowledge_pages_context_updated + ON knowledge_pages (tenant_id, project_id, updated_at DESC); diff --git a/sql/tables/036_knowledge_page_sections.sql b/sql/tables/036_knowledge_page_sections.sql new file mode 100644 index 00000000..0312f5e4 --- /dev/null +++ b/sql/tables/036_knowledge_page_sections.sql @@ -0,0 +1,32 @@ +CREATE TABLE IF NOT EXISTS knowledge_page_sections ( + section_id uuid PRIMARY KEY, + page_id uuid NOT NULL REFERENCES knowledge_pages(page_id) ON DELETE CASCADE, + section_key text NOT NULL, + heading text NOT NULL, + role text NOT NULL, + content text NOT NULL, + ordinal int NOT NULL, + citations jsonb NOT NULL DEFAULT '[]'::jsonb, + unsupported_reason text NULL, + content_hash text NOT NULL, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +ALTER TABLE knowledge_page_sections + DROP CONSTRAINT IF EXISTS ck_knowledge_page_sections_citations; +ALTER TABLE knowledge_page_sections + ADD CONSTRAINT ck_knowledge_page_sections_citations + CHECK (jsonb_typeof(citations) = 'array'); + +ALTER TABLE knowledge_page_sections + DROP CONSTRAINT IF EXISTS ck_knowledge_page_sections_cited_or_unsupported; +ALTER TABLE knowledge_page_sections + ADD CONSTRAINT ck_knowledge_page_sections_cited_or_unsupported + CHECK (jsonb_array_length(citations) > 0 OR unsupported_reason IS NOT NULL); + +CREATE UNIQUE INDEX IF NOT EXISTS uq_knowledge_page_sections_page_key + ON knowledge_page_sections (page_id, section_key); + +CREATE INDEX IF NOT EXISTS idx_knowledge_page_sections_page_ordinal + ON knowledge_page_sections (page_id, ordinal); diff --git a/sql/tables/037_knowledge_page_source_refs.sql b/sql/tables/037_knowledge_page_source_refs.sql new file mode 100644 index 00000000..d157c563 --- /dev/null +++ b/sql/tables/037_knowledge_page_source_refs.sql @@ -0,0 +1,37 @@ +CREATE TABLE IF NOT EXISTS knowledge_page_source_refs ( + ref_id uuid PRIMARY KEY, + page_id uuid NOT NULL REFERENCES knowledge_pages(page_id) ON DELETE CASCADE, + section_id uuid NULL REFERENCES knowledge_page_sections(section_id) ON DELETE CASCADE, + source_kind text NOT NULL, + source_id uuid NOT NULL, + source_status text NULL, + source_updated_at timestamptz NULL, + source_content_hash text NULL, + source_snapshot jsonb NOT NULL DEFAULT '{}'::jsonb, + citation_metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL DEFAULT now() +); + +ALTER TABLE knowledge_page_source_refs + DROP CONSTRAINT IF EXISTS ck_knowledge_page_source_refs_source_kind; +ALTER TABLE knowledge_page_source_refs + ADD CONSTRAINT ck_knowledge_page_source_refs_source_kind + CHECK (source_kind IN ('note', 'event', 'relation', 'proposal')); + +ALTER TABLE knowledge_page_source_refs + DROP CONSTRAINT IF EXISTS ck_knowledge_page_source_refs_source_snapshot; +ALTER TABLE knowledge_page_source_refs + ADD CONSTRAINT ck_knowledge_page_source_refs_source_snapshot + CHECK (jsonb_typeof(source_snapshot) = 'object'); + +ALTER TABLE knowledge_page_source_refs + DROP CONSTRAINT IF EXISTS ck_knowledge_page_source_refs_citation_metadata; +ALTER TABLE knowledge_page_source_refs + ADD CONSTRAINT ck_knowledge_page_source_refs_citation_metadata + CHECK (jsonb_typeof(citation_metadata) = 'object'); + +CREATE INDEX IF NOT EXISTS idx_knowledge_page_source_refs_page + ON knowledge_page_source_refs (page_id, source_kind, source_id); + +CREATE INDEX IF NOT EXISTS idx_knowledge_page_source_refs_source + ON knowledge_page_source_refs (source_kind, source_id); diff --git a/sql/tables/038_knowledge_page_lint_findings.sql b/sql/tables/038_knowledge_page_lint_findings.sql new file mode 100644 index 00000000..e76a5aa2 --- /dev/null +++ b/sql/tables/038_knowledge_page_lint_findings.sql @@ -0,0 +1,33 @@ +CREATE TABLE IF NOT EXISTS knowledge_page_lint_findings ( + finding_id uuid PRIMARY KEY, + page_id uuid NOT NULL REFERENCES knowledge_pages(page_id) ON DELETE CASCADE, + section_id uuid NULL REFERENCES knowledge_page_sections(section_id) ON DELETE SET NULL, + finding_type text NOT NULL, + severity text NOT NULL, + source_kind text NULL, + source_id uuid NULL, + message text NOT NULL, + details jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL DEFAULT now() +); + +ALTER TABLE knowledge_page_lint_findings + DROP CONSTRAINT IF EXISTS ck_knowledge_page_lint_findings_severity; +ALTER TABLE knowledge_page_lint_findings + ADD CONSTRAINT ck_knowledge_page_lint_findings_severity + CHECK (severity IN ('info', 'warning', 'error')); + +ALTER TABLE knowledge_page_lint_findings + DROP CONSTRAINT IF EXISTS ck_knowledge_page_lint_findings_source_kind; +ALTER TABLE knowledge_page_lint_findings + ADD CONSTRAINT ck_knowledge_page_lint_findings_source_kind + CHECK (source_kind IS NULL OR source_kind IN ('note', 'event', 'relation', 'proposal')); + +ALTER TABLE knowledge_page_lint_findings + DROP CONSTRAINT IF EXISTS ck_knowledge_page_lint_findings_details; +ALTER TABLE knowledge_page_lint_findings + ADD CONSTRAINT ck_knowledge_page_lint_findings_details + CHECK (jsonb_typeof(details) = 'object'); + +CREATE INDEX IF NOT EXISTS idx_knowledge_page_lint_findings_page + ON knowledge_page_lint_findings (page_id, severity, created_at DESC);