Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

323 changes: 312 additions & 11 deletions apps/elf-api/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,22 @@ use axum::{
routing,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_json::{Map, Value};
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
use utoipa::{OpenApi, ToSchema};
use utoipa_scalar::{Scalar, Servable};
use uuid::Uuid;

use crate::state::AppState;
use elf_config::{SecurityAuthKey, SecurityAuthRole};
use elf_domain::{english_gate, writegate::WritePolicy};
use elf_domain::{
consolidation::{
ConsolidationInputRef, ConsolidationLineage, ConsolidationReviewAction,
ConsolidationReviewState,
},
english_gate,
writegate::WritePolicy,
};
use elf_service::{
AddEventRequest, AddEventResponse, AddNoteInput, AddNoteRequest, AddNoteResponse,
AdminGraphPredicateAliasAddRequest, AdminGraphPredicateAliasesListRequest,
Expand All @@ -34,15 +41,20 @@ use elf_service::{
AdminIngestionProfileDefaultResponse, AdminIngestionProfileDefaultSetRequest,
AdminIngestionProfileGetRequest, AdminIngestionProfileListRequest,
AdminIngestionProfileResponse, AdminIngestionProfileVersionsListRequest,
AdminIngestionProfileVersionsListResponse, AdminIngestionProfilesListResponse, DeleteRequest,
DeleteResponse, DocType, DocsExcerptResponse, DocsExcerptsGetRequest, DocsGetRequest,
DocsGetResponse, DocsPutRequest, DocsPutResponse, DocsSearchL0Request, DocsSearchL0Response,
Error, EventMessage, GranteeKind, GraphQueryEntityRef, GraphQueryPredicateRef,
GraphQueryRequest, GraphQueryResponse, IngestionProfileSelector, ListRequest, ListResponse,
NoteFetchRequest, NoteFetchResponse, NoteProvenanceBundleResponse, NoteProvenanceGetRequest,
PayloadLevel, PublishNoteRequest, QueryPlan, RankingRequestOverride, RebuildReport,
SearchDetailsRequest, SearchDetailsResult, SearchExplainRequest, SearchExplainResponse,
SearchIndexItem, SearchRequest, SearchResponse, SearchSessionGetRequest, SearchTimelineGroup,
AdminIngestionProfileVersionsListResponse, AdminIngestionProfilesListResponse,
ConsolidationProposalGetRequest, ConsolidationProposalInput, ConsolidationProposalResponse,
ConsolidationProposalReviewRequest, ConsolidationProposalsListRequest,
ConsolidationProposalsListResponse, ConsolidationRunCreateRequest,
ConsolidationRunCreateResponse, ConsolidationRunGetRequest, ConsolidationRunResponse,
ConsolidationRunsListRequest, ConsolidationRunsListResponse, DeleteRequest, DeleteResponse,
DocType, DocsExcerptResponse, DocsExcerptsGetRequest, DocsGetRequest, DocsGetResponse,
DocsPutRequest, DocsPutResponse, DocsSearchL0Request, DocsSearchL0Response, Error,
EventMessage, GranteeKind, GraphQueryEntityRef, GraphQueryPredicateRef, GraphQueryRequest,
GraphQueryResponse, IngestionProfileSelector, ListRequest, ListResponse, NoteFetchRequest,
NoteFetchResponse, NoteProvenanceBundleResponse, NoteProvenanceGetRequest, PayloadLevel,
PublishNoteRequest, QueryPlan, RankingRequestOverride, RebuildReport, SearchDetailsRequest,
SearchDetailsResult, SearchExplainRequest, SearchExplainResponse, SearchIndexItem,
SearchRequest, SearchResponse, SearchSessionGetRequest, SearchTimelineGroup,
SearchTimelineRequest, SearchTrajectoryResponse, SearchTrajectorySummary, ShareScope,
SpaceGrantRevokeRequest, SpaceGrantRevokeResponse, SpaceGrantUpsertRequest,
SpaceGrantsListRequest, TextPositionSelector, TextQuoteSelector, TraceBundleGetRequest,
Expand Down Expand Up @@ -115,6 +127,12 @@ const VIEWER_HTML: &str = include_str!("../static/viewer.html");
admin_ingestion_profile_versions_list,
admin_ingestion_profile_default_get,
admin_ingestion_profile_default_set,
consolidation_run_create,
consolidation_runs_list,
consolidation_run_get,
consolidation_proposals_list,
consolidation_proposal_get,
consolidation_proposal_review,
rebuild_qdrant,
searches_raw,
trace_recent_list,
Expand All @@ -140,6 +158,7 @@ const VIEWER_HTML: &str = include_str!("../static/viewer.html");
(name = "docs", description = "Document extension ingestion, search, and excerpt retrieval."),
(name = "search", description = "Progressive search sessions and raw search diagnostics."),
(name = "graph", description = "Graph query and predicate administration."),
(name = "consolidation", description = "Reviewable derived consolidation proposals."),
(name = "admin", description = "Local admin and operator inspection routes."),
)
)]
Expand Down Expand Up @@ -314,6 +333,35 @@ struct AdminIngestionProfileDefaultSetBody {
version: Option<i32>,
}

