Skip to content
18 changes: 18 additions & 0 deletions rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,24 @@ With the default `CliProgram::Resolve`, `Client::start()` resolves the CLI in th

Created via `Client::create_session` or `Client::resume_session`. Owns an internal event loop that dispatches CLI callbacks to the focused handler traits you install on `SessionConfig`, and broadcasts session events through `subscribe()`.

#### Cloud sessions

`Client::create_session` creates a Mission Control–backed cloud session when the config is built with `SessionConfig::with_cloud(...)`. The runtime owns the session ID: do **not** set `session_id` or `provider` on the config (the SDK rejects both with `Error::InvalidConfig`).

```rust,ignore
use github_copilot_sdk::types::{CloudSessionOptions, CloudSessionRepository, SessionConfig};

let cloud = CloudSessionOptions::with_repository(
CloudSessionRepository::new("github", "copilot-sdk").with_branch("main"),
);
let session = client
.create_session(SessionConfig::default().with_cloud(cloud))
.await?;
println!("cloud session id: {}", session.id());
```

The SDK buffers any `session.event` notifications or inbound JSON-RPC requests that arrive before the `session.create` response (bounded, drop-oldest) and replays them once the runtime-assigned session ID is registered.

```rust,ignore
use github_copilot_sdk::MessageOptions;

Expand Down
54 changes: 53 additions & 1 deletion rust/src/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ pub mod error_codes {
/// Invalid method parameters (-32602).
pub const INVALID_PARAMS: i32 = -32602;
/// Internal server error (-32603).
#[allow(dead_code, reason = "standard JSON-RPC code, reserved for future use")]
pub const INTERNAL_ERROR: i32 = -32603;
}

Expand Down Expand Up @@ -490,6 +489,59 @@ impl JsonRpcClient {
))),
}
}

/// Clone a sync handle onto the outbound writer for fire-and-forget
/// frames. Use only for paths that cannot `.await` (currently the
/// session router, which holds a `parking_lot::Mutex` while deciding
/// to discard a buffered request).
pub(crate) fn writer_handle(&self) -> WriterHandle {
WriterHandle {
write_tx: self.write_tx.clone(),
}
}
}

/// Sync, fire-and-forget handle onto the JSON-RPC writer actor. Cloned
/// from [`JsonRpcClient::writer_handle`]; serializes the message on the
/// caller's thread and enqueues it without awaiting an ack. Loss of the
/// ack means we'll never observe a write error here, which is acceptable
/// for the one current caller (error responses to dropped pending
/// requests): if the wire is broken, the runtime will time out the
/// request on its own.
pub(crate) struct WriterHandle {
write_tx: mpsc::UnboundedSender<WriteCommand>,
}

impl Clone for WriterHandle {
fn clone(&self) -> Self {
Self {
write_tx: self.write_tx.clone(),
}
}
}

impl WriterHandle {
/// Serialize and enqueue a JSON-RPC message without waiting for the
/// writer actor to flush it. Drops silently if serialization fails or
/// the writer actor has shut down — both indicate the transport is
/// already unusable.
pub(crate) fn send_fire_and_forget<T: serde::Serialize>(&self, message: &T) {
let body = match serde_json::to_vec(message) {
Ok(body) => body,
Err(e) => {
warn!(error = %e, "WriterHandle failed to serialize fire-and-forget message");
return;
}
};
let mut frame = Vec::with_capacity(CONTENT_LENGTH_HEADER.len() + 16 + body.len() + 4);
frame.extend_from_slice(CONTENT_LENGTH_HEADER.as_bytes());
frame.extend_from_slice(body.len().to_string().as_bytes());
frame.extend_from_slice(b"\r\n\r\n");
frame.extend_from_slice(&body);

let (ack_tx, _ack_rx) = oneshot::channel();
let _ = self.write_tx.send(WriteCommand { frame, ack: ack_tx });
}
}

/// RAII guard that removes a pending-request entry from the map if the
Expand Down
20 changes: 16 additions & 4 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1204,14 +1204,16 @@ impl Client {
let pid = child.as_ref().and_then(|c| c.id());
info!(pid = ?pid, "copilot CLI client ready");

let client_rpc_writer_handle = rpc.writer_handle();

let client = Self {
inner: Arc::new(ClientInner {
child: parking_lot::Mutex::new(child),
rpc,
cwd,
request_rx: parking_lot::Mutex::new(Some(request_rx)),
notification_tx: notification_broadcast_tx,
router: router::SessionRouter::new(),
router: router::SessionRouter::with_writer(client_rpc_writer_handle),
negotiated_protocol_version: OnceLock::new(),
state: parking_lot::Mutex::new(ConnectionState::Connected),
lifecycle_tx: broadcast::channel(256).0,
Expand All @@ -1224,6 +1226,10 @@ impl Client {
}),
};
client.spawn_lifecycle_dispatcher();
client
.inner
.router
.start(&client.inner.notification_tx, &client.inner.request_rx);
debug!(
elapsed_ms = setup_start.elapsed().as_millis(),
pid = ?pid,
Expand Down Expand Up @@ -1580,12 +1586,18 @@ impl Client {
&self,
session_id: &SessionId,
) -> crate::router::SessionChannels {
self.inner
.router
.ensure_started(&self.inner.notification_tx, &self.inner.request_rx);
self.inner.router.register(session_id)
}

/// Enter pending-routing mode on the router. While the returned guard is
/// alive, notifications and requests addressed to session ids that are
/// not yet registered are buffered instead of being dropped. Used when
/// creating cloud sessions so the SDK can receive events that the
/// runtime emits between `session.create` and the response.
pub(crate) fn begin_pending_session_routing(&self) -> crate::router::PendingSessionRouting {
self.inner.router.begin_pending_session_routing()
}

/// Unregister a session, dropping its per-session channels.
pub(crate) fn unregister_session(&self, session_id: &SessionId) {
self.inner.router.unregister(session_id);
Expand Down
Loading
Loading