From ace5d45d3abbb0fa9451f5d90f22da66f9ad2309 Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Wed, 10 Jun 2026 13:55:57 -0700 Subject: [PATCH 1/3] fix(server): stop commands flush batches and resolve on hardware write Stop commands previously fire-and-forgot into the device io channel, where message_gap batching could leave the stop write sitting in pending_commands until the batch deadline. Since shutdown order is stop then disconnect, the disconnect routinely beat the deadline and dropped the pending stop write unflushed, leaving the device running. The io-channel payload is now DeviceTaskMessage, carrying the commands plus an optional oneshot write-acknowledgement sender. A message with an ack is urgent: the io task merges it into any pending batch (existing dedupe), flushes everything to hardware immediately regardless of the batch deadline, then fires the ack. Messages without an ack keep the exact prior batching behaviour, so normal output is unchanged. The stop path accumulates the hardware commands from every per-feature stop OutputCmd into a single write-acknowledged batch and awaits the ack, so stop() (and therefore stop_devices()/shutdown) resolves only once the stop write has reached hardware. The wait is bounded by a runtime-agnostic 1s timeout (select! against async_manager::sleep, not tokio::time, for WASM parity); a wedged or dead device resolves Ok rather than hanging shutdown. The io task's token-cancel and channel-close exits now best-effort flush pending_commands first so a stop in the batch window still lands; the hardware-Disconnected exit deliberately does not flush, since the hardware is gone. Co-Authored-By: Claude Fable 5 --- .../src/device/device_handle.rs | 139 ++++++++++++++---- .../buttplug_server/src/device/device_task.rs | 104 ++++++++++--- 2 files changed, 194 insertions(+), 49 deletions(-) diff --git a/crates/buttplug_server/src/device/device_handle.rs b/crates/buttplug_server/src/device/device_handle.rs index 759dc482..16b139cf 100644 --- a/crates/buttplug_server/src/device/device_handle.rs +++ b/crates/buttplug_server/src/device/device_handle.rs @@ -26,7 +26,7 @@ use buttplug_core::{ OutputValue, StopCmdV4, }, - util::{stream::convert_broadcast_receiver_to_stream, task::TaskScope}, + util::{async_manager, stream::convert_broadcast_receiver_to_stream, task::TaskScope}, }; use buttplug_server_device_config::{ DeviceConfigurationManager, @@ -36,10 +36,13 @@ use buttplug_server_device_config::{ }; use dashmap::DashMap; use futures::future::{self, BoxFuture, FutureExt}; -use tokio::sync::{ - broadcast, - mpsc::{Sender, channel}, - oneshot, +use tokio::{ + select, + sync::{ + broadcast, + mpsc::{Sender, channel}, + oneshot, + }, }; use tokio_stream::StreamExt; use uuid::Uuid; @@ -58,7 +61,7 @@ use crate::{ use super::{ InternalDeviceEvent, OutputObservation, - device_task::{DeviceTaskConfig, spawn_device_task}, + device_task::{DeviceTaskConfig, DeviceTaskMessage, spawn_device_task}, hardware::{Hardware, HardwareCommand, HardwareConnector, HardwareEvent}, protocol::{ProtocolHandler, ProtocolKeepaliveStrategy, ProtocolSpecializer}, }; @@ -109,7 +112,7 @@ pub struct DeviceHandle { legacy_attributes: ServerDeviceAttributes, last_output_command: Arc>, stop_commands: Arc>, - internal_hw_msg_sender: Sender>, + internal_hw_msg_sender: Sender, output_observation_sender: Option>, /// Scope owning this device's tasks. Rides in an Arc since DeviceHandle is /// Clone; the subtree cancels when the last clone drops. @@ -127,7 +130,7 @@ impl DeviceHandle { definition: ServerDeviceDefinition, identifier: UserDeviceIdentifier, stop_commands: Vec, - internal_hw_msg_sender: Sender>, + internal_hw_msg_sender: Sender, output_observation_sender: Option>, task_scope: Arc, ) -> Self { @@ -272,11 +275,32 @@ impl DeviceHandle { // --- Private command handling methods --- fn handle_outputcmd_v4(&self, msg: &CheckedOutputCmdV4) -> ButtplugServerResultFuture { + match self.output_cmd_hardware_commands(msg) { + // Deduped (no-op) or empty: nothing to send, succeed immediately. + None => future::ready(Ok(message::OkV0::default().into())).boxed(), + Some(Err(err)) => future::ready(Err(err)).boxed(), + Some(Ok(commands)) => self.handle_hardware_commands(commands, false), + } + } + + /// Run the bookkeeping for an output command (dedupe map, output-observation + /// emit) and ask the protocol handler for the hardware commands it produces. + /// + /// Returns `None` when the command is a deduped no-op (nothing to send), + /// `Some(Err(..))` when the handler errored, and `Some(Ok(cmds))` otherwise. + /// Shared by `handle_outputcmd_v4` (normal output) and the stop path, so both + /// keep identical dedupe-map and observation behaviour. The stop path + /// accumulates the returned commands across all stop messages and flushes them + /// in a single write-acknowledged batch. + fn output_cmd_hardware_commands( + &self, + msg: &CheckedOutputCmdV4, + ) -> Option, ButtplugError>> { if let Some(last_msg) = self.last_output_command.get(&msg.feature_id()) && *last_msg == *msg { trace!("No commands generated for incoming device packet, skipping and returning success."); - return future::ready(Ok(message::OkV0::default().into())).boxed(); + return None; } self .last_output_command @@ -293,37 +317,94 @@ impl DeviceHandle { }); } - self.handle_generic_command_result(self.handler.handle_output_cmd(msg)) + Some(self.handler.handle_output_cmd(msg).map_err(|e| e.into())) } - fn handle_hardware_commands(&self, commands: Vec) -> ButtplugServerResultFuture { + /// Queue hardware commands for the device io task. + /// + /// When `wait_for_write` is false (all normal output paths) this is + /// fire-and-forget: the commands are dropped into the io channel and the + /// future resolves immediately, leaving any batching to the io task. + /// + /// When `wait_for_write` is true (the stop path) the commands carry a + /// write-acknowledgement channel: the io task flushes the batch to hardware + /// immediately and fires the ack, and this future only resolves once that + /// write has gone out (or the bounded wait elapses / the device goes away). + fn handle_hardware_commands( + &self, + commands: Vec, + wait_for_write: bool, + ) -> ButtplugServerResultFuture { let sender = self.internal_hw_msg_sender.clone(); async move { - let _ = sender.send(commands).await; + if wait_for_write { + let (ack_sender, ack_receiver) = oneshot::channel(); + if sender + .send(DeviceTaskMessage { + commands, + ack: Some(ack_sender), + }) + .await + .is_err() + { + debug!("Device io task gone, skipping write acknowledgement wait."); + return Ok(message::OkV0::default().into()); + } + // Bounded wait for the write ack. There is no tokio::time on WASM, so + // race the receiver against a runtime-agnostic sleep instead of + // tokio::time::timeout. A wedged or dead device must not hang shutdown + // or error a stop, so any non-success outcome resolves Ok (matching the + // existing swallow semantics; scope cancellation handles stragglers). + select! { + ack = ack_receiver => { + if ack.is_err() { + debug!("Device io task dropped ack; device likely disconnecting."); + } + } + _ = async_manager::sleep(Duration::from_secs(1)) => { + warn!("Timed out waiting for stop command write acknowledgement."); + } + } + } else { + let _ = sender + .send(DeviceTaskMessage { + commands, + ack: None, + }) + .await; + } Ok(message::OkV0::default().into()) } .boxed() } - fn handle_generic_command_result( - &self, - command_result: Result, ButtplugDeviceError>, - ) -> ButtplugServerResultFuture { - let hardware_commands = match command_result { - Ok(commands) => commands, - Err(err) => return future::ready(Err(err.into())).boxed(), - }; - - self.handle_hardware_commands(hardware_commands) - } - fn handle_stop_device_cmd(&self, msg: &StopCmdV4) -> ButtplugServerResultFuture { let mut fut_vec = vec![]; if msg.outputs() { - self - .stop_commands - .iter() - .for_each(|msg| fut_vec.push(self.parse_message(msg.clone()))); + // Accumulate the hardware commands from every stop output into a single + // write-acknowledged batch. A multi-feature device produces one stop + // OutputCmd per feature, but the protocol handler folds them into one + // accumulated write; sending each individually with its own ack would + // emit one write per feature instead. Collecting them and issuing a + // single acked flush keeps the on-wire output identical to normal output + // batching while still resolving only once the stop write has landed. + // + // Stop commands are always OutputCmds (see build_device_handle); any + // other shape falls back to the normal fire-and-forget parse path. + let mut stop_hardware_commands = vec![]; + for stop_msg in self.stop_commands.iter() { + match stop_msg { + ButtplugDeviceCommandMessageUnionV4::OutputCmd(output_msg) => { + match self.output_cmd_hardware_commands(output_msg) { + None => {} + Some(Ok(commands)) => stop_hardware_commands.extend(commands), + Some(Err(err)) => fut_vec.push(future::ready(Err(err)).boxed()), + } + } + other => fut_vec.push(self.parse_message(other.clone())), + } + } + fut_vec.push(self.handle_hardware_commands(stop_hardware_commands, true)); } if msg.inputs() { self.definition.features().iter().for_each(|(i, f)| { @@ -536,7 +617,7 @@ pub(super) async fn build_device_handle( let strategy = handler.keepalive_strategy(); // Create the hardware command channel and spawn the device task - let (internal_hw_msg_sender, internal_hw_msg_recv) = channel::>(1024); + let (internal_hw_msg_sender, internal_hw_msg_recv) = channel::(1024); let device_wait_duration = if let Some(gap) = definition.message_gap_ms() { Some(Duration::from_millis(gap as u64)) diff --git a/crates/buttplug_server/src/device/device_task.rs b/crates/buttplug_server/src/device/device_task.rs index 8664ba35..15e2fb1c 100644 --- a/crates/buttplug_server/src/device/device_task.rs +++ b/crates/buttplug_server/src/device/device_task.rs @@ -34,6 +34,15 @@ pub struct DeviceTaskConfig { pub keepalive_strategy: ProtocolKeepaliveStrategy, } +/// A batch of hardware commands for the device io task, optionally carrying a +/// write acknowledgement channel. When `ack` is present the batch is urgent: +/// the io task flushes it (and any pending batched commands) to hardware +/// immediately, then fires the ack. +pub struct DeviceTaskMessage { + pub commands: Vec, + pub ack: Option>, +} + /// Spawn the device communication task. /// /// This task handles: @@ -48,7 +57,7 @@ pub fn spawn_device_task( hardware: Arc, _handler: Arc, config: DeviceTaskConfig, - mut command_receiver: Receiver>, + mut command_receiver: Receiver, ) { task_scope.spawn("io", move |token| async move { run_device_task(hardware, config, &mut command_receiver, token).await; @@ -62,7 +71,7 @@ pub fn spawn_device_task( async fn run_device_task( hardware: Arc, config: DeviceTaskConfig, - command_receiver: &mut Receiver>, + command_receiver: &mut Receiver, token: CancellationToken, ) { let mut hardware_events = hardware.event_stream(); @@ -93,6 +102,23 @@ async fn run_device_task( let mut pending_commands: VecDeque = VecDeque::new(); let mut batch_deadline: Option = None; + // Write every pending command to hardware in order, updating the keepalive + // packet as writes go out. Shared by the receive arm (urgent acked flush), + // the batch-deadline arm, and the flush-on-exit hardening below. + async fn flush_pending( + hardware: &Hardware, + pending_commands: &mut VecDeque, + track_keepalive: bool, + keepalive_packet: &mut Option, + ) { + while let Some(cmd) = pending_commands.pop_front() { + let _ = hardware.parse_message(&cmd).await; + if track_keepalive && let HardwareCommand::Write(ref write_cmd) = cmd { + *keepalive_packet = Some(write_cmd.clone()); + } + } + } + loop { // Calculate keepalive timeout let keepalive_fut = async { @@ -121,17 +147,54 @@ async fn run_device_task( // Priority 0: Cooperative cancellation - wins over new work. _ = token.cancelled() => { info!("Device task cancelled, shutting down"); + // Best-effort flush of any batched writes (e.g. a stop command still + // sitting in the batch window) before exiting. The hardware is still + // present here, so the writes can land; write errors are swallowed. + flush_pending( + &hardware, + &mut pending_commands, + track_keepalive, + &mut keepalive_packet, + ) + .await; return; } // Priority 1: Incoming commands msg = command_receiver.recv() => { - let Some(commands) = msg else { + let Some(DeviceTaskMessage { commands, ack }) = msg else { info!("No longer receiving messages from device parent, breaking"); + // The command channel closed (all DeviceHandles dropped). Hardware is + // still present, so best-effort flush any batched writes before exit. + flush_pending( + &hardware, + &mut pending_commands, + track_keepalive, + &mut keepalive_packet, + ) + .await; break; }; - if let Some(device_wait_duration) = device_wait_duration { + if let Some(ack) = ack { + // Urgent write-acknowledged batch (stop path). Merge with any pending + // batch using the existing dedupe, flush everything to hardware now + // regardless of the batch deadline, then fire the ack so the caller + // only resolves once the writes have actually gone out. + for command in commands { + pending_commands.retain(|existing| !command.overlaps(existing)); + pending_commands.push_back(command); + } + flush_pending( + &hardware, + &mut pending_commands, + track_keepalive, + &mut keepalive_packet, + ) + .await; + batch_deadline = None; + let _ = ack.send(()); + } else if let Some(device_wait_duration) = device_wait_duration { // Batching enabled if pending_commands.is_empty() { // First batch - add directly without deduplication (matches old behavior) @@ -147,28 +210,27 @@ async fn run_device_task( } else { // No batching - send immediately trace!("No wait duration, sending commands immediately: {:?}", commands); - for cmd in commands { - let _ = hardware.parse_message(&cmd).await; - if track_keepalive - && let HardwareCommand::Write(ref write_cmd) = cmd - { - keepalive_packet = Some(write_cmd.clone()); - } - } + pending_commands.extend(commands); + flush_pending( + &hardware, + &mut pending_commands, + track_keepalive, + &mut keepalive_packet, + ) + .await; } } // Priority 2: Batch deadline reached - flush pending commands _ = batch_fut => { trace!("Batch deadline reached, sending {} commands", pending_commands.len()); - while let Some(cmd) = pending_commands.pop_front() { - let _ = hardware.parse_message(&cmd).await; - if track_keepalive - && let HardwareCommand::Write(ref write_cmd) = cmd - { - keepalive_packet = Some(write_cmd.clone()); - } - } + flush_pending( + &hardware, + &mut pending_commands, + track_keepalive, + &mut keepalive_packet, + ) + .await; batch_deadline = None; } @@ -208,6 +270,8 @@ async fn run_device_task( hw_event = hardware_events.recv() => { if matches!(hw_event, Ok(HardwareEvent::Disconnected(_))) || hw_event.is_err() { info!("Hardware disconnected, shutting down task"); + // Do NOT flush pending_commands here: the hardware is gone, so writes + // would fail and any pending stop is moot. return; } } From f10dbe166dbfadd5ab0bd782f7560db7de5935f8 Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Wed, 10 Jun 2026 14:07:54 -0700 Subject: [PATCH 2/3] test: stop commands verified written before stop/shutdown resolve MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two RED-verified lifecycle tests pinning the stop-command write-ack contract, plus the minimal test-util change they need. Test util: TestDevice now carries a configurable message_gap (default 1ms, unchanged for all existing tests). add_test_device_with_message_gap and test_server_with_device_and_message_gap let a test widen the device io task's batch window so the stop-write race is deterministic rather than relying on the 1ms default. test_stop_resolves_only_after_stop_write_reaches_hardware (Test A): with a 500ms message gap and an active vibrate, a StopCmd must not resolve until the resulting stop write ([0xF1, 0x00]) has reached the hardware channel. Pre-fix this fails deterministically — StopCmd fire-and-forgets and resolves while the write sits in the batch window. test_shutdown_writes_stop_before_resolving (Test B): the original incident. With the same batched device in an active output state, server.shutdown() must record the device's stop write before it resolves. Pre-fix the disconnect tore the io task down mid-batch and dropped the pending stop write. Both retire the write-observation limitation documented on test_shutdown_under_load_drains_subtree (1ms harness gap made it flaky). RED verified by reverting the two production files to HEAD~1 (2 failed / 0 passed across three runs); restored exactly (empty production diff); GREEN and stable across five runs. Full buttplug_tests suite green incl. 808 conformance. Co-Authored-By: Claude Fable 5 --- .../tests/test_task_lifecycle.rs | 174 +++++++++++++++++- crates/buttplug_tests/tests/util/mod.rs | 16 ++ .../util/test_device_manager/test_device.rs | 28 ++- .../test_device_comm_manager.rs | 33 +++- 4 files changed, 239 insertions(+), 12 deletions(-) diff --git a/crates/buttplug_tests/tests/test_task_lifecycle.rs b/crates/buttplug_tests/tests/test_task_lifecycle.rs index 4425c444..86d296f6 100644 --- a/crates/buttplug_tests/tests/test_task_lifecycle.rs +++ b/crates/buttplug_tests/tests/test_task_lifecycle.rs @@ -17,14 +17,78 @@ use buttplug_core::{ OutputValue, RequestServerInfoV4, StartScanningV0, + StopCmdV4, }, util::task::registry, }; -use buttplug_server::message::ButtplugClientMessageVariant; +use buttplug_server::{device::hardware::HardwareCommand, message::ButtplugClientMessageVariant}; use futures::{StreamExt, pin_mut}; use std::time::Duration; +use util::TestDeviceChannelHost; use util::stalling_device_communication_manager::StallingDeviceCommunicationManagerBuilder; -use util::{test_server_with_comm_manager, test_server_with_device}; +use util::{ + test_server_with_comm_manager, + test_server_with_device, + test_server_with_device_and_message_gap, +}; + +/// The Aneros "Massage Demo" stop write for feature 0: `[0xF1, 0x00]`. A vibrate +/// on the same feature writes `[0xF1, 0x40]`; the stop resets it to zero. See +/// `device_test_case/test_aneros_protocol.yaml` for the full sequence. +const ANEROS_STOP_WRITE_FEATURE_0: [u8; 2] = [0xF1, 0x00]; + +/// Non-blockingly drain every hardware write the test device has recorded so +/// far and return whether any of them is the feature-0 stop write. We use +/// `try_recv` so the check reflects exactly what was on the wire *at the moment +/// stop/shutdown resolved* — a write still sitting in the device io task's batch +/// window has not reached the host channel yet and will not be counted. +fn recorded_a_stop_write(host: &mut TestDeviceChannelHost) -> bool { + let mut saw_stop = false; + while let Ok(command) = host.receiver.try_recv() { + if let HardwareCommand::Write(write) = command + && write.data().as_slice() == ANEROS_STOP_WRITE_FEATURE_0 + { + saw_stop = true; + } + } + saw_stop +} + +/// Bring a "Massage Demo" device online under `server`, returning its device +/// index. Mirrors the handshake/scan/connect dance the other lifecycle tests do. +async fn bring_device_online(server: &buttplug_server::ButtplugServer, client_name: &str) -> u32 { + let recv = server.server_version_event_stream(); + pin_mut!(recv); + server + .parse_message(ButtplugClientMessageVariant::V4( + RequestServerInfoV4::new( + client_name, + BUTTPLUG_CURRENT_API_MAJOR_VERSION, + BUTTPLUG_CURRENT_API_MINOR_VERSION, + ) + .into(), + )) + .await + .expect("server info request should succeed"); + server + .parse_message(ButtplugClientMessageVariant::V4( + StartScanningV0::default().into(), + )) + .await + .expect("start scanning should succeed"); + tokio::time::timeout(Duration::from_secs(5), async { + while let Some(msg) = recv.next().await { + if let ButtplugServerMessageV4::DeviceList(list) = msg + && let Some((&idx, _)) = list.devices().iter().next() + { + return idx; + } + } + panic!("device event stream ended before a device connected"); + }) + .await + .expect("timed out waiting for device to connect") +} /// Bring a real (test-harness) device online and confirm that, once the server /// is shut down and dropped, no tasks remain registered under the scope tree. @@ -296,3 +360,109 @@ async fn test_shutdown_resolves_with_stalled_bringup() { .expect("shutdown hung with a stalled device bringup — bringup is not honoring its cancellation token") .expect("server shutdown errored"); } + +/// Test A — the stop-write-acknowledgement contract: a `StopCmd` must not +/// resolve until the resulting stop write has actually reached the hardware, +/// even when the device io task is batching commands over a long message gap. +/// +/// This retires the limitation documented on `test_shutdown_under_load_drains_subtree` +/// (the 1ms harness gap made write observation flaky). Here we deliberately give +/// the device a 500ms message gap so the batching window is large and the race +/// is deterministic: an active vibrate write lands, the stop write is queued +/// into that 500ms batch, and we assert the stop write is on the wire *by the +/// time the stop message resolves*. +/// +/// RED (pre-fix, `git stash` Tasks 1-2): `handle_hardware_commands` fire-and-forgets +/// the stop write into the io channel and `parse_message` for the StopCmd resolves +/// immediately, while the write sits unflushed in the 500ms batch. `try_recv` +/// finds no stop write and this assertion fails deterministically. +/// +/// GREEN (with the ack-on-write fix): the stop path requests a write +/// acknowledgement; the io task urgent-flushes the batch and fires the ack, so +/// StopCmd only resolves after the write is on the wire. +#[tokio::test] +async fn test_stop_resolves_only_after_stop_write_reaches_hardware() { + // 500ms gap: large enough that, pre-fix, the stop write provably has not been + // flushed by the time StopCmd resolves (which is ~immediate). + let (server, mut device) = + test_server_with_device_and_message_gap("Massage Demo", Duration::from_millis(500)); + + let device_index = bring_device_online(&server, "Stop Write Ack Test").await; + + // Put the device into an actively-running state so the stop has real work to + // flush. This vibrate write itself enters the batch window. + server + .parse_message(ButtplugClientMessageVariant::V4( + OutputCmdV4::new( + device_index, + 0, + OutputCommand::Vibrate(OutputValue::new(50)), + ) + .into(), + )) + .await + .expect("vibrate command should succeed"); + + // Stop. With ack-on-write, parse_message for StopCmd resolves only after the + // stop write has been flushed to hardware. + server + .parse_message(ButtplugClientMessageVariant::V4( + StopCmdV4::new(Some(device_index), None, true, true).into(), + )) + .await + .expect("stop command should succeed"); + + // The instant stop resolves, the stop write must already be on the wire. No + // sleep here: that is the whole point — we are asserting ordering, not + // eventual delivery. + assert!( + recorded_a_stop_write(&mut device), + "StopCmd resolved but the stop write was not yet recorded on the hardware channel — \ + it is still sitting in the device io task's batch window" + ); +} + +/// Test B — the original incident: with a batched device in an active output +/// state, `server.shutdown()` must drive the per-device stop write all the way +/// to hardware before it resolves. This is the assertion the task-scope work +/// could not make with the 1ms harness gap (the disconnect tore the io task down +/// inside the batch window, dropping the pending stop write). With ack-on-write, +/// stop_devices waits for the write before shutdown proceeds to disconnect. +/// +/// RED (pre-fix): shutdown's stop_devices fire-and-forgets the stop write, then +/// disconnect tears down the io task mid-batch and the write is dropped — no stop +/// write is ever recorded. +/// +/// GREEN (with the fix): the stop write is acknowledged before disconnect, so it +/// is recorded before shutdown resolves. +#[tokio::test] +async fn test_shutdown_writes_stop_before_resolving() { + let (server, mut device) = + test_server_with_device_and_message_gap("Massage Demo", Duration::from_millis(500)); + + let device_index = bring_device_online(&server, "Shutdown Stop Write Test").await; + + server + .parse_message(ButtplugClientMessageVariant::V4( + OutputCmdV4::new( + device_index, + 0, + OutputCommand::Vibrate(OutputValue::new(50)), + ) + .into(), + )) + .await + .expect("vibrate command should succeed"); + + tokio::time::timeout(Duration::from_secs(10), server.shutdown()) + .await + .expect("shutdown did not resolve in time") + .expect("server shutdown errored"); + + // By the time shutdown resolved, the stop write must have reached hardware. + assert!( + recorded_a_stop_write(&mut device), + "shutdown() resolved but the device's stop write was never recorded — \ + the pending stop write was dropped when the io task was torn down mid-batch" + ); +} diff --git a/crates/buttplug_tests/tests/util/mod.rs b/crates/buttplug_tests/tests/util/mod.rs index 9f4e07e8..a8246035 100644 --- a/crates/buttplug_tests/tests/util/mod.rs +++ b/crates/buttplug_tests/tests/util/mod.rs @@ -170,6 +170,22 @@ pub fn test_server_with_device(device_type: &str) -> (ButtplugServer, TestDevice (test_server_with_comm_manager(builder), device) } +/// Like `test_server_with_device`, but the device's io task batches commands +/// over `message_gap`. A large gap widens the stop-write batching window so +/// tests can deterministically observe whether stop commands are flushed to +/// hardware before stop/shutdown resolve. +#[allow(dead_code)] +pub fn test_server_with_device_and_message_gap( + device_type: &str, + message_gap: std::time::Duration, +) -> (ButtplugServer, TestDeviceChannelHost) { + let mut builder = TestDeviceCommunicationManagerBuilder::default(); + let device = builder + .add_test_device_with_message_gap(&TestDeviceIdentifier::new(device_type, None), message_gap); + + (test_server_with_comm_manager(builder), device) +} + #[allow(dead_code)] pub fn test_server_v4_with_device(device_type: &str) -> (ButtplugServer, TestDeviceChannelHost) { let mut builder = TestDeviceCommunicationManagerBuilder::default(); diff --git a/crates/buttplug_tests/tests/util/test_device_manager/test_device.rs b/crates/buttplug_tests/tests/util/test_device_manager/test_device.rs index 91916c91..5e4c5665 100644 --- a/crates/buttplug_tests/tests/util/test_device_manager/test_device.rs +++ b/crates/buttplug_tests/tests/util/test_device_manager/test_device.rs @@ -115,12 +115,16 @@ impl HardwareSpecializer for TestHardwareSpecializer { } } } + // The device carries its own message gap (default 1ms for the slight delay + // multi-message protocols expect). Tests that need a large batching window + // to make the stop-write race deterministic override it via + // `add_test_device_with_message_gap`. + let message_gap = device.message_gap(); let hardware = Hardware::new( &device.name(), &device.address(), &endpoints, - // Add slight delay for protocols with multiple messages. - &Some(Duration::from_millis(1)), + &Some(message_gap), false, Box::new(device), ); @@ -153,10 +157,15 @@ pub fn new_device_channel() -> (TestDeviceChannelHost, TestDeviceChannelDevice) ) } +/// Default inter-message gap for test hardware. A slight delay multi-message +/// protocols expect; small enough that ordinary tests are unaffected. +const DEFAULT_MESSAGE_GAP: Duration = Duration::from_millis(1); + pub struct TestDevice { name: String, address: String, endpoints: HashSet, + message_gap: Duration, test_device_channel: mpsc::Sender, event_sender: broadcast::Sender, subscribed_endpoints: Arc>, @@ -166,6 +175,16 @@ pub struct TestDevice { impl TestDevice { #[allow(dead_code)] pub fn new(name: &str, address: &str, test_device_channel: TestDeviceChannelDevice) -> Self { + Self::new_with_message_gap(name, address, DEFAULT_MESSAGE_GAP, test_device_channel) + } + + #[allow(dead_code)] + pub fn new_with_message_gap( + name: &str, + address: &str, + message_gap: Duration, + test_device_channel: TestDeviceChannelDevice, + ) -> Self { let (event_sender, _) = broadcast::channel(256); let event_sender_clone = event_sender.clone(); @@ -210,6 +229,7 @@ impl TestDevice { name: name.to_owned(), address: address.to_owned(), endpoints: HashSet::new(), + message_gap, test_device_channel: command_sender, event_sender, subscribed_endpoints, @@ -221,6 +241,10 @@ impl TestDevice { self.endpoints.insert(*endpoint); } + pub fn message_gap(&self) -> Duration { + self.message_gap + } + pub fn name(&self) -> String { self.name.clone() } diff --git a/crates/buttplug_tests/tests/util/test_device_manager/test_device_comm_manager.rs b/crates/buttplug_tests/tests/util/test_device_manager/test_device_comm_manager.rs index 6e0d18a7..81fe9c27 100644 --- a/crates/buttplug_tests/tests/util/test_device_manager/test_device_comm_manager.rs +++ b/crates/buttplug_tests/tests/util/test_device_manager/test_device_comm_manager.rs @@ -30,7 +30,7 @@ use std::{ Arc, atomic::{AtomicBool, Ordering}, }, - time::{SystemTime, UNIX_EPOCH}, + time::{Duration, SystemTime, UNIX_EPOCH}, }; use tokio::sync::mpsc::Sender; @@ -74,8 +74,11 @@ impl TestDeviceIdentifier { } } +/// Default inter-message gap for test hardware, matching `TestDevice`'s default. +const DEFAULT_MESSAGE_GAP: Duration = Duration::from_millis(1); + pub struct TestDeviceCommunicationManagerBuilder { - devices: Option>, + devices: Option>, } impl Default for TestDeviceCommunicationManagerBuilder { @@ -88,12 +91,24 @@ impl Default for TestDeviceCommunicationManagerBuilder { impl TestDeviceCommunicationManagerBuilder { pub fn add_test_device(&mut self, device: &TestDeviceIdentifier) -> TestDeviceChannelHost { + self.add_test_device_with_message_gap(device, DEFAULT_MESSAGE_GAP) + } + + /// Register a test device whose io task batches commands over `message_gap`. + /// A large gap (e.g. 500ms) makes the stop-write batching race deterministic + /// for tests asserting that stop commands are flushed before stop resolves. + #[allow(dead_code)] + pub fn add_test_device_with_message_gap( + &mut self, + device: &TestDeviceIdentifier, + message_gap: Duration, + ) -> TestDeviceChannelHost { let (host_channel, device_channel) = new_device_channel(); self .devices .as_mut() .expect("Devices vec does not exist, is this running twice?") - .push((device.clone(), device_channel)); + .push((device.clone(), message_gap, device_channel)); host_channel } } @@ -115,26 +130,28 @@ impl HardwareCommunicationManagerBuilder for TestDeviceCommunicationManagerBuild fn new_uninitialized_ble_test_device( identifier: &TestDeviceIdentifier, + message_gap: Duration, device_channel: TestDeviceChannelDevice, ) -> TestHardwareConnector { let address = identifier.address.clone(); let specifier = ProtocolCommunicationSpecifier::BluetoothLE( BluetoothLESpecifier::new_from_device(&identifier.name, &HashMap::new(), &[]), ); - let hardware = TestDevice::new(&identifier.name, &address, device_channel); + let hardware = + TestDevice::new_with_message_gap(&identifier.name, &address, message_gap, device_channel); TestHardwareConnector::new(specifier, hardware) } pub struct TestDeviceCommunicationManager { device_sender: Sender, - devices: Vec<(TestDeviceIdentifier, TestDeviceChannelDevice)>, + devices: Vec<(TestDeviceIdentifier, Duration, TestDeviceChannelDevice)>, is_scanning: Arc, } impl TestDeviceCommunicationManager { pub fn new( device_sender: Sender, - devices: Vec<(TestDeviceIdentifier, TestDeviceChannelDevice)>, + devices: Vec<(TestDeviceIdentifier, Duration, TestDeviceChannelDevice)>, ) -> Self { Self { device_sender, @@ -156,8 +173,8 @@ impl HardwareCommunicationManager for TestDeviceCommunicationManager { let mut events = vec![]; - while let Some((device, test_channel)) = self.devices.pop() { - let device_creator = new_uninitialized_ble_test_device(&device, test_channel); + while let Some((device, message_gap, test_channel)) = self.devices.pop() { + let device_creator = new_uninitialized_ble_test_device(&device, message_gap, test_channel); events.push(HardwareCommunicationManagerEvent::DeviceFound { name: device.name.clone(), From e7a431f55dd6cc2e366978bb2499864de83a9fbc Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Wed, 10 Jun 2026 14:18:37 -0700 Subject: [PATCH 3/3] docs: clean stale comments flagged in stop-ack review Co-Authored-By: Claude Fable 5 --- crates/buttplug_server/src/device/device_handle.rs | 1 - crates/buttplug_tests/tests/test_task_lifecycle.rs | 7 +++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/buttplug_server/src/device/device_handle.rs b/crates/buttplug_server/src/device/device_handle.rs index 16b139cf..d994ae60 100644 --- a/crates/buttplug_server/src/device/device_handle.rs +++ b/crates/buttplug_server/src/device/device_handle.rs @@ -308,7 +308,6 @@ impl DeviceHandle { if let Some(sender) = &self.output_observation_sender { // OutputType derives Display via strum, producing clean names like "Vibrate", "Rotate". - // The design uses format!("{:?}") but to_string() is preferred for clean output. let _ = sender.send(OutputObservation { device_index: self.definition.index(), feature_index: msg.feature_index(), diff --git a/crates/buttplug_tests/tests/test_task_lifecycle.rs b/crates/buttplug_tests/tests/test_task_lifecycle.rs index 86d296f6..251e17df 100644 --- a/crates/buttplug_tests/tests/test_task_lifecycle.rs +++ b/crates/buttplug_tests/tests/test_task_lifecycle.rs @@ -229,6 +229,13 @@ async fn test_server_shutdown_leaves_no_tasks() { /// ordering bug, so a write-observation assertion is inherently flaky here. An /// instrumented-ordering variant was also attempted and found inherently flaky /// with this harness, so it is deliberately not pursued. +/// +/// UPDATE: with the stop-command write acknowledgement (DeviceTaskMessage ack) +/// and a configurable per-device message_gap, the write observation IS now +/// possible — see `test_stop_resolves_only_after_stop_write_reaches_hardware` +/// and `test_shutdown_writes_stop_before_resolving` below, which retire the +/// limitation described in the NOTE above for the acked stop path. This test +/// remains as the coarse hang/leak smoke test. #[tokio::test] async fn test_shutdown_under_load_drains_subtree() { // Hold the channel so the device stays connected through shutdown.