#[derive(Clone, Debug, Deserialize)]
struct ConsolidationRunCreateBody {
job_kind: String,
input_refs: Vec<ConsolidationInputRef>,
#[serde(default = "empty_json_object")]
source_snapshot: Value,
lineage: ConsolidationLineage,
#[serde(default)]
proposals: Vec<ConsolidationProposalInput>,
}

#[derive(Clone, Debug, Deserialize)]
struct ConsolidationRunsListQuery {
limit: Option<u32>,
}

#[derive(Clone, Debug, Deserialize)]
struct ConsolidationProposalsListQuery {
run_id: Option<Uuid>,
review_state: Option<ConsolidationReviewState>,
limit: Option<u32>,
}

#[derive(Clone, Debug, Deserialize)]
struct ConsolidationProposalReviewBody {
action: ConsolidationReviewAction,
review_comment: Option<String>,
}

#[derive(Clone, Debug, Serialize, ToSchema)]
struct AdminIngestionProfileDefaultResponseV2 {
profile_id: String,
Expand Down Expand Up @@ -583,6 +631,20 @@ pub fn admin_router(state: AppState) -> Router {
"/v2/admin/events/ingestion-profiles",
routing::get(admin_ingestion_profiles_list).post(admin_ingestion_profile_create),
)
.route(
"/v2/admin/consolidation/runs",
routing::get(consolidation_runs_list).post(consolidation_run_create),
)
.route("/v2/admin/consolidation/runs/{run_id}", routing::get(consolidation_run_get))
.route("/v2/admin/consolidation/proposals", routing::get(consolidation_proposals_list))
.route(
"/v2/admin/consolidation/proposals/{proposal_id}",
routing::get(consolidation_proposal_get),
)
.route(
"/v2/admin/consolidation/proposals/{proposal_id}/review",
routing::post(consolidation_proposal_review),
)
.route("/v2/admin/qdrant/rebuild", routing::post(rebuild_qdrant))
.route("/v2/admin/searches/raw", routing::post(searches_raw))
.route("/v2/admin/traces/recent", routing::get(trace_recent_list))
Expand Down Expand Up @@ -620,6 +682,10 @@ where
.merge(Scalar::with_url(SCALAR_DOCS_PATH, <ApiDoc as OpenApi>::openapi()))
}

fn empty_json_object() -> Value {
Value::Object(Map::new())
}

