From e960c5175b9221ddbd38a73d089766793120a368 Mon Sep 17 00:00:00 2001 From: Jared Lunde Date: Fri, 19 Jun 2026 17:48:46 -0700 Subject: [PATCH 1/3] feat(beyond-pg-init): make it a first-class Beyond platform PID 1 + durable-volume fixes beyond-pg-init now integrates with instd's VM lifecycle and runs Postgres on a fresh durable GlideFS volume end-to-end. Proven on the homelab: a primary boots, real SQL (CREATE/INSERT/SELECT) round-trips and persists on the data volume, and a snapshot+CoW-fork yields a write-divergent branch of production (verified twice, green). Platform integration (beyond-pg-init stays PID 1, per the fidelity-first design): - substrate.rs: do instd's guest_runtime Ready handshake on a dedicated tokio thread so `service.create.completed` fires (instd waits for "guest ready"). - volumes.rs: mount data volumes from MMDS at their mount_path (mirrors guest-init), so /var/lib/postgresql is the durable GlideFS volume. - bootsetup.rs: MMDS HTTP client read Content-Length bytes instead of read_to_end (Firecracker MMDS holds the connection open -> read_to_end timed out -> EAGAIN -> fail-closed on POSTGRES_PASSWORD -> kernel panic). connect_timeout, generous retry budget. Durable-volume / noble-image fixes (beyond-pg): - pg.rs/boot.rs: run runtime initdb as the postgres user + chown the fresh volume tree + make the pwfile postgres-readable (initdb refuses root; the v olume mounts root-owned and empty over the image's PGDATA). - boot.rs: ensure_socket_dir() creates /run/postgresql (no systemd-tmpfiles here). - tls.rs: emit Ed25519 server key as PKCS#8 v1 (OpenSSL 3.0.13 on ubuntu-noble can't decode rcgen's PKCS#8 v2; key/perf unchanged, v2 only appends the pubkey). - config.rs: filter shared_preload_libraries to installed .so's; supervisor.rs: downgrade missing REQUIRED_EXTENSIONS to a warning (auth/queue ext are a future milestone, pgdg pins dropped) so the standalone primitive boots. - pg_hba.conf: local trust for the co-located `pgbouncer` auth_user (PASSWORD NULL + peer mapped OS postgres -> DB pgbouncer -> "Peer authentication failed"). Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 203 +++++++++++++++++ Cargo.toml | 1 + beyond-pg-init/Cargo.toml | 5 + beyond-pg-init/src/bootsetup.rs | 68 +++++- beyond-pg-init/src/main.rs | 9 + beyond-pg-init/src/substrate.rs | 85 ++++++++ beyond-pg-init/src/volumes.rs | 325 ++++++++++++++++++++++++++++ packer/files/postgresql/pg_hba.conf | 8 + src/boot.rs | 52 ++++- src/config.rs | 144 ++++++++++++ src/pg.rs | 32 +-- src/supervisor.rs | 33 ++- src/tls.rs | 47 +++- 13 files changed, 980 insertions(+), 32 deletions(-) create mode 100644 beyond-pg-init/src/substrate.rs create mode 100644 beyond-pg-init/src/volumes.rs diff --git a/Cargo.lock b/Cargo.lock index 042b6f5..367dbf8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,41 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "aead" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0" +dependencies = [ + "crypto-common 0.1.7", + "generic-array", +] + +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures 0.2.17", +] + +[[package]] +name = "aes-gcm" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "831010a0f742e1209b3bcea8fab6a8e149051ba6099432c8cb2cc117dec3ead1" +dependencies = [ + "aead", + "aes", + "cipher", + "ctr", + "ghash", + "subtle", +] + [[package]] name = "aho-corasick" version = "1.1.4" @@ -275,6 +310,7 @@ dependencies = [ name = "beyond-pg" version = "0.1.0" dependencies = [ + "base64", "beyond-handoff", "beyond-pg-core", "clap", @@ -326,9 +362,12 @@ version = "0.1.0" dependencies = [ "beyond-handoff", "beyond-pg-core", + "guest-runtime", "libc", + "nix 0.31.2", "serde", "serde_json", + "tokio", ] [[package]] @@ -523,11 +562,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-link", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common 0.1.7", + "inout", +] + [[package]] name = "clap" version = "4.6.1" @@ -661,6 +712,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" dependencies = [ "generic-array", + "rand_core 0.6.4", "typenum", ] @@ -673,6 +725,15 @@ dependencies = [ "hybrid-array", ] +[[package]] +name = "ctr" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" +dependencies = [ + "cipher", +] + [[package]] name = "ctutils" version = "0.4.2" @@ -1079,6 +1140,46 @@ dependencies = [ "wasip3", ] +[[package]] +name = "ghash" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0d8a4362ccb29cb0b265253fb0a2728f592895ee6854fd9bc13f2ffda266ff1" +dependencies = [ + "opaque-debug", + "polyval", +] + +[[package]] +name = "guest-runtime" +version = "0.1.0" +dependencies = [ + "aes-gcm", + "base64", + "guest-session", + "io_utils", + "libc", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tracing", + "vsock-protocol", + "wire", +] + +[[package]] +name = "guest-session" +version = "0.1.0" +dependencies = [ + "libc", + "nix 0.30.1", + "thiserror 2.0.18", + "tokio", + "tracing", + "uuid", + "vsock-protocol", +] + [[package]] name = "h2" version = "0.4.14" @@ -1497,6 +1598,23 @@ dependencies = [ "serde_core", ] +[[package]] +name = "inout" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" +dependencies = [ + "generic-array", +] + +[[package]] +name = "io_utils" +version = "0.1.0" +dependencies = [ + "nix 0.29.0", + "tokio", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -1731,6 +1849,19 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags", + "cfg-if", + "cfg_aliases", + "libc", + "memoffset", +] + [[package]] name = "nix" version = "0.30.1" @@ -1894,6 +2025,12 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" +[[package]] +name = "opaque-debug" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" + [[package]] name = "openssl" version = "0.10.79" @@ -2052,6 +2189,18 @@ version = "0.3.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" +[[package]] +name = "polyval" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d1fe60d06143b2430aa532c94cfe9e29783047f06c0d7fd359a9a51b729fa25" +dependencies = [ + "cfg-if", + "cpufeatures 0.2.17", + "opaque-debug", + "universal-hash", +] + [[package]] name = "portable-atomic" version = "1.13.1" @@ -2310,6 +2459,15 @@ dependencies = [ "rand_core 0.9.5", ] +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.17", +] + [[package]] name = "rand_core" version = "0.9.5" @@ -2643,6 +2801,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_bytes" +version = "0.11.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5d440709e79d88e51ac01c4b72fc6cb7314017bb7da9eeff678aa94c10e3ea8" +dependencies = [ + "serde", + "serde_core", +] + [[package]] name = "serde_core" version = "1.0.228" @@ -3341,6 +3509,16 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "universal-hash" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea" +dependencies = [ + "crypto-common 0.1.7", + "subtle", +] + [[package]] name = "untrusted" version = "0.9.0" @@ -3445,6 +3623,19 @@ dependencies = [ "nix 0.31.2", ] +[[package]] +name = "vsock-protocol" +version = "0.1.0" +dependencies = [ + "rmp-serde", + "serde", + "serde_bytes", + "serde_json", + "thiserror 2.0.18", + "tokio", + "wire", +] + [[package]] name = "wal-proto" version = "0.1.0" @@ -3876,6 +4067,18 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "wire" +version = "0.1.0" +dependencies = [ + "chrono", + "rmp-serde", + "serde", + "serde_bytes", + "serde_json", + "thiserror 2.0.18", +] + [[package]] name = "wit-bindgen" version = "0.51.0" diff --git a/Cargo.toml b/Cargo.toml index c473ee3..e01b47d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ name = "beyond-pg" path = "src/main.rs" [dependencies] +base64 = "0.22" beyond-pg-core = { path = "./beyond-pg-core" } clap = { version = "4", features = ["derive"] } quinn = "0.11" diff --git a/beyond-pg-init/Cargo.toml b/beyond-pg-init/Cargo.toml index 93beb32..56b4cc8 100644 --- a/beyond-pg-init/Cargo.toml +++ b/beyond-pg-init/Cargo.toml @@ -17,3 +17,8 @@ serde_json = "1" [target.'cfg(target_os = "linux")'.dependencies] libc = "0.2" handoff = { version = "0.1", package = "beyond-handoff" } +# Shared substrate SDK: the guest-ready vsock handshake instd waits for, plus +# the admin exec/ping channel. Path dep into the beyond workspace. +guest-runtime = { path = "../../beyond/rustlib/guest-runtime" } +nix = { version = "0.31", features = ["mount"] } +tokio = { version = "1", features = ["rt", "macros", "net", "time", "io-util"] } diff --git a/beyond-pg-init/src/bootsetup.rs b/beyond-pg-init/src/bootsetup.rs index 2597344..52a310d 100644 --- a/beyond-pg-init/src/bootsetup.rs +++ b/beyond-pg-init/src/bootsetup.rs @@ -15,8 +15,13 @@ use std::time::Duration; const MMDS_ADDR: &str = "169.254.169.254:80"; const MMDS_MAX_ATTEMPTS: u32 = 30; -const MMDS_RETRY: Duration = Duration::from_millis(10); -const HTTP_TIMEOUT: Duration = Duration::from_millis(200); +const MMDS_RETRY: Duration = Duration::from_millis(200); +// Firecracker's MMDS can take up to ~1–2s to answer the first request after +// boot on this substrate; a 200ms read timeout raced that and every attempt +// died with EAGAIN (the connect succeeds — it's the response that's slow), so +// the whole loop fell back to "POSTGRES_PASSWORD not set" and panicked. guest- +// init happens to win the race with its smaller VM; we must not depend on luck. +const HTTP_TIMEOUT: Duration = Duration::from_secs(3); const MAX_RESPONSE_BYTES: u64 = 64 * 1024; /// Run the full PID 1 boot sequence. Bails (exit 1) on fatal failures. @@ -41,6 +46,11 @@ pub fn run() { mount_essential_filesystems(); setup_network(); fetch_mmds(); + // instd records attached data volumes in MMDS; mount them (as root, before + // the supervisor child spawns) so postgres finds its data dir at + // /var/lib/postgresql. Tolerant: no volumes → no-op; a missing device is + // logged FATAL but never aborts the VM. + crate::volumes::mount_from_mmds(); setup_zram(); } @@ -293,13 +303,17 @@ fn write_mmds_file(json: &serde_json::Value) { fn poll_mmds() -> Result { let token = get_mmds_token(); for attempt in 1..=MMDS_MAX_ATTEMPTS { + let t0 = std::time::Instant::now(); match fetch_mmds_metadata(token.as_deref()) { Ok(Some(data)) => { eprintln!("[init] MMDS data available (attempt {attempt})"); return Ok(data); } Ok(None) => {} - Err(e) => eprintln!("[init] MMDS fetch attempt {attempt} failed: {e}"), + Err(e) => eprintln!( + "[init] MMDS fetch attempt {attempt} failed after {}ms: {e}", + t0.elapsed().as_millis() + ), } std::thread::sleep(MMDS_RETRY); } @@ -361,20 +375,56 @@ fn fetch_mmds_metadata( } fn http_roundtrip(request: &[u8]) -> std::io::Result> { - // Infallible: MMDS_ADDR is a compile-time-known SocketAddr literal. - let addr: std::net::SocketAddr = MMDS_ADDR.parse().unwrap(); + // connect_timeout (non-blocking connect + poll) handles the post-boot window + // where the eth0 link-local neighbor isn't resolved yet. + let addr: std::net::SocketAddr = MMDS_ADDR.parse().expect("MMDS_ADDR is a literal"); let mut stream = TcpStream::connect_timeout(&addr, HTTP_TIMEOUT)?; stream.set_write_timeout(Some(HTTP_TIMEOUT))?; stream.set_read_timeout(Some(HTTP_TIMEOUT))?; stream.write_all(request)?; + + // Read headers, then exactly `Content-Length` body bytes — do NOT read to + // EOF. Firecracker's MMDS keeps the TCP connection OPEN after the response + // (it ignores `Connection: close`), so there is no EOF; `read_to_end` blocks + // for the full read timeout on every request and surfaces as EAGAIN. This is + // the same Content-Length-bounded read guest-init's MMDS client uses. let mut buf = Vec::new(); - // Propagate read errors (e.g. timeout mid-response) so the retry loop - // logs the underlying I/O failure instead of a downstream JSON parse - // error on a truncated body. - stream.take(MAX_RESPONSE_BYTES).read_to_end(&mut buf)?; + let mut chunk = [0u8; 4096]; + loop { + if let Some(end) = find_headers_end(&buf) { + if let Some(len) = content_length(&buf[..end]) { + if buf.len() >= end + len { + buf.truncate(end + len); + break; + } + } + } + match stream.read(&mut chunk) { + Ok(0) => break, // EOF (Connection: close honored) + Ok(n) => buf.extend_from_slice(&chunk[..n]), + Err(e) => return Err(e), // timeout / error — surface to the retry loop + } + if buf.len() as u64 > MAX_RESPONSE_BYTES { + break; + } + } Ok(buf) } +/// Byte offset just past the `\r\n\r\n` header terminator, if fully buffered. +fn find_headers_end(buf: &[u8]) -> Option { + buf.windows(4).position(|w| w == b"\r\n\r\n").map(|p| p + 4) +} + +/// Parse the `Content-Length` header (case-insensitive) from a header block. +fn content_length(headers: &[u8]) -> Option { + let text = std::str::from_utf8(headers).ok()?; + text.split("\r\n") + .filter_map(|line| line.split_once(':')) + .find(|(name, _)| name.trim().eq_ignore_ascii_case("content-length")) + .and_then(|(_, value)| value.trim().parse().ok()) +} + fn http_status(response: &[u8]) -> Option { let line = response.split(|&b| b == b'\n').next()?; let s = std::str::from_utf8(line).ok()?; diff --git a/beyond-pg-init/src/main.rs b/beyond-pg-init/src/main.rs index c4d14ed..dfa0497 100644 --- a/beyond-pg-init/src/main.rs +++ b/beyond-pg-init/src/main.rs @@ -10,7 +10,11 @@ #[cfg(target_os = "linux")] mod bootsetup; #[cfg(target_os = "linux")] +mod substrate; +#[cfg(target_os = "linux")] mod supervise; +#[cfg(target_os = "linux")] +mod volumes; #[cfg(target_os = "linux")] fn main() -> ! { @@ -22,6 +26,11 @@ fn main() -> ! { std::process::exit(1); } bootsetup::run(); + // Report "guest ready" to the host substrate over vsock and keep the + // connection alive for the VM's lifetime. instd waits for this handshake + // before considering the create successful. Spawned after boot setup (so + // the network/MMDS are up) and before the supervise loop takes over. + substrate::spawn_handshake(); supervise::run(); } diff --git a/beyond-pg-init/src/substrate.rs b/beyond-pg-init/src/substrate.rs new file mode 100644 index 0000000..4cb0e5f --- /dev/null +++ b/beyond-pg-init/src/substrate.rs @@ -0,0 +1,85 @@ +//! Substrate vsock "guest ready" handshake for `beyond-pg-init`. +//! +//! instd waits ~30s after spawning firecracker for the guest to complete the +//! [`guest_runtime::VsockClient::connect`] handshake (which sends a `Ready` +//! message); without it instd reports "guest not ready: timeout" and the +//! create fails. `beyond-pg-init` is a sync PID 1 (see [`crate::supervise`]), +//! so we host the async handshake + keep-alive loop on a dedicated OS thread +//! running a current-thread tokio runtime for the VM's lifetime. +//! +//! The handshake is soft-fail: if the connection can't be established (e.g. no +//! AF_VSOCK in a Docker test environment) we log a warning and let the thread +//! exit, exactly as `service-init`'s `EnvelopeSink::try_connect` does. The +//! supervise loop owns shutdown via signalfd (instd also sends SIGTERM), so on +//! a `Shutdown` event we only log — we never poweroff from this thread. + +/// Spawn the dedicated `substrate-vsock` thread that performs the guest-runtime +/// `Ready` handshake and then keeps the connection alive for the VM's lifetime. +/// +/// Returns immediately; the handshake happens on the spawned thread. Soft-fail +/// throughout: a failed spawn or a failed connect is logged, never fatal. +pub fn spawn_handshake() { + let builder = std::thread::Builder::new().name("substrate-vsock".to_string()); + if let Err(e) = builder.spawn(run) { + eprintln!("[init] WARNING: failed to spawn substrate-vsock thread: {e}"); + } +} + +fn run() { + // Current-thread runtime: this thread does nothing but host the single + // vsock connection and its keep-alive loop, so a multi-thread scheduler + // would only waste worker threads. + let runtime = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(e) => { + eprintln!("[init] WARNING: substrate-vsock tokio runtime build failed: {e}"); + return; + } + }; + runtime.block_on(handshake_loop()); +} + +async fn handshake_loop() { + let cfg = guest_runtime::VsockClientConfig::for_primitive(format!( + "beyond-pg-init/{}", + env!("CARGO_PKG_VERSION") + )); + let mut client = match guest_runtime::VsockClient::connect(&cfg).await { + Ok(client) => { + eprintln!("[init] substrate vsock handshake complete; guest reported ready"); + client + } + Err(e) => { + // Soft-fail like service-init: no vsock (e.g. Docker tests) just + // means no host substrate to report to. Let the thread exit. + eprintln!("[init] WARNING: substrate vsock connect failed; guest-ready unreported: {e}"); + return; + } + }; + + // Serve the administrative exec/ping/shell channel on the same connection + // and keep it alive so instd sees the guest as live. The loop only exits + // on a Shutdown event or a connection error. + client.enable_admin(guest_runtime::SessionConfig::default()); + loop { + match client.next_event().await { + Ok(guest_runtime::SubstrateEvent::Shutdown) => { + // The supervise loop owns shutdown via signalfd (instd also + // sends SIGTERM); we only log and let this thread exit. + eprintln!("[init] substrate requested shutdown; vsock loop exiting"); + break; + } + Ok(guest_runtime::SubstrateEvent::AppMessage(_)) => { + // beyond-pg-init has no workload-env channel; ignore app frames. + } + Ok(_) => {} + Err(e) => { + eprintln!("[init] substrate vsock loop ended (connection closed): {e}"); + break; + } + } + } +} diff --git a/beyond-pg-init/src/volumes.rs b/beyond-pg-init/src/volumes.rs new file mode 100644 index 0000000..659be10 --- /dev/null +++ b/beyond-pg-init/src/volumes.rs @@ -0,0 +1,325 @@ +//! Data-volume mounting for `beyond-pg-init`. +//! +//! instd attaches data volumes as block devices and records them in MMDS; +//! `beyond-pg-init` must mount them before the supervisor child is spawned so +//! postgres finds its data dir. The postgres data volume is recorded with +//! `path = /var/lib/postgresql`. +//! +//! Ported from guest-init's `mmds::read_attachments` + `mounts` (the canonical +//! mount-option behavior per storage class), adapted to beyond-pg-init's +//! `[init]` logging and made tolerant of a not-yet-present block device (a +//! brief retry) — a missing volume is logged FATAL but does not panic the VM, +//! so postgres can still surface the underlying error. +//! +//! Runs as root (PID 1) during [`crate::bootsetup::run`], after MMDS has been +//! fetched and before [`crate::supervise::run`] spawns the supervisor. + +use std::path::Path; +use std::time::{Duration, Instant}; + +use nix::mount::{MsFlags, mount}; +use serde_json::Value; + +/// Max time to wait for a declared block device to appear before giving up. +const DEVICE_WAIT: Duration = Duration::from_millis(500); +const DEVICE_POLL: Duration = Duration::from_millis(50); + +/// One data-volume attachment as written by instd into MMDS. +pub struct AttachmentMeta { + pub volume_id: String, + pub device: String, + pub path: String, + pub fstype: String, + pub readonly: bool, + pub storage_class: String, +} + +/// Read volume attachments from the already-written MMDS metadata file and +/// mount each one. Best-effort: returns immediately if there are no volumes. +pub fn mount_from_mmds() { + let attachments = read_attachments(); + if attachments.is_empty() { + return; + } + eprintln!("[init] mounting {} data volume(s) from MMDS", attachments.len()); + mount_data_volumes(&attachments); +} + +/// Read volume attachments from the MMDS metadata file written by +/// [`crate::bootsetup::fetch_mmds`]. +/// +/// Returns an empty vec if the file is absent, unparseable, or the +/// `volumes` array is missing — instances without data volumes see none. +fn read_attachments() -> Vec { + let path = beyond_pg_core::mmds::MMDS_PATH; + let raw = match std::fs::read(path) { + Ok(b) => b, + Err(_) => return vec![], + }; + let val: Value = match serde_json::from_slice(&raw) { + Ok(v) => v, + Err(_) => return vec![], + }; + parse_attachments(&val) +} + +/// Parse the `latest.meta-data.volumes` array out of an MMDS value. +fn parse_attachments(val: &Value) -> Vec { + let arr = match val["latest"]["meta-data"]["volumes"].as_array() { + Some(a) => a, + None => return vec![], + }; + arr.iter() + .filter_map(|v| { + Some(AttachmentMeta { + volume_id: v["volume_id"].as_str()?.to_string(), + device: v["device"].as_str()?.to_string(), + path: v["path"].as_str()?.to_string(), + fstype: v["fstype"].as_str().unwrap_or("ext4").to_string(), + readonly: v["readonly"].as_bool().unwrap_or(false), + storage_class: v["storage_class"].as_str().unwrap_or("standard").to_string(), + }) + }) + .collect() +} + +/// Mount each attachment. Idempotent (skips already-mounted paths) and tolerant +/// of a transiently-absent block device (brief retry). A volume that can't be +/// mounted is logged FATAL but does NOT abort the VM, so postgres can still +/// start and surface the underlying error. +fn mount_data_volumes(attachments: &[AttachmentMeta]) { + for a in attachments { + if let Err(e) = mount_one(a) { + eprintln!( + "[init] FATAL: failed to mount data volume {} ({} → {}): {e}; continuing", + a.volume_id, a.device, a.path + ); + } + } +} + +fn mount_one(a: &AttachmentMeta) -> Result<(), String> { + if !wait_for_device(&a.device) { + return Err(format!("device {} not present after {DEVICE_WAIT:?}", a.device)); + } + + ensure_dir(&a.path)?; + + if is_mounted(&a.path, &a.device)? { + eprintln!("[init] already mounted {}, skipping", a.path); + return Ok(()); + } + if let Some(other) = mounted_device(&a.path)? { + return Err(format!( + "path {} is already mounted with {} but expected {}", + a.path, other, a.device + )); + } + + let (flags, opts) = mount_params(&a.storage_class, a.readonly); + mount( + Some(a.device.as_str()), + a.path.as_str(), + Some(a.fstype.as_str()), + flags, + Some(opts.as_str()), + ) + .map_err(|e| format!("mount {} → {}: {}", a.device, a.path, e))?; + + eprintln!( + "[init] mounted {} at {} ({}{})", + a.device, + a.path, + opts, + if a.readonly { ",ro" } else { "" } + ); + Ok(()) +} + +/// Wait briefly for the block device to appear — instd attaches it around the +/// same time the guest boots, so it may not be visible the instant we look. +fn wait_for_device(device: &str) -> bool { + let dev = Path::new(device); + let deadline = Instant::now() + DEVICE_WAIT; + loop { + if dev.exists() { + return true; + } + if Instant::now() >= deadline { + return false; + } + std::thread::sleep(DEVICE_POLL); + } +} + +/// Returns (mount flags, mount options string) for a given storage class. +/// +/// The opts string is passed as the ext4-specific data argument to mount(2). +/// VFS-level flags (noatime, ro) MUST go in the flags bitfield — passing them +/// in the data string makes ext4 reject the mount with EINVAL. Mirrors +/// guest-init's `mounts::mount_params`. +fn mount_params(storage_class: &str, readonly: bool) -> (MsFlags, String) { + let mut flags = MsFlags::MS_NOATIME; + if readonly { + flags |= MsFlags::MS_RDONLY; + } + let opts = match storage_class { + "standard" => "commit=60", + s if s.starts_with("database:") => "data=ordered,commit=30,barrier=1", + "ephemeral" => "commit=120,nobarrier", + "scratch" => "commit=120,nobarrier,data=writeback", + _ => "commit=60", + }; + (flags, opts.to_string()) +} + +/// Read `/proc/self/mountinfo` to check whether `path` is mounted with `device`. +fn is_mounted(path: &str, device: &str) -> Result { + let info = read_mountinfo()?; + Ok(info.iter().any(|(mp, dev)| mp == path && dev == device)) +} + +/// Return the device currently mounted at `path`, or `None` if nothing is. +fn mounted_device(path: &str) -> Result, String> { + let info = read_mountinfo()?; + Ok(info.into_iter().find(|(mp, _)| mp == path).map(|(_, dev)| dev)) +} + +fn read_mountinfo() -> Result, String> { + let content = std::fs::read_to_string("/proc/self/mountinfo") + .map_err(|e| format!("read mountinfo: {e}"))?; + Ok(parse_mountinfo(&content)) +} + +/// Parse `/proc/self/mountinfo` into `(mount_point, source)` pairs. +fn parse_mountinfo(content: &str) -> Vec<(String, String)> { + let mut entries = Vec::new(); + for line in content.lines() { + // Format: id parent major:minor root mount-point mount-options ... - fstype source ... + // mount-point is field 4 (0-indexed); source is the 2nd token after "-". + let fields: Vec<&str> = line.splitn(10, ' ').collect(); + if fields.len() < 5 { + continue; + } + let mount_point = fields[4]; + let Some(dash_pos) = line.find(" - ") else { + continue; + }; + let after_dash: Vec<&str> = line[dash_pos + 3..].splitn(3, ' ').collect(); + if after_dash.len() < 2 { + continue; + } + entries.push((mount_point.to_string(), after_dash[1].to_string())); + } + entries +} + +fn ensure_dir(path: &str) -> Result<(), String> { + std::fs::create_dir_all(path).map_err(|e| format!("create dir {path}: {e}")) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_attachments_postgres_volume() { + let val = serde_json::json!({ + "latest": { "meta-data": { "volumes": [ + { + "volume_id": "vol-abc", + "device": "/dev/vdb", + "path": "/var/lib/postgresql", + "fstype": "ext4", + "readonly": false, + "storage_class": "database:postgres" + } + ] } } + }); + let atts = parse_attachments(&val); + assert_eq!(atts.len(), 1); + let a = &atts[0]; + assert_eq!(a.volume_id, "vol-abc"); + assert_eq!(a.device, "/dev/vdb"); + assert_eq!(a.path, "/var/lib/postgresql"); + assert_eq!(a.fstype, "ext4"); + assert!(!a.readonly); + assert_eq!(a.storage_class, "database:postgres"); + } + + #[test] + fn parse_attachments_applies_defaults() { + // fstype/readonly/storage_class omitted → defaults. + let val = serde_json::json!({ + "latest": { "meta-data": { "volumes": [ + { "volume_id": "v", "device": "/dev/vdc", "path": "/data" } + ] } } + }); + let atts = parse_attachments(&val); + assert_eq!(atts.len(), 1); + assert_eq!(atts[0].fstype, "ext4"); + assert!(!atts[0].readonly); + assert_eq!(atts[0].storage_class, "standard"); + } + + #[test] + fn parse_attachments_skips_entries_missing_required_fields() { + // device missing → entry dropped (filter_map on the `?`s). + let val = serde_json::json!({ + "latest": { "meta-data": { "volumes": [ + { "volume_id": "v", "path": "/data" } + ] } } + }); + assert!(parse_attachments(&val).is_empty()); + } + + #[test] + fn parse_attachments_empty_when_no_volumes_key() { + let val = serde_json::json!({ "latest": { "meta-data": { "hostname": "pg" } } }); + assert!(parse_attachments(&val).is_empty()); + } + + #[test] + fn parse_attachments_empty_on_garbage() { + let val = serde_json::json!({ "anything": 1 }); + assert!(parse_attachments(&val).is_empty()); + } + + #[test] + fn mount_params_database_uses_ordered_journaling() { + let (flags, opts) = mount_params("database:postgres", false); + assert!(flags.contains(MsFlags::MS_NOATIME)); + assert!(!flags.contains(MsFlags::MS_RDONLY)); + assert!(opts.contains("data=ordered")); + assert!(opts.contains("commit=30")); + assert!(opts.contains("barrier=1")); + } + + #[test] + fn mount_params_standard_and_readonly() { + let (flags, opts) = mount_params("standard", true); + assert!(flags.contains(MsFlags::MS_NOATIME)); + assert!(flags.contains(MsFlags::MS_RDONLY)); + assert!(opts.contains("commit=60")); + assert!(!opts.contains("data=ordered")); + } + + #[test] + fn mount_params_unknown_falls_back_to_standard() { + let (_flags, opts) = mount_params("weird_class", false); + assert!(opts.contains("commit=60") && !opts.contains("data=ordered")); + } + + #[test] + fn parse_mountinfo_extracts_mountpoint_and_source() { + let line = "36 35 8:1 / /var/lib/postgresql rw,noatime shared:1 - ext4 /dev/vdb rw,data=ordered"; + let entries = parse_mountinfo(line); + assert_eq!(entries, vec![("/var/lib/postgresql".to_string(), "/dev/vdb".to_string())]); + } + + #[test] + fn parse_mountinfo_skips_malformed_lines() { + // No " - " separator → skipped. + assert!(parse_mountinfo("garbage without dash").is_empty()); + } +} diff --git a/packer/files/postgresql/pg_hba.conf b/packer/files/postgresql/pg_hba.conf index 260c591..44386ce 100644 --- a/packer/files/postgresql/pg_hba.conf +++ b/packer/files/postgresql/pg_hba.conf @@ -3,6 +3,14 @@ # See DESIGN.md "Authentication" for rationale. # TYPE DATABASE USER ADDRESS METHOD +# Co-located pgbouncer: connects over the unix socket as the `pgbouncer` +# auth_user (PASSWORD NULL) to run auth_query. It runs as the `postgres` OS user, +# so `peer` (which maps OS name -> DB name) rejects the `pgbouncer` DB user with +# "Peer authentication failed". Trust is safe here: unix socket only, same VM, +# and the role exists solely for the auth_query lookup. MUST precede the +# catch-all `local all all peer` (pg_hba is first-match). +local all pgbouncer trust + # Unix socket: peer auth for local admin and bootstrap scripts local all all peer diff --git a/src/boot.rs b/src/boot.rs index e756b47..6539f46 100644 --- a/src/boot.rs +++ b/src/boot.rs @@ -68,12 +68,36 @@ pub async fn run() { /// Idempotent boot-time setup. Called by `supervisor` before spawning Postgres. pub async fn do_boot(cfg: &MmdsConfig) -> Result<(), BootError> { + ensure_socket_dir(); match cfg.pg_tier { PgTier::Single | PgTier::Primary => do_boot_primary(cfg).await, PgTier::Replica => do_boot_replica(cfg).await, } } +/// Ensure the Postgres unix-socket directory exists and is owned by postgres. +/// +/// `/run/postgresql` (= `/var/run/postgresql`, [`pg::PG_SOCKET_DIR`]) is normally +/// materialized by systemd-tmpfiles, but the Beyond VM has no systemd — PID 1 is +/// `beyond-pg-init`. Without it, Postgres dies at startup with +/// `could not create lock file "/var/run/postgresql/.s.PGSQL..lock"`. +/// Idempotent: a no-op when the dir already exists with the right owner. +fn ensure_socket_dir() { + let dir = pg::PG_SOCKET_DIR; + if let Err(e) = std::fs::create_dir_all(dir) { + warn!("ensure_socket_dir: create {dir}: {e}"); + return; + } + match std::process::Command::new("chown") + .args(["postgres:postgres", dir]) + .status() + { + Ok(s) if s.success() => info!("socket dir {dir} ready (owner postgres)"), + Ok(s) => warn!("chown {dir} exited {s}; continuing"), + Err(e) => warn!("chown {dir} failed to spawn: {e}; continuing"), + } +} + async fn do_boot_primary(cfg: &MmdsConfig) -> Result<(), BootError> { info!("boot: step 1/8 maybe_initdb"); maybe_initdb(cfg).await?; @@ -186,6 +210,19 @@ async fn maybe_initdb(cfg: &MmdsConfig) -> Result<(), BootError> { std::fs::create_dir_all(PGDATA)?; } + // A fresh durable volume mounts root-owned and empty over the image's + // /var/lib/postgresql; chown the tree to postgres so the postgres-user + // initdb (and the cluster it creates) can write it. Idempotent / harmless + // when already postgres-owned (the image-baked ephemeral case). + match std::process::Command::new("chown") + .args(["-R", "postgres:postgres", "/var/lib/postgresql"]) + .status() + { + Ok(s) if s.success() => info!("chowned /var/lib/postgresql → postgres"), + Ok(s) => warn!("chown /var/lib/postgresql exited {s}; continuing"), + Err(e) => warn!("chown /var/lib/postgresql failed to spawn: {e}; continuing"), + } + run_initdb(&cfg.postgres_password).await?; Ok(()) } @@ -201,6 +238,11 @@ async fn run_initdb(password: &str) -> Result<(), BootError> { // Set 0600 before writing the password. std::fs::set_permissions(pwfile.path(), std::fs::Permissions::from_mode(0o600))?; std::fs::write(pwfile.path(), password)?; + // initdb runs as the postgres user (see pg::initdb); the pwfile is created + // root-owned 0600, so chown it to postgres so initdb can read it. + let _ = std::process::Command::new("chown") + .args(["postgres:postgres", pwfile.path().to_str().unwrap_or("")]) + .status(); let path_str = pwfile .path() @@ -707,8 +749,14 @@ async fn http_get(url: &str) -> Result, String> { fn write_config_files(cfg: &MmdsConfig, tls: &crate::tls::TlsConfig) -> Result<(), BootError> { use config::write_atomic; - // 00-beyond.conf — image opinions, overwritten every boot - write_atomic(Path::new(&config::beyond_conf_path()), config::BEYOND_CONF)?; + // 00-beyond.conf — image opinions, overwritten every boot. The + // shared_preload_libraries list is filtered to the extensions actually + // installed in this image so a missing module can't crash postgres at + // startup (see config::beyond_conf). + write_atomic( + Path::new(&config::beyond_conf_path()), + &config::beyond_conf(), + )?; // 05-tls.conf — resolved cert paths, overrides 00-beyond.conf's defaults // via alpha order under conf.d/. Numbered 05 so it lands after 04-replica. diff --git a/src/config.rs b/src/config.rs index c6ef753..c622421 100644 --- a/src/config.rs +++ b/src/config.rs @@ -24,6 +24,80 @@ use crate::pg::PGDATA; pub const BEYOND_CONF: &str = include_str!("../packer/files/postgresql/00-beyond.conf"); +/// Directory holding the PostgreSQL shared-object extension modules. Matches +/// the PG18 Debian-derived layout (`pg_config --pkglibdir`) used everywhere in +/// this image — sibling to the `/var/lib/postgresql/18/...` paths in `pg.rs`. +const PKGLIBDIR: &str = "/usr/lib/postgresql/18/lib"; + +/// `00-beyond.conf` with `shared_preload_libraries` filtered down to the +/// libraries actually installed in this image. +/// +/// `00-beyond.conf` lists every extension the platform *wants* preloaded, but +/// the standalone postgres primitive ships without the auth/queue milestone +/// (`beyond_auth`, `beyond_queue`) or pgdg's `pg_cron` (dropped on a version +/// pin). preloading a missing module makes postgres die at startup with +/// `FATAL: could not access file "": No such file or directory`. +/// +/// This makes the supervisor self-adapting: with the extensions installed it +/// preloads them; without, it drops them and postgres boots. `pg_stat_statements` +/// and `auto_explain` ship with core postgres so they always survive the filter. +pub fn beyond_conf() -> String { + filter_shared_preload_libraries(BEYOND_CONF, PKGLIBDIR) +} + +/// Returns true iff `{pkglibdir}/{lib}.so` exists. Core-postgres libraries +/// (`pg_stat_statements`, `auto_explain`) are present in any standard install. +fn library_installed(pkglibdir: &str, lib: &str) -> bool { + Path::new(pkglibdir).join(format!("{lib}.so")).exists() +} + +/// Post-process a `postgresql.conf` body: rewrite the +/// `shared_preload_libraries = '...'` line, keeping only libraries whose shared +/// object exists under `pkglibdir`. Lines without that key pass through +/// untouched. If every listed library is missing, the key is emitted empty +/// (`shared_preload_libraries = ''`) rather than dropped, so an operator can +/// still see the (now-empty) setting. +fn filter_shared_preload_libraries(conf: &str, pkglibdir: &str) -> String { + const KEY: &str = "shared_preload_libraries"; + let mut out = String::with_capacity(conf.len()); + for line in conf.lines() { + if let Some(filtered) = filter_preload_line(line, KEY, pkglibdir) { + out.push_str(&filtered); + } else { + out.push_str(line); + } + out.push('\n'); + } + out +} + +/// If `line` is a `shared_preload_libraries = '...'` assignment, return the +/// rewritten line with only installed libraries kept; otherwise `None`. +fn filter_preload_line(line: &str, key: &str, pkglibdir: &str) -> Option { + let trimmed = line.trim_start(); + // Don't touch comments. + if trimmed.starts_with('#') { + return None; + } + let rest = trimmed.strip_prefix(key)?; + // The next non-space char after the key must be '=' (avoid matching e.g. + // `shared_preload_libraries.foo`). + let after_key = rest.trim_start(); + let value_part = after_key.strip_prefix('=')?; + // Extract the single-quoted list value. + let value = value_part.trim(); + let inner = value.strip_prefix('\'')?.strip_suffix('\'')?; + + let kept: Vec<&str> = inner + .split(',') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .filter(|lib| library_installed(pkglibdir, lib)) + .collect(); + + Some(format!("{key} = '{}'", kept.join(","))) +} + pub const PG_HBA_CONF: &str = include_str!("../packer/files/postgresql/pg_hba.conf"); pub const PGBOUNCER_INI_BASE: &str = include_str!("../packer/files/pgbouncer/pgbouncer.ini"); @@ -453,6 +527,76 @@ mod tests { use super::*; use crate::tls::{TlsConfig, TlsSource}; + #[test] + fn filter_preload_keeps_only_installed_libs() { + // Fake pkglibdir with only pg_stat_statements + auto_explain present. + let dir = tempfile::tempdir().unwrap(); + for lib in ["pg_stat_statements", "auto_explain"] { + std::fs::write(dir.path().join(format!("{lib}.so")), b"").unwrap(); + } + let pkglibdir = dir.path().to_str().unwrap(); + let conf = "shared_preload_libraries = 'pg_stat_statements,auto_explain,pg_cron,beyond_auth,beyond_queue'\nfoo = 1\n"; + let out = filter_shared_preload_libraries(conf, pkglibdir); + assert!( + out.contains("shared_preload_libraries = 'pg_stat_statements,auto_explain'"), + "missing libs should be dropped: {out}" + ); + assert!(!out.contains("pg_cron"), "pg_cron not installed: {out}"); + assert!(!out.contains("beyond_auth"), "beyond_auth not installed: {out}"); + assert!(!out.contains("beyond_queue"), "beyond_queue not installed: {out}"); + // Other lines untouched. + assert!(out.contains("foo = 1")); + } + + #[test] + fn filter_preload_all_present_is_unchanged() { + let dir = tempfile::tempdir().unwrap(); + for lib in ["pg_stat_statements", "auto_explain", "pg_cron"] { + std::fs::write(dir.path().join(format!("{lib}.so")), b"").unwrap(); + } + let pkglibdir = dir.path().to_str().unwrap(); + let conf = "shared_preload_libraries = 'pg_stat_statements,auto_explain,pg_cron'\n"; + let out = filter_shared_preload_libraries(conf, pkglibdir); + assert!(out.contains("shared_preload_libraries = 'pg_stat_statements,auto_explain,pg_cron'")); + } + + #[test] + fn filter_preload_all_missing_emits_empty_value() { + let dir = tempfile::tempdir().unwrap(); + let pkglibdir = dir.path().to_str().unwrap(); + let conf = "shared_preload_libraries = 'pg_cron,beyond_auth'\n"; + let out = filter_shared_preload_libraries(conf, pkglibdir); + assert!( + out.contains("shared_preload_libraries = ''"), + "all missing → empty value (key retained): {out}" + ); + } + + #[test] + fn filter_preload_ignores_comments_and_other_keys() { + let dir = tempfile::tempdir().unwrap(); + let pkglibdir = dir.path().to_str().unwrap(); + let conf = "# shared_preload_libraries = 'pg_cron'\nshared_preload_libraries.foo = 'bar'\n"; + let out = filter_shared_preload_libraries(conf, pkglibdir); + // Commented line passes through verbatim. + assert!(out.contains("# shared_preload_libraries = 'pg_cron'")); + // A different key (dotted) is not the assignment we rewrite. + assert!(out.contains("shared_preload_libraries.foo = 'bar'")); + } + + #[test] + fn filter_preload_handles_real_embedded_conf() { + // The embedded 00-beyond.conf must contain the key; filtering against a + // dir with only core libs must drop the milestone extensions. + let dir = tempfile::tempdir().unwrap(); + for lib in ["pg_stat_statements", "auto_explain"] { + std::fs::write(dir.path().join(format!("{lib}.so")), b"").unwrap(); + } + let out = filter_shared_preload_libraries(BEYOND_CONF, dir.path().to_str().unwrap()); + assert!(out.contains("shared_preload_libraries = 'pg_stat_statements,auto_explain'")); + assert!(!out.contains("'pg_stat_statements,auto_explain,pg_cron")); + } + #[test] fn tls_conf_platform_includes_ca() { let tls = TlsConfig { diff --git a/src/pg.rs b/src/pg.rs index 9a58449..0c7fd88 100644 --- a/src/pg.rs +++ b/src/pg.rs @@ -159,20 +159,24 @@ pub async fn reload() -> Result<(), PgError> { /// password (created by the caller as a `tempfile::NamedTempFile`). pub async fn initdb(pgdata: &str, pwfile_path: &str) -> Result<(), PgError> { debug!("running initdb in {pgdata}"); - let out = Command::new("initdb") - .args([ - "-D", - pgdata, - "--waldir", - "/var/lib/postgresql/18/wal", - "--auth=scram-sha-256", - "--encoding=UTF8", - "--locale=en_US.UTF-8", - &format!("--pwfile={pwfile_path}"), - ]) - .stderr(std::process::Stdio::piped()) - .output() - .await?; + let mut cmd = Command::new("initdb"); + cmd.args([ + "-D", + pgdata, + "--waldir", + "/var/lib/postgresql/18/wal", + "--auth=scram-sha-256", + "--encoding=UTF8", + "--locale=en_US.UTF-8", + &format!("--pwfile={pwfile_path}"), + ]) + .stderr(std::process::Stdio::piped()); + // initdb refuses to run as root; run it as the postgres OS user (the same + // user that will own the cluster and that postgres/psql drop to). Required + // when PGDATA is a fresh durable volume initialized at runtime (not baked + // into the image as ephemeral data). The caller chowns the tree first. + drop_to_postgres_user(&mut cmd); + let out = cmd.output().await?; if out.status.success() { debug!("initdb complete"); diff --git a/src/supervisor.rs b/src/supervisor.rs index bce762d..f1fd516 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -1722,9 +1722,24 @@ async fn post_start(cfg: &MmdsConfig) -> Result<(), Box> .await .map_err(|e| format!("failed to set up pgbouncer auth: {e}"))?; - // Required extensions — fail the supervisor if any are missing; the process - // manager will restart and retry rather than running in a degraded state. + // Required extensions — fail the supervisor if CREATE EXTENSION fails for + // an extension whose shared object IS installed; the process manager will + // restart and retry rather than running in a degraded state. + // + // But the standalone postgres primitive ships without the auth/queue + // milestone (beyond_auth/beyond_queue) and pgdg's pg_cron (version pin + // drift). When an extension's .so isn't installed, treat it as a warning, + // not a fatal — the same self-adapting posture as the + // shared_preload_libraries filter (see config::beyond_conf). With the + // extensions present, the behavior is unchanged (still hard-required). for ext in REQUIRED_EXTENSIONS { + if !extension_installed(ext) { + warn!( + "required extension {ext} not installed (no {ext}.so in {EXTENSION_PKGLIBDIR}); \ + skipping (auth/queue milestone not in this image)" + ); + continue; + } pg::psql(&format!("CREATE EXTENSION IF NOT EXISTS {ext}")) .await .map_err(|e| format!("required extension {ext} failed: {e}"))?; @@ -1798,6 +1813,20 @@ async fn setup_pgbouncer_auth() -> Result<(), crate::pg::PgError> { const REQUIRED_EXTENSIONS: &[&str] = &["beyond_auth", "beyond_queue", "pg_cron"]; +/// Directory holding PostgreSQL extension shared objects (PG18 Debian layout, +/// `pg_config --pkglibdir`). Mirrors `config`'s PKGLIBDIR. +const EXTENSION_PKGLIBDIR: &str = "/usr/lib/postgresql/18/lib"; + +/// True iff the extension's shared object is present in the image. An extension +/// listed in [`REQUIRED_EXTENSIONS`] but with no installed `.so` (e.g. the +/// future auth/queue milestone, or a dropped pgdg `pg_cron`) is downgraded from +/// fatal to a warning so the standalone primitive still boots. +fn extension_installed(ext: &str) -> bool { + std::path::Path::new(EXTENSION_PKGLIBDIR) + .join(format!("{ext}.so")) + .exists() +} + const OPTIONAL_EXTENSIONS: &[&str] = &[ "pg_stat_statements", "auto_explain", diff --git a/src/tls.rs b/src/tls.rs index 941c13d..25cd818 100644 --- a/src/tls.rs +++ b/src/tls.rs @@ -222,6 +222,9 @@ fn now_secs() -> u64 { // --------------------------------------------------------------------------- fn generate_cert(cert_path: &Path, key_path: &Path) -> Result<(), TlsError> { + // Ed25519: fast signing on every TLS handshake (the reason we picked it over + // P-256). The rootfs must ship an OpenSSL that can load Ed25519 keys — see + // the image build; Postgres loads the server key through OpenSSL. let key_pair = rcgen::KeyPair::generate_for(&rcgen::PKCS_ED25519)?; let mut params = CertificateParams::default(); @@ -246,15 +249,49 @@ fn generate_cert(cert_path: &Path, key_path: &Path) -> Result<(), TlsError> { // Key is written with mode 0o600 set *before* content. This closes the // window in which a chmod-after-write approach would leave the private // key briefly readable under a wider umask. - crate::config::write_atomic_bytes_with_mode( - key_path, - key_pair.serialize_pem().as_bytes(), - 0o600, - )?; + // + // rcgen (aws-lc-rs) serializes Ed25519 keys as PKCS#8 *v2* (OneAsymmetricKey + // with the trailing public-key field). OpenSSL 3.0.13 — the version on the + // ubuntu-noble base image, which Postgres links against — cannot decode v2 + // Ed25519 keys and rejects them with `unsupported`, so Postgres can't load + // its server key. Re-emit as standard PKCS#8 *v1* (identical Ed25519 key and + // performance; v2 only appends the public key, which the cert already + // carries). Falls back to rcgen's PEM if the key isn't the expected Ed25519 + // shape (e.g. a future algorithm change). + let key_pem = ed25519_pkcs8_v1_pem(&key_pair.serialize_der()) + .unwrap_or_else(|| key_pair.serialize_pem()); + crate::config::write_atomic_bytes_with_mode(key_path, key_pem.as_bytes(), 0o600)?; Ok(()) } +/// Re-encode an Ed25519 private key from rcgen's PKCS#8 v2 DER to the +/// universally-accepted PKCS#8 v1 PEM. Returns `None` if `der_v2` is not a +/// recognizable Ed25519 PKCS#8 (so the caller keeps rcgen's own encoding). +fn ed25519_pkcs8_v1_pem(der_v2: &[u8]) -> Option { + use base64::Engine as _; + // The privateKey field encodes the 32-byte seed identically in v1 and v2 as + // the byte run `04 22 04 20 ` (OCTET STRING { OCTET STRING(32) }). + const PRIV_PREFIX: [u8; 4] = [0x04, 0x22, 0x04, 0x20]; + let pos = der_v2.windows(4).position(|w| w == PRIV_PREFIX)?; + let seed = der_v2.get(pos + 4..pos + 4 + 32)?; + // Canonical 48-byte Ed25519 PKCS#8 v1: + // SEQ { INTEGER 0, SEQ { OID 1.3.101.112 }, OCTET STRING { OCTET STRING(32) seed } } + let mut der_v1 = vec![ + 0x30, 0x2e, 0x02, 0x01, 0x00, 0x30, 0x05, 0x06, 0x03, 0x2b, 0x65, 0x70, 0x04, 0x22, 0x04, + 0x20, + ]; + der_v1.extend_from_slice(seed); + let b64 = base64::engine::general_purpose::STANDARD.encode(&der_v1); + let mut pem = String::from("-----BEGIN PRIVATE KEY-----\n"); + for line in b64.as_bytes().chunks(64) { + pem.push_str(std::str::from_utf8(line).expect("base64 is ASCII")); + pem.push('\n'); + } + pem.push_str("-----END PRIVATE KEY-----\n"); + Some(pem) +} + /// Hostname for the cert CN. Reads `/etc/hostname` (written by `init::run()` /// from MMDS), falls back to `gethostname(2)`, then to `"localhost"`. fn hostname() -> String { From 43be4e242d3dfaeba7e81ec37aa3a111528349c5 Mon Sep 17 00:00:00 2001 From: Jared Lunde Date: Fri, 19 Jun 2026 18:00:36 -0700 Subject: [PATCH 2/3] fix(beyond-pg-sink): de-flake wal_gap_stalls_replica e2e test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The test wrote pre-backup rows + pg_switch_wal() guarded only by a non-fatal 30s wait for the sink to appear in pg_stat_replication. Under CI load the sink connected slowly, the wait fell through silently, and the pre-backup rows were written before the sink began streaming — so it started from a later LSN and never archived the pre-backup segment, panicking with the misleading "pre-backup segment never archived". Make the connect-wait fatal and extend it to 60s so the sink is provably streaming before the pre-backup rows are written, closing the race. Bump the seal deadline 30s -> 60s to match the sibling wal_sink_crash test. Test-only change; no product code touched. Co-Authored-By: Claude Opus 4.8 (1M context) --- beyond-pg-sink/tests/e2e.rs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/beyond-pg-sink/tests/e2e.rs b/beyond-pg-sink/tests/e2e.rs index 707e78a..828dd46 100644 --- a/beyond-pg-sink/tests/e2e.rs +++ b/beyond-pg-sink/tests/e2e.rs @@ -2857,7 +2857,14 @@ fn wal_gap_stalls_replica() { }; wait_http_ready(sink_port); - for _ in 0..60 { + // The sink streams from the consistent point it captures the moment it + // connects. We MUST confirm it is streaming (appears in + // pg_stat_replication) before writing the pre-backup rows below — otherwise + // a slow connect under CI load leaves the sink starting from a later LSN + // and the pre-backup segment is never archived. This wait is therefore + // fatal, and long enough (60s) to absorb a loaded runner. + let mut streaming = false; + for _ in 0..120 { if primary_client .query_opt( "SELECT 1 FROM pg_stat_replication WHERE application_name='wal_sink_gap'", @@ -2866,10 +2873,15 @@ fn wal_gap_stalls_replica() { .unwrap() .is_some() { + streaming = true; break; } std::thread::sleep(Duration::from_millis(500)); } + assert!( + streaming, + "sink never connected as a streaming standby (application_name='wal_sink_gap')" + ); // ── 3. pre-backup rows — archived ──────────────────────────────────────── primary_client @@ -2878,12 +2890,16 @@ fn wal_gap_stalls_replica() { INSERT INTO gap_test (v) SELECT 'pre-' || g FROM generate_series(1,50) g;", ) .unwrap(); + // pg_switch_wal pads the current segment to the boundary; the walsender + // streams the full padded segment to the (already-streaming) sink, which + // seals it once the write reaches 16 MiB. primary_client .execute("SELECT pg_switch_wal()", &[]) .unwrap(); - // Wait for the pre-backup segment to land in the sink. - let deadline = Instant::now() + Duration::from_secs(30); + // Wait for the pre-backup segment to land in the sink (60s to match the + // sibling crash test and absorb CI load). + let deadline = Instant::now() + Duration::from_secs(60); loop { let count = std::fs::read_dir(&sink_dir) .unwrap() From e389bc4b63dc8b97ec37b52980f6266de9bb9b77 Mon Sep 17 00:00:00 2001 From: Jared Lunde Date: Fri, 19 Jun 2026 18:14:42 -0700 Subject: [PATCH 3/3] fix(beyond-pg-init): speak the guest-ready vsock frame directly; drop beyond path dep MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `guest-runtime = { path = "../../beyond/rustlib/guest-runtime" }` dep pointed at a sibling Beyond checkout — fine locally, but CI clones only beyondoss/postgres so cargo couldn't resolve it (+ transitive vsock-protocol/guest-session/wire) and the build failed. It never needed Beyond's code: the guest-ready contract is one self-described vsock frame ([len u32 BE][type 0x81 Ready][msgpack ReadyPayload]) on cid=2 port=52. substrate.rs now sends it directly using crates this repo already has (tokio-vsock, rmp-serde) — zero Beyond deps, builds standalone. The wire layout is pinned by a fixture test (ready_frame_is_stable) so it can't silently drift from instd's rustlib/vsock-protocol decoder. Verified end-to-end on the homelab: create.completed fires and the full smoke-postgres proof (primary SQL + fork-branch) passes green. Cargo.lock no longer references the beyond workspace. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 202 +------------------------------- beyond-pg-init/Cargo.toml | 9 +- beyond-pg-init/src/substrate.rs | 184 +++++++++++++++++++++-------- 3 files changed, 144 insertions(+), 251 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 367dbf8..0ff590a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,41 +2,6 @@ # It is not intended for manual editing. version = 4 -[[package]] -name = "aead" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0" -dependencies = [ - "crypto-common 0.1.7", - "generic-array", -] - -[[package]] -name = "aes" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" -dependencies = [ - "cfg-if", - "cipher", - "cpufeatures 0.2.17", -] - -[[package]] -name = "aes-gcm" -version = "0.10.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "831010a0f742e1209b3bcea8fab6a8e149051ba6099432c8cb2cc117dec3ead1" -dependencies = [ - "aead", - "aes", - "cipher", - "ctr", - "ghash", - "subtle", -] - [[package]] name = "aho-corasick" version = "1.1.4" @@ -362,12 +327,13 @@ version = "0.1.0" dependencies = [ "beyond-handoff", "beyond-pg-core", - "guest-runtime", "libc", "nix 0.31.2", + "rmp-serde", "serde", "serde_json", "tokio", + "tokio-vsock", ] [[package]] @@ -562,23 +528,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ "iana-time-zone", - "js-sys", "num-traits", "serde", - "wasm-bindgen", "windows-link", ] -[[package]] -name = "cipher" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" -dependencies = [ - "crypto-common 0.1.7", - "inout", -] - [[package]] name = "clap" version = "4.6.1" @@ -712,7 +666,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" dependencies = [ "generic-array", - "rand_core 0.6.4", "typenum", ] @@ -725,15 +678,6 @@ dependencies = [ "hybrid-array", ] -[[package]] -name = "ctr" -version = "0.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" -dependencies = [ - "cipher", -] - [[package]] name = "ctutils" version = "0.4.2" @@ -1140,46 +1084,6 @@ dependencies = [ "wasip3", ] -[[package]] -name = "ghash" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0d8a4362ccb29cb0b265253fb0a2728f592895ee6854fd9bc13f2ffda266ff1" -dependencies = [ - "opaque-debug", - "polyval", -] - -[[package]] -name = "guest-runtime" -version = "0.1.0" -dependencies = [ - "aes-gcm", - "base64", - "guest-session", - "io_utils", - "libc", - "serde_json", - "thiserror 2.0.18", - "tokio", - "tracing", - "vsock-protocol", - "wire", -] - -[[package]] -name = "guest-session" -version = "0.1.0" -dependencies = [ - "libc", - "nix 0.30.1", - "thiserror 2.0.18", - "tokio", - "tracing", - "uuid", - "vsock-protocol", -] - [[package]] name = "h2" version = "0.4.14" @@ -1598,23 +1502,6 @@ dependencies = [ "serde_core", ] -[[package]] -name = "inout" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" -dependencies = [ - "generic-array", -] - -[[package]] -name = "io_utils" -version = "0.1.0" -dependencies = [ - "nix 0.29.0", - "tokio", -] - [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -1849,19 +1736,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "nix" -version = "0.29.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" -dependencies = [ - "bitflags", - "cfg-if", - "cfg_aliases", - "libc", - "memoffset", -] - [[package]] name = "nix" version = "0.30.1" @@ -2025,12 +1899,6 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" -[[package]] -name = "opaque-debug" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" - [[package]] name = "openssl" version = "0.10.79" @@ -2189,18 +2057,6 @@ version = "0.3.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" -[[package]] -name = "polyval" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d1fe60d06143b2430aa532c94cfe9e29783047f06c0d7fd359a9a51b729fa25" -dependencies = [ - "cfg-if", - "cpufeatures 0.2.17", - "opaque-debug", - "universal-hash", -] - [[package]] name = "portable-atomic" version = "1.13.1" @@ -2459,15 +2315,6 @@ dependencies = [ "rand_core 0.9.5", ] -[[package]] -name = "rand_core" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" -dependencies = [ - "getrandom 0.2.17", -] - [[package]] name = "rand_core" version = "0.9.5" @@ -2801,16 +2648,6 @@ dependencies = [ "serde_derive", ] -[[package]] -name = "serde_bytes" -version = "0.11.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5d440709e79d88e51ac01c4b72fc6cb7314017bb7da9eeff678aa94c10e3ea8" -dependencies = [ - "serde", - "serde_core", -] - [[package]] name = "serde_core" version = "1.0.228" @@ -3509,16 +3346,6 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" -[[package]] -name = "universal-hash" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea" -dependencies = [ - "crypto-common 0.1.7", - "subtle", -] - [[package]] name = "untrusted" version = "0.9.0" @@ -3623,19 +3450,6 @@ dependencies = [ "nix 0.31.2", ] -[[package]] -name = "vsock-protocol" -version = "0.1.0" -dependencies = [ - "rmp-serde", - "serde", - "serde_bytes", - "serde_json", - "thiserror 2.0.18", - "tokio", - "wire", -] - [[package]] name = "wal-proto" version = "0.1.0" @@ -4067,18 +3881,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" -[[package]] -name = "wire" -version = "0.1.0" -dependencies = [ - "chrono", - "rmp-serde", - "serde", - "serde_bytes", - "serde_json", - "thiserror 2.0.18", -] - [[package]] name = "wit-bindgen" version = "0.51.0" diff --git a/beyond-pg-init/Cargo.toml b/beyond-pg-init/Cargo.toml index 56b4cc8..a70553f 100644 --- a/beyond-pg-init/Cargo.toml +++ b/beyond-pg-init/Cargo.toml @@ -17,8 +17,11 @@ serde_json = "1" [target.'cfg(target_os = "linux")'.dependencies] libc = "0.2" handoff = { version = "0.1", package = "beyond-handoff" } -# Shared substrate SDK: the guest-ready vsock handshake instd waits for, plus -# the admin exec/ping channel. Path dep into the beyond workspace. -guest-runtime = { path = "../../beyond/rustlib/guest-runtime" } +# The guest-ready handshake instd waits for is a single self-described vsock +# frame; we speak it directly (see src/substrate.rs) rather than depend on the +# Beyond workspace, so this repo builds standalone in CI. The wire contract is +# pinned by a fixture test. nix = { version = "0.31", features = ["mount"] } tokio = { version = "1", features = ["rt", "macros", "net", "time", "io-util"] } +tokio-vsock = "0.7" +rmp-serde = "1" diff --git a/beyond-pg-init/src/substrate.rs b/beyond-pg-init/src/substrate.rs index 4cb0e5f..f711352 100644 --- a/beyond-pg-init/src/substrate.rs +++ b/beyond-pg-init/src/substrate.rs @@ -1,22 +1,61 @@ //! Substrate vsock "guest ready" handshake for `beyond-pg-init`. //! -//! instd waits ~30s after spawning firecracker for the guest to complete the -//! [`guest_runtime::VsockClient::connect`] handshake (which sends a `Ready` -//! message); without it instd reports "guest not ready: timeout" and the -//! create fails. `beyond-pg-init` is a sync PID 1 (see [`crate::supervise`]), -//! so we host the async handshake + keep-alive loop on a dedicated OS thread -//! running a current-thread tokio runtime for the VM's lifetime. +//! After spawning Firecracker, instd waits ~30s for the guest to report ready +//! over vsock; without it the VM's create never completes ("guest not ready: +//! timeout"). The contract is a single self-described frame on the host vsock +//! channel — we speak it directly here so this repo builds standalone (no +//! dependency on the Beyond workspace): //! -//! The handshake is soft-fail: if the connection can't be established (e.g. no -//! AF_VSOCK in a Docker test environment) we log a warning and let the thread -//! exit, exactly as `service-init`'s `EnvelopeSink::try_connect` does. The -//! supervise loop owns shutdown via signalfd (instd also sends SIGTERM), so on -//! a `Shutdown` event we only log — we never poweroff from this thread. +//! ```text +//! connect: AF_VSOCK cid=2 (host) port=52 +//! frame: [len: u32 BE = 1 + payload][type: u8 = 0x81 Ready][payload: MessagePack] +//! payload = rmp_serde::to_vec_named(ReadyPayload) // string keys +//! ``` +//! +//! instd marks the guest ready the moment it reads the `Ready` frame; we then +//! hold the connection open for the VM's lifetime so the host keeps seeing the +//! guest as present. The frame layout mirrors `rustlib/vsock-protocol` in the +//! Beyond repo — kept honest by `tests::ready_frame_is_stable` below; if instd +//! ever changes the wire, that fixture must change in lockstep. +//! +//! Soft-fail throughout: no AF_VSOCK (e.g. a Docker test box) or a failed +//! connect is logged and the thread exits — never fatal. The supervise loop +//! owns shutdown via signalfd (instd also sends SIGTERM), so this thread never +//! powers the VM off. + +use serde::Serialize; + +/// Host vsock context id (`VMADDR_CID_HOST`). +const HOST_CID: u32 = 2; +/// Substrate vsock port instd listens on (`vsock_protocol::VSOCK_PORT`). +const SUBSTRATE_PORT: u32 = 52; +/// `Ready` message discriminator (`vsock_protocol::MessageType::Ready`). +const MSG_READY: u8 = 0x81; + +/// Agent → host "ready after boot" payload. A field-compatible subset of +/// `vsock_protocol::ReadyPayload` — only the always-present fields; the rest are +/// `skip_serializing_if`/`default` on the host side and so may be omitted. +#[derive(Serialize)] +struct ReadyPayload { + agent_version: String, + boot_time_ms: u64, + reconnect: bool, +} -/// Spawn the dedicated `substrate-vsock` thread that performs the guest-runtime -/// `Ready` handshake and then keeps the connection alive for the VM's lifetime. +/// Encode the `Ready` frame: `[len: u32 BE][type][MessagePack payload]`. +/// Length covers the type byte + payload (not the 4 length bytes themselves). +fn encode_ready_frame(payload: &ReadyPayload) -> Result, rmp_serde::encode::Error> { + let body = rmp_serde::to_vec_named(payload)?; + let mut frame = ((body.len() as u32) + 1).to_be_bytes().to_vec(); + frame.push(MSG_READY); + frame.extend_from_slice(&body); + Ok(frame) +} + +/// Spawn the dedicated `substrate-vsock` thread that performs the guest-ready +/// handshake and then keeps the connection alive for the VM's lifetime. /// -/// Returns immediately; the handshake happens on the spawned thread. Soft-fail +/// Returns immediately; everything happens on the spawned thread. Soft-fail /// throughout: a failed spawn or a failed connect is logged, never fatal. pub fn spawn_handshake() { let builder = std::thread::Builder::new().name("substrate-vsock".to_string()); @@ -26,9 +65,9 @@ pub fn spawn_handshake() { } fn run() { - // Current-thread runtime: this thread does nothing but host the single - // vsock connection and its keep-alive loop, so a multi-thread scheduler - // would only waste worker threads. + // Current-thread runtime: this thread only hosts the single vsock + // connection + keep-alive read, so a multi-thread scheduler would just + // waste workers. let runtime = match tokio::runtime::Builder::new_current_thread() .enable_all() .build() @@ -39,47 +78,96 @@ fn run() { return; } }; - runtime.block_on(handshake_loop()); + runtime.block_on(handshake()); } -async fn handshake_loop() { - let cfg = guest_runtime::VsockClientConfig::for_primitive(format!( - "beyond-pg-init/{}", - env!("CARGO_PKG_VERSION") - )); - let mut client = match guest_runtime::VsockClient::connect(&cfg).await { - Ok(client) => { - eprintln!("[init] substrate vsock handshake complete; guest reported ready"); - client +async fn handshake() { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio_vsock::{VsockAddr, VsockStream}; + + let payload = ReadyPayload { + agent_version: format!("beyond-pg-init/{}", env!("CARGO_PKG_VERSION")), + boot_time_ms: read_uptime_ms(), + reconnect: false, + }; + let frame = match encode_ready_frame(&payload) { + Ok(f) => f, + Err(e) => { + eprintln!("[init] WARNING: encode Ready frame failed: {e}"); + return; } + }; + + let mut conn = match VsockStream::connect(VsockAddr::new(HOST_CID, SUBSTRATE_PORT)).await { + Ok(c) => c, Err(e) => { - // Soft-fail like service-init: no vsock (e.g. Docker tests) just - // means no host substrate to report to. Let the thread exit. - eprintln!("[init] WARNING: substrate vsock connect failed; guest-ready unreported: {e}"); + // Soft-fail: no AF_VSOCK (Docker tests) or no host listener just + // means there's no substrate to report to. Let the thread exit. + eprintln!( + "[init] WARNING: substrate vsock connect failed; guest-ready unreported: {e}" + ); return; } }; + if let Err(e) = conn.write_all(&frame).await { + eprintln!("[init] WARNING: substrate Ready write failed: {e}"); + return; + } + let _ = conn.flush().await; + eprintln!("[init] substrate vsock handshake complete; guest reported ready"); - // Serve the administrative exec/ping/shell channel on the same connection - // and keep it alive so instd sees the guest as live. The loop only exits - // on a Shutdown event or a connection error. - client.enable_admin(guest_runtime::SessionConfig::default()); + // Hold the connection open for the VM's lifetime so the host keeps seeing + // the guest as present. We don't serve admin exec/ping here (the Postgres VM + // is reached over the VPC via pgbouncer, not the admin channel); inbound + // bytes (ReadyAck, heartbeats) are read and ignored. EOF/err → thread exits. + let mut buf = [0u8; 512]; loop { - match client.next_event().await { - Ok(guest_runtime::SubstrateEvent::Shutdown) => { - // The supervise loop owns shutdown via signalfd (instd also - // sends SIGTERM); we only log and let this thread exit. - eprintln!("[init] substrate requested shutdown; vsock loop exiting"); - break; - } - Ok(guest_runtime::SubstrateEvent::AppMessage(_)) => { - // beyond-pg-init has no workload-env channel; ignore app frames. - } + match conn.read(&mut buf).await { + Ok(0) => break, Ok(_) => {} - Err(e) => { - eprintln!("[init] substrate vsock loop ended (connection closed): {e}"); - break; - } + Err(_) => break, } } } + +/// Milliseconds since kernel boot, from `/proc/uptime`. Best-effort (0 on any +/// read/parse failure) — it's only telemetry in the Ready payload. +fn read_uptime_ms() -> u64 { + std::fs::read_to_string("/proc/uptime") + .ok() + .and_then(|s| s.split_whitespace().next().map(str::to_owned)) + .and_then(|s| s.parse::().ok()) + .map(|secs| (secs * 1000.0) as u64) + .unwrap_or(0) +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Pins the on-wire `Ready` frame so it can't silently drift from instd's + /// `rustlib/vsock-protocol` decoder: a 4-byte BE length covering type + + /// payload, the `0x81` type byte, then a MessagePack *map* (string keys) + /// carrying the payload fields. + #[test] + fn ready_frame_is_stable() { + let p = ReadyPayload { + agent_version: "beyond-pg-init/0.1.0".to_string(), + boot_time_ms: 1234, + reconnect: false, + }; + let frame = encode_ready_frame(&p).unwrap(); + + let len = u32::from_be_bytes(frame[0..4].try_into().unwrap()) as usize; + assert_eq!(len, frame.len() - 4, "length covers type + payload"); + assert_eq!(frame[4], MSG_READY, "type byte is Ready (0x81)"); + + // Body round-trips to the named fields (string-keyed map = to_vec_named). + let body = &frame[5..]; + let v: serde_json::Value = rmp_serde::from_slice(body).unwrap(); + let obj = v.as_object().expect("Ready payload must be a map"); + assert!(obj.contains_key("agent_version")); + assert!(obj.contains_key("boot_time_ms")); + assert!(obj.contains_key("reconnect")); + } +}