Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,11 @@ MDB_MCP_CONNECTION_STRING=mongodb+srv://USER:PASS@cluster0.xxxxx.mongodb.net/?re
# Atlas Service Account (OAuth client creds) → MCP atlas-* tools (atlas-get-performance-advisor, mongodb-logs)
MDB_MCP_API_CLIENT_ID=
MDB_MCP_API_CLIENT_SECRET=

# ===== Auth & write gate (two-persona console) =====
# Signs/verifies the HS256 session token shared by the read API and the dashboard — must be the
# SAME value in both. Generate with: openssl rand -hex 32. Never commit the real value.
SESSION_SECRET=
# Shared secret gating the write endpoints (POST /run, /packs/{id}/decision). Held server-side only
# (read API + the dashboard proxies); never exposed to the browser. Generate: openssl rand -hex 16.
RUN_API_TOKEN=
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ mongo-build-plan.md
.claude/
.codex/
.gstack/
.agents/
skills-lock.json

# local design reference images (visual specs, not pushed)
design-refs/
86 changes: 58 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,47 +1,77 @@
# googlecloudhack — Evidence-Driven DBRE Agent
# Evidence-Driven DBRE Agent

A Gemini-powered MongoDB performance engineer: detects slow queries, proposes ESR-correct
indexes from real `explain` evidence, gates `apply` behind human approval, verifies the
result, and ships a hashed evidence pack plus an internal event ledger for every fix.
Two personas, one MongoDB performance loop:

> **Status:** Day-5 — live Cloud Run demo with split Agent Engine diagnosis roles,
> deterministic validation, human-gated apply/verify, and Evidence Ledger collections.
- **Users** run real query workloads against a live Atlas collection from a guided console. Each
query's real `explain` evidence is captured and attributed to whoever ran it.
- A **DBRE** triages the *actual* slowest captured queries — ranked by explain evidence (blocking
sort, collection scan, over-scan ratio), not wall-clock — diagnoses one, and approves an
ESR-correct index fix. The controller applies it behind a hash-bound human gate, then verifies it.

## Architecture

Five demo stages (Detect → Diagnose → Test → Approve → Verify) over a deterministic
three-phase safety engine (DIAGNOSE → APPROVE → VERIFY).
There is no hardcoded demo query: the queries the DBRE fixes are the ones users really ran.

Current production path:
## Flow

```text
Dashboard -> FastAPI Cloud Run (opens approval gate)
-> Diagnose Agent Engine -> Candidate Agent Engine -> Rationale Agent Engine
-> deterministic controller validates + emits DIAGNOSED EvidencePack
-> /packs/{id}/decision hash-bound approval ticket
-> apply + verify
USER ─ login ─> Workload Console ─ guided query ─> read API ─ explain ─> Atlas
└─ capture (attributed) ─> query_log
DBRE ─ login ─> Slow-Query Queue (ranked by evidence)
└─ Diagnose ─> deterministic ESR diagnosis ─> DIAGNOSED EvidencePack
└─ hash-bound Approve ─> apply index + re-explain ─> VERIFIED
```

The three Agent Engine resources perform read-only Mongo diagnosis, candidate testing,
and rationale generation with Python-native tools.
Deterministic Python remains the safety authority: it recomputes the ESR winner, evidence
hash, phase transitions, index apply, and verification. `/run` opens the approval gate
before diagnosis and remains read-only; `/packs/{run_id}/decision` is the only path that
can issue the one-time approval ticket required for mutation.
## Architecture

- **Dashboard** — Next.js (App Router). Seeded role-based login backed by an httpOnly session
cookie; the user persona is confined to the workload console, the DBRE to the triage + review
planes. The read API is the security authority — it re-verifies the session bearer on every data
call, and the approver identity always comes from the verified session, never the browser.
- **Read API** — FastAPI on Cloud Run. Guided, validated, read-only workload queries; evidence
capture; the evidence-ranked queue; and the DIAGNOSE → (human APPROVE) → VERIFY remediation flow.
Index mutation happens only after a matching hash-bound approval.
- **Diagnosis** — a pure, deterministic ESR analyzer derives the correct index key order
(Equality → Sort → Range) from each query's own structure. In production three read-only Vertex AI
Agent Engine roles narrate the diagnosis; locally the controller runs deterministically.
- **State** — MongoDB Atlas. `dbre_state` holds `users`, `query_log`, `evidence_packs`, and the
internal ledger collections; the demo workload runs against `sample_supplies.sales_agent_demo`.