fn json_error(
status: StatusCode,
code: &str,
Expand Down Expand Up @@ -2370,6 +2436,241 @@ async fn admin_note_provenance_get(
Ok(Json(response))
}

#[utoipa::path(
post,
path = "/v2/admin/consolidation/runs",
tag = "consolidation",
request_body = Value,
responses(
(status = 200, description = "Consolidation run was created.", body = Value),
(status = 400, description = "Invalid request.", body = ErrorBody),
(status = 401, description = "Authentication required.", body = ErrorBody),
(status = 403, description = "Admin access required.", body = ErrorBody),
(status = 500, description = "Internal error.", body = ErrorBody),
)
)]
async fn consolidation_run_create(
State(state): State<AppState>,
headers: HeaderMap,
payload: Result<Json<ConsolidationRunCreateBody>, JsonRejection>,
) -> Result<Json<ConsolidationRunCreateResponse>, ApiError> {
let ctx = RequestContext::from_headers(&headers)?;
let Json(payload) = payload.map_err(|err| {
tracing::warn!(error = %err, "Invalid request payload.");

json_error(StatusCode::BAD_REQUEST, "INVALID_REQUEST", "Invalid request payload.", None)
})?;
let response = state
.service
.consolidation_run_create(ConsolidationRunCreateRequest {
tenant_id: ctx.tenant_id,
project_id: ctx.project_id,
agent_id: ctx.agent_id,
job_kind: payload.job_kind,
input_refs: payload.input_refs,
source_snapshot: payload.source_snapshot,
lineage: payload.lineage,
proposals: payload.proposals,
})
.await?;

Ok(Json(response))
}

#[utoipa::path(
get,
path = "/v2/admin/consolidation/runs",
tag = "consolidation",
params(("limit" = Option<u32>, Query, description = "Maximum runs to return.")),
responses(
(status = 200, description = "Consolidation runs.", body = Value),
(status = 400, description = "Invalid request.", body = ErrorBody),
(status = 401, description = "Authentication required.", body = ErrorBody),
(status = 403, description = "Admin access required.", body = ErrorBody),
(status = 500, description = "Internal error.", body = ErrorBody),
)
)]
async fn consolidation_runs_list(
State(state): State<AppState>,
headers: HeaderMap,
query: Result<Query<ConsolidationRunsListQuery>, QueryRejection>,
) -> Result<Json<ConsolidationRunsListResponse>, ApiError> {
let ctx = RequestContext::from_headers(&headers)?;
let Query(query) = query.map_err(|err| {
tracing::warn!(error = %err, "Invalid query parameters.");

json_error(
StatusCode::BAD_REQUEST,
"INVALID_REQUEST",
"Invalid query parameters.".to_string(),
None,
)
})?;
let response = state
.service
.consolidation_runs_list(ConsolidationRunsListRequest {
tenant_id: ctx.tenant_id,
project_id: ctx.project_id,
limit: query.limit,
})
.await?;

Ok(Json(response))
}

#[utoipa::path(
get,
path = "/v2/admin/consolidation/runs/{run_id}",
tag = "consolidation",
params(("run_id" = Uuid, Path, description = "Consolidation run ID.")),
responses(
(status = 200, description = "Consolidation run.", body = Value),
(status = 400, description = "Invalid request.", body = ErrorBody),
(status = 401, description = "Authentication required.", body = ErrorBody),
(status = 403, description = "Admin access required.", body = ErrorBody),
(status = 404, description = "Consolidation run was not found.", body = ErrorBody),
(status = 500, description = "Internal error.", body = ErrorBody),
)
)]
async fn consolidation_run_get(
State(state): State<AppState>,
headers: HeaderMap,
Path(run_id): Path<Uuid>,
) -> Result<Json<ConsolidationRunResponse>, ApiError> {
let ctx = RequestContext::from_headers(&headers)?;
let response = state
.service
.consolidation_run_get(ConsolidationRunGetRequest {
tenant_id: ctx.tenant_id,
project_id: ctx.project_id,
run_id,
})
.await?;

Ok(Json(response))
}

