From 6790237fb5dbd784896aa5ca9f16e0166cbfd29a Mon Sep 17 00:00:00 2001 From: Nivesh353 Date: Wed, 24 Jun 2026 15:23:22 +0530 Subject: [PATCH 1/3] feat(a2a): add outbound A2A client (CLI + SDK) Configure remote A2A agents under `a2a_agents` in agent.yaml; each remote skill becomes a `__` tool. Blocking + streaming, header auth, per-agent timeout. Opt-in, non-fatal, no server started. Wired into both the CLI and the programmatic query() SDK path. --- Documentation.md | 66 +++++++++ README.md | 17 +++ package-lock.json | 42 ++++++ package.json | 1 + src/a2a/manager.ts | 329 +++++++++++++++++++++++++++++++++++++++++++++ src/a2a/types.ts | 28 ++++ src/exports.ts | 2 + src/index.ts | 27 +++- src/loader.ts | 3 + src/sdk.ts | 18 +++ test/a2a.test.ts | 198 +++++++++++++++++++++++++++ 11 files changed, 729 insertions(+), 2 deletions(-) create mode 100644 src/a2a/manager.ts create mode 100644 src/a2a/types.ts create mode 100644 test/a2a.test.ts diff --git a/Documentation.md b/Documentation.md index c795966..8bf8ad9 100644 --- a/Documentation.md +++ b/Documentation.md @@ -19,6 +19,7 @@ - [Workflows & SkillFlows](#workflows--skillflows) - [Hooks](#hooks) - [Plugins](#plugins) +- [A2A Client](#a2a-client) - [Memory System](#memory-system) - [Schedules & Cron](#schedules--cron) - [Integrations](#integrations) @@ -215,6 +216,11 @@ plugins: config: api_key: "${MY_PLUGIN_KEY}" +# A2A agents (optional) — remote agents gitagent may call (see "A2A Client") +a2a_agents: + research-agent: + url: https://research.example.com + # Compliance (optional — for enterprise) compliance: risk_level: high @@ -730,6 +736,66 @@ gitagent plugin init my-plugin # scaffold a new plugin --- +## A2A Client + +The **A2A client** lets gitagent call **other AI agents** that speak the +[A2A (Agent2Agent) protocol](https://a2a-protocol.org) — the open interop standard maintained +by the Linux Foundation. This is how gitagent talks to agents built on other frameworks +(LangGraph, CrewAI, Google ADK, …): it delegates a task to a remote agent and folds the result +back into its own reasoning. + +**It is outbound-only — gitagent does not run a server.** It connects out to a remote agent's +Agent Card, just like any HTTP client. With no `a2a_agents` configured, nothing happens and no +network calls are made; the default CLI behavior is unchanged. + +> A2A is complementary to local tools: tools give the agent *capabilities*, A2A gives it +> *peers*. Each remote skill is exposed as a normal tool, so delegation looks identical to any +> other tool call. + +### Configuration + +Add an `a2a_agents` map to `agent.yaml`: + +```yaml +a2a_agents: + research-agent: + url: https://research.example.com # base URL; Agent Card resolved from + # /.well-known/agent-card.json + headers: # optional auth/headers; ${VAR} interpolated + Authorization: "Bearer ${RESEARCH_TOKEN}" + timeoutMs: 30000 # optional; connect + per-call timeout (default 30000) + stream: true # optional; use SSE streaming when supported (default true) + cardPath: /.well-known/agent-card.json # optional; override the Agent Card path +``` + +### How it works + +1. **Discovery (startup).** For each configured agent, gitagent fetches its **Agent Card** and + reads the agent's name, description, and skills. +2. **Tools (startup).** Each skill becomes one tool named `__` + (e.g. `research-agent__web_search`). An agent that declares no skills becomes a single tool + named after the agent. Names that collide with an existing tool are skipped with a warning. +3. **Delegation (runtime).** When the model calls one of these tools with a `message`, gitagent + sends it to the remote agent. If the agent supports streaming, partial output is shown live + (`message/stream` over SSE); otherwise it blocks until the task completes (`message/send`). + The remote agent's reply is returned as the tool result. + +### Behavior & errors + +- **Opt-in:** no `a2a_agents` → feature is inert, zero overhead. +- **Non-fatal connections:** if an agent can't be reached at startup, it is skipped with a + warning and the rest of the session works normally. +- **Per-call errors** are returned to the model as text (so it can react), not thrown. +- **Auth:** put credentials in `headers` and reference environment variables with `${VAR}`. + +### Security note + +The A2A client makes **outbound** calls only — it opens no ports and exposes no server. Treat +remote agents like any third-party service: only configure agents you trust, and supply +credentials via environment variables rather than committing them to `agent.yaml`. + +--- + ## Memory System GitAgent's memory is git-native — all memory changes are committed, versioned, and auditable. diff --git a/README.md b/README.md index 526cd8d..8329845 100644 --- a/README.md +++ b/README.md @@ -607,6 +607,23 @@ my-plugin/ └── index.ts # Programmatic entry point ``` +## A2A Client + +Let gitagent **call other AI agents** that speak the [A2A (Agent2Agent) protocol](https://a2a-protocol.org) — the Linux Foundation interop standard. gitagent delegates a task to a remote agent (LangGraph, CrewAI, Google ADK, …) and uses the result. It's **outbound-only — no server runs** — and fully opt-in. + +Add remote agents to `agent.yaml`: + +```yaml +a2a_agents: + research-agent: + url: https://research.example.com + headers: + Authorization: "Bearer ${RESEARCH_TOKEN}" # ${VAR} interpolated + stream: true # SSE streaming when supported (default) +``` + +At startup gitagent fetches each agent's **Agent Card** and exposes every skill as a tool named `__`. The model calls it like any other tool; failed connections are skipped with a warning. See [Documentation.md → A2A Client](Documentation.md#a2a-client) for details. + ## Multi-Model Support Gitagent works with any LLM provider supported by [pi-ai](https://github.com/badlogic/pi-mono/tree/main/packages/ai): diff --git a/package-lock.json b/package-lock.json index 2316271..b4eb7ff 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,7 @@ "version": "2.0.0", "license": "MIT", "dependencies": { + "@a2a-js/sdk": "^0.3.13", "@mariozechner/pi-agent-core": "^0.70.2", "@mariozechner/pi-ai": "^0.70.2", "@opentelemetry/api": "^1.9.0", @@ -47,6 +48,47 @@ } } }, + "node_modules/@a2a-js/sdk": { + "version": "0.3.13", + "resolved": "https://registry.npmjs.org/@a2a-js/sdk/-/sdk-0.3.13.tgz", + "integrity": "sha512-BZr0f9JVNQs3GKOM9xINWCh6OKIJWZFPyqqVqTym5mxO2Eemc6I/0zL7zWnljHzGdaf5aZQyQN5xa6PSH62q+A==", + "license": "Apache-2.0", + "dependencies": { + "uuid": "^11.1.0" + }, + "engines": { + "node": ">=18" + }, + "peerDependencies": { + "@bufbuild/protobuf": "^2.10.2", + "@grpc/grpc-js": "^1.11.0", + "express": "^4.21.2 || ^5.1.0" + }, + "peerDependenciesMeta": { + "@bufbuild/protobuf": { + "optional": true + }, + "@grpc/grpc-js": { + "optional": true + }, + "express": { + "optional": true + } + } + }, + "node_modules/@a2a-js/sdk/node_modules/uuid": { + "version": "11.1.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-11.1.1.tgz", + "integrity": "sha512-vIYxrBCC/N/K+Js3qSN88go7kIfNPssr/hHCesKCQNAjmgvYS2oqr69kIufEG+O4+PfezOH4EbIeHCfFov8ZgQ==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "license": "MIT", + "bin": { + "uuid": "dist/esm/bin/uuid" + } + }, "node_modules/@anthropic-ai/sdk": { "version": "0.90.0", "resolved": "https://registry.npmjs.org/@anthropic-ai/sdk/-/sdk-0.90.0.tgz", diff --git a/package.json b/package.json index 00aa77c..7c0033e 100644 --- a/package.json +++ b/package.json @@ -49,6 +49,7 @@ "node": ">=20" }, "dependencies": { + "@a2a-js/sdk": "^0.3.13", "@mariozechner/pi-agent-core": "^0.70.2", "@mariozechner/pi-ai": "^0.70.2", "@opentelemetry/api": "^1.9.0", diff --git a/src/a2a/manager.ts b/src/a2a/manager.ts new file mode 100644 index 0000000..fc3d1ed --- /dev/null +++ b/src/a2a/manager.ts @@ -0,0 +1,329 @@ +import { randomUUID } from "crypto"; +import { A2AClient } from "@a2a-js/sdk/client"; +import type { + AgentCard, + AgentSkill, + Message, + MessageSendParams, + Part, + Task, + TaskArtifactUpdateEvent, + TaskStatusUpdateEvent, +} from "@a2a-js/sdk"; +import type { AgentTool } from "@mariozechner/pi-agent-core"; +import { Type } from "@sinclair/typebox"; +import type { A2AAgentConfig, A2ASetupResult } from "./types.js"; + +const DEFAULT_TIMEOUT_MS = 30000; + +// ANSI helpers (kept local so the module has no dependency on index.ts). +const dim = (s: string) => `\x1b[2m${s}\x1b[0m`; +const red = (s: string) => `\x1b[31m${s}\x1b[0m`; + +/** A live connection to one remote A2A agent. */ +interface A2AConnection { + name: string; + client: A2AClient; + card: AgentCard; +} + +function withTimeout(op: Promise, ms: number, label: string): Promise { + return Promise.race([ + op, + new Promise((_, reject) => + setTimeout(() => reject(new Error(`${label} timed out after ${ms}ms`)), ms), + ), + ]); +} + +/** Replace `${VAR}` with process.env values (same syntax as plugin config). */ +function interpolateEnv(value: string): string { + return value.replace(/\$\{(\w+)\}/g, (_, name) => process.env[name] || ""); +} + +function interpolateHeaders( + headers: Record | undefined, +): Record | undefined { + if (!headers) return undefined; + const out: Record = {}; + for (const [k, v] of Object.entries(headers)) out[k] = interpolateEnv(v); + return out; +} + +/** + * A fetch wrapper that injects static headers (auth etc.) into every request. + * This is how the A2A SDK supports custom headers — via a custom fetch impl. + */ +function headerFetch(headers: Record): typeof fetch { + return (input: any, init: any = {}) => { + const merged = new Headers(init.headers || {}); + for (const [k, v] of Object.entries(headers)) merged.set(k, v); + return fetch(input, { ...init, headers: merged }); + }; +} + +/** Flatten an array of A2A content parts into plain text. Binary parts are + * summarized rather than inlined, to protect the token budget. */ +export function partsToText(parts: Part[] | undefined): string { + if (!Array.isArray(parts)) return ""; + const out: string[] = []; + for (const p of parts) { + switch (p?.kind) { + case "text": + out.push(p.text ?? ""); + break; + case "file": { + const f: any = (p as any).file ?? {}; + out.push(`[file: ${f.name || f.uri || f.mimeType || "binary"}]`); + break; + } + case "data": + try { + out.push("```json\n" + JSON.stringify((p as any).data, null, 2) + "\n```"); + } catch { + out.push("[data]"); + } + break; + } + } + return out.join("\n"); +} + +/** Extract the best text from a terminal Message or Task result. */ +function resultToText(result: Message | Task): string { + if (result.kind === "message") return partsToText(result.parts); + // Task: prefer artifacts, then the status message. + const fromArtifacts = (result.artifacts ?? []) + .map((a) => partsToText(a.parts)) + .filter(Boolean) + .join("\n"); + if (fromArtifacts) return fromArtifacts; + const statusMsg = result.status?.message; + if (statusMsg) return partsToText(statusMsg.parts); + return `[task ${result.id}: ${result.status?.state ?? "unknown"}]`; +} + +function buildSendParams(message: string): MessageSendParams { + return { + message: { + kind: "message", + messageId: randomUUID(), + role: "user", + parts: [{ kind: "text", text: message }], + }, + configuration: { + acceptedOutputModes: ["text/plain"], + blocking: true, + }, + }; +} + +/** + * Run one streaming call. Accumulates text from message / status-update / + * artifact-update events, pushing partial output via onUpdate, and returns the + * final accumulated text. + */ +async function runStreaming( + client: A2AClient, + message: string, + signal: AbortSignal | undefined, + onUpdate?: (text: string) => void, +): Promise { + let acc = ""; + const append = (text: string) => { + if (!text) return; + acc += (acc ? "\n" : "") + text; + onUpdate?.(acc); + }; + const stream = client.sendMessageStream(buildSendParams(message)); + for await (const event of stream) { + if (signal?.aborted) break; + const e = event as Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent; + switch (e.kind) { + case "message": + append(partsToText(e.parts)); + break; + case "status-update": + if (e.status?.message) append(partsToText(e.status.message.parts)); + if (e.final) return acc; + break; + case "artifact-update": + append(partsToText(e.artifact?.parts)); + break; + case "task": + // Initial/echoed task snapshot — capture any status text. + if (e.status?.message) append(partsToText(e.status.message.parts)); + break; + } + } + return acc; +} + +/** Run one blocking call and return the final text. */ +async function runBlocking(client: A2AClient, message: string): Promise { + const resp = await client.sendMessage(buildSendParams(message)); + if ("error" in resp && resp.error) { + throw new Error(resp.error.message || "A2A agent returned an error"); + } + const result = (resp as any).result as Message | Task; + return resultToText(result); +} + +/** Build the tool description from a skill (or the card, when skill-less). */ +function describeSkill(card: AgentCard, skill?: AgentSkill): string { + if (!skill) { + return `Delegate a task to the "${card.name}" agent. ${card.description ?? ""}`.trim(); + } + const parts = [skill.description || skill.name]; + if (skill.tags?.length) parts.push(`Tags: ${skill.tags.join(", ")}.`); + if (skill.examples?.length) parts.push(`Examples: ${skill.examples.slice(0, 3).join(" | ")}.`); + return `[${card.name}] ${parts.join(" ")}`.trim(); +} + +/** Sanitize a name so the assembled tool name is a safe identifier. */ +function sanitize(name: string): string { + return name.replace(/[^a-zA-Z0-9_]+/g, "_").replace(/^_+|_+$/g, ""); +} + +/** + * Build one AgentTool for a remote agent/skill. Constructed directly (not via + * buildTool) so streaming can push partial deltas through onUpdate. + */ +function buildA2ATool( + conn: A2AConnection, + config: A2AAgentConfig, + skill: AgentSkill | undefined, + toolName: string, +): AgentTool { + const timeoutMs = config.timeoutMs ?? DEFAULT_TIMEOUT_MS; + const streaming = config.stream !== false && conn.card.capabilities?.streaming === true; + + return { + name: toolName, + label: toolName, + description: describeSkill(conn.card, skill), + parameters: Type.Object({ + message: Type.String({ + description: + "The task or question to send to the remote agent, in natural language.", + }), + }), + execute: async (_toolCallId, params, signal, onUpdate) => { + const message = String((params as any)?.message ?? "").trim(); + if (!message) { + return { + content: [{ type: "text" as const, text: "Error: 'message' is required." }], + details: undefined, + }; + } + const onText = onUpdate + ? (text: string) => + onUpdate({ content: [{ type: "text" as const, text }], details: undefined }) + : undefined; + try { + const call = streaming + ? runStreaming(conn.client, message, signal, onText) + : runBlocking(conn.client, message); + const text = await withTimeout(call, timeoutMs, `a2a ${conn.name}`); + return { + content: [{ type: "text" as const, text: text || "[no content returned]" }], + details: undefined, + }; + } catch (err: any) { + const msg = err?.message ?? String(err); + return { + content: [ + { type: "text" as const, text: `A2A call to "${conn.name}" failed: ${msg}` }, + ], + details: undefined, + }; + } + }, + }; +} + +/** + * Connect to every configured A2A agent, expose each skill as a tool, and + * return the merged tools plus an idempotent cleanup. A connection failure is + * non-fatal: that agent is skipped with a warning and the rest still load. + */ +export async function setupA2A( + agents: Record | undefined, + existingToolNames: Set, +): Promise { + const tools: AgentTool[] = []; + if (!agents || Object.keys(agents).length === 0) { + return { tools, cleanup: async () => {} }; + } + + const taken = new Set(existingToolNames); + + for (const [rawName, rawConfig] of Object.entries(agents)) { + const name = sanitize(rawName); + if (!rawConfig?.url) { + console.warn(red(`[a2a:${rawName}] missing "url" — skipping`)); + continue; + } + const config: A2AAgentConfig = { + ...rawConfig, + url: interpolateEnv(rawConfig.url), + headers: interpolateHeaders(rawConfig.headers), + }; + const timeoutMs = config.timeoutMs ?? DEFAULT_TIMEOUT_MS; + + try { + const cardUrl = config.cardPath + ? new URL(config.cardPath, config.url).toString() + : new URL("/.well-known/agent-card.json", config.url).toString(); + const options = config.headers + ? { fetchImpl: headerFetch(config.headers) } + : undefined; + const client = await withTimeout( + A2AClient.fromCardUrl(cardUrl, options), + timeoutMs, + `a2a ${name} connect`, + ); + const card = await withTimeout( + client.getAgentCard(), + timeoutMs, + `a2a ${name} card`, + ); + const conn: A2AConnection = { name, client, card }; + + const skills = Array.isArray(card.skills) ? card.skills : []; + const built: AgentTool[] = + skills.length > 0 + ? skills.map((skill) => + buildA2ATool(conn, config, skill, `${name}__${sanitize(skill.id)}`), + ) + : [buildA2ATool(conn, config, undefined, name)]; + + let added = 0; + for (const tool of built) { + if (taken.has(tool.name)) { + console.warn( + dim(`[a2a:${rawName}] tool "${tool.name}" collides — skipping`), + ); + continue; + } + taken.add(tool.name); + tools.push(tool); + added++; + } + console.log( + dim(`[a2a] ${rawName} connected (${added} skill${added === 1 ? "" : "s"})`), + ); + } catch (err: any) { + console.warn( + red(`[a2a:${rawName}] connection failed: ${err?.message ?? String(err)} — skipping`), + ); + } + } + + // v1 holds no persistent connection between calls (each call opens its own + // request/stream), so cleanup is a no-op placeholder kept for symmetry and + // future streaming-connection reuse. + const cleanup = async () => {}; + + return { tools, cleanup }; +} diff --git a/src/a2a/types.ts b/src/a2a/types.ts new file mode 100644 index 0000000..ebc06c7 --- /dev/null +++ b/src/a2a/types.ts @@ -0,0 +1,28 @@ +import type { AgentTool } from "@mariozechner/pi-agent-core"; + +/** + * A remote A2A (Agent2Agent) agent gitagent is allowed to call. Outbound only — + * gitagent connects out to the agent's Agent Card; it never runs a server. + */ +export interface A2AAgentConfig { + /** Base URL of the remote agent. The Agent Card is resolved from here + * (`/.well-known/agent-card.json` unless `cardPath` overrides it). */ + url: string; + /** Explicit Agent Card path/URL, if the agent doesn't use the well-known path. */ + cardPath?: string; + /** Extra HTTP headers (e.g. auth) sent on every request. `${VAR}` is interpolated. */ + headers?: Record; + /** Connect + per-call timeout in ms. Default 30000. */ + timeoutMs?: number; + /** Use SSE streaming (`message/stream`) when the agent supports it. Default true. + * Set false to force blocking `message/send`. */ + stream?: boolean; +} + +/** Result of wiring up all configured A2A agents for a session. */ +export interface A2ASetupResult { + /** Tools discovered across all agents, one per skill, namespaced `__`. */ + tools: AgentTool[]; + /** Idempotent teardown — aborts any open streams. Safe to call repeatedly. */ + cleanup: () => Promise; +} diff --git a/src/exports.ts b/src/exports.ts index 787d60f..4a84d68 100644 --- a/src/exports.ts +++ b/src/exports.ts @@ -23,6 +23,8 @@ export type { // Internal types (for advanced usage) export type { AgentManifest, LoadedAgent } from "./loader.js"; +export type { A2AAgentConfig, A2ASetupResult } from "./a2a/types.js"; +export { setupA2A } from "./a2a/manager.js"; export type { SkillMetadata } from "./skills.js"; export type { WorkflowMetadata } from "./workflows.js"; export type { SubAgentMetadata } from "./agents.js"; diff --git a/src/index.ts b/src/index.ts index 68ea4ca..6a58d53 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,6 +12,7 @@ import { loadHooksConfig, runHooks, wrapToolWithHooks } from "./hooks.js"; import type { HooksConfig } from "./hooks.js"; import { loadDeclarativeTools } from "./tool-loader.js"; import { toAgentTool } from "./tool-utils.js"; +import { setupA2A } from "./a2a/manager.js"; import { AuditLogger, isAuditEnabled } from "./audit.js"; import { formatComplianceWarnings } from "./compliance.js"; import { readFile, mkdir, writeFile, stat, access } from "fs/promises"; @@ -40,6 +41,18 @@ const bold = (s: string) => `\x1b[1m${s}\x1b[0m`; const red = (s: string) => `\x1b[31m${s}\x1b[0m`; const green = (s: string) => `\x1b[32m${s}\x1b[0m`; +// Teardown for A2A connections, set during agent assembly. Module-scoped so the +// SIGTERM handler (outside main()) can reach it. No-op until A2A is configured. +let a2aCleanup: (() => Promise) | null = null; +const runA2ACleanup = async () => { + if (!a2aCleanup) return; + try { + await a2aCleanup(); + } catch { + /* best-effort — never block exit */ + } +}; + interface ParsedArgs { model?: string; dir: string; @@ -542,6 +555,12 @@ async function main(): Promise { const declarativeTools = await loadDeclarativeTools(agentDir); tools = [...tools, ...declarativeTools]; + // A2A remote-agent tools (manifest-declared) — outbound only, no server. + // Each remote skill becomes a tool; failed connections warn and are skipped. + const a2aSetup = await setupA2A(manifest.a2a_agents, new Set(tools.map((t) => t.name))); + tools = [...tools, ...a2aSetup.tools]; + a2aCleanup = a2aSetup.cleanup; + // Plugin tools (declarative + programmatic) — check for collisions with existing tools const existingToolNames = new Set(tools.map((t) => t.name)); for (const plugin of loaded.plugins) { @@ -651,6 +670,7 @@ async function main(): Promise { } throw err; } finally { + await runA2ACleanup(); if (localSession) { console.log(dim("Finalizing session...")); localSession.finalize(); @@ -685,6 +705,7 @@ async function main(): Promise { if (trimmed === "/quit" || trimmed === "/exit") { rl.close(); + await runA2ACleanup(); if (localSession) { console.log(dim("Finalizing session...")); localSession.finalize(); @@ -836,7 +857,7 @@ async function main(): Promise { try { _session.end({ "gitagent.cost_usd": _totalCostUsd }); } catch { /* ignore */ } - stopSandbox().finally(() => process.exit(0)); + runA2ACleanup().finally(() => stopSandbox().finally(() => process.exit(0))); } }); @@ -845,7 +866,9 @@ async function main(): Promise { // Flush OpenTelemetry exporters on SIGTERM. No-op when telemetry is disabled. process.on("SIGTERM", () => { - shutdownTelemetry().catch(() => {}).finally(() => process.exit(0)); + runA2ACleanup() + .finally(() => shutdownTelemetry().catch(() => {})) + .finally(() => process.exit(0)); }); main() diff --git a/src/loader.ts b/src/loader.ts index b8d193e..3c030bb 100644 --- a/src/loader.ts +++ b/src/loader.ts @@ -22,6 +22,7 @@ import type { ComplianceWarning } from "./compliance.js"; import { discoverAndLoadPlugins } from "./plugins.js"; import type { LoadedPlugin } from "./plugin-types.js"; import type { PluginConfig } from "./plugin-types.js"; +import type { A2AAgentConfig } from "./a2a/types.js"; export interface AgentManifest { spec_version: string; @@ -55,6 +56,8 @@ export interface AgentManifest { delegation?: { mode: "auto" | "explicit" | "router"; router?: string }; compliance?: Record; plugins?: Record; + /** Remote A2A agents gitagent may call (outbound only — no server). */ + a2a_agents?: Record; } async function readFileOr(path: string, fallback: string): Promise { diff --git a/src/sdk.ts b/src/sdk.ts index dfa9a80..befda55 100644 --- a/src/sdk.ts +++ b/src/sdk.ts @@ -9,6 +9,7 @@ import type { SandboxContext } from "./sandbox.js"; import { loadHooksConfig, runHooks, wrapToolWithHooks } from "./hooks.js"; import { loadDeclarativeTools } from "./tool-loader.js"; import { toAgentTool } from "./tool-utils.js"; +import { setupA2A } from "./a2a/manager.js"; import { wrapToolWithProgrammaticHooks } from "./sdk-hooks.js"; import { mergeHooksConfigs } from "./plugins.js"; import { initLocalSession } from "./session.js"; @@ -111,6 +112,12 @@ export function query(options: QueryOptions): Query { let sandboxCtx: SandboxContext | undefined; // Local session (hoisted for cleanup in catch) let localSession: LocalSession | undefined; + // A2A connection teardown (hoisted for cleanup on success + error paths) + let a2aCleanup: (() => Promise) | null = null; + const runA2ACleanup = async () => { + if (!a2aCleanup) return; + try { await a2aCleanup(); } catch { /* best-effort */ } + }; // OpenTelemetry session span — opened immediately so it covers agent // load + prompt + cleanup. Closed in the IIFE's finally so every exit @@ -189,6 +196,11 @@ export function query(options: QueryOptions): Query { const declarativeTools = await loadDeclarativeTools(loaded.agentDir); tools = [...tools, ...declarativeTools]; + // A2A remote-agent tools (manifest-declared) — outbound only, opt-in. + const a2aSetup = await setupA2A(loaded.manifest.a2a_agents, new Set(tools.map((t) => t.name))); + tools = [...tools, ...a2aSetup.tools]; + a2aCleanup = a2aSetup.cleanup; + // Plugin tools (declarative + programmatic) — check for collisions with existing tools const existingToolNames = new Set(tools.map((t) => t.name)); for (const plugin of loaded.plugins) { @@ -537,6 +549,9 @@ export function query(options: QueryOptions): Query { } } + // Tear down A2A connections + await runA2ACleanup(); + // Finalize local session if active if (localSession) { try { localSession.finalize(); } catch { /* best-effort */ } @@ -560,6 +575,9 @@ export function query(options: QueryOptions): Query { } } })().catch(async (err) => { + // Tear down A2A connections on error + await runA2ACleanup(); + // Finalize local session on error if (localSession) { try { localSession.finalize(); } catch { /* best-effort */ } diff --git a/test/a2a.test.ts b/test/a2a.test.ts new file mode 100644 index 0000000..ca963db --- /dev/null +++ b/test/a2a.test.ts @@ -0,0 +1,198 @@ +import { describe, it, before, beforeEach, afterEach, mock } from "node:test"; +import assert from "node:assert/strict"; + +// Dynamic imports since the project is ESM and we test the built output. +let setupA2A: typeof import("../dist/a2a/manager.js").setupA2A; +let partsToText: typeof import("../dist/a2a/manager.js").partsToText; +let A2AClient: typeof import("@a2a-js/sdk/client").A2AClient; + +before(async () => { + const mgr = await import("../dist/a2a/manager.js"); + setupA2A = mgr.setupA2A; + partsToText = mgr.partsToText; + A2AClient = (await import("@a2a-js/sdk/client")).A2AClient; +}); + +// ── Test doubles ───────────────────────────────────────────────────────── + +function fakeCard(skills: any[], streaming = false): any { + return { + protocolVersion: "0.3.0", + name: "Research", + description: "A research agent", + url: "http://example.test", + version: "1.0.0", + capabilities: { streaming }, + defaultInputModes: ["text/plain"], + defaultOutputModes: ["text/plain"], + skills, + }; +} + +function textMessage(text: string): any { + return { kind: "message", messageId: "m1", role: "agent", parts: [{ kind: "text", text }] }; +} + +function installFakeClient(client: any) { + mock.method(A2AClient, "fromCardUrl", async () => client); +} + +afterEach(() => mock.restoreAll()); + +// Silence the manager's console output during tests. +beforeEach(() => { + mock.method(console, "log", () => {}); + mock.method(console, "warn", () => {}); +}); + +// ── partsToText ────────────────────────────────────────────────────────── + +describe("partsToText", () => { + it("joins text parts", () => { + const out = partsToText([ + { kind: "text", text: "hello" }, + { kind: "text", text: "world" }, + ] as any); + assert.equal(out, "hello\nworld"); + }); + + it("summarizes file parts instead of inlining bytes", () => { + const out = partsToText([{ kind: "file", file: { name: "report.pdf" } }] as any); + assert.match(out, /\[file: report\.pdf\]/); + }); + + it("renders data parts as JSON", () => { + const out = partsToText([{ kind: "data", data: { a: 1 } }] as any); + assert.match(out, /"a": 1/); + }); + + it("returns empty string for non-array", () => { + assert.equal(partsToText(undefined), ""); + }); +}); + +// ── setupA2A: discovery & mapping ───────────────────────────────────────── + +describe("setupA2A", () => { + it("returns no tools and a no-op cleanup when nothing is configured", async () => { + const r1 = await setupA2A(undefined, new Set()); + const r2 = await setupA2A({}, new Set()); + assert.equal(r1.tools.length, 0); + assert.equal(r2.tools.length, 0); + await r1.cleanup(); // does not throw + }); + + it("maps one tool per skill, namespaced __", async () => { + installFakeClient({ + getAgentCard: async () => + fakeCard([ + { id: "web_search", name: "Web Search", description: "Search", tags: [] }, + { id: "fact_check", name: "Fact Check", description: "Check", tags: [] }, + ]), + }); + const { tools } = await setupA2A( + { research: { url: "http://example.test" } }, + new Set(), + ); + const names = tools.map((t) => t.name).sort(); + assert.deepEqual(names, ["research__fact_check", "research__web_search"]); + }); + + it("falls back to a single agent-named tool when no skills are declared", async () => { + installFakeClient({ getAgentCard: async () => fakeCard([]) }); + const { tools } = await setupA2A( + { helper: { url: "http://example.test" } }, + new Set(), + ); + assert.equal(tools.length, 1); + assert.equal(tools[0].name, "helper"); + }); + + it("skips tools that collide with existing tool names", async () => { + installFakeClient({ + getAgentCard: async () => + fakeCard([{ id: "read", name: "Read", description: "x", tags: [] }]), + }); + const { tools } = await setupA2A( + { fs: { url: "http://example.test" } }, + new Set(["fs__read"]), + ); + assert.equal(tools.length, 0); + }); + + it("skips an agent with a missing url but keeps others", async () => { + installFakeClient({ + getAgentCard: async () => + fakeCard([{ id: "go", name: "Go", description: "x", tags: [] }]), + }); + const { tools } = await setupA2A( + { bad: {} as any, good: { url: "http://example.test" } }, + new Set(), + ); + assert.deepEqual(tools.map((t) => t.name), ["good__go"]); + }); + + it("is non-fatal when an agent connection fails", async () => { + mock.method(A2AClient, "fromCardUrl", async () => { + throw new Error("ECONNREFUSED"); + }); + const { tools } = await setupA2A( + { down: { url: "http://example.test", timeoutMs: 200 } }, + new Set(), + ); + assert.equal(tools.length, 0); // skipped, no throw + }); +}); + +// ── tool execution ──────────────────────────────────────────────────────── + +describe("A2A tool execution", () => { + it("blocking call returns the remote agent's text", async () => { + installFakeClient({ + getAgentCard: async () => fakeCard([{ id: "ask", name: "Ask", description: "x", tags: [] }], false), + sendMessage: async () => ({ result: textMessage("the answer is 42") }), + }); + const { tools } = await setupA2A({ q: { url: "http://example.test" } }, new Set()); + const res = await tools[0].execute("call1", { message: "what is the answer?" }); + assert.equal(res.content[0].text, "the answer is 42"); + }); + + it("propagates a remote error into the tool result instead of throwing", async () => { + installFakeClient({ + getAgentCard: async () => fakeCard([{ id: "ask", name: "Ask", description: "x", tags: [] }], false), + sendMessage: async () => ({ error: { code: -32000, message: "boom" } }), + }); + const { tools } = await setupA2A({ q: { url: "http://example.test" } }, new Set()); + const res = await tools[0].execute("call1", { message: "hi" }); + assert.match(res.content[0].text, /failed: boom/); + }); + + it("rejects an empty message", async () => { + installFakeClient({ + getAgentCard: async () => fakeCard([{ id: "ask", name: "Ask", description: "x", tags: [] }], false), + }); + const { tools } = await setupA2A({ q: { url: "http://example.test" } }, new Set()); + const res = await tools[0].execute("call1", { message: " " }); + assert.match(res.content[0].text, /required/); + }); + + it("streaming call accumulates events and pushes partial updates", async () => { + async function* stream() { + yield { kind: "status-update", status: { state: "working", message: textMessage("partial") }, final: false }; + yield { kind: "artifact-update", artifact: { artifactId: "a1", parts: [{ kind: "text", text: "final answer" }] } }; + yield { kind: "status-update", status: { state: "completed" }, final: true }; + } + installFakeClient({ + getAgentCard: async () => fakeCard([{ id: "ask", name: "Ask", description: "x", tags: [] }], true), + sendMessageStream: () => stream(), + }); + const { tools } = await setupA2A({ q: { url: "http://example.test" } }, new Set()); + const updates: string[] = []; + const res = await tools[0].execute("call1", { message: "go" }, undefined, (p: any) => + updates.push(p.content[0].text), + ); + assert.match(res.content[0].text, /partial/); + assert.match(res.content[0].text, /final answer/); + assert.ok(updates.length >= 1, "expected at least one partial update"); + }); +}); From 0066a4d78846cf385fa0fa5f906a5b01bd7f5b35 Mon Sep 17 00:00:00 2001 From: Nivesh353 Date: Wed, 24 Jun 2026 16:51:15 +0530 Subject: [PATCH 2/3] feat(a2a): support defining A2A agents in code via SDK New `a2aAgents` option on query(), merged on top of agent.yaml's `a2a_agents` (code wins on collision). Lets SDK users configure remote agents without a yaml. --- src/sdk-types.ts | 7 +++++++ src/sdk.ts | 10 ++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/sdk-types.ts b/src/sdk-types.ts index 20dc060..b0cf529 100644 --- a/src/sdk-types.ts +++ b/src/sdk-types.ts @@ -1,5 +1,6 @@ import type { AgentManifest } from "./loader.js"; import type { SessionCosts } from "./cost-tracker.js"; +import type { A2AAgentConfig } from "./a2a/types.js"; // ── Message types ────────────────────────────────────────────────────── @@ -136,6 +137,12 @@ export interface QueryOptions { systemPrompt?: string; systemPromptSuffix?: string; tools?: GCToolDefinition[]; + /** + * Remote A2A agents to connect to, defined in code. Merged on top of the + * agent.yaml `a2a_agents` map (these win on key collision). Each remote skill + * becomes a `__` tool, same as the manifest-declared ones. + */ + a2aAgents?: Record; replaceBuiltinTools?: boolean; allowedTools?: string[]; disallowedTools?: string[]; diff --git a/src/sdk.ts b/src/sdk.ts index befda55..5bf55ec 100644 --- a/src/sdk.ts +++ b/src/sdk.ts @@ -196,8 +196,14 @@ export function query(options: QueryOptions): Query { const declarativeTools = await loadDeclarativeTools(loaded.agentDir); tools = [...tools, ...declarativeTools]; - // A2A remote-agent tools (manifest-declared) — outbound only, opt-in. - const a2aSetup = await setupA2A(loaded.manifest.a2a_agents, new Set(tools.map((t) => t.name))); + // A2A remote-agent tools — outbound only, opt-in. Sourced from the + // manifest's `a2a_agents` and/or the `a2aAgents` option (option wins on + // key collision), so SDK users can define agents purely in code. + const a2aAgents = + loaded.manifest.a2a_agents || options.a2aAgents + ? { ...loaded.manifest.a2a_agents, ...options.a2aAgents } + : undefined; + const a2aSetup = await setupA2A(a2aAgents, new Set(tools.map((t) => t.name))); tools = [...tools, ...a2aSetup.tools]; a2aCleanup = a2aSetup.cleanup; From 1449b501d93616e007d50882261d981cc2b9a4d2 Mon Sep 17 00:00:00 2001 From: Nivesh353 Date: Fri, 26 Jun 2026 10:51:23 +0530 Subject: [PATCH 3/3] fix(a2a): operator precedence, cleanup reset, and consistent SIGINT shutdown --- src/index.ts | 8 ++++++-- src/sdk.ts | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/index.ts b/src/index.ts index 6a58d53..754c660 100644 --- a/src/index.ts +++ b/src/index.ts @@ -46,8 +46,10 @@ const green = (s: string) => `\x1b[32m${s}\x1b[0m`; let a2aCleanup: (() => Promise) | null = null; const runA2ACleanup = async () => { if (!a2aCleanup) return; + const fn = a2aCleanup; + a2aCleanup = null; try { - await a2aCleanup(); + await fn(); } catch { /* best-effort — never block exit */ } @@ -857,7 +859,9 @@ async function main(): Promise { try { _session.end({ "gitagent.cost_usd": _totalCostUsd }); } catch { /* ignore */ } - runA2ACleanup().finally(() => stopSandbox().finally(() => process.exit(0))); + runA2ACleanup() + .finally(() => shutdownTelemetry().catch(() => {})) + .finally(() => stopSandbox().finally(() => process.exit(0))); } }); diff --git a/src/sdk.ts b/src/sdk.ts index 5bf55ec..bb46c84 100644 --- a/src/sdk.ts +++ b/src/sdk.ts @@ -200,7 +200,7 @@ export function query(options: QueryOptions): Query { // manifest's `a2a_agents` and/or the `a2aAgents` option (option wins on // key collision), so SDK users can define agents purely in code. const a2aAgents = - loaded.manifest.a2a_agents || options.a2aAgents + (loaded.manifest.a2a_agents || options.a2aAgents) ? { ...loaded.manifest.a2a_agents, ...options.a2aAgents } : undefined; const a2aSetup = await setupA2A(a2aAgents, new Set(tools.map((t) => t.name)));