Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,13 @@ SESSION_SECRET=
# Shared secret gating the write endpoints (POST /run, /packs/{id}/decision). Held server-side only
# (read API + the dashboard proxies); never exposed to the browser. Generate: openssl rand -hex 16.
RUN_API_TOKEN=

# ===== Sift Memory (Voyage AI, DBRE-side read-only retrieval) =====
# Used only to retrieve/rerank DBRE guidance for Run Review. It must never be exposed to
# the browser and never participates in approval, mutation, or verification decisions.
VOYAGE_API_KEY=
VOYAGE_EMBED_MODEL=voyage-4-lite
VOYAGE_RERANK_MODEL=rerank-2.5-lite
VOYAGE_MEMORY_TOP_K=3
VOYAGE_MEMORY_PREFILTER_K=8
VOYAGE_MEMORY_MAX_DOCS=12
25 changes: 14 additions & 11 deletions agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,26 @@ class AgentRole(StrEnum):

INSTRUCTIONS = {
AgentRole.FULL: (
"You are a MongoDB performance engineer. Use the native Mongo tools to read the "
"slow query, compare candidates, diagnose the ESR-correct index, and explain the "
"rationale. Return compact JSON with evidence, candidates, experiments, "
"recommended_index, and rationale. Never create or drop an index during diagnosis."
"You are a MongoDB performance engineer. The request gives a captured slow query; pass "
"it verbatim as the `query_json` argument to every tool. Use the native Mongo tools to "
"read the slow query, compare candidates, diagnose the ESR-correct index, and explain "
"the rationale. Return compact JSON with evidence, candidates, recommended_index, and "
"rationale. Never create or drop an index during diagnosis."
),
AgentRole.DIAGNOSE: (
"You are the Diagnose Agent. Run explain_slow_query and diagnose_candidate. "
"Return compact JSON with before evidence, diagnosis, and recommended_index. "
"Never create or drop an index."
"You are the Diagnose Agent. Run explain_slow_query then diagnose_candidate, passing the "
"request's query as the `query_json` argument to each. Return compact JSON with before "
"evidence, diagnosis, and recommended_index. Never create or drop an index."
),
AgentRole.CANDIDATE: (
"You are the Candidate Agent. Run compare_candidate_indexes. Return compact JSON "
"with candidate metrics and the winner. Never create or drop an index."
"You are the Candidate Agent. Run compare_candidate_indexes, passing the request's query "
"as the `query_json` argument. Return compact JSON with candidate metrics and the winner. "
"Never create or drop an index."
),
AgentRole.RATIONALE: (
"You are the Rationale Agent. Run rationalize_recommendation. Return compact JSON "
"with recommended_index and rationale grounded in evidence. Never create or drop an index."
"You are the Rationale Agent. Run rationalize_recommendation, passing the request's query "
"as the `query_json` argument. Return compact JSON with recommended_index and rationale "
"grounded in evidence. Never create or drop an index."
),
}

Expand Down
9 changes: 8 additions & 1 deletion agents/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,14 @@ def teardown(resource_name: str) -> None: # pragma: no cover - live deploy
print(f"TEARDOWN=engine_deleted resource={resource_name}")


_SMOKE_QUERY_JSON = (
'{"filter": {"purchaseMethod": "Phone", "customer.age": {"$gte": 55, "$lte": 75}}, '
'"sort": [["saleDate", -1]], "limit": 15}'
)


def _smoke_prompt(role: AgentRole) -> str:
q = f"Pass this exact JSON string as the `query_json` argument to every tool:\n{_SMOKE_QUERY_JSON}\n"
prompts = {
AgentRole.DIAGNOSE: (
"Run explain_slow_query and diagnose_candidate. Return compact JSON with "
Expand All @@ -151,7 +158,7 @@ def _smoke_prompt(role: AgentRole) -> str:
"and rationale. Do not mutate the database."
),
}
return prompts[role]
return f"{prompts[role]}\n\n{q}"