#[utoipa::path(
get,
path = "/v2/admin/consolidation/proposals",
tag = "consolidation",
params(
("run_id" = Option<Uuid>, Query, description = "Optional run filter."),
("review_state" = Option<String>, Query, description = "Optional review-state filter."),
("limit" = Option<u32>, Query, description = "Maximum proposals to return."),
),
responses(
(status = 200, description = "Consolidation proposals.", body = Value),
(status = 400, description = "Invalid request.", body = ErrorBody),
(status = 401, description = "Authentication required.", body = ErrorBody),
(status = 403, description = "Admin access required.", body = ErrorBody),
(status = 500, description = "Internal error.", body = ErrorBody),
)
)]
async fn consolidation_proposals_list(
State(state): State<AppState>,
headers: HeaderMap,
query: Result<Query<ConsolidationProposalsListQuery>, QueryRejection>,
) -> Result<Json<ConsolidationProposalsListResponse>, ApiError> {
let ctx = RequestContext::from_headers(&headers)?;
let Query(query) = query.map_err(|err| {
tracing::warn!(error = %err, "Invalid query parameters.");

json_error(
StatusCode::BAD_REQUEST,
"INVALID_REQUEST",
"Invalid query parameters.".to_string(),
None,
)
})?;
let response = state
.service
.consolidation_proposals_list(ConsolidationProposalsListRequest {
tenant_id: ctx.tenant_id,
project_id: ctx.project_id,
run_id: query.run_id,
review_state: query.review_state,
limit: query.limit,
})
.await?;

Ok(Json(response))
}

#[utoipa::path(
get,
path = "/v2/admin/consolidation/proposals/{proposal_id}",
tag = "consolidation",
params(("proposal_id" = Uuid, Path, description = "Consolidation proposal ID.")),
responses(
(status = 200, description = "Consolidation proposal.", body = Value),
(status = 400, description = "Invalid request.", body = ErrorBody),
(status = 401, description = "Authentication required.", body = ErrorBody),
(status = 403, description = "Admin access required.", body = ErrorBody),
(status = 404, description = "Consolidation proposal was not found.", body = ErrorBody),
(status = 500, description = "Internal error.", body = ErrorBody),
)
)]
async fn consolidation_proposal_get(
State(state): State<AppState>,
headers: HeaderMap,
Path(proposal_id): Path<Uuid>,
) -> Result<Json<ConsolidationProposalResponse>, ApiError> {
let ctx = RequestContext::from_headers(&headers)?;
let response = state
.service
.consolidation_proposal_get(ConsolidationProposalGetRequest {
tenant_id: ctx.tenant_id,
project_id: ctx.project_id,
proposal_id,
})
.await?;

Ok(Json(response))
}

#[utoipa::path(
post,
path = "/v2/admin/consolidation/proposals/{proposal_id}/review",
tag = "consolidation",
params(("proposal_id" = Uuid, Path, description = "Consolidation proposal ID.")),
request_body = Value,
responses(
(status = 200, description = "Consolidation proposal review action was applied.", body = Value),
(status = 400, description = "Invalid request.", body = ErrorBody),
(status = 401, description = "Authentication required.", body = ErrorBody),
(status = 403, description = "Admin access required.", body = ErrorBody),
(status = 404, description = "Consolidation proposal was not found.", body = ErrorBody),
(status = 500, description = "Internal error.", body = ErrorBody),
)
)]
async fn consolidation_proposal_review(
State(state): State<AppState>,
headers: HeaderMap,
Path(proposal_id): Path<Uuid>,
payload: Result<Json<ConsolidationProposalReviewBody>, JsonRejection>,
) -> Result<Json<ConsolidationProposalResponse>, ApiError> {
let ctx = RequestContext::from_headers(&headers)?;
let Json(payload) = payload.map_err(|err| {
tracing::warn!(error = %err, "Invalid request payload.");

json_error(StatusCode::BAD_REQUEST, "INVALID_REQUEST", "Invalid request payload.", None)
})?;
let response = state
.service
.consolidation_proposal_review(ConsolidationProposalReviewRequest {
tenant_id: ctx.tenant_id,
project_id: ctx.project_id,
reviewer_agent_id: ctx.agent_id,
proposal_id,
review_action: payload.action,
review_comment: payload.review_comment,
})
.await?;

Ok(Json(response))
}

#[utoipa::path(
get,
path = "/v2/admin/events/ingestion-profiles",
Expand Down
Loading