The dashboard reads only `EvidencePack` JSON, including `approval_gate` state and
`agent_trace` proof that the gate opened before Agent Engine participation. Internally,
MongoDB persists ledger collections for `slow_queries`, `candidates`, `experiments`,
`decisions`, `evidence_packs`, `approvals`, `applications`, and `verifications`.
The dashboard reads only `EvidencePack` v1 JSON; that contract is frozen in `contracts/`.

## Quickstart

```bash
uv sync --dev
cp .env.example .env # fill GCP + MongoDB values
uv run pytest -q
cp .env.example .env # fill MongoDB + (prod) Vertex values; set SESSION_SECRET + RUN_API_TOKEN

# one-time data + accounts (against your Atlas cluster)
uv run python seed/seed_demo_fixture.py seed # 300k demo docs
uv run python seed/seed_workload.py verify # baseline indexes + prove the trap presets
uv run python seed/seed_users.py # Dev Trivedi, Aakash Singh, DBRE — prints passwords once

uv run pytest -q # unit + contract (live integration auto-skips with no conn)
```

Run the full stack locally (deterministic controller, no Vertex needed):

```bash
SS=$(openssl rand -hex 32); RT=$(openssl rand -hex 16)
# read API (reads Atlas via MDB_MCP_CONNECTION_STRING from .env)
SESSION_SECRET=$SS RUN_API_TOKEN=$RT uv run uvicorn api.server:app --port 8000
# dashboard — SAME SESSION_SECRET + RUN_API_TOKEN, API_URL -> the read API
cd dashboard && npm install && \
API_URL=http://127.0.0.1:8000 SESSION_SECRET=$SS RUN_API_TOKEN=$RT npm run dev
```

Re-run `seed/seed_workload.py reset` between demos — an approved fix removes the trap for a whole
store/method class.

## Safety

- Agents and tools are read-only; only the deterministic controller mutates, and only after a
matching hash-bound approval.
- The approver identity comes from the verified DBRE session — never from the browser.
- Secrets live in `.env` (local) / Secret Manager (prod); none are committed.

## License

Apache-2.0 — see [`LICENSE`](LICENSE).
139 changes: 139 additions & 0 deletions api/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Web auth: user store, login route, and role guards. Credentials are verified against the
`users` collection, a short-lived HS256 session token is issued, and protected endpoints are
gated by role. SESSION_SECRET (env) signs/verifies tokens — it is never returned to clients
or logged. The dashboard holds the token in an httpOnly cookie and forwards it as a bearer."""

import os
from dataclasses import dataclass
from typing import Annotated, Protocol

from fastapi import APIRouter, Depends, Header, HTTPException
from pydantic import BaseModel

from controller.auth import (
Identity,
TokenError,
hash_password,
make_session_token,
read_session_token,
verify_password,
)

ROLE_USER = "user"
ROLE_DBRE = "dbre"
VALID_ROLES = (ROLE_USER, ROLE_DBRE)

# Equalises login timing whether or not the username exists (reduces enumeration signal).
_DUMMY_HASH = hash_password("\x00 absent-account placeholder \x00")


@dataclass(frozen=True)
class UserRecord:
username: str
display_name: str
role: str
password_hash: str


class AuthStore(Protocol):
def get_user(self, username: str) -> UserRecord | None: ...


class MongoUserStore:
def __init__(self, collection) -> None:
self._col = collection

def get_user(self, username: str) -> UserRecord | None:
doc = self._col.find_one({"username": username}, {"_id": False})
if not doc:
return None
return UserRecord(
username=doc["username"],
display_name=doc.get("display_name", doc["username"]),
role=doc["role"],
password_hash=doc["password_hash"],
)


def get_auth_store() -> AuthStore:
raise HTTPException(status_code=503, detail="authentication is not configured")


AuthStoreDep = Annotated[AuthStore, Depends(get_auth_store)]


def session_secret() -> str:
secret = os.environ.get("SESSION_SECRET")
if not secret:
raise HTTPException(status_code=503, detail="SESSION_SECRET is not configured")
return secret


class LoginRequest(BaseModel):
username: str
password: str


class LoginResponse(BaseModel):
token: str
role: str
username: str
display_name: str


auth_router = APIRouter()


@auth_router.post("/auth/login", response_model=LoginResponse)
def login(body: LoginRequest, store: AuthStoreDep) -> LoginResponse:
user = store.get_user(body.username)
# Always run a hash verification so the response time does not reveal whether the
# username exists. compare_digest inside verify_password keeps this constant-time.
if user is None:
verify_password(body.password, _DUMMY_HASH)
raise HTTPException(status_code=401, detail="invalid credentials")
if not verify_password(body.password, user.password_hash):
raise HTTPException(status_code=401, detail="invalid credentials")
identity = Identity(username=user.username, display_name=user.display_name, role=user.role)
token = make_session_token(identity, session_secret())
return LoginResponse(
token=token, role=user.role, username=user.username, display_name=user.display_name
)


def _identity_from_authorization(authorization: str | None) -> Identity:
if not authorization or not authorization.lower().startswith("bearer "):
raise HTTPException(status_code=401, detail="missing bearer token")
token = authorization.split(" ", 1)[1].strip()
try:
return read_session_token(token, session_secret())
except TokenError as exc:
raise HTTPException(status_code=401, detail="invalid session token") from exc


def require_role(*roles: str):
"""FastAPI dependency factory: returns the verified Identity, or 401 (no/invalid token) /
403 (role not permitted). With no roles given, any valid session passes."""
allowed = roles or VALID_ROLES

def _dependency(authorization: Annotated[str | None, Header()] = None) -> Identity:
identity = _identity_from_authorization(authorization)
if identity.role not in allowed:
raise HTTPException(status_code=403, detail="insufficient role")
return identity

return _dependency


def optional_dbre_identity(
authorization: Annotated[str | None, Header()] = None,
) -> Identity | None:
"""DBRE role guard that is active only when SESSION_SECRET is configured — mirrors
require_write_token's conditional gating, so local/CI flows stay open while production
enforces the role. Returns the verified DBRE identity, or None when auth is unconfigured."""
if not os.environ.get("SESSION_SECRET"):
return None
identity = _identity_from_authorization(authorization)
if identity.role != ROLE_DBRE:
raise HTTPException(status_code=403, detail="DBRE role required")
return identity
71 changes: 61 additions & 10 deletions api/routes.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import os
import secrets
from dataclasses import dataclass
from typing import Annotated, Literal, Protocol
from uuid import uuid4

from fastapi import APIRouter, Depends, Header, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel

from api.auth import optional_dbre_identity
from api.workload import WorkloadService, get_workload_service_optional
from controller.auth import Identity
from controller.orchestrator import ApprovalTicket, issue_approval_ticket
from controller.workload import WorkloadSpecError, assert_safe_query
from controller.schemas import ApprovalGateState, EvidencePack, PackStatus

router = APIRouter()
Expand All @@ -27,11 +32,20 @@ def get_pack(self, run_id: str) -> EvidencePack | None: ...
def save_pack(self, pack: EvidencePack) -> None: ...


@dataclass(frozen=True)
class QueryInput:
"""A real captured query to diagnose, in place of the preset demo fixture."""

query_filter: dict
query_sort: list[tuple[str, int]]
limit: int


class Engine(Protocol):
"""Executes the remediation phases against the live target. diagnose() is read-only;
apply_and_verify() performs the human-approved index mutation; reject() is a no-op record."""

async def diagnose(self, run_id: str) -> EvidencePack: ...
async def diagnose(self, run_id: str, query: QueryInput | None = None) -> EvidencePack: ...
async def apply_and_verify(
self, pack: EvidencePack, ticket: ApprovalTicket
) -> EvidencePack: ...
Expand Down Expand Up @@ -72,16 +86,44 @@ def get_pack(run_id: str, store: StoreDep) -> dict:

class RunRequest(BaseModel):
run_id: str | None = None
captured_query_id: str | None = None


WorkloadOptionalDep = Annotated[WorkloadService | None, Depends(get_workload_service_optional)]


@router.post("/run", dependencies=[Depends(require_write_token)])
async def trigger_run(store: StoreDep, engine: EngineDep, body: RunRequest | None = None) -> dict:
"""Trigger a DIAGNOSE-only live run over the preset demo fixture (#37). Returns a
DIAGNOSED pack — NO database mutation happens here. The recommended index is applied only
after a human approves via POST /packs/{run_id}/decision. Synchronous (a few seconds,
longer on a cold start); pass {"run_id": "..."} only to pin the id."""
async def trigger_run(
store: StoreDep,
engine: EngineDep,
workload: WorkloadOptionalDep,
_identity: Annotated[Identity | None, Depends(optional_dbre_identity)],
body: RunRequest | None = None,
) -> dict:
"""Trigger a DIAGNOSE-only run. With a captured_query_id, diagnoses that real captured query
against its natural plan; otherwise the preset demo fixture. NO mutation happens here — a human
approves via POST /packs/{run_id}/decision before any index is applied. The DBRE role is
enforced when SESSION_SECRET is configured."""
run_id = (body.run_id if body else None) or f"run-{uuid4().hex[:8]}"
pack = await engine.diagnose(run_id)
captured_id = body.captured_query_id if body else None
query: QueryInput | None = None
if captured_id is not None:
if workload is None:
raise HTTPException(status_code=503, detail="workload service not configured")
captured = workload.get_captured(captured_id)
if captured is None:
raise HTTPException(status_code=404, detail=f"captured query '{captured_id}' not found")
spec = captured["query"]
query = QueryInput(
query_filter=dict(spec["filter"]),
query_sort=[(field, int(direction)) for field, direction in spec["sort"]],
limit=int(spec["limit"]),
)
try:
assert_safe_query(query.query_filter, query.query_sort)
except WorkloadSpecError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
pack = await engine.diagnose(run_id, query)
store.save_pack(pack)
return pack.model_dump(mode="json")

Expand All @@ -94,7 +136,13 @@ class DecisionRequest(BaseModel):


@router.post("/packs/{run_id}/decision", dependencies=[Depends(require_write_token)])
async def decide_pack(run_id: str, body: DecisionRequest, store: StoreDep, engine: EngineDep):
async def decide_pack(
run_id: str,
body: DecisionRequest,
store: StoreDep,
engine: EngineDep,
identity: Annotated[Identity | None, Depends(optional_dbre_identity)],
):
"""The dashboard's approve/reject endpoint. On approve the recommended index is applied
and the fix verified (the human-gated mutation); on reject the decision is recorded with
no mutation. Returns the updated pack; 404 not_found / 409 already_decided |
Expand All @@ -119,12 +167,15 @@ async def decide_pack(run_id: str, body: DecisionRequest, store: StoreDep, engin
status_code=409,
content={"error": "approval_gate", "detail": "pending approval gate required"},
)
# Authoritative approver: the verified DBRE session when configured, else the request body
# (local/CI). The dashboard never decides who approved.
approver = identity.display_name if identity is not None else body.approver
if body.decision == "approve":
try:
ticket = issue_approval_ticket(
pack,
evidence_hash=body.evidence_hash,
approver=body.approver,
approver=approver,
note=body.note,
)
except ValueError as exc:
Expand All @@ -134,7 +185,7 @@ async def decide_pack(run_id: str, body: DecisionRequest, store: StoreDep, engin
updated = await engine.apply_and_verify(pack, ticket)
else:
try:
updated = engine.reject(pack, approver=body.approver, note=body.note)
updated = engine.reject(pack, approver=approver, note=body.note)
except ValueError as exc:
return JSONResponse(
status_code=409, content={"error": "approval_gate", "detail": str(exc)}
Expand Down
Loading
Loading