From ba327af780e1cdb3cd45e1576dea6ddcb824c721 Mon Sep 17 00:00:00 2001 From: Jared Lunde Date: Sun, 21 Jun 2026 12:04:37 -0700 Subject: [PATCH] beyond-pg-init: report PSI memory pressure to the host over vsock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The host memory controller (instd) right-sizes each VM's balloon/hotplug from a distress signal. Its sharpest signal is Linux PSI (/proc/pressure/memory), which measures memory-stall time directly and catches the buffered read() cache misses a database generates — exactly the thrash a Postgres VM hits when its page cache is squeezed. But this primitive never reported any guest resource stats, so the controller was flying blind on the workload that needs it most. Add a periodic (30s) GuestResourceStats (0xA2) report carrying PSI some/full avg10, multiplexed onto the existing substrate connection alongside heartbeats and log relay. We send PSI only (disk_total omitted) so the host skips disk billing for this report. The frame is byte-compatible with instd's vsock_protocol::GuestResourceStatsPayload decoder; resource_stats_frame_is_stable pins the wire format the same way ready_frame_is_stable does, so it can't drift. Requires the guest kernel booted with psi=1 (instd sets this on the cmdline); if PSI is unavailable the reporter simply sends nothing and the controller falls back to its balloon-stat signals. Co-Authored-By: Claude Opus 4.8 (1M context) --- beyond-pg-init/src/substrate.rs | 116 ++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/beyond-pg-init/src/substrate.rs b/beyond-pg-init/src/substrate.rs index 024c46f..128a8b0 100644 --- a/beyond-pg-init/src/substrate.rs +++ b/beyond-pg-init/src/substrate.rs @@ -34,8 +34,11 @@ const MSG_READY: u8 = 0x81; // Agent → host: ready after boot. const MSG_HEARTBEAT: u8 = 0x02; // Host → agent: liveness probe. const MSG_HEARTBEAT_RESP: u8 = 0x82; // Agent → host: heartbeat reply. const MSG_SHUTDOWN: u8 = 0x04; // Host → agent: shutdown requested. +const MSG_GUEST_RESOURCE_STATS: u8 = 0xA2; // Agent → host: periodic resource stats. /// Frame length ceiling — sanity bound so a corrupt length can't allocate wild. const MAX_FRAME: u32 = 16 * 1024 * 1024; +/// How often to report guest memory pressure to the host. +const RESOURCE_STATS_PERIOD: std::time::Duration = std::time::Duration::from_secs(30); /// Agent → host "ready after boot" payload. A field-compatible subset of /// `vsock_protocol::ReadyPayload` — only the always-present fields; the rest are @@ -53,6 +56,62 @@ struct HeartbeatPayload { timestamp: u64, } +/// Agent → host periodic resource stats. A field-compatible subset of +/// `vsock_protocol::GuestResourceStatsPayload`: we report only PSI memory +/// pressure, so the host's memory controller can right-size this VM. `seq` and +/// `disk_used_bytes` are required by the host struct (sent as 0); we omit +/// `disk_total_bytes` so the host skips disk billing for this report (Postgres +/// disk usage is not tracked here). Keys must match the host decoder exactly. +#[derive(Serialize)] +struct GuestResourceStatsPayload { + seq: u64, + disk_used_bytes: u64, + #[serde(skip_serializing_if = "Option::is_none")] + psi_mem_some_avg10: Option, + #[serde(skip_serializing_if = "Option::is_none")] + psi_mem_full_avg10: Option, +} + +/// Read Linux PSI memory pressure `(some.avg10, full.avg10)` from +/// `/proc/pressure/memory`. `None` if PSI is unavailable (kernel without +/// `CONFIG_PSI` / not booted with `psi=1`) or the file can't be read. +fn read_memory_pressure() -> Option<(f64, f64)> { + let raw = std::fs::read_to_string("/proc/pressure/memory").ok()?; + parse_memory_pressure(&raw) +} + +/// Parse `some.avg10` / `full.avg10` from `/proc/pressure/memory` text. Split +/// out from the file read so it's testable without `/proc`. +fn parse_memory_pressure(raw: &str) -> Option<(f64, f64)> { + let mut some = None; + let mut full = None; + for line in raw.lines() { + let mut fields = line.split_ascii_whitespace(); + let kind = fields.next(); + let avg10 = fields.find_map(|f| f.strip_prefix("avg10=")?.parse::().ok()); + match kind { + Some("some") => some = avg10, + Some("full") => full = avg10, + _ => {} + } + } + Some((some?, full.unwrap_or(0.0))) +} + +/// Encode a `GuestResourceStats` frame carrying current PSI memory pressure, or +/// `None` if PSI is unavailable this tick. +fn encode_resource_stats_frame() -> Option> { + let (some, full) = read_memory_pressure()?; + let payload = GuestResourceStatsPayload { + seq: 0, + disk_used_bytes: 0, + psi_mem_some_avg10: Some(some), + psi_mem_full_avg10: Some(full), + }; + let body = rmp_serde::to_vec_named(&payload).ok()?; + Some(encode_frame(MSG_GUEST_RESOURCE_STATS, &body)) +} + /// Frame a message: `[len: u32 BE = 1 + payload][type][MessagePack payload]`. fn encode_frame(msg_type: u8, body: &[u8]) -> Vec { let mut frame = ((body.len() as u32) + 1).to_be_bytes().to_vec(); @@ -164,9 +223,24 @@ where let (log_tx, mut log_rx) = tokio::sync::mpsc::channel::(1024); spawn_log_sink(log_tx); + // Periodic guest memory-pressure (PSI) report for the host memory + // controller. Fire-and-forget like heartbeats; a write error breaks the + // loop and the thread exits (same as any vsock failure). + let mut psi_interval = tokio::time::interval(RESOURCE_STATS_PERIOD); + psi_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut len_buf = [0u8; 4]; loop { tokio::select! { + // Periodic PSI report. + _ = psi_interval.tick() => { + if let Some(frame) = encode_resource_stats_frame() { + if conn.write_all(&frame).await.is_err() { + break; + } + let _ = conn.flush().await; + } + } // Inbound substrate frames (heartbeat / shutdown / ignored). read = conn.read_exact(&mut len_buf) => { if read.is_err() { @@ -325,4 +399,46 @@ mod tests { assert!(obj.contains_key("boot_time_ms")); assert!(obj.contains_key("reconnect")); } + + /// Pins the `GuestResourceStats` (0xA2) frame so it can't drift from instd's + /// `vsock_protocol::GuestResourceStatsPayload` decoder: BE length, the type + /// byte, then a MessagePack map whose keys match the host struct. `seq` and + /// `disk_used_bytes` are required host fields; `disk_total_bytes` is omitted + /// so the host skips disk billing; the PSI keys carry the signal. + #[test] + fn resource_stats_frame_is_stable() { + let payload = GuestResourceStatsPayload { + seq: 0, + disk_used_bytes: 0, + psi_mem_some_avg10: Some(12.34), + psi_mem_full_avg10: Some(0.5), + }; + let body = rmp_serde::to_vec_named(&payload).unwrap(); + let frame = encode_frame(MSG_GUEST_RESOURCE_STATS, &body); + + let len = u32::from_be_bytes(frame[0..4].try_into().unwrap()) as usize; + assert_eq!(len, frame.len() - 4, "length covers type + payload"); + assert_eq!(frame[4], MSG_GUEST_RESOURCE_STATS, "type byte is 0xA2"); + + let v: serde_json::Value = rmp_serde::from_slice(&frame[5..]).unwrap(); + let obj = v.as_object().expect("payload must be a map"); + assert!(obj.contains_key("seq")); + assert!(obj.contains_key("disk_used_bytes")); + assert!(obj.contains_key("psi_mem_some_avg10")); + assert!(obj.contains_key("psi_mem_full_avg10")); + assert!( + !obj.contains_key("disk_total_bytes"), + "disk_total omitted so the host skips disk billing" + ); + } + + #[test] + fn parses_psi_memory() { + let raw = "\ +some avg10=12.34 avg60=5.00 avg300=1.20 total=461476658 +full avg10=0.50 avg60=0.10 avg300=0.00 total=422631474 +"; + assert_eq!(parse_memory_pressure(raw), Some((12.34, 0.50))); + assert_eq!(parse_memory_pressure(""), None); + } }