async def smoke(
Expand Down
187 changes: 124 additions & 63 deletions agents/native_mongo_tools.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
"""Python-native Mongo tools for Agent Engine.

These tools are read-only. They mirror the deterministic controller's demo fixture
query and never create or drop indexes.
These tools are read-only and query-parameterised: each one takes the captured slow query as a
JSON string (`query_json`) and explains/diagnoses THAT query against the live collection. They
never create or drop an index — the ESR-correct index is derived deterministically and is built
only after a human approves the evidence hash.
"""

from __future__ import annotations

from collections.abc import Mapping
import json
from collections.abc import Mapping, Sequence
from typing import Any

from controller.demo_fixture import COLL, DB, LIMIT, QUERY_FILTER, QUERY_SORT
from controller.demo_fixture import COLL, DB
from controller.diagnosis import diagnose
from controller.explain import capture_evidence, get_connection_string
from controller.schemas import Evidence

INDEX_B_NAME = "esr_wrong_B"
INDEX_C_NAME = "esr_right_C"
INDEX_B_KEYS = (("storeLocation", 1), ("customer.age", 1), ("saleDate", -1))
INDEX_C_KEYS = (("storeLocation", 1), ("saleDate", -1), ("customer.age", 1))
NAMESPACE = f"{DB}.{COLL}"


Expand All @@ -31,104 +30,166 @@ def _require_connection_string() -> str:
)


def _capture_with_hint(hint: str | list[tuple[str, int]]) -> Evidence:
def _parse_query(
query_json: str | Mapping[str, Any],
) -> tuple[dict[str, Any], list[tuple[str, int]], int]:
"""Parse the query the orchestrator passes to every tool.

Accepts {"filter": ..., "sort": [["f", -1]], "limit": N} or a {"query": {...}} wrapper.
"""
data = json.loads(query_json) if isinstance(query_json, str) else dict(query_json)
spec = data.get("query", data) if isinstance(data, dict) else {}
query_filter = dict(spec.get("filter", {}))
query_sort = [(str(field), int(direction)) for field, direction in spec.get("sort", [])]
limit = int(spec.get("limit", 20))
return query_filter, query_sort, limit


def _capture(
query_filter: Mapping[str, Any], query_sort: Sequence[tuple[str, int]], limit: int
) -> Evidence:
"""Capture the query's NATURAL explain plan (no index hint), read-only."""
from pymongo import MongoClient

client = MongoClient(_require_connection_string())
try:
return capture_evidence(client[DB][COLL], QUERY_FILTER, QUERY_SORT, LIMIT, hint=hint)
return capture_evidence(client[DB][COLL], query_filter, query_sort, limit)
finally:
client.close()


def _evidence_payload(evidence: Evidence, *, hint: str | list[tuple[str, int]]) -> dict[str, Any]:
def _query_echo(
query_filter: Mapping[str, Any], query_sort: Sequence[tuple[str, int]], limit: int
) -> dict[str, Any]:
return {
"namespace": NAMESPACE,
"hint": hint,
"evidence": evidence.model_dump(mode="json"),
"metrics": evidence.metrics.model_dump(mode="json"),
"filter": dict(query_filter),
"sort": [[field, direction] for field, direction in query_sort],
"limit": limit,
}


def _candidate_payload(
name: str, index_spec: tuple[tuple[str, int], ...], evidence: Evidence
) -> dict[str, Any]:
def _recommended_spec(diagnosis: Any) -> list[list[Any]]:
return [[field, direction] for field, direction in diagnosis.recommendation.index_spec]


def explain_slow_query(query_json: str) -> dict[str, Any]:
"""Explain the given slow query's natural plan and return its before-evidence.

Args:
query_json: JSON string of the query, e.g.
{"filter": {"purchaseMethod": "Phone", "customer.age": {"$gte": 55, "$lte": 75}},
"sort": [["saleDate", -1]], "limit": 15}
"""
query_filter, query_sort, limit = _parse_query(query_json)
evidence = _capture(query_filter, query_sort, limit)
return {
"name": name,
"index_spec": [[field, direction] for field, direction in index_spec],
"has_blocking_sort": evidence.metrics.has_blocking_sort,
"total_keys_examined": evidence.metrics.total_keys_examined,
"docs_examined": evidence.metrics.docs_examined,
"docs_returned": evidence.metrics.docs_returned,
"stages": list(evidence.metrics.stages),
"namespace": NAMESPACE,
"query": _query_echo(query_filter, query_sort, limit),
"evidence": evidence.model_dump(mode="json"),
"metrics": evidence.metrics.model_dump(mode="json"),
}


