responses, Object id) throws InterruptedException {
+ long deadline = System.nanoTime() + TIMEOUT.toNanos();
+ while (System.nanoTime() < deadline) {
+ if (responses.stream().anyMatch(response -> id.equals(response.id()))) {
+ return;
+ }
+ Thread.sleep(10);
+ }
+ }
+
+ private static String sessionId(Object params) {
+ if (params instanceof AcpSchema.PromptRequest promptRequest) {
+ return promptRequest.sessionId();
+ }
+ throw new IllegalArgumentException("Expected PromptRequest params but received " + params);
+ }
+
+ private static void allowAgentTransportSubscription() throws InterruptedException {
+ // AcpAgentSession subscribes to the in-memory transport in its constructor.
+ // subscribe() is asynchronous, so give the unicast sink subscriber a short
+ // window to attach before the test sends client messages.
+ Thread.sleep(AGENT_TRANSPORT_SUBSCRIPTION_DELAY_MILLIS);
+ }
+
+ private static void allowClientTransportSubscription() throws InterruptedException {
+ // clientTransport.connect(...).subscribe() also attaches asynchronously. Without
+ // this small wait, an immediate agent response can race the test subscriber.
+ Thread.sleep(CLIENT_TRANSPORT_SUBSCRIPTION_DELAY_MILLIS);
+ }
+
}
diff --git a/acp-streamable-http-jetty/pom.xml b/acp-streamable-http-jetty/pom.xml
new file mode 100644
index 0000000..c294a19
--- /dev/null
+++ b/acp-streamable-http-jetty/pom.xml
@@ -0,0 +1,68 @@
+
+
+ 4.0.0
+
+
+ com.agentclientprotocol
+ acp-java-sdk
+ 0.13.0-SNAPSHOT
+
+
+ acp-streamable-http-jetty
+ jar
+
+ ACP Streamable HTTP Jetty
+ Streamable HTTP agent transport using Jetty for listener-backed remote agents
+
+
+
+ com.agentclientprotocol
+ acp-core
+
+
+
+ org.eclipse.jetty
+ jetty-server
+
+
+ org.eclipse.jetty.ee10
+ jetty-ee10-servlet
+
+
+ org.eclipse.jetty.http2
+ jetty-http2-server
+
+
+ org.eclipse.jetty.websocket
+ jetty-websocket-jetty-server
+
+
+ org.eclipse.jetty.websocket
+ jetty-websocket-jetty-api
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+ ch.qos.logback
+ logback-classic
+ test
+
+
+ io.projectreactor
+ reactor-test
+ test
+
+
+
+
diff --git a/acp-streamable-http-jetty/src/main/java/com/agentclientprotocol/sdk/agent/transport/StreamableHttpAcpAgentTransport.java b/acp-streamable-http-jetty/src/main/java/com/agentclientprotocol/sdk/agent/transport/StreamableHttpAcpAgentTransport.java
new file mode 100644
index 0000000..88263e8
--- /dev/null
+++ b/acp-streamable-http-jetty/src/main/java/com/agentclientprotocol/sdk/agent/transport/StreamableHttpAcpAgentTransport.java
@@ -0,0 +1,1149 @@
+/*
+ * Copyright 2025-2026 the original author or authors.
+ */
+
+package com.agentclientprotocol.sdk.agent.transport;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.channels.ClosedChannelException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.agentclientprotocol.sdk.agent.AcpAgentFactory;
+import com.agentclientprotocol.sdk.error.AcpConnectionException;
+import com.agentclientprotocol.sdk.json.AcpJsonMapper;
+import com.agentclientprotocol.sdk.json.TypeRef;
+import com.agentclientprotocol.sdk.spec.AcpSchema;
+import com.agentclientprotocol.sdk.spec.AcpSchema.JSONRPCMessage;
+import com.agentclientprotocol.sdk.util.Assert;
+import jakarta.servlet.AsyncContext;
+import jakarta.servlet.ServletException;
+import jakarta.servlet.http.HttpServlet;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
+import org.eclipse.jetty.ee10.servlet.ServletHolder;
+import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.websocket.api.Callback;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.StatusCode;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.server.WebSocketUpgradeHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+
+/**
+ * Listener-backed ACP Streamable HTTP transport for agents.
+ *
+ *
+ * This transport hosts the ACP Streamable HTTP endpoint on Jetty, including POST/SSE
+ * request handling and WebSocket upgrades on the same path. It creates one fresh agent
+ * runtime per remote ACP connection through {@link AcpAgentFactory}. The accepted
+ * connection then owns its own per-connection {@link RemoteAcpConnection}, while the
+ * listener remains responsible only for wire-level concerns such as headers, SSE
+ * streams, WebSocket frames, and request routing.
+ *
+ *
+ *
+ * WebSocket support is intentionally hosted here instead of as a separate public
+ * listener so one {@code /acp} endpoint can behave like the RFD and the Rust
+ * {@code AcpHttpServer}: HTTP requests fall through to the servlet, while valid
+ * WebSocket upgrade requests are accepted by Jetty's {@link WebSocketUpgradeHandler}.
+ *
+ *
+ * @author Kaiser Dandangi
+ */
+public class StreamableHttpAcpAgentTransport {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamableHttpAcpAgentTransport.class);
+
+ public static final String DEFAULT_ACP_PATH = "/acp";
+
+ private static final String HEADER_CONNECTION_ID = "Acp-Connection-Id";
+
+ private static final String HEADER_SESSION_ID = "Acp-Session-Id";
+
+ private static final String CONTENT_TYPE_JSON = "application/json";
+
+ private static final String CONTENT_TYPE_EVENT_STREAM = "text/event-stream";
+
+ private static final int MAX_REPLAY_EVENTS = 1024;
+
+ private static final Duration INITIALIZE_TIMEOUT = Duration.ofSeconds(30);
+
+ /**
+ * Controls whether unknown message methods may fall back to shape-based routing.
+ */
+ public enum RoutingMode {
+
+ /**
+ * Prefer explicit ACP routing and fall back to session-id shape inference for
+ * extension methods. Also permits provisional session streams before
+ * {@code session/load} so the currently ambiguous resume flow can work.
+ */
+ COMPATIBLE,
+
+ /**
+ * Require explicit routing rules and reject unknown session streams.
+ */
+ STRICT
+
+ }
+
+ private enum ScopeKind {
+
+ CONNECTION,
+
+ SESSION
+
+ }
+
+ private enum RequestKind {
+
+ INITIALIZE,
+
+ SESSION_NEW,
+
+ SESSION_LOAD,
+
+ GENERIC
+
+ }
+
+ private enum SessionState {
+
+ PENDING_LOAD,
+
+ KNOWN
+
+ }
+
+ private record RouteScope(ScopeKind kind, String sessionId) {
+
+ static RouteScope connection() {
+ return new RouteScope(ScopeKind.CONNECTION, null);
+ }
+
+ static RouteScope session(String sessionId) {
+ return new RouteScope(ScopeKind.SESSION, sessionId);
+ }
+
+ boolean isSession() {
+ return kind == ScopeKind.SESSION;
+ }
+
+ }
+
+ private record ClientRequestRoute(RequestKind kind, RouteScope requestScope, RouteScope responseScope) {
+ }
+
+ private record ResolvedInboundRoute(JSONRPCMessage message, RouteScope requestScope,
+ ClientRequestRoute requestRoute) {
+ }
+
+ private final int configuredPort;
+
+ private final String path;
+
+ private final AcpJsonMapper jsonMapper;
+
+ private final AcpAgentFactory agentFactory;
+
+ private final ConcurrentMap connections = new ConcurrentHashMap<>();
+
+ private final ConcurrentMap webSocketConnections = new ConcurrentHashMap<>();
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ private final AtomicBoolean closing = new AtomicBoolean(false);
+
+ private final Sinks.One terminationSink = Sinks.one();
+
+ private volatile RoutingMode routingMode = RoutingMode.COMPATIBLE;
+
+ private volatile Server server;
+
+ private volatile ServerConnector connector;
+
+ /**
+ * Creates a new Streamable HTTP listener on the default ACP path.
+ * @param port port to listen on
+ * @param jsonMapper JSON mapper used for serialization
+ * @param agentFactory factory used to create one agent runtime per connection
+ */
+ public StreamableHttpAcpAgentTransport(int port, AcpJsonMapper jsonMapper, AcpAgentFactory agentFactory) {
+ this(port, DEFAULT_ACP_PATH, jsonMapper, agentFactory);
+ }
+
+ /**
+ * Creates a new Streamable HTTP listener.
+ * @param port port to listen on
+ * @param path endpoint path
+ * @param jsonMapper JSON mapper used for serialization
+ * @param agentFactory factory used to create one agent runtime per connection
+ */
+ public StreamableHttpAcpAgentTransport(int port, String path, AcpJsonMapper jsonMapper,
+ AcpAgentFactory agentFactory) {
+ Assert.isTrue(port > 0, "Port must be positive");
+ Assert.hasText(path, "Path must not be empty");
+ Assert.notNull(jsonMapper, "The JsonMapper can not be null");
+ Assert.notNull(agentFactory, "The agentFactory can not be null");
+ this.configuredPort = port;
+ this.path = path;
+ this.jsonMapper = jsonMapper;
+ this.agentFactory = agentFactory;
+ }
+
+ /**
+ * Sets the routing mode used by the listener.
+ * @param routingMode routing mode to use
+ * @return this transport
+ */
+ public StreamableHttpAcpAgentTransport routingMode(RoutingMode routingMode) {
+ Assert.notNull(routingMode, "The routingMode can not be null");
+ this.routingMode = routingMode;
+ return this;
+ }
+
+ /**
+ * Starts the embedded Jetty server.
+ * @return a mono that completes when the listener is ready
+ */
+ public Mono start() {
+ if (!started.compareAndSet(false, true)) {
+ return Mono.error(new IllegalStateException("Already started"));
+ }
+
+ return Mono.fromCallable(() -> {
+ Server jettyServer = new Server();
+ HttpConfiguration httpConfig = new HttpConfiguration();
+ ServerConnector jettyConnector = new ServerConnector(jettyServer,
+ new HttpConnectionFactory(httpConfig), new HTTP2CServerConnectionFactory(httpConfig));
+ jettyConnector.setPort(configuredPort);
+ jettyServer.addConnector(jettyConnector);
+
+ ServletContextHandler context = new ServletContextHandler();
+ context.setContextPath("/");
+ context.addServlet(new ServletHolder(new AcpServlet()), path);
+
+ WebSocketUpgradeHandler webSocketHandler = WebSocketUpgradeHandler.from(jettyServer, context, container -> {
+ container.setIdleTimeout(Duration.ofMinutes(30));
+ container.addMapping(path, (request, response, callback) -> {
+ WebSocketConnectionState connection = createWebSocketConnection();
+ try {
+ connection.start();
+ webSocketConnections.put(connection.id(), connection);
+ response.getHeaders().put(HEADER_CONNECTION_ID, connection.id());
+ return new AcpWebSocketEndpoint(connection);
+ }
+ catch (Exception e) {
+ connection.close();
+ callback.failed(e);
+ return null;
+ }
+ });
+ });
+ context.insertHandler(webSocketHandler);
+ jettyServer.setHandler(context);
+
+ jettyServer.start();
+ this.server = jettyServer;
+ this.connector = jettyConnector;
+ logger.info("Streamable HTTP agent listener started on port {} at path {}", getPort(), path);
+ return null;
+ }).then();
+ }
+
+ /**
+ * Returns the bound port.
+ * @return listener port
+ */
+ public int getPort() {
+ ServerConnector currentConnector = this.connector;
+ return currentConnector != null ? currentConnector.getLocalPort() : configuredPort;
+ }
+
+ /**
+ * Closes all active connections and stops the listener.
+ * @return a mono that completes when shutdown finishes
+ */
+ public Mono closeGracefully() {
+ return Mono.fromRunnable(() -> {
+ if (!closing.compareAndSet(false, true)) {
+ return;
+ }
+ connections.values().forEach(ConnectionState::close);
+ connections.clear();
+ webSocketConnections.values().forEach(WebSocketConnectionState::close);
+ webSocketConnections.clear();
+ Server currentServer = this.server;
+ if (currentServer != null) {
+ try {
+ currentServer.stop();
+ }
+ catch (Exception e) {
+ throw new AcpConnectionException("Failed to stop Streamable HTTP listener", e);
+ }
+ }
+ terminationSink.tryEmitValue(null);
+ });
+ }
+
+ /**
+ * Returns a mono that completes once the listener terminates.
+ * @return termination mono
+ */
+ public Mono awaitTermination() {
+ return terminationSink.asMono();
+ }
+
+ int activeConnectionCount() {
+ return connections.size() + webSocketConnections.size();
+ }
+
+ private ConnectionState createConnection() {
+ String connectionId = UUID.randomUUID().toString();
+ ConnectionState connection = new ConnectionState(connectionId);
+ connection.start();
+ return connection;
+ }
+
+ private WebSocketConnectionState createWebSocketConnection() {
+ String connectionId = UUID.randomUUID().toString();
+ return new WebSocketConnectionState(connectionId);
+ }
+
+ private boolean isInitializeRequest(JSONRPCMessage message) {
+ return message instanceof AcpSchema.JSONRPCRequest request
+ && AcpSchema.METHOD_INITIALIZE.equals(request.method()) && request.id() != null;
+ }
+
+ private final class AcpServlet extends HttpServlet {
+
+ @Override
+ protected void doPost(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+ if (!hasContentType(request, CONTENT_TYPE_JSON)) {
+ writeText(response, HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE,
+ "Content-Type must be application/json");
+ return;
+ }
+
+ String body = new String(request.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
+ if (body.stripLeading().startsWith("[")) {
+ writeText(response, HttpServletResponse.SC_NOT_IMPLEMENTED, "JSON-RPC batches are not supported");
+ return;
+ }
+
+ JSONRPCMessage message;
+ try {
+ message = AcpSchema.deserializeJsonRpcMessage(jsonMapper, body);
+ }
+ catch (Exception e) {
+ writeText(response, HttpServletResponse.SC_BAD_REQUEST, "Invalid JSON-RPC");
+ return;
+ }
+
+ if (isInitialize(message)) {
+ handleInitialize(request, response, (AcpSchema.JSONRPCRequest) message);
+ return;
+ }
+
+ String connectionId = header(request, HEADER_CONNECTION_ID).orElse(null);
+ if (connectionId == null) {
+ writeText(response, HttpServletResponse.SC_BAD_REQUEST, HEADER_CONNECTION_ID + " header required");
+ return;
+ }
+ ConnectionState connection = connections.get(connectionId);
+ if (connection == null) {
+ response.setStatus(HttpServletResponse.SC_NOT_FOUND);
+ return;
+ }
+
+ try {
+ connection.acceptClientPost(message, header(request, HEADER_SESSION_ID).orElse(null));
+ response.setStatus(HttpServletResponse.SC_ACCEPTED);
+ }
+ catch (UnknownSessionException e) {
+ writeText(response, HttpServletResponse.SC_NOT_FOUND, e.getMessage());
+ }
+ catch (AcpConnectionException | IllegalArgumentException e) {
+ writeText(response, HttpServletResponse.SC_BAD_REQUEST, e.getMessage());
+ }
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+ if (!accepts(request, CONTENT_TYPE_EVENT_STREAM)) {
+ writeText(response, HttpServletResponse.SC_NOT_ACCEPTABLE, "client must accept text/event-stream");
+ return;
+ }
+
+ String connectionId = header(request, HEADER_CONNECTION_ID).orElse(null);
+ if (connectionId == null) {
+ writeText(response, HttpServletResponse.SC_BAD_REQUEST, HEADER_CONNECTION_ID + " header required");
+ return;
+ }
+ ConnectionState connection = connections.get(connectionId);
+ if (connection == null) {
+ response.setStatus(HttpServletResponse.SC_NOT_FOUND);
+ return;
+ }
+
+ try {
+ connection.openStream(request, response, header(request, HEADER_SESSION_ID).orElse(null));
+ }
+ catch (UnknownSessionException e) {
+ writeText(response, HttpServletResponse.SC_NOT_FOUND, e.getMessage());
+ }
+ }
+
+ @Override
+ protected void doDelete(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+ String connectionId = header(request, HEADER_CONNECTION_ID).orElse(null);
+ if (connectionId == null) {
+ writeText(response, HttpServletResponse.SC_BAD_REQUEST, HEADER_CONNECTION_ID + " header required");
+ return;
+ }
+ ConnectionState connection = connections.remove(connectionId);
+ if (connection == null) {
+ response.setStatus(HttpServletResponse.SC_NOT_FOUND);
+ return;
+ }
+ connection.close();
+ response.setStatus(HttpServletResponse.SC_ACCEPTED);
+ }
+
+ private void handleInitialize(HttpServletRequest request, HttpServletResponse response,
+ AcpSchema.JSONRPCRequest initializeRequest) throws IOException {
+ if (header(request, HEADER_CONNECTION_ID).isPresent()) {
+ writeText(response, HttpServletResponse.SC_BAD_REQUEST,
+ "initialize must not include " + HEADER_CONNECTION_ID);
+ return;
+ }
+
+ ConnectionState connection = createConnection();
+ try {
+ JSONRPCMessage initializeResponse = connection.initialize(initializeRequest)
+ .block(INITIALIZE_TIMEOUT);
+ if (!(initializeResponse instanceof AcpSchema.JSONRPCResponse)) {
+ throw new AcpConnectionException("initialize did not produce a JSON-RPC response");
+ }
+ connections.put(connection.id(), connection);
+ response.setStatus(HttpServletResponse.SC_OK);
+ response.setContentType(CONTENT_TYPE_JSON);
+ response.setHeader(HEADER_CONNECTION_ID, connection.id());
+ response.getWriter().write(jsonMapper.writeValueAsString(initializeResponse));
+ }
+ catch (Exception e) {
+ connection.close();
+ writeText(response, HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "initialize failed");
+ }
+ }
+
+ }
+
+ private boolean isInitialize(JSONRPCMessage message) {
+ return message instanceof AcpSchema.JSONRPCRequest request
+ && AcpSchema.METHOD_INITIALIZE.equals(request.method());
+ }
+
+ private boolean hasContentType(HttpServletRequest request, String expected) {
+ return Optional.ofNullable(request.getContentType())
+ .map(String::toLowerCase)
+ .filter(contentType -> contentType.contains(expected))
+ .isPresent();
+ }
+
+ private boolean accepts(HttpServletRequest request, String expected) {
+ return Optional.ofNullable(request.getHeader("Accept"))
+ .map(String::toLowerCase)
+ .filter(accept -> accept.contains(expected))
+ .isPresent();
+ }
+
+ private Optional header(HttpServletRequest request, String name) {
+ return Optional.ofNullable(request.getHeader(name)).filter(value -> !value.isBlank());
+ }
+
+ private void writeText(HttpServletResponse response, int status, String body) throws IOException {
+ response.setStatus(status);
+ response.setContentType("text/plain");
+ response.getWriter().write(body);
+ }
+
+ private final class ConnectionState {
+
+ private final String id;
+
+ private final RemoteAcpConnection connection;
+
+ private final OutboundStream connectionStream = new OutboundStream();
+
+ private final ConcurrentMap sessionStreams = new ConcurrentHashMap<>();
+
+ private final ConcurrentMap sessions = new ConcurrentHashMap<>();
+
+ // Client-originated request id -> route expected for the later agent response.
+ private final ConcurrentMap clientRequestRoutes = new ConcurrentHashMap<>();
+
+ // Agent-originated request id -> route required for the later client response.
+ private final ConcurrentMap agentRequestRoutes = new ConcurrentHashMap<>();
+
+ private final Sinks.One initializeResponse = Sinks.one();
+
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
+
+ private volatile Object initializeRequestId;
+
+ ConnectionState(String id) {
+ this.id = id;
+ this.connection = new RemoteAcpConnection(id, jsonMapper, this::routeAgentMessage);
+ }
+
+ String id() {
+ return id;
+ }
+
+ void start() {
+ this.connection.start(agentFactory).block(INITIALIZE_TIMEOUT);
+ }
+
+ Mono initialize(AcpSchema.JSONRPCRequest request) {
+ this.initializeRequestId = request.id();
+ connection.acceptInbound(request);
+ return initializeResponse.asMono().doOnSuccess(ignored -> initialized.set(true));
+ }
+
+ void acceptClientPost(JSONRPCMessage message, String sessionHeader) {
+ if (message instanceof AcpSchema.JSONRPCResponse response) {
+ validateClientResponseScope(response, sessionHeader);
+ connection.acceptInbound(message);
+ return;
+ }
+
+ ResolvedInboundRoute resolved = resolveInboundRoute(message, sessionHeader);
+ if (resolved.requestScope().isSession()) {
+ prepareSessionForInbound(resolved.requestScope().sessionId(), resolved.requestRoute());
+ }
+ if (message instanceof AcpSchema.JSONRPCRequest request && request.id() != null
+ && resolved.requestRoute() != null) {
+ clientRequestRoutes.put(request.id(), resolved.requestRoute());
+ }
+ connection.acceptInbound(message);
+ }
+
+ void openStream(HttpServletRequest request, HttpServletResponse response, String sessionId)
+ throws IOException {
+ RouteScope scope = sessionId == null ? RouteScope.connection() : RouteScope.session(sessionId);
+ OutboundStream stream;
+ if (scope.isSession()) {
+ stream = openSessionStream(scope.sessionId());
+ }
+ else {
+ stream = connectionStream;
+ }
+
+ response.setStatus(HttpServletResponse.SC_OK);
+ response.setContentType(CONTENT_TYPE_EVENT_STREAM);
+ response.setHeader("Cache-Control", "no-cache");
+ response.setHeader(HEADER_CONNECTION_ID, id);
+ if (scope.isSession()) {
+ response.setHeader(HEADER_SESSION_ID, scope.sessionId());
+ }
+ AsyncContext asyncContext = request.startAsync();
+ asyncContext.setTimeout(0);
+ stream.subscribe(asyncContext, response);
+ }
+
+ void close() {
+ connections.remove(id, this);
+ connectionStream.close();
+ sessionStreams.values().forEach(OutboundStream::close);
+ connection.closeGracefully().subscribe(v -> {
+ }, error -> logger.warn("Error closing Streamable HTTP ACP connection {}", id, error));
+ }
+
+ private void routeAgentMessage(JSONRPCMessage message) {
+ try {
+ if (message instanceof AcpSchema.JSONRPCResponse response
+ && Objects.equals(response.id(), initializeRequestId) && !initialized.get()) {
+ initializeResponse.tryEmitValue(message);
+ return;
+ }
+
+ RouteScope scope = resolveAgentOutboundScope(message);
+ String payload = jsonMapper.writeValueAsString(message);
+ if (scope.isSession()) {
+ sessionStream(scope.sessionId()).push(payload);
+ }
+ else {
+ connectionStream.push(payload);
+ }
+ }
+ catch (Exception e) {
+ connection.signalException(e);
+ close();
+ }
+ }
+
+ private RouteScope resolveAgentOutboundScope(JSONRPCMessage message) {
+ if (message instanceof AcpSchema.JSONRPCResponse response) {
+ ClientRequestRoute route = clientRequestRoutes.remove(response.id());
+ if (route == null) {
+ logger.warn("Agent emitted response for unknown client request id {}; routing to connection stream",
+ response.id());
+ return RouteScope.connection();
+ }
+ if (route.kind() == RequestKind.SESSION_NEW && response.error() == null) {
+ String sessionId = extractSessionIdFromNewSessionResponse(response);
+ markSessionKnown(sessionId);
+ }
+ if (route.kind() == RequestKind.SESSION_LOAD && response.error() == null) {
+ markSessionKnown(route.requestScope().sessionId());
+ }
+ return route.responseScope();
+ }
+
+ String method;
+ Object params;
+ Object id = null;
+ if (message instanceof AcpSchema.JSONRPCRequest request) {
+ method = request.method();
+ params = request.params();
+ id = request.id();
+ }
+ else if (message instanceof AcpSchema.JSONRPCNotification notification) {
+ method = notification.method();
+ params = notification.params();
+ }
+ else {
+ throw new AcpConnectionException("Unsupported outbound JSON-RPC message type: " + message);
+ }
+
+ RouteScope scope = resolveAgentRequestOrNotificationScope(method, params);
+ if (id != null) {
+ agentRequestRoutes.put(id, scope);
+ }
+ return scope;
+ }
+
+ private RouteScope resolveAgentRequestOrNotificationScope(String method, Object params) {
+ switch (method) {
+ case AcpSchema.METHOD_SESSION_REQUEST_PERMISSION:
+ case AcpSchema.METHOD_SESSION_UPDATE:
+ case AcpSchema.METHOD_FS_READ_TEXT_FILE:
+ case AcpSchema.METHOD_FS_WRITE_TEXT_FILE:
+ case AcpSchema.METHOD_TERMINAL_CREATE:
+ case AcpSchema.METHOD_TERMINAL_OUTPUT:
+ case AcpSchema.METHOD_TERMINAL_RELEASE:
+ case AcpSchema.METHOD_TERMINAL_WAIT_FOR_EXIT:
+ case AcpSchema.METHOD_TERMINAL_KILL:
+ return RouteScope.session(requireSessionId(params, method));
+ default:
+ Optional sessionId = extractSessionId(params);
+ if (routingMode == RoutingMode.STRICT) {
+ throw new AcpConnectionException("No explicit routing rule for outbound method " + method);
+ }
+ return sessionId.map(RouteScope::session).orElseGet(RouteScope::connection);
+ }
+ }
+
+ private ResolvedInboundRoute resolveInboundRoute(JSONRPCMessage message, String sessionHeader) {
+ String method;
+ Object params;
+ if (message instanceof AcpSchema.JSONRPCRequest request) {
+ method = request.method();
+ params = request.params();
+ }
+ else if (message instanceof AcpSchema.JSONRPCNotification notification) {
+ method = notification.method();
+ params = notification.params();
+ }
+ else {
+ throw new AcpConnectionException("Unsupported inbound JSON-RPC message type: " + message);
+ }
+
+ RouteScope requestScope;
+ RequestKind kind = RequestKind.GENERIC;
+ RouteScope responseScope;
+
+ switch (method) {
+ case AcpSchema.METHOD_AUTHENTICATE:
+ case AcpSchema.METHOD_SESSION_NEW:
+ requestScope = RouteScope.connection();
+ kind = AcpSchema.METHOD_SESSION_NEW.equals(method) ? RequestKind.SESSION_NEW : RequestKind.GENERIC;
+ responseScope = RouteScope.connection();
+ break;
+ case AcpSchema.METHOD_SESSION_LOAD:
+ requestScope = requireSessionScope(method, params, sessionHeader);
+ kind = RequestKind.SESSION_LOAD;
+ responseScope = RouteScope.connection();
+ break;
+ case AcpSchema.METHOD_SESSION_PROMPT:
+ case AcpSchema.METHOD_SESSION_SET_MODE:
+ case AcpSchema.METHOD_SESSION_SET_MODEL:
+ case AcpSchema.METHOD_SESSION_CANCEL:
+ requestScope = requireSessionScope(method, params, sessionHeader);
+ responseScope = requestScope;
+ break;
+ default:
+ Optional sessionId = extractSessionId(params);
+ if (routingMode == RoutingMode.STRICT) {
+ throw new AcpConnectionException("No explicit routing rule for inbound method " + method);
+ }
+ if (sessionId.isPresent()) {
+ requestScope = requireSessionScope(method, params, sessionHeader);
+ }
+ else {
+ requestScope = RouteScope.connection();
+ }
+ responseScope = requestScope;
+ }
+
+ ClientRequestRoute requestRoute = message instanceof AcpSchema.JSONRPCRequest
+ ? new ClientRequestRoute(kind, requestScope, responseScope) : null;
+ return new ResolvedInboundRoute(message, requestScope, requestRoute);
+ }
+
+ private RouteScope requireSessionScope(String method, Object params, String sessionHeader) {
+ String sessionId = requireSessionId(params, method);
+ if (sessionHeader == null) {
+ throw new AcpConnectionException(HEADER_SESSION_ID + " header required for " + method);
+ }
+ if (!sessionId.equals(sessionHeader)) {
+ throw new AcpConnectionException("Header " + HEADER_SESSION_ID + " does not match params.sessionId");
+ }
+ return RouteScope.session(sessionId);
+ }
+
+ private void prepareSessionForInbound(String sessionId, ClientRequestRoute route) {
+ SessionState current = sessions.get(sessionId);
+ if (route != null && route.kind() == RequestKind.SESSION_LOAD) {
+ if (current == null) {
+ if (routingMode == RoutingMode.STRICT) {
+ throw new UnknownSessionException("Unknown session " + sessionId);
+ }
+ sessions.putIfAbsent(sessionId, SessionState.PENDING_LOAD);
+ sessionStream(sessionId);
+ }
+ return;
+ }
+ if (current != SessionState.KNOWN) {
+ throw new UnknownSessionException("Unknown session " + sessionId);
+ }
+ }
+
+ private void validateClientResponseScope(AcpSchema.JSONRPCResponse response, String sessionHeader) {
+ RouteScope expected = agentRequestRoutes.get(response.id());
+ if (expected == null) {
+ logger.warn("Client posted response for unknown agent request id {}", response.id());
+ return;
+ }
+ RouteScope actual = sessionHeader == null ? RouteScope.connection() : RouteScope.session(sessionHeader);
+ if (!Objects.equals(expected, actual)) {
+ throw new AcpConnectionException(
+ "Response id " + response.id() + " arrived on " + actual + " but expected " + expected);
+ }
+ agentRequestRoutes.remove(response.id(), expected);
+ }
+
+ private OutboundStream openSessionStream(String sessionId) {
+ SessionState current = sessions.get(sessionId);
+ if (current == null) {
+ if (routingMode == RoutingMode.STRICT) {
+ throw new UnknownSessionException("Unknown session " + sessionId);
+ }
+ /*
+ * RFD gap:
+ * The current text says unknown session-scoped GET requests return 404,
+ * but its resume flow also asks clients to open a session stream before
+ * sending session/load. Compatible mode keeps a provisional stream so
+ * practical resume can work while strict mode preserves the literal rule.
+ */
+ sessions.putIfAbsent(sessionId, SessionState.PENDING_LOAD);
+ }
+ return sessionStream(sessionId);
+ }
+
+ private OutboundStream sessionStream(String sessionId) {
+ return sessionStreams.computeIfAbsent(sessionId, ignored -> new OutboundStream());
+ }
+
+ private void markSessionKnown(String sessionId) {
+ sessions.put(sessionId, SessionState.KNOWN);
+ sessionStream(sessionId);
+ }
+
+ private String extractSessionIdFromNewSessionResponse(AcpSchema.JSONRPCResponse response) {
+ AcpSchema.NewSessionResponse sessionResponse = jsonMapper.convertValue(response.result(),
+ new TypeRef() {
+ });
+ if (sessionResponse.sessionId() == null || sessionResponse.sessionId().isBlank()) {
+ throw new AcpConnectionException("session/new response missing sessionId");
+ }
+ return sessionResponse.sessionId();
+ }
+
+ }
+
+ private Optional extractSessionId(Object params) {
+ if (params == null) {
+ return Optional.empty();
+ }
+ Map, ?> paramsMap = jsonMapper.convertValue(params, Map.class);
+ Object sessionId = paramsMap.get("sessionId");
+ return sessionId == null ? Optional.empty() : Optional.of(sessionId.toString());
+ }
+
+ private String requireSessionId(Object params, String method) {
+ return extractSessionId(params)
+ .filter(sessionId -> !sessionId.isBlank())
+ .orElseThrow(() -> new AcpConnectionException("Missing sessionId for method " + method));
+ }
+
+ private final class OutboundStream {
+
+ private final ArrayDeque replay = new ArrayDeque<>();
+
+ private final List subscribers = new CopyOnWriteArrayList<>();
+
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private boolean replayOpen = true;
+
+ synchronized void push(String payload) {
+ if (closed.get()) {
+ return;
+ }
+ if (replayOpen) {
+ if (replay.size() == MAX_REPLAY_EVENTS) {
+ throw new AcpConnectionException(
+ "Outbound SSE replay buffer exceeded " + MAX_REPLAY_EVENTS + " events");
+ }
+ replay.addLast(payload);
+ return;
+ }
+ subscribers.forEach(subscriber -> subscriber.send(payload));
+ }
+
+ synchronized void subscribe(AsyncContext asyncContext, HttpServletResponse response) throws IOException {
+ if (closed.get()) {
+ throw new IOException("SSE stream is closed");
+ }
+ SseSubscriber subscriber = new SseSubscriber(this, asyncContext, response);
+ subscribers.add(subscriber);
+ if (replayOpen) {
+ for (String payload : new ArrayList<>(replay)) {
+ subscriber.send(payload);
+ }
+ replay.clear();
+ replayOpen = false;
+ }
+ subscriber.flush();
+ }
+
+ void remove(SseSubscriber subscriber) {
+ subscribers.remove(subscriber);
+ }
+
+ void close() {
+ if (closed.compareAndSet(false, true)) {
+ subscribers.forEach(SseSubscriber::close);
+ subscribers.clear();
+ synchronized (this) {
+ replay.clear();
+ }
+ }
+ }
+
+ }
+
+ private final class SseSubscriber {
+
+ private final OutboundStream parent;
+
+ private final AsyncContext asyncContext;
+
+ private final PrintWriter writer;
+
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ SseSubscriber(OutboundStream parent, AsyncContext asyncContext, HttpServletResponse response) throws IOException {
+ this.parent = parent;
+ this.asyncContext = asyncContext;
+ this.writer = response.getWriter();
+ }
+
+ synchronized void send(String payload) {
+ if (closed.get()) {
+ return;
+ }
+ writer.write("data: ");
+ writer.write(payload);
+ writer.write("\n\n");
+ writer.flush();
+ if (writer.checkError()) {
+ close();
+ }
+ }
+
+ synchronized void flush() {
+ writer.flush();
+ }
+
+ void close() {
+ if (closed.compareAndSet(false, true)) {
+ parent.remove(this);
+ try {
+ asyncContext.complete();
+ }
+ catch (IllegalStateException ignored) {
+ }
+ }
+ }
+
+ }
+
+ private final class WebSocketConnectionState {
+
+ private final String id;
+
+ private final RemoteAcpConnection remoteConnection;
+
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
+
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private final SerializedWebSocketSender outboundSender = new SerializedWebSocketSender();
+
+ private volatile Session session;
+
+ WebSocketConnectionState(String id) {
+ this.id = id;
+ this.remoteConnection = new RemoteAcpConnection(id, jsonMapper, this::sendToClient);
+ }
+
+ String id() {
+ return id;
+ }
+
+ void start() {
+ this.remoteConnection.start(agentFactory).block(INITIALIZE_TIMEOUT);
+ }
+
+ void open(Session session) {
+ this.session = session;
+ }
+
+ void acceptFromClient(JSONRPCMessage message) {
+ if (!initialized.get()) {
+ // The WebSocket branch of the streamable endpoint has no POST
+ // initialize response that can create the connection first, so the first
+ // client-originated JSON-RPC message on the socket must be initialize.
+ if (!isInitializeRequest(message)) {
+ close(StatusCode.PROTOCOL, "first ACP WebSocket message must be initialize");
+ return;
+ }
+ initialized.set(true);
+ }
+ remoteConnection.acceptInbound(message);
+ }
+
+ void sendToClient(JSONRPCMessage message) {
+ try {
+ String payload = jsonMapper.writeValueAsString(message);
+ logger.debug("Sending streamable ACP WebSocket message: {}", payload);
+ outboundSender.send(payload);
+ }
+ catch (Exception e) {
+ remoteConnection.signalException(e);
+ close(StatusCode.SERVER_ERROR, "failed to send ACP message");
+ }
+ }
+
+ void close() {
+ close(StatusCode.NORMAL, "server closing");
+ }
+
+ void close(int statusCode, String reason) {
+ if (!closed.compareAndSet(false, true)) {
+ return;
+ }
+ outboundSender.close();
+ webSocketConnections.remove(id, this);
+ Session currentSession = this.session;
+ if (currentSession != null && currentSession.isOpen()) {
+ currentSession.close(statusCode, reason, Callback.NOOP);
+ }
+ remoteConnection.closeGracefully().subscribe(v -> {
+ }, error -> logger.warn("Error closing Streamable ACP WebSocket connection {}", id, error));
+ }
+
+ private final class SerializedWebSocketSender {
+
+ private final Object lock = new Object();
+
+ private final ArrayDeque queue = new ArrayDeque<>();
+
+ private boolean sendInProgress = false;
+
+ void send(String payload) {
+ boolean shouldDrain;
+ synchronized (lock) {
+ if (closed.get()) {
+ throw new AcpConnectionException("Streamable ACP WebSocket connection is closed");
+ }
+ queue.addLast(payload);
+ shouldDrain = !sendInProgress;
+ if (shouldDrain) {
+ sendInProgress = true;
+ }
+ }
+ if (shouldDrain) {
+ drain();
+ }
+ }
+
+ /*
+ * Jetty WebSocket sessions do not allow overlapping writes. Agent messages can
+ * be produced by concurrent prompt handlers, so this per-connection queue sends
+ * exactly one frame at a time and advances only after Jetty completes the
+ * callback for the previous frame.
+ */
+ private void drain() {
+ String payload;
+ Session currentSession;
+ synchronized (lock) {
+ if (closed.get()) {
+ clear();
+ return;
+ }
+ payload = queue.pollFirst();
+ if (payload == null) {
+ sendInProgress = false;
+ return;
+ }
+ currentSession = session;
+ }
+
+ if (currentSession == null || !currentSession.isOpen()) {
+ fail(new AcpConnectionException("Streamable ACP WebSocket connection is closed"));
+ return;
+ }
+
+ try {
+ currentSession.sendText(payload, Callback.from(this::drain, this::fail));
+ }
+ catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ private void fail(Throwable error) {
+ if (!closed.get()) {
+ remoteConnection.signalException(error);
+ WebSocketConnectionState.this.close(StatusCode.SERVER_ERROR, "failed to send ACP message");
+ }
+ }
+
+ void close() {
+ clear();
+ }
+
+ private void clear() {
+ synchronized (lock) {
+ queue.clear();
+ sendInProgress = false;
+ }
+ }
+
+ }
+
+ }
+
+ /**
+ * Jetty WebSocket endpoint for one WebSocket-upgraded ACP connection.
+ */
+ @WebSocket
+ public class AcpWebSocketEndpoint {
+
+ private final WebSocketConnectionState connection;
+
+ AcpWebSocketEndpoint(WebSocketConnectionState connection) {
+ this.connection = connection;
+ }
+
+ @OnWebSocketOpen
+ public void onOpen(Session session) {
+ logger.info("Streamable ACP WebSocket client connected from {}", session.getRemoteSocketAddress());
+ connection.open(session);
+ }
+
+ @OnWebSocketMessage
+ public void onMessage(Session session, String message) {
+ logger.debug("Received streamable ACP WebSocket message: {}", message);
+
+ try {
+ JSONRPCMessage jsonRpcMessage = AcpSchema.deserializeJsonRpcMessage(jsonMapper, message);
+ connection.acceptFromClient(jsonRpcMessage);
+ }
+ catch (Exception e) {
+ logger.warn("Closing streamable ACP WebSocket connection after invalid JSON-RPC frame", e);
+ connection.close(StatusCode.PROTOCOL, "invalid JSON-RPC frame");
+ }
+ }
+
+ @OnWebSocketClose
+ public void onClose(Session session, int statusCode, String reason) {
+ logger.info("Streamable ACP WebSocket client disconnected: {} - {}", statusCode, reason);
+ connection.close(statusCode, reason);
+ }
+
+ @OnWebSocketError
+ public void onError(Session session, Throwable error) {
+ if (error instanceof ClosedChannelException) {
+ logger.debug("Streamable ACP WebSocket channel closed");
+ connection.close(StatusCode.NORMAL, "WebSocket channel closed");
+ return;
+ }
+ logger.error("Streamable ACP WebSocket error", error);
+ connection.remoteConnection.signalException(error);
+ connection.close(StatusCode.SERVER_ERROR, "WebSocket error");
+ }
+
+ }
+
+ private static final class UnknownSessionException extends RuntimeException {
+
+ UnknownSessionException(String message) {
+ super(message);
+ }
+
+ }
+
+}
diff --git a/acp-streamable-http-jetty/src/test/java/com/agentclientprotocol/sdk/agent/transport/StreamableHttpAcpAgentTransportIntegrationTest.java b/acp-streamable-http-jetty/src/test/java/com/agentclientprotocol/sdk/agent/transport/StreamableHttpAcpAgentTransportIntegrationTest.java
new file mode 100644
index 0000000..879d265
--- /dev/null
+++ b/acp-streamable-http-jetty/src/test/java/com/agentclientprotocol/sdk/agent/transport/StreamableHttpAcpAgentTransportIntegrationTest.java
@@ -0,0 +1,428 @@
+/*
+ * Copyright 2025-2026 the original author or authors.
+ */
+
+package com.agentclientprotocol.sdk.agent.transport;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.agentclientprotocol.sdk.agent.AcpAgent;
+import com.agentclientprotocol.sdk.agent.AcpAgentFactory;
+import com.agentclientprotocol.sdk.client.AcpAsyncClient;
+import com.agentclientprotocol.sdk.client.AcpClient;
+import com.agentclientprotocol.sdk.client.transport.StreamableHttpAcpClientTransport;
+import com.agentclientprotocol.sdk.json.AcpJsonMapper;
+import com.agentclientprotocol.sdk.json.TypeRef;
+import com.agentclientprotocol.sdk.spec.AcpSchema;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * End-to-end tests against the Java Streamable HTTP agent transport.
+ */
+class StreamableHttpAcpAgentTransportIntegrationTest {
+
+ private static final Duration TIMEOUT = Duration.ofSeconds(5);
+
+ private static final AcpJsonMapper JSON_MAPPER = AcpJsonMapper.createDefault();
+
+ @Test
+ void javaClientCanTalkToRunningJavaServer() throws Exception {
+ try (FixtureServer server = FixtureServer.start(StreamableHttpAcpAgentTransport.RoutingMode.COMPATIBLE)) {
+ AcpAsyncClient client = AcpClient
+ .async(new StreamableHttpAcpClientTransport(server.endpoint(), AcpJsonMapper.createDefault()))
+ .requestTimeout(TIMEOUT)
+ .build();
+
+ client.initialize().block(TIMEOUT);
+ AcpSchema.NewSessionResponse session = client
+ .newSession(new AcpSchema.NewSessionRequest("/workspace", List.of(), null))
+ .block(TIMEOUT);
+ AcpSchema.PromptResponse prompt = client
+ .prompt(new AcpSchema.PromptRequest(session.sessionId(),
+ List.of(new AcpSchema.TextContent("hello")), null))
+ .block(TIMEOUT);
+
+ assertThat(session.sessionId()).isEqualTo("sess-1");
+ assertThat(prompt.stopReason()).isEqualTo(AcpSchema.StopReason.END_TURN);
+
+ client.closeGracefully().block(TIMEOUT);
+ }
+ }
+
+ @Test
+ void permissionRequestRoundTripsOverSessionStream() throws Exception {
+ try (FixtureServer server = FixtureServer.start(StreamableHttpAcpAgentTransport.RoutingMode.COMPATIBLE)) {
+ AtomicInteger permissionRequests = new AtomicInteger();
+ AcpAsyncClient client = AcpClient
+ .async(new StreamableHttpAcpClientTransport(server.endpoint(), AcpJsonMapper.createDefault()))
+ .requestPermissionHandler(request -> {
+ permissionRequests.incrementAndGet();
+ return Mono.just(new AcpSchema.RequestPermissionResponse(
+ new AcpSchema.PermissionSelected("allow")));
+ })
+ .requestTimeout(TIMEOUT)
+ .build();
+
+ client.initialize().block(TIMEOUT);
+ AcpSchema.NewSessionResponse session = client
+ .newSession(new AcpSchema.NewSessionRequest("/workspace", List.of(), null))
+ .block(TIMEOUT);
+ AcpSchema.PromptResponse prompt = client
+ .prompt(new AcpSchema.PromptRequest(session.sessionId(),
+ List.of(new AcpSchema.TextContent("permission please")), null))
+ .block(TIMEOUT);
+
+ assertThat(prompt.stopReason()).isEqualTo(AcpSchema.StopReason.END_TURN);
+ assertThat(permissionRequests).hasValue(1);
+
+ client.closeGracefully().block(TIMEOUT);
+ }
+ }
+
+ @Test
+ void compatibleModeAllowsSessionLoadPreopen() throws Exception {
+ try (FixtureServer server = FixtureServer.start(StreamableHttpAcpAgentTransport.RoutingMode.COMPATIBLE)) {
+ AcpAsyncClient client = AcpClient
+ .async(new StreamableHttpAcpClientTransport(server.endpoint(), AcpJsonMapper.createDefault()))
+ .requestTimeout(TIMEOUT)
+ .build();
+
+ client.initialize().block(TIMEOUT);
+ AcpSchema.LoadSessionResponse response = client
+ .loadSession(new AcpSchema.LoadSessionRequest("sess-load", "/workspace", List.of()))
+ .block(TIMEOUT);
+
+ assertThat(response).isNotNull();
+
+ client.closeGracefully().block(TIMEOUT);
+ }
+ }
+
+ @Test
+ void supportsTwoLogicalSessions() throws Exception {
+ try (FixtureServer server = FixtureServer.start(StreamableHttpAcpAgentTransport.RoutingMode.COMPATIBLE)) {
+ AcpAsyncClient client = AcpClient
+ .async(new StreamableHttpAcpClientTransport(server.endpoint(), AcpJsonMapper.createDefault()))
+ .requestTimeout(TIMEOUT)
+ .build();
+
+ client.initialize().block(TIMEOUT);
+ AcpSchema.NewSessionResponse first = client
+ .newSession(new AcpSchema.NewSessionRequest("/workspace/one", List.of(), null))
+ .block(TIMEOUT);
+ AcpSchema.NewSessionResponse second = client
+ .newSession(new AcpSchema.NewSessionRequest("/workspace/two", List.of(), null))
+ .block(TIMEOUT);
+ AcpSchema.PromptResponse firstPrompt = client
+ .prompt(new AcpSchema.PromptRequest(first.sessionId(), List.of(new AcpSchema.TextContent("one")), null))
+ .block(TIMEOUT);
+ AcpSchema.PromptResponse secondPrompt = client
+ .prompt(new AcpSchema.PromptRequest(second.sessionId(), List.of(new AcpSchema.TextContent("two")), null))
+ .block(TIMEOUT);
+
+ assertThat(first.sessionId()).isEqualTo("sess-1");
+ assertThat(second.sessionId()).isEqualTo("sess-2");
+ assertThat(firstPrompt.stopReason()).isEqualTo(AcpSchema.StopReason.END_TURN);
+ assertThat(secondPrompt.stopReason()).isEqualTo(AcpSchema.StopReason.END_TURN);
+
+ client.closeGracefully().block(TIMEOUT);
+ }
+ }
+
+ @Test
+ void wrongStreamClientResponseIsRejected() throws Exception {
+ try (FixtureServer server = FixtureServer.start(StreamableHttpAcpAgentTransport.RoutingMode.COMPATIBLE)) {
+ HttpClient rawClient = HttpClient.newHttpClient();
+ HttpResponse initialize = rawClient.send(HttpRequest.newBuilder(server.endpoint())
+ .header("Content-Type", "application/json")
+ .header("Accept", "application/json")
+ .POST(HttpRequest.BodyPublishers.ofString("""
+ {"jsonrpc":"2.0","id":"init-1","method":"initialize","params":{"protocolVersion":1,"clientCapabilities":{}}}
+ """))
+ .build(), HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
+ String connectionId = initialize.headers().firstValue("Acp-Connection-Id").orElseThrow();
+ try (SseReader connectionStream = SseReader.open(rawClient, server.endpoint(), connectionId, null)) {
+ postJson(rawClient, server.endpoint(), connectionId, null,
+ """
+ {"jsonrpc":"2.0","id":"new-1","method":"session/new","params":{"cwd":"/workspace","mcpServers":[]}}
+ """);
+ AcpSchema.JSONRPCResponse newSessionResponse = connectionStream.nextResponse();
+ AcpSchema.NewSessionResponse session = JSON_MAPPER.convertValue(newSessionResponse.result(),
+ new TypeRef() {
+ });
+
+ try (SseReader sessionStream = SseReader.open(rawClient, server.endpoint(), connectionId,
+ session.sessionId())) {
+ postJson(rawClient, server.endpoint(), connectionId, session.sessionId(),
+ """
+ {"jsonrpc":"2.0","id":"prompt-1","method":"session/prompt","params":{"sessionId":"%s","prompt":[{"type":"text","text":"permission please"}]}}
+ """.formatted(session.sessionId()));
+ AcpSchema.JSONRPCRequest permissionRequest = sessionStream.nextRequest();
+ HttpResponse wrongStreamResponse = postJson(rawClient, server.endpoint(), connectionId, null,
+ """
+ {"jsonrpc":"2.0","id":"%s","result":{"outcome":{"outcome":"selected","optionId":"allow"}}}
+ """.formatted(permissionRequest.id()));
+
+ assertThat(wrongStreamResponse.statusCode()).isEqualTo(400);
+ assertThat(wrongStreamResponse.body()).contains("expected RouteScope");
+ }
+ }
+ }
+ }
+
+ @Test
+ void validationFailuresUseHttpStatusCodes() throws Exception {
+ try (FixtureServer server = FixtureServer.start(StreamableHttpAcpAgentTransport.RoutingMode.COMPATIBLE)) {
+ HttpClient rawClient = HttpClient.newHttpClient();
+ HttpResponse unsupportedContentType = rawClient.send(HttpRequest.newBuilder(server.endpoint())
+ .header("Content-Type", "text/plain")
+ .POST(HttpRequest.BodyPublishers.ofString("{}"))
+ .build(), HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
+ HttpResponse batch = rawClient.send(HttpRequest.newBuilder(server.endpoint())
+ .header("Content-Type", "application/json")
+ .POST(HttpRequest.BodyPublishers.ofString("[]"))
+ .build(), HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
+ HttpResponse wrongAccept = rawClient.send(HttpRequest.newBuilder(server.endpoint())
+ .header("Accept", "application/json")
+ .GET()
+ .build(), HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
+ HttpResponse missingConnection = rawClient.send(HttpRequest.newBuilder(server.endpoint())
+ .header("Accept", "text/event-stream")
+ .GET()
+ .build(), HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
+
+ assertThat(unsupportedContentType.statusCode()).isEqualTo(415);
+ assertThat(batch.statusCode()).isEqualTo(501);
+ assertThat(wrongAccept.statusCode()).isEqualTo(406);
+ assertThat(missingConnection.statusCode()).isEqualTo(400);
+ }
+ }
+
+ @Test
+ void replayOverflowClosesConnectionInsteadOfDroppingMessages() throws Exception {
+ try (FixtureServer server = FixtureServer.start(StreamableHttpAcpAgentTransport.RoutingMode.COMPATIBLE)) {
+ HttpClient rawClient = HttpClient.newHttpClient();
+ HttpResponse initialize = rawClient.send(HttpRequest.newBuilder(server.endpoint())
+ .header("Content-Type", "application/json")
+ .header("Accept", "application/json")
+ .POST(HttpRequest.BodyPublishers.ofString("""
+ {"jsonrpc":"2.0","id":"init-1","method":"initialize","params":{"protocolVersion":1,"clientCapabilities":{}}}
+ """))
+ .build(), HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
+ String connectionId = initialize.headers().firstValue("Acp-Connection-Id").orElseThrow();
+
+ for (int i = 0; i < 1025; i++) {
+ HttpResponse response = postJson(rawClient, server.endpoint(), connectionId, null,
+ """
+ {"jsonrpc":"2.0","id":"new-%d","method":"session/new","params":{"cwd":"/workspace","mcpServers":[]}}
+ """.formatted(i));
+ if (response.statusCode() == 404) {
+ break;
+ }
+ assertThat(response.statusCode()).isEqualTo(202);
+ }
+
+ assertEventuallyPostStatus(rawClient, server.endpoint(), connectionId, 404);
+ }
+ }
+
+ @Test
+ void strictModeRejectsUnknownSessionStream() throws Exception {
+ try (FixtureServer server = FixtureServer.start(StreamableHttpAcpAgentTransport.RoutingMode.STRICT)) {
+ HttpResponse response = HttpClient.newHttpClient()
+ .send(HttpRequest.newBuilder(server.endpoint())
+ .header("Content-Type", "application/json")
+ .header("Accept", "application/json")
+ .POST(HttpRequest.BodyPublishers.ofString("""
+ {"jsonrpc":"2.0","id":"init-1","method":"initialize","params":{"protocolVersion":1,"clientCapabilities":{}}}
+ """))
+ .build(), HttpResponse.BodyHandlers.discarding());
+ String connectionId = response.headers().firstValue("Acp-Connection-Id").orElseThrow();
+ HttpResponse unknownSession = HttpClient.newHttpClient()
+ .send(HttpRequest.newBuilder(server.endpoint())
+ .header("Accept", "text/event-stream")
+ .header("Acp-Connection-Id", connectionId)
+ .header("Acp-Session-Id", "unknown")
+ .GET()
+ .build(), HttpResponse.BodyHandlers.discarding());
+
+ assertThat(response.statusCode()).isEqualTo(200);
+ assertThat(unknownSession.statusCode()).isEqualTo(404);
+ }
+ }
+
+ private static HttpResponse postJson(HttpClient client, URI endpoint, String connectionId, String sessionId,
+ String body) throws Exception {
+ HttpRequest.Builder builder = HttpRequest.newBuilder(endpoint)
+ .header("Content-Type", "application/json")
+ .header("Accept", "application/json")
+ .header("Acp-Connection-Id", connectionId)
+ .POST(HttpRequest.BodyPublishers.ofString(body));
+ if (sessionId != null) {
+ builder.header("Acp-Session-Id", sessionId);
+ }
+ return client.send(builder.build(), HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
+ }
+
+ private static void assertEventuallyPostStatus(HttpClient client, URI endpoint, String connectionId, int expectedStatus)
+ throws Exception {
+ long deadline = System.nanoTime() + TIMEOUT.toNanos();
+ int id = 0;
+ while (System.nanoTime() < deadline) {
+ HttpResponse response = postJson(client, endpoint, connectionId, null,
+ """
+ {"jsonrpc":"2.0","id":"overflow-probe-%d","method":"session/new","params":{"cwd":"/workspace","mcpServers":[]}}
+ """.formatted(id++));
+ if (response.statusCode() == expectedStatus) {
+ return;
+ }
+ Thread.sleep(25);
+ }
+ throw new AssertionError("Timed out waiting for stream status " + expectedStatus);
+ }
+
+ private static final class FixtureServer implements AutoCloseable {
+
+ private final StreamableHttpAcpAgentTransport transport;
+
+ private FixtureServer(StreamableHttpAcpAgentTransport transport) {
+ this.transport = transport;
+ }
+
+ static FixtureServer start(StreamableHttpAcpAgentTransport.RoutingMode routingMode) throws Exception {
+ AtomicInteger sessionCounter = new AtomicInteger();
+ AcpAgentFactory agentFactory = AcpAgentFactory.async(transport -> AcpAgent.async(transport)
+ .initializeHandler(request -> Mono.just(new AcpSchema.InitializeResponse(
+ AcpSchema.LATEST_PROTOCOL_VERSION, new AcpSchema.AgentCapabilities(true, null, null), null)))
+ .newSessionHandler(request -> Mono.just(new AcpSchema.NewSessionResponse(
+ "sess-" + sessionCounter.incrementAndGet(), null, null)))
+ .loadSessionHandler(request -> Mono.just(new AcpSchema.LoadSessionResponse(null, null)))
+ .promptHandler((request, context) -> {
+ Mono work = request.text().contains("permission")
+ ? context.askPermission("fixture permission").then()
+ : Mono.empty();
+ return work.then(context.sendMessage("hello"))
+ .thenReturn(AcpSchema.PromptResponse.endTurn());
+ })
+ .build());
+ StreamableHttpAcpAgentTransport transport = new StreamableHttpAcpAgentTransport(
+ freePort(), AcpJsonMapper.createDefault(), agentFactory).routingMode(routingMode);
+ transport.start().block(TIMEOUT);
+ return new FixtureServer(transport);
+ }
+
+ URI endpoint() {
+ return URI.create("http://127.0.0.1:" + transport.getPort() + "/acp");
+ }
+
+ @Override
+ public void close() {
+ transport.closeGracefully().block(TIMEOUT);
+ }
+
+ private static int freePort() throws IOException {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ return socket.getLocalPort();
+ }
+ }
+
+ }
+
+ private static final class SseReader implements AutoCloseable {
+
+ private final BlockingQueue messages = new LinkedBlockingQueue<>();
+
+ private final InputStream body;
+
+ private final ExecutorService executor;
+
+ private SseReader(InputStream body) {
+ this.body = body;
+ this.executor = Executors.newSingleThreadExecutor(r -> {
+ Thread thread = new Thread(r, "streamable-http-test-sse-reader");
+ thread.setDaemon(true);
+ return thread;
+ });
+ this.executor.submit(this::readLoop);
+ }
+
+ static SseReader open(HttpClient client, URI endpoint, String connectionId, String sessionId) throws Exception {
+ HttpRequest.Builder builder = HttpRequest.newBuilder(endpoint)
+ .header("Accept", "text/event-stream")
+ .header("Acp-Connection-Id", connectionId)
+ .GET();
+ if (sessionId != null) {
+ builder.header("Acp-Session-Id", sessionId);
+ }
+ HttpResponse response = client.send(builder.build(), HttpResponse.BodyHandlers.ofInputStream());
+ assertThat(response.statusCode()).isEqualTo(200);
+ return new SseReader(response.body());
+ }
+
+ AcpSchema.JSONRPCResponse nextResponse() throws Exception {
+ return (AcpSchema.JSONRPCResponse) nextMessage();
+ }
+
+ AcpSchema.JSONRPCRequest nextRequest() throws Exception {
+ return (AcpSchema.JSONRPCRequest) nextMessage();
+ }
+
+ private AcpSchema.JSONRPCMessage nextMessage() throws Exception {
+ AcpSchema.JSONRPCMessage message = messages.poll(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+ assertThat(message).isNotNull();
+ return message;
+ }
+
+ private void readLoop() {
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(body, StandardCharsets.UTF_8))) {
+ StringBuilder data = new StringBuilder();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if (line.isEmpty()) {
+ dispatch(data);
+ data.setLength(0);
+ }
+ else if (line.startsWith("data:")) {
+ data.append(line.substring(5).stripLeading());
+ }
+ }
+ }
+ catch (Exception ignored) {
+ }
+ }
+
+ private void dispatch(StringBuilder data) throws IOException {
+ if (!data.isEmpty()) {
+ messages.add(AcpSchema.deserializeJsonRpcMessage(JSON_MAPPER, data.toString()));
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ body.close();
+ executor.shutdownNow();
+ }
+
+ }
+
+}
diff --git a/acp-streamable-http-jetty/src/test/java/com/agentclientprotocol/sdk/agent/transport/StreamableHttpAcpAgentTransportWebSocketIntegrationTest.java b/acp-streamable-http-jetty/src/test/java/com/agentclientprotocol/sdk/agent/transport/StreamableHttpAcpAgentTransportWebSocketIntegrationTest.java
new file mode 100644
index 0000000..fc5ac86
--- /dev/null
+++ b/acp-streamable-http-jetty/src/test/java/com/agentclientprotocol/sdk/agent/transport/StreamableHttpAcpAgentTransportWebSocketIntegrationTest.java
@@ -0,0 +1,398 @@
+/*
+ * Copyright 2025-2026 the original author or authors.
+ */
+
+package com.agentclientprotocol.sdk.agent.transport;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.WebSocket;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Locale;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.agentclientprotocol.sdk.agent.AcpAgent;
+import com.agentclientprotocol.sdk.agent.AcpAgentFactory;
+import com.agentclientprotocol.sdk.client.AcpAsyncClient;
+import com.agentclientprotocol.sdk.client.AcpClient;
+import com.agentclientprotocol.sdk.client.transport.WebSocketAcpClientTransport;
+import com.agentclientprotocol.sdk.json.AcpJsonMapper;
+import com.agentclientprotocol.sdk.spec.AcpSchema;
+import org.eclipse.jetty.websocket.api.StatusCode;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * End-to-end tests for the WebSocket upgrade path on the Streamable HTTP transport.
+ */
+class StreamableHttpAcpAgentTransportWebSocketIntegrationTest {
+
+ private static final Duration TIMEOUT = Duration.ofSeconds(5);
+
+ @Test
+ void constructorValidatesRequiredArguments() {
+ AcpJsonMapper jsonMapper = AcpJsonMapper.createDefault();
+ AcpAgentFactory agentFactory = simpleAgentFactory();
+
+ assertThatThrownBy(() -> new StreamableHttpAcpAgentTransport(0, jsonMapper, agentFactory))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Port");
+ assertThatThrownBy(() -> new StreamableHttpAcpAgentTransport(8080, "", jsonMapper, agentFactory))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Path");
+ assertThatThrownBy(() -> new StreamableHttpAcpAgentTransport(8080, null, agentFactory))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("JsonMapper");
+ assertThatThrownBy(() -> new StreamableHttpAcpAgentTransport(8080, jsonMapper, null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("agentFactory");
+ }
+
+ @Test
+ void handshakeReturnsConnectionIdHeader() throws Exception {
+ try (FixtureServer server = FixtureServer.start(simpleAgentFactory());
+ Socket socket = new Socket("127.0.0.1", server.port())) {
+ socket.setSoTimeout((int) TIMEOUT.toMillis());
+
+ String key = Base64.getEncoder()
+ .encodeToString(UUID.randomUUID().toString().substring(0, 16).getBytes(StandardCharsets.UTF_8));
+ PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
+ writer.print("GET /acp HTTP/1.1\r\n");
+ writer.print("Host: 127.0.0.1:" + server.port() + "\r\n");
+ writer.print("Upgrade: websocket\r\n");
+ writer.print("Connection: Upgrade\r\n");
+ writer.print("Sec-WebSocket-Key: " + key + "\r\n");
+ writer.print("Sec-WebSocket-Version: 13\r\n");
+ writer.print("\r\n");
+ writer.flush();
+
+ List responseLines = readHttpHeaders(socket);
+
+ assertThat(responseLines.get(0)).contains("101");
+ assertThat(responseLines.stream()
+ .map(line -> line.toLowerCase(Locale.ROOT))
+ .anyMatch(line -> line.startsWith("acp-connection-id:"))).isTrue();
+ }
+ }
+
+ @Test
+ void javaClientCanTalkToStreamableWebSocketUpgrade() throws Exception {
+ AtomicReference receivedUpdate = new AtomicReference<>();
+
+ try (FixtureServer server = FixtureServer.start(simpleAgentFactory())) {
+ AcpAsyncClient client = AcpClient
+ .async(new WebSocketAcpClientTransport(server.endpoint(), AcpJsonMapper.createDefault()))
+ .sessionUpdateConsumer(update -> {
+ receivedUpdate.set(update);
+ return Mono.empty();
+ })
+ .requestTimeout(TIMEOUT)
+ .build();
+ try {
+ client.initialize(new AcpSchema.InitializeRequest(
+ AcpSchema.LATEST_PROTOCOL_VERSION, new AcpSchema.ClientCapabilities()))
+ .block(TIMEOUT);
+ AcpSchema.NewSessionResponse session = client
+ .newSession(new AcpSchema.NewSessionRequest("/workspace", List.of()))
+ .block(TIMEOUT);
+ AcpSchema.PromptResponse prompt = client
+ .prompt(new AcpSchema.PromptRequest(session.sessionId(),
+ List.of(new AcpSchema.TextContent("hello over ws"))))
+ .block(TIMEOUT);
+
+ assertThat(session.sessionId()).startsWith("sess-");
+ assertThat(prompt.stopReason()).isEqualTo(AcpSchema.StopReason.END_TURN);
+ assertThat(receivedUpdate.get()).isNotNull();
+ }
+ finally {
+ client.closeGracefully().block(TIMEOUT);
+ }
+ }
+ }
+
+ @Test
+ void permissionRequestRoundTripsOverStreamableWebSocketUpgrade() throws Exception {
+ AtomicInteger permissionRequests = new AtomicInteger();
+ AcpAgentFactory agentFactory = AcpAgentFactory.async(transport -> AcpAgent.async(transport)
+ .initializeHandler(request -> Mono.just(new AcpSchema.InitializeResponse(
+ AcpSchema.LATEST_PROTOCOL_VERSION, new AcpSchema.AgentCapabilities(true, null, null), List.of())))
+ .newSessionHandler(request -> Mono.just(new AcpSchema.NewSessionResponse("permission-session", null, null)))
+ .promptHandler((request, context) -> context.askPermission("streamable websocket edit")
+ .map(allowed -> {
+ assertThat(allowed).isTrue();
+ return AcpSchema.PromptResponse.endTurn();
+ }))
+ .build());
+
+ try (FixtureServer server = FixtureServer.start(agentFactory)) {
+ AcpAsyncClient client = AcpClient
+ .async(new WebSocketAcpClientTransport(server.endpoint(), AcpJsonMapper.createDefault()))
+ .requestPermissionHandler(request -> {
+ permissionRequests.incrementAndGet();
+ return Mono.just(new AcpSchema.RequestPermissionResponse(
+ new AcpSchema.PermissionSelected("allow")));
+ })
+ .requestTimeout(TIMEOUT)
+ .build();
+ try {
+ client.initialize(new AcpSchema.InitializeRequest(
+ AcpSchema.LATEST_PROTOCOL_VERSION, new AcpSchema.ClientCapabilities()))
+ .block(TIMEOUT);
+ client.newSession(new AcpSchema.NewSessionRequest("/workspace", List.of())).block(TIMEOUT);
+ AcpSchema.PromptResponse prompt = client
+ .prompt(new AcpSchema.PromptRequest("permission-session",
+ List.of(new AcpSchema.TextContent("please ask permission"))))
+ .block(TIMEOUT);
+
+ assertThat(prompt.stopReason()).isEqualTo(AcpSchema.StopReason.END_TURN);
+ assertThat(permissionRequests.get()).isEqualTo(1);
+ }
+ finally {
+ client.closeGracefully().block(TIMEOUT);
+ }
+ }
+ }
+
+ @Test
+ void supportsMultipleConcurrentWebSocketClients() throws Exception {
+ try (FixtureServer server = FixtureServer.start(simpleAgentFactory())) {
+ AcpAsyncClient firstClient = AcpClient
+ .async(new WebSocketAcpClientTransport(server.endpoint(), AcpJsonMapper.createDefault()))
+ .requestTimeout(TIMEOUT)
+ .build();
+ AcpAsyncClient secondClient = AcpClient
+ .async(new WebSocketAcpClientTransport(server.endpoint(), AcpJsonMapper.createDefault()))
+ .requestTimeout(TIMEOUT)
+ .build();
+ try {
+ firstClient.initialize(new AcpSchema.InitializeRequest(
+ AcpSchema.LATEST_PROTOCOL_VERSION, new AcpSchema.ClientCapabilities()))
+ .block(TIMEOUT);
+ secondClient.initialize(new AcpSchema.InitializeRequest(
+ AcpSchema.LATEST_PROTOCOL_VERSION, new AcpSchema.ClientCapabilities()))
+ .block(TIMEOUT);
+
+ AcpSchema.NewSessionResponse firstSession = firstClient
+ .newSession(new AcpSchema.NewSessionRequest("/workspace/one", List.of()))
+ .block(TIMEOUT);
+ AcpSchema.NewSessionResponse secondSession = secondClient
+ .newSession(new AcpSchema.NewSessionRequest("/workspace/two", List.of()))
+ .block(TIMEOUT);
+
+ assertThat(firstSession.sessionId()).isNotEqualTo(secondSession.sessionId());
+ assertThat(server.transport().activeConnectionCount()).isEqualTo(2);
+ }
+ finally {
+ firstClient.closeGracefully().block(TIMEOUT);
+ secondClient.closeGracefully().block(TIMEOUT);
+ }
+ }
+ }
+
+ @Test
+ void serializesConcurrentAgentMessagesOnOneWebSocketConnection() throws Exception {
+ AtomicInteger sessionCounter = new AtomicInteger();
+ AtomicInteger receivedUpdates = new AtomicInteger();
+ AcpAgentFactory agentFactory = AcpAgentFactory.async(transport -> AcpAgent.async(transport)
+ .initializeHandler(request -> Mono.just(new AcpSchema.InitializeResponse(
+ AcpSchema.LATEST_PROTOCOL_VERSION, new AcpSchema.AgentCapabilities(true, null, null), List.of())))
+ .newSessionHandler(request -> Mono.just(new AcpSchema.NewSessionResponse(
+ "sess-" + sessionCounter.incrementAndGet(), null, null)))
+ .promptHandler((request, context) -> Mono.delay(Duration.ofMillis(25))
+ .thenMany(Flux.range(0, 20)
+ .flatMap(i -> context.sendMessage(request.sessionId() + ": update-" + i)
+ .subscribeOn(Schedulers.parallel()), 8))
+ .then(Mono.just(AcpSchema.PromptResponse.endTurn())))
+ .build());
+
+ try (FixtureServer server = FixtureServer.start(agentFactory)) {
+ AcpAsyncClient client = AcpClient
+ .async(new WebSocketAcpClientTransport(server.endpoint(), AcpJsonMapper.createDefault()))
+ .sessionUpdateConsumer(update -> {
+ receivedUpdates.incrementAndGet();
+ return Mono.empty();
+ })
+ .requestTimeout(TIMEOUT)
+ .build();
+ try {
+ client.initialize(new AcpSchema.InitializeRequest(
+ AcpSchema.LATEST_PROTOCOL_VERSION, new AcpSchema.ClientCapabilities()))
+ .block(TIMEOUT);
+ AcpSchema.NewSessionResponse firstSession = client
+ .newSession(new AcpSchema.NewSessionRequest("/workspace/one", List.of()))
+ .block(TIMEOUT);
+ AcpSchema.NewSessionResponse secondSession = client
+ .newSession(new AcpSchema.NewSessionRequest("/workspace/two", List.of()))
+ .block(TIMEOUT);
+
+ var prompts = Mono.zip(
+ client.prompt(new AcpSchema.PromptRequest(firstSession.sessionId(),
+ List.of(new AcpSchema.TextContent("first")))),
+ client.prompt(new AcpSchema.PromptRequest(secondSession.sessionId(),
+ List.of(new AcpSchema.TextContent("second")))))
+ .block(TIMEOUT);
+
+ assertThat(prompts.getT1().stopReason()).isEqualTo(AcpSchema.StopReason.END_TURN);
+ assertThat(prompts.getT2().stopReason()).isEqualTo(AcpSchema.StopReason.END_TURN);
+ assertThat(receivedUpdates).hasValue(40);
+ }
+ finally {
+ client.closeGracefully().block(TIMEOUT);
+ }
+ }
+ }
+
+ @Test
+ void rejectsNonInitializeFirstMessage() throws Exception {
+ try (FixtureServer server = FixtureServer.start(simpleAgentFactory())) {
+ CloseRecordingListener listener = new CloseRecordingListener();
+ WebSocket webSocket = HttpClient.newHttpClient()
+ .newWebSocketBuilder()
+ .connectTimeout(TIMEOUT)
+ .buildAsync(server.endpoint(), listener)
+ .get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+
+ assertThat(listener.openLatch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)).isTrue();
+ webSocket.sendText("""
+ {"jsonrpc":"2.0","id":"new-1","method":"session/new","params":{"cwd":"/workspace","mcpServers":[]}}
+ """, true).get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+
+ assertThat(listener.closeLatch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)).isTrue();
+ assertThat(listener.closeCode.get()).isEqualTo(StatusCode.PROTOCOL);
+ assertEventuallyNoConnections(server.transport());
+ }
+ }
+
+ private static AcpAgentFactory simpleAgentFactory() {
+ AtomicInteger sessionCounter = new AtomicInteger();
+ return AcpAgentFactory.async(transport -> AcpAgent.async(transport)
+ .initializeHandler(request -> Mono.just(new AcpSchema.InitializeResponse(
+ AcpSchema.LATEST_PROTOCOL_VERSION, new AcpSchema.AgentCapabilities(true, null, null), List.of())))
+ .newSessionHandler(request -> Mono.just(new AcpSchema.NewSessionResponse(
+ "sess-" + sessionCounter.incrementAndGet(), null, null)))
+ .promptHandler((request, context) -> context.sendMessage("hello from streamable websocket")
+ .thenReturn(AcpSchema.PromptResponse.endTurn()))
+ .build());
+ }
+
+ private static List readHttpHeaders(Socket socket) throws IOException {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
+ List lines = new ArrayList<>();
+ String line;
+ while ((line = reader.readLine()) != null && !line.isEmpty()) {
+ lines.add(line);
+ }
+ return lines;
+ }
+
+ private static void assertEventuallyNoConnections(StreamableHttpAcpAgentTransport transport) throws InterruptedException {
+ long deadline = System.nanoTime() + TIMEOUT.toNanos();
+ while (System.nanoTime() < deadline) {
+ if (transport.activeConnectionCount() == 0) {
+ return;
+ }
+ Thread.sleep(25);
+ }
+ assertThat(transport.activeConnectionCount()).isEqualTo(0);
+ }
+
+ private static int freePort() {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ return socket.getLocalPort();
+ }
+ catch (IOException e) {
+ throw new IllegalStateException("Unable to allocate a free port", e);
+ }
+ }
+
+ private static final class FixtureServer implements AutoCloseable {
+
+ private final StreamableHttpAcpAgentTransport transport;
+
+ private FixtureServer(StreamableHttpAcpAgentTransport transport) {
+ this.transport = transport;
+ }
+
+ static FixtureServer start(AcpAgentFactory agentFactory) {
+ StreamableHttpAcpAgentTransport transport = new StreamableHttpAcpAgentTransport(
+ freePort(), AcpJsonMapper.createDefault(), agentFactory);
+ transport.start().block(TIMEOUT);
+ return new FixtureServer(transport);
+ }
+
+ int port() {
+ return transport.getPort();
+ }
+
+ URI endpoint() {
+ return URI.create("ws://127.0.0.1:" + transport.getPort() + "/acp");
+ }
+
+ StreamableHttpAcpAgentTransport transport() {
+ return transport;
+ }
+
+ @Override
+ public void close() {
+ transport.closeGracefully().block(TIMEOUT);
+ }
+
+ }
+
+ private static final class CloseRecordingListener implements WebSocket.Listener {
+
+ private final CountDownLatch openLatch = new CountDownLatch(1);
+
+ private final CountDownLatch closeLatch = new CountDownLatch(1);
+
+ private final AtomicInteger closeCode = new AtomicInteger();
+
+ @Override
+ public void onOpen(WebSocket webSocket) {
+ openLatch.countDown();
+ webSocket.request(1);
+ }
+
+ @Override
+ public CompletionStage> onText(WebSocket webSocket, CharSequence data, boolean last) {
+ webSocket.request(1);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletionStage> onClose(WebSocket webSocket, int statusCode, String reason) {
+ closeCode.set(statusCode);
+ closeLatch.countDown();
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void onError(WebSocket webSocket, Throwable error) {
+ closeLatch.countDown();
+ }
+
+ }
+
+}
diff --git a/acp-streamable-http-jetty/src/test/resources/logback-test.xml b/acp-streamable-http-jetty/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..5243e19
--- /dev/null
+++ b/acp-streamable-http-jetty/src/test/resources/logback-test.xml
@@ -0,0 +1,11 @@
+
+
+
+
+
+
+
+ %d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n
+
+
+
diff --git a/plans/DOCS-ROADMAP.md b/plans/DOCS-ROADMAP.md
deleted file mode 100644
index dc26c67..0000000
--- a/plans/DOCS-ROADMAP.md
+++ /dev/null
@@ -1,329 +0,0 @@
-# Roadmap: ACP Java SDK 0.9.0 Documentation
-
-> **Created**: 2026-02-10
-> **Design version**: 0.9.0
-
-## Overview
-
-Documentation ships in three stages, ordered by launch criticality. Stage 1 creates the Mintlify documentation site (blocks the 0.9.0 blog post). Stage 2 updates tutorial READMEs and SDK metadata for the release. Stage 3 completes remaining pages post-launch. All documentation follows the code-first workflow: verify tutorial code compiles, then write docs based on working code.
-
-## Stage 1: Mintlify Site (Launch-Critical)
-
-### Step 1.1: Navigation and Scaffolding
-
-**Entry criteria**:
-- [ ] Read: Claude Agent SDK Mintlify structure (`~/community/mintlify-docs/claude-agent-sdk/`)
-- [ ] Read: `~/community/mintlify-docs/mint.json`
-
-**Work items**:
-- [ ] UPDATE `~/community/mintlify-docs/mint.json` with ACP Java SDK section under Incubating Projects
-- [ ] CREATE directory: `~/community/mintlify-docs/acp-java-sdk/`
-- [ ] CREATE directory: `~/community/mintlify-docs/acp-java-sdk/reference/`
-- [ ] CREATE directory: `~/community/mintlify-docs/acp-java-sdk/tutorial/`
-
-**Exit criteria**:
-- [ ] mint.json validates and includes ACP Java SDK navigation
-- [ ] Directory structure created
-- [ ] Update `ROADMAP.md` checkboxes
-
-**Deliverables**: Site navigation and directory scaffolding
-
----
-
-### Step 1.2: Index Page
-
-**Entry criteria**:
-- [ ] Step 1.1 complete
-- [ ] Read: `~/community/mintlify-docs/claude-agent-sdk/index.md` (template)
-- [ ] Read: `~/acp/acp-java/README.md` (source material)
-
-**Work items**:
-- [ ] CREATE `~/community/mintlify-docs/acp-java-sdk/index.md` (~120 lines)
-- [ ] Content: Overview, three-API-styles table, quick start (client + annotation-based agent), CardGroup to Reference + Tutorial, resource links
-
-**Exit criteria**:
-- [ ] Index page renders in dev preview
-- [ ] Code examples match SDK README
-- [ ] Update `ROADMAP.md` checkboxes
-
-**Deliverables**: `acp-java-sdk/index.md`
-
----
-
-### Step 1.3: Tutorial Index Page
-
-**Entry criteria**:
-- [ ] Step 1.1 complete
-- [ ] Read: `~/community/mintlify-docs/claude-agent-sdk/tutorial/index.md` (template)
-
-**Work items**:
-- [ ] CREATE `~/community/mintlify-docs/acp-java-sdk/tutorial/index.md` (~60 lines)
-- [ ] Content: Overview, prerequisites, 3-part structure table, getting the code
-
-**Exit criteria**:
-- [ ] Tutorial index renders and links resolve
-- [ ] Update `ROADMAP.md` checkboxes
-
-**Deliverables**: `acp-java-sdk/tutorial/index.md`
-
----
-
-### Step 1.4: Priority Tutorial Pages (10 Pages)
-
-**Entry criteria**:
-- [ ] Step 1.3 complete
-- [ ] Read: `~/community/mintlify-docs/claude-agent-sdk/tutorial/01-hello-world.md` (template)
-- [ ] VERIFY: `cd ~/projects/acp-java-tutorial && ./mvnw compile -pl module-01-first-contact,module-05-streaming-updates,module-12-echo-agent,module-13-agent-handlers,module-14-sending-updates,module-15-agent-requests,module-16-in-memory-testing,module-28-zed-integration,module-29-jetbrains-integration,module-30-vscode-integration -q`
-
-**Work items**:
-- [ ] CREATE `tutorial/01-first-contact.md` — ACP client basics (module-01)
-- [ ] CREATE `tutorial/05-streaming-updates.md` — Receiving real-time updates (module-05)
-- [ ] CREATE `tutorial/12-echo-agent.md` — Building your first agent (module-12)
-- [ ] CREATE `tutorial/13-agent-handlers.md` — All handler types (module-13)
-- [ ] CREATE `tutorial/14-sending-updates.md` — Agent-side streaming (module-14)
-- [ ] CREATE `tutorial/15-agent-requests.md` — File and permission requests (module-15)
-- [ ] CREATE `tutorial/16-in-memory-testing.md` — Testing without subprocesses (module-16)
-- [ ] CREATE `tutorial/28-zed-integration.md` — Running agents in Zed (module-28)
-- [ ] CREATE `tutorial/29-jetbrains-integration.md` — Running agents in JetBrains (module-29)
-- [ ] CREATE `tutorial/30-vscode-integration.md` — Running agents in VS Code (module-30)
-
-Each page follows template structure:
-- What You'll Learn
-- The Code (with explanation)
-- Source Code GitHub link
-- Run Command
-- Next Module
-
-**Exit criteria**:
-- [ ] All 10 pages render in dev preview
-- [ ] Code examples match actual tutorial source
-- [ ] All cross-links resolve
-- [ ] Update `ROADMAP.md` checkboxes
-
-**Deliverables**: 10 tutorial pages in `acp-java-sdk/tutorial/`
-
----
-
-### Step 1.5: API Reference Page
-
-**Entry criteria**:
-- [ ] Step 1.1 complete
-- [ ] Read: `~/community/mintlify-docs/claude-agent-sdk/reference/java.md` (template)
-- [ ] Read: `~/acp/acp-java/README.md` (source material)
-- [ ] Read: `~/acp/acp-java/acp-agent-support/README.md` (source material)
-
-**Work items**:
-- [ ] CREATE `~/community/mintlify-docs/acp-java-sdk/reference/java.md` (~500 lines)
-- [ ] Sections: Installation, Three-API comparison, Client API, Agent API (annotation/sync/async), Protocol types, Capabilities, Transports, Errors, Test utilities
-
-**Exit criteria**:
-- [ ] Reference page renders in dev preview
-- [ ] All code examples verified against SDK
-- [ ] Update `ROADMAP.md` checkboxes
-
-**Deliverables**: `acp-java-sdk/reference/java.md`
-
----
-
-### Step 1.6: Stage 1 Review
-
-**Entry criteria**:
-- [ ] Steps 1.1-1.5 complete
-
-**Work items**:
-- [ ] RUN `~/community/mintlify-docs/dev-preview.sh` to verify all pages render
-- [ ] VERIFY all cross-links work (index → tutorial → reference)
-- [ ] VERIFY code examples match actual tutorial source
-- [ ] CHECK for forbidden marketing language
-- [ ] VERIFY no internal implementation details exposed
-
-**Exit criteria**:
-- [ ] All pages render without errors
-- [ ] Zero forbidden-language violations
-- [ ] All code examples match working tutorial code
-- [ ] Update `ROADMAP.md` checkboxes
-
----
-
-## Stage 2: Tutorial READMEs + SDK Updates
-
-### Step 2.1: Lightweight Module READMEs (10 Priority Modules)
-
-**Entry criteria**:
-- [ ] Stage 1 complete
-- [ ] Read: `~/community/claude-agent-sdk-java-tutorial/module-01-hello-world/README.md` (template)
-
-**Work items**:
-- [ ] CREATE README.md for module-01-first-contact (5-6 lines)
-- [ ] CREATE README.md for module-05-streaming-updates
-- [ ] CREATE README.md for module-12-echo-agent
-- [ ] CREATE README.md for module-13-agent-handlers
-- [ ] CREATE README.md for module-14-sending-updates
-- [ ] CREATE README.md for module-15-agent-requests
-- [ ] CREATE README.md for module-16-in-memory-testing
-- [ ] UPDATE README.md for module-28 — add Mintlify link at top
-- [ ] UPDATE README.md for module-29 — add Mintlify link at top
-- [ ] UPDATE README.md for module-30 — add Mintlify link at top
-
-**Exit criteria**:
-- [ ] All 10 modules have README.md files
-- [ ] Mintlify links point to correct pages
-- [ ] Update `ROADMAP.md` checkboxes
-
-**Deliverables**: 7 new READMEs, 3 updated READMEs
-
----
-
-### Step 2.2: Fix Tutorial README
-
-**Entry criteria**:
-- [ ] Step 2.1 complete
-
-**Work items**:
-- [ ] UPDATE `~/projects/acp-java-tutorial/README.md`
-- [ ] MOVE modules 03, 04, 06, 09, 11 from "Coming Soon" to active (they have source code)
-- [ ] ADD Mintlify docs link at top
-- [ ] REORGANIZE into 3-part structure: Client → Agent → IDE Integration
-
-**Exit criteria**:
-- [ ] No modules with source code listed as "Coming Soon"
-- [ ] Mintlify link works
-- [ ] Update `ROADMAP.md` checkboxes
-
-**Deliverables**: Updated tutorial README
-
----
-
-### Step 2.3: SDK README Updates
-
-**Entry criteria**:
-- [ ] Step 2.2 complete
-
-**Work items**:
-- [ ] UPDATE `~/acp/acp-java/README.md`
-- [ ] ADD Mintlify docs link at top of Overview
-- [ ] UPDATE Installation: change `0.9.0-SNAPSHOT` to `0.9.0`, remove snapshots repository XML
-
-**Exit criteria**:
-- [ ] Version references updated
-- [ ] Mintlify link present
-- [ ] Update `ROADMAP.md` checkboxes
-
-**Deliverables**: Updated SDK README
-
----
-
-### Step 2.4: CHANGELOG for 0.9.0
-
-**Entry criteria**:
-- [ ] Step 2.3 complete
-
-**Work items**:
-- [ ] UPDATE `~/acp/acp-java/CHANGELOG.md`
-- [ ] REPLACE "[Unreleased]" with "[0.9.0] - 2026-02-XX"
-- [ ] EXPAND with full feature list from SDK development
-
-**Exit criteria**:
-- [ ] CHANGELOG reflects 0.9.0 release
-- [ ] All major features listed
-- [ ] Update `ROADMAP.md` checkboxes
-
-**Deliverables**: Updated CHANGELOG
-
----
-
-### Step 2.5: Stage 2 Review
-
-**Entry criteria**:
-- [ ] Steps 2.1-2.4 complete
-
-**Work items**:
-- [ ] VERIFY GitHub rendering of all module READMEs
-- [ ] CLICK all cross-links (SDK README → tutorial modules → Mintlify)
-- [ ] CONFIRM `./mvnw compile` passes for tutorial project
-
-**Exit criteria**:
-- [ ] All links resolve
-- [ ] Tutorial compiles
-- [ ] Update `ROADMAP.md` checkboxes
-
----
-
-## Stage 3: Post-Launch Completion (Not Blocking Release)
-
-### Step 3.1: Remaining Mintlify Tutorial Pages (14 Pages)
-
-**Work items**:
-- [ ] CREATE pages for modules: 02, 03, 04, 06, 07, 08, 09, 10, 11, 17, 18, 19, 21, 22
-- [ ] UPDATE mint.json navigation with expanded tutorial groups
-
----
-
-### Step 3.2: Remaining Tutorial Module READMEs (14 Modules)
-
-**Work items**:
-- [ ] CREATE READMEs for all remaining modules with source code
-
----
-
-### Step 3.3: SDK Module READMEs
-
-**Work items**:
-- [ ] CREATE lightweight READMEs for: acp-core, acp-annotations, acp-test, acp-websocket-jetty
-
----
-
-### Step 3.4: Enhancements
-
-**Work items**:
-- [ ] ADD architecture diagram to Mintlify index
-- [ ] ADD Gradle installation instructions to reference page
-
----
-
-## Execution Order (Stage 1 Priority)
-
-1. mint.json + directory scaffolding (unblocks everything)
-2. Index page + tutorial index (site structure)
-3. Tutorial pages: 12 (echo agent), 28 (Zed), 01 (first contact) — highest impact first
-4. API reference page (largest single item)
-5. Remaining 7 tutorial pages
-6. Stage 1 review
-
-## Verification
-
-- `~/community/mintlify-docs/dev-preview.sh` — all pages render
-- Every code example matches actual tutorial source
-- All cross-links: SDK README → tutorial modules → Mintlify
-- `./mvnw compile` passes for tutorial project
-
-## Writing Agents
-
-| Agent | Role |
-|-------|------|
-| `~/.claude/agents/technical-writer.md` | Primary — writes Mintlify pages and READMEs |
-| `~/.claude/agents/doc-reviewer.md` | Review — validates against style guide |
-| `~/.claude/agents/tutorial-code-sync.md` | Sync — ensures code examples match tutorial source |
-
-## Style Principles
-
-- Direct, plain-spoken, unadorned
-- Assume reader competence
-- Structure: context, mechanism, consequence
-- Forbidden: exciting, game-changing, best-in-class, seamlessly, powerful, intuitive, revolutionary, cutting-edge
-- Short paragraphs (3-4 sentences max), tables for comparisons, code blocks liberally
-- Accuracy over aesthetics
-
-## Conventions
-
-### Commit Convention
-
-```
-Step X.Y: Brief description of what was done
-```
-
-### Code-First Workflow
-
-1. Verify tutorial code compiles: `./mvnw compile -pl module-XX-* -q`
-2. THEN write docs based on working code
-3. Code in docs must match working tutorial code exactly
diff --git a/pom.xml b/pom.xml
index 3441e54..dae9ce9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,6 +18,7 @@
acp-core
acp-agent-support
acp-test
+ acp-streamable-http-jetty
acp-websocket-jetty
@@ -114,6 +115,11 @@
acp-test
${project.version}
+
+ com.agentclientprotocol
+ acp-streamable-http-jetty
+ ${project.version}
+
@@ -133,6 +139,21 @@
jetty-websocket-jetty-api
${jetty.version}
+
+ org.eclipse.jetty
+ jetty-server
+ ${jetty.version}
+
+
+ org.eclipse.jetty.ee10
+ jetty-ee10-servlet
+ ${jetty.version}
+
+
+ org.eclipse.jetty.http2
+ jetty-http2-server
+ ${jetty.version}
+