def explain_slow_query() -> dict[str, Any]:
"""Capture the canonical slow-query evidence by hinting the known wrong ESR index."""
evidence = _capture_with_hint(INDEX_B_NAME)
return _evidence_payload(evidence, hint=INDEX_B_NAME)
def compare_candidate_indexes(query_json: str) -> dict[str, Any]:
"""Compare the query's current plan against the ESR-recommended index (read-only).

The current plan is measured; the recommended index is derived deterministically and is NOT
created here — diagnosis never mutates. The recommendation is the winner.

def compare_candidate_indexes() -> dict[str, Any]:
"""Compare wrong-vs-correct ESR candidates using read-only hinted explains."""
wrong = _capture_with_hint(INDEX_B_NAME)
correct = _capture_with_hint(list(INDEX_C_KEYS))
Args:
query_json: JSON string {"filter": ..., "sort": ..., "limit": ...}.
"""
query_filter, query_sort, limit = _parse_query(query_json)
current = _capture(query_filter, query_sort, limit)
diagnosis = diagnose(
query_filter,
query_sort,
has_blocking_sort=current.metrics.has_blocking_sort,
current_index=None,
)
return {
"namespace": NAMESPACE,
"query": {"filter": QUERY_FILTER, "sort": QUERY_SORT, "limit": LIMIT},
"query": _query_echo(query_filter, query_sort, limit),
"candidates": [
_candidate_payload(INDEX_B_NAME, INDEX_B_KEYS, wrong),
_candidate_payload(INDEX_C_NAME, INDEX_C_KEYS, correct),
{
"name": "current_plan",
"index_spec": None,
"has_blocking_sort": current.metrics.has_blocking_sort,
"total_keys_examined": current.metrics.total_keys_examined,
"docs_examined": current.metrics.docs_examined,
"docs_returned": current.metrics.docs_returned,
"stages": list(current.metrics.stages),
},
{
"name": "esr_recommended",
"index_spec": _recommended_spec(diagnosis),
},
],
"winner": INDEX_C_NAME
if correct.metrics.total_keys_examined < wrong.metrics.total_keys_examined
else INDEX_B_NAME,
"winner": "esr_recommended",
}


def diagnose_candidate() -> dict[str, Any]:
"""Run the deterministic ESR diagnosis from live slow-query evidence."""
evidence = _capture_with_hint(INDEX_B_NAME)
def diagnose_candidate(query_json: str) -> dict[str, Any]:
"""Capture natural evidence for the query and run the deterministic ESR diagnosis.

Args:
query_json: JSON string {"filter": ..., "sort": ..., "limit": ...}.
"""
query_filter, query_sort, limit = _parse_query(query_json)
evidence = _capture(query_filter, query_sort, limit)
diagnosis = diagnose(
QUERY_FILTER,
QUERY_SORT,
query_filter,
query_sort,
has_blocking_sort=evidence.metrics.has_blocking_sort,
current_index=INDEX_B_NAME,
current_index=None,
)
return {
"namespace": NAMESPACE,
"source": "deterministic_esr",
"before": evidence.model_dump(mode="json"),
"diagnosis": diagnosis.model_dump(mode="json"),
"recommended_index": _recommended_spec(diagnosis),
}


def rationalize_recommendation() -> dict[str, Any]:
"""Return a concise evidence-grounded rationale for the ESR recommendation."""
slow = _capture_with_hint(INDEX_B_NAME)
fast = _capture_with_hint(list(INDEX_C_KEYS))
def rationalize_recommendation(query_json: str) -> dict[str, Any]:
"""Return an evidence-grounded rationale for the ESR recommendation (read-only).

Args:
query_json: JSON string {"filter": ..., "sort": ..., "limit": ...}.
"""
query_filter, query_sort, limit = _parse_query(query_json)
current = _capture(query_filter, query_sort, limit)
diagnosis = diagnose(
QUERY_FILTER,
QUERY_SORT,
has_blocking_sort=slow.metrics.has_blocking_sort,
current_index=INDEX_B_NAME,
query_filter,
query_sort,
has_blocking_sort=current.metrics.has_blocking_sort,
current_index=None,
)
spec = _recommended_spec(diagnosis)
keys = current.metrics.total_keys_examined
docs = current.metrics.docs_examined
returned = current.metrics.docs_returned
order = ", ".join(f"{field}:{direction}" for field, direction in spec)
problem = (
"performs a blocking in-memory SORT"
if current.metrics.has_blocking_sort
else "over-scans the collection"
)
before_keys = slow.metrics.total_keys_examined
after_keys = fast.metrics.total_keys_examined
return {
"namespace": NAMESPACE,
"recommended_index": diagnosis.recommendation.model_dump(mode="json")["index_spec"],
"recommended_index": spec,
"rationale": (
"Index B puts the range field before the sort field, causing a blocking SORT "
f"and {before_keys} keys examined. ESR index C moves saleDate before "
f"customer.age, removes the SORT stage, and examines {after_keys} keys."
f"The query examines {keys} keys and {docs} documents to return {returned}, and "
f"{problem}. The ESR-ordered index ({order}) places equality fields first, the sort "
"field next, and range fields last, so the index supplies the requested order and "
"removes the blocking SORT."
),
"evidence": {
"before_keys_examined": before_keys,
"after_keys_examined": after_keys,
"before_has_blocking_sort": slow.metrics.has_blocking_sort,
"after_has_blocking_sort": fast.metrics.has_blocking_sort,
"keys_examined": keys,
"docs_examined": docs,
"docs_returned": returned,
"has_blocking_sort": current.metrics.has_blocking_sort,
},
}

Expand Down
37 changes: 13 additions & 24 deletions api/agent_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,31 +265,20 @@ def _role_prompt(
query_sort: list[tuple[str, int]],
limit: int,
) -> str:
payload = {
"run_id": run_id,
"namespace": namespace,
"query": {"filter": query_filter, "sort": query_sort, "limit": limit},
}
prompts = {
"explain_slow_query": (
"Run exactly one tool: explain_slow_query. Return compact JSON containing "
"`before` evidence and the tool name. Do not call any mutation tools."
),
"compare_candidate_indexes": (
"Run exactly one tool: compare_candidate_indexes. Return compact JSON containing "
"candidate metrics, winner, and the tool name. Do not call any mutation tools."
),
"diagnose_candidate": (
"Run exactly one tool: diagnose_candidate. Return compact JSON containing "
"`before`, `diagnosis`, `recommended_index`, and the tool name. Do not call "
"any mutation tools."
),
"rationalize_recommendation": (
"Run exactly one tool: rationalize_recommendation. Return compact JSON containing "
"`recommended_index`, `rationale`, and the tool name. Do not call any mutation tools."
),
query_json = json.dumps(
{"filter": query_filter, "sort": [[f, d] for f, d in query_sort], "limit": limit}
)
returns = {
"explain_slow_query": "`before` evidence and the tool name",
"compare_candidate_indexes": "candidate metrics, winner, and the tool name",
"diagnose_candidate": "`before`, `diagnosis`, `recommended_index`, and the tool name",
"rationalize_recommendation": "`recommended_index`, `rationale`, and the tool name",
}
return f"{prompts[tool_name]}\n\n{json.dumps(payload, sort_keys=True)}"
return (
f"Run exactly one tool: {tool_name}. Pass this exact JSON string as its `query_json` "
f"argument:\n{query_json}\n(run_id={run_id}, namespace={namespace}). Return compact JSON "
f"containing {returns[tool_name]}. Do not call any other tool or any mutation tool."
)


@dataclass(frozen=True)
Expand Down
Loading
Loading