diff --git a/crates/buttplug_server/src/device/device_handle.rs b/crates/buttplug_server/src/device/device_handle.rs index 759dc4824..d994ae60b 100644 --- a/crates/buttplug_server/src/device/device_handle.rs +++ b/crates/buttplug_server/src/device/device_handle.rs @@ -26,7 +26,7 @@ use buttplug_core::{ OutputValue, StopCmdV4, }, - util::{stream::convert_broadcast_receiver_to_stream, task::TaskScope}, + util::{async_manager, stream::convert_broadcast_receiver_to_stream, task::TaskScope}, }; use buttplug_server_device_config::{ DeviceConfigurationManager, @@ -36,10 +36,13 @@ use buttplug_server_device_config::{ }; use dashmap::DashMap; use futures::future::{self, BoxFuture, FutureExt}; -use tokio::sync::{ - broadcast, - mpsc::{Sender, channel}, - oneshot, +use tokio::{ + select, + sync::{ + broadcast, + mpsc::{Sender, channel}, + oneshot, + }, }; use tokio_stream::StreamExt; use uuid::Uuid; @@ -58,7 +61,7 @@ use crate::{ use super::{ InternalDeviceEvent, OutputObservation, - device_task::{DeviceTaskConfig, spawn_device_task}, + device_task::{DeviceTaskConfig, DeviceTaskMessage, spawn_device_task}, hardware::{Hardware, HardwareCommand, HardwareConnector, HardwareEvent}, protocol::{ProtocolHandler, ProtocolKeepaliveStrategy, ProtocolSpecializer}, }; @@ -109,7 +112,7 @@ pub struct DeviceHandle { legacy_attributes: ServerDeviceAttributes, last_output_command: Arc>, stop_commands: Arc>, - internal_hw_msg_sender: Sender>, + internal_hw_msg_sender: Sender, output_observation_sender: Option>, /// Scope owning this device's tasks. Rides in an Arc since DeviceHandle is /// Clone; the subtree cancels when the last clone drops. @@ -127,7 +130,7 @@ impl DeviceHandle { definition: ServerDeviceDefinition, identifier: UserDeviceIdentifier, stop_commands: Vec, - internal_hw_msg_sender: Sender>, + internal_hw_msg_sender: Sender, output_observation_sender: Option>, task_scope: Arc, ) -> Self { @@ -272,11 +275,32 @@ impl DeviceHandle { // --- Private command handling methods --- fn handle_outputcmd_v4(&self, msg: &CheckedOutputCmdV4) -> ButtplugServerResultFuture { + match self.output_cmd_hardware_commands(msg) { + // Deduped (no-op) or empty: nothing to send, succeed immediately. + None => future::ready(Ok(message::OkV0::default().into())).boxed(), + Some(Err(err)) => future::ready(Err(err)).boxed(), + Some(Ok(commands)) => self.handle_hardware_commands(commands, false), + } + } + + /// Run the bookkeeping for an output command (dedupe map, output-observation + /// emit) and ask the protocol handler for the hardware commands it produces. + /// + /// Returns `None` when the command is a deduped no-op (nothing to send), + /// `Some(Err(..))` when the handler errored, and `Some(Ok(cmds))` otherwise. + /// Shared by `handle_outputcmd_v4` (normal output) and the stop path, so both + /// keep identical dedupe-map and observation behaviour. The stop path + /// accumulates the returned commands across all stop messages and flushes them + /// in a single write-acknowledged batch. + fn output_cmd_hardware_commands( + &self, + msg: &CheckedOutputCmdV4, + ) -> Option, ButtplugError>> { if let Some(last_msg) = self.last_output_command.get(&msg.feature_id()) && *last_msg == *msg { trace!("No commands generated for incoming device packet, skipping and returning success."); - return future::ready(Ok(message::OkV0::default().into())).boxed(); + return None; } self .last_output_command @@ -284,7 +308,6 @@ impl DeviceHandle { if let Some(sender) = &self.output_observation_sender { // OutputType derives Display via strum, producing clean names like "Vibrate", "Rotate". - // The design uses format!("{:?}") but to_string() is preferred for clean output. let _ = sender.send(OutputObservation { device_index: self.definition.index(), feature_index: msg.feature_index(), @@ -293,37 +316,94 @@ impl DeviceHandle { }); } - self.handle_generic_command_result(self.handler.handle_output_cmd(msg)) + Some(self.handler.handle_output_cmd(msg).map_err(|e| e.into())) } - fn handle_hardware_commands(&self, commands: Vec) -> ButtplugServerResultFuture { + /// Queue hardware commands for the device io task. + /// + /// When `wait_for_write` is false (all normal output paths) this is + /// fire-and-forget: the commands are dropped into the io channel and the + /// future resolves immediately, leaving any batching to the io task. + /// + /// When `wait_for_write` is true (the stop path) the commands carry a + /// write-acknowledgement channel: the io task flushes the batch to hardware + /// immediately and fires the ack, and this future only resolves once that + /// write has gone out (or the bounded wait elapses / the device goes away). + fn handle_hardware_commands( + &self, + commands: Vec, + wait_for_write: bool, + ) -> ButtplugServerResultFuture { let sender = self.internal_hw_msg_sender.clone(); async move { - let _ = sender.send(commands).await; + if wait_for_write { + let (ack_sender, ack_receiver) = oneshot::channel(); + if sender + .send(DeviceTaskMessage { + commands, + ack: Some(ack_sender), + }) + .await + .is_err() + { + debug!("Device io task gone, skipping write acknowledgement wait."); + return Ok(message::OkV0::default().into()); + } + // Bounded wait for the write ack. There is no tokio::time on WASM, so + // race the receiver against a runtime-agnostic sleep instead of + // tokio::time::timeout. A wedged or dead device must not hang shutdown + // or error a stop, so any non-success outcome resolves Ok (matching the + // existing swallow semantics; scope cancellation handles stragglers). + select! { + ack = ack_receiver => { + if ack.is_err() { + debug!("Device io task dropped ack; device likely disconnecting."); + } + } + _ = async_manager::sleep(Duration::from_secs(1)) => { + warn!("Timed out waiting for stop command write acknowledgement."); + } + } + } else { + let _ = sender + .send(DeviceTaskMessage { + commands, + ack: None, + }) + .await; + } Ok(message::OkV0::default().into()) } .boxed() } - fn handle_generic_command_result( - &self, - command_result: Result, ButtplugDeviceError>, - ) -> ButtplugServerResultFuture { - let hardware_commands = match command_result { - Ok(commands) => commands, - Err(err) => return future::ready(Err(err.into())).boxed(), - }; - - self.handle_hardware_commands(hardware_commands) - } - fn handle_stop_device_cmd(&self, msg: &StopCmdV4) -> ButtplugServerResultFuture { let mut fut_vec = vec![]; if msg.outputs() { - self - .stop_commands - .iter() - .for_each(|msg| fut_vec.push(self.parse_message(msg.clone()))); + // Accumulate the hardware commands from every stop output into a single + // write-acknowledged batch. A multi-feature device produces one stop + // OutputCmd per feature, but the protocol handler folds them into one + // accumulated write; sending each individually with its own ack would + // emit one write per feature instead. Collecting them and issuing a + // single acked flush keeps the on-wire output identical to normal output + // batching while still resolving only once the stop write has landed. + // + // Stop commands are always OutputCmds (see build_device_handle); any + // other shape falls back to the normal fire-and-forget parse path. + let mut stop_hardware_commands = vec![]; + for stop_msg in self.stop_commands.iter() { + match stop_msg { + ButtplugDeviceCommandMessageUnionV4::OutputCmd(output_msg) => { + match self.output_cmd_hardware_commands(output_msg) { + None => {} + Some(Ok(commands)) => stop_hardware_commands.extend(commands), + Some(Err(err)) => fut_vec.push(future::ready(Err(err)).boxed()), + } + } + other => fut_vec.push(self.parse_message(other.clone())), + } + } + fut_vec.push(self.handle_hardware_commands(stop_hardware_commands, true)); } if msg.inputs() { self.definition.features().iter().for_each(|(i, f)| { @@ -536,7 +616,7 @@ pub(super) async fn build_device_handle( let strategy = handler.keepalive_strategy(); // Create the hardware command channel and spawn the device task - let (internal_hw_msg_sender, internal_hw_msg_recv) = channel::>(1024); + let (internal_hw_msg_sender, internal_hw_msg_recv) = channel::(1024); let device_wait_duration = if let Some(gap) = definition.message_gap_ms() { Some(Duration::from_millis(gap as u64)) diff --git a/crates/buttplug_server/src/device/device_task.rs b/crates/buttplug_server/src/device/device_task.rs index 8664ba35d..15e2fb1cf 100644 --- a/crates/buttplug_server/src/device/device_task.rs +++ b/crates/buttplug_server/src/device/device_task.rs @@ -34,6 +34,15 @@ pub struct DeviceTaskConfig { pub keepalive_strategy: ProtocolKeepaliveStrategy, } +/// A batch of hardware commands for the device io task, optionally carrying a +/// write acknowledgement channel. When `ack` is present the batch is urgent: +/// the io task flushes it (and any pending batched commands) to hardware +/// immediately, then fires the ack. +pub struct DeviceTaskMessage { + pub commands: Vec, + pub ack: Option>, +} + /// Spawn the device communication task. /// /// This task handles: @@ -48,7 +57,7 @@ pub fn spawn_device_task( hardware: Arc, _handler: Arc, config: DeviceTaskConfig, - mut command_receiver: Receiver>, + mut command_receiver: Receiver, ) { task_scope.spawn("io", move |token| async move { run_device_task(hardware, config, &mut command_receiver, token).await; @@ -62,7 +71,7 @@ pub fn spawn_device_task( async fn run_device_task( hardware: Arc, config: DeviceTaskConfig, - command_receiver: &mut Receiver>, + command_receiver: &mut Receiver, token: CancellationToken, ) { let mut hardware_events = hardware.event_stream(); @@ -93,6 +102,23 @@ async fn run_device_task( let mut pending_commands: VecDeque = VecDeque::new(); let mut batch_deadline: Option = None; + // Write every pending command to hardware in order, updating the keepalive + // packet as writes go out. Shared by the receive arm (urgent acked flush), + // the batch-deadline arm, and the flush-on-exit hardening below. + async fn flush_pending( + hardware: &Hardware, + pending_commands: &mut VecDeque, + track_keepalive: bool, + keepalive_packet: &mut Option, + ) { + while let Some(cmd) = pending_commands.pop_front() { + let _ = hardware.parse_message(&cmd).await; + if track_keepalive && let HardwareCommand::Write(ref write_cmd) = cmd { + *keepalive_packet = Some(write_cmd.clone()); + } + } + } + loop { // Calculate keepalive timeout let keepalive_fut = async { @@ -121,17 +147,54 @@ async fn run_device_task( // Priority 0: Cooperative cancellation - wins over new work. _ = token.cancelled() => { info!("Device task cancelled, shutting down"); + // Best-effort flush of any batched writes (e.g. a stop command still + // sitting in the batch window) before exiting. The hardware is still + // present here, so the writes can land; write errors are swallowed. + flush_pending( + &hardware, + &mut pending_commands, + track_keepalive, + &mut keepalive_packet, + ) + .await; return; } // Priority 1: Incoming commands msg = command_receiver.recv() => { - let Some(commands) = msg else { + let Some(DeviceTaskMessage { commands, ack }) = msg else { info!("No longer receiving messages from device parent, breaking"); + // The command channel closed (all DeviceHandles dropped). Hardware is + // still present, so best-effort flush any batched writes before exit. + flush_pending( + &hardware, + &mut pending_commands, + track_keepalive, + &mut keepalive_packet, + ) + .await; break; }; - if let Some(device_wait_duration) = device_wait_duration { + if let Some(ack) = ack { + // Urgent write-acknowledged batch (stop path). Merge with any pending + // batch using the existing dedupe, flush everything to hardware now + // regardless of the batch deadline, then fire the ack so the caller + // only resolves once the writes have actually gone out. + for command in commands { + pending_commands.retain(|existing| !command.overlaps(existing)); + pending_commands.push_back(command); + } + flush_pending( + &hardware, + &mut pending_commands, + track_keepalive, + &mut keepalive_packet, + ) + .await; + batch_deadline = None; + let _ = ack.send(()); + } else if let Some(device_wait_duration) = device_wait_duration { // Batching enabled if pending_commands.is_empty() { // First batch - add directly without deduplication (matches old behavior) @@ -147,28 +210,27 @@ async fn run_device_task( } else { // No batching - send immediately trace!("No wait duration, sending commands immediately: {:?}", commands); - for cmd in commands { - let _ = hardware.parse_message(&cmd).await; - if track_keepalive - && let HardwareCommand::Write(ref write_cmd) = cmd - { - keepalive_packet = Some(write_cmd.clone()); - } - } + pending_commands.extend(commands); + flush_pending( + &hardware, + &mut pending_commands, + track_keepalive, + &mut keepalive_packet, + ) + .await; } } // Priority 2: Batch deadline reached - flush pending commands _ = batch_fut => { trace!("Batch deadline reached, sending {} commands", pending_commands.len()); - while let Some(cmd) = pending_commands.pop_front() { - let _ = hardware.parse_message(&cmd).await; - if track_keepalive - && let HardwareCommand::Write(ref write_cmd) = cmd - { - keepalive_packet = Some(write_cmd.clone()); - } - } + flush_pending( + &hardware, + &mut pending_commands, + track_keepalive, + &mut keepalive_packet, + ) + .await; batch_deadline = None; } @@ -208,6 +270,8 @@ async fn run_device_task( hw_event = hardware_events.recv() => { if matches!(hw_event, Ok(HardwareEvent::Disconnected(_))) || hw_event.is_err() { info!("Hardware disconnected, shutting down task"); + // Do NOT flush pending_commands here: the hardware is gone, so writes + // would fail and any pending stop is moot. return; } } diff --git a/crates/buttplug_tests/tests/test_task_lifecycle.rs b/crates/buttplug_tests/tests/test_task_lifecycle.rs index 4425c4443..251e17df2 100644 --- a/crates/buttplug_tests/tests/test_task_lifecycle.rs +++ b/crates/buttplug_tests/tests/test_task_lifecycle.rs @@ -17,14 +17,78 @@ use buttplug_core::{ OutputValue, RequestServerInfoV4, StartScanningV0, + StopCmdV4, }, util::task::registry, }; -use buttplug_server::message::ButtplugClientMessageVariant; +use buttplug_server::{device::hardware::HardwareCommand, message::ButtplugClientMessageVariant}; use futures::{StreamExt, pin_mut}; use std::time::Duration; +use util::TestDeviceChannelHost; use util::stalling_device_communication_manager::StallingDeviceCommunicationManagerBuilder; -use util::{test_server_with_comm_manager, test_server_with_device}; +use util::{ + test_server_with_comm_manager, + test_server_with_device, + test_server_with_device_and_message_gap, +}; + +/// The Aneros "Massage Demo" stop write for feature 0: `[0xF1, 0x00]`. A vibrate +/// on the same feature writes `[0xF1, 0x40]`; the stop resets it to zero. See +/// `device_test_case/test_aneros_protocol.yaml` for the full sequence. +const ANEROS_STOP_WRITE_FEATURE_0: [u8; 2] = [0xF1, 0x00]; + +/// Non-blockingly drain every hardware write the test device has recorded so +/// far and return whether any of them is the feature-0 stop write. We use +/// `try_recv` so the check reflects exactly what was on the wire *at the moment +/// stop/shutdown resolved* — a write still sitting in the device io task's batch +/// window has not reached the host channel yet and will not be counted. +fn recorded_a_stop_write(host: &mut TestDeviceChannelHost) -> bool { + let mut saw_stop = false; + while let Ok(command) = host.receiver.try_recv() { + if let HardwareCommand::Write(write) = command + && write.data().as_slice() == ANEROS_STOP_WRITE_FEATURE_0 + { + saw_stop = true; + } + } + saw_stop +} + +/// Bring a "Massage Demo" device online under `server`, returning its device +/// index. Mirrors the handshake/scan/connect dance the other lifecycle tests do. +async fn bring_device_online(server: &buttplug_server::ButtplugServer, client_name: &str) -> u32 { + let recv = server.server_version_event_stream(); + pin_mut!(recv); + server + .parse_message(ButtplugClientMessageVariant::V4( + RequestServerInfoV4::new( + client_name, + BUTTPLUG_CURRENT_API_MAJOR_VERSION, + BUTTPLUG_CURRENT_API_MINOR_VERSION, + ) + .into(), + )) + .await + .expect("server info request should succeed"); + server + .parse_message(ButtplugClientMessageVariant::V4( + StartScanningV0::default().into(), + )) + .await + .expect("start scanning should succeed"); + tokio::time::timeout(Duration::from_secs(5), async { + while let Some(msg) = recv.next().await { + if let ButtplugServerMessageV4::DeviceList(list) = msg + && let Some((&idx, _)) = list.devices().iter().next() + { + return idx; + } + } + panic!("device event stream ended before a device connected"); + }) + .await + .expect("timed out waiting for device to connect") +} /// Bring a real (test-harness) device online and confirm that, once the server /// is shut down and dropped, no tasks remain registered under the scope tree. @@ -165,6 +229,13 @@ async fn test_server_shutdown_leaves_no_tasks() { /// ordering bug, so a write-observation assertion is inherently flaky here. An /// instrumented-ordering variant was also attempted and found inherently flaky /// with this harness, so it is deliberately not pursued. +/// +/// UPDATE: with the stop-command write acknowledgement (DeviceTaskMessage ack) +/// and a configurable per-device message_gap, the write observation IS now +/// possible — see `test_stop_resolves_only_after_stop_write_reaches_hardware` +/// and `test_shutdown_writes_stop_before_resolving` below, which retire the +/// limitation described in the NOTE above for the acked stop path. This test +/// remains as the coarse hang/leak smoke test. #[tokio::test] async fn test_shutdown_under_load_drains_subtree() { // Hold the channel so the device stays connected through shutdown. @@ -296,3 +367,109 @@ async fn test_shutdown_resolves_with_stalled_bringup() { .expect("shutdown hung with a stalled device bringup — bringup is not honoring its cancellation token") .expect("server shutdown errored"); } + +/// Test A — the stop-write-acknowledgement contract: a `StopCmd` must not +/// resolve until the resulting stop write has actually reached the hardware, +/// even when the device io task is batching commands over a long message gap. +/// +/// This retires the limitation documented on `test_shutdown_under_load_drains_subtree` +/// (the 1ms harness gap made write observation flaky). Here we deliberately give +/// the device a 500ms message gap so the batching window is large and the race +/// is deterministic: an active vibrate write lands, the stop write is queued +/// into that 500ms batch, and we assert the stop write is on the wire *by the +/// time the stop message resolves*. +/// +/// RED (pre-fix, `git stash` Tasks 1-2): `handle_hardware_commands` fire-and-forgets +/// the stop write into the io channel and `parse_message` for the StopCmd resolves +/// immediately, while the write sits unflushed in the 500ms batch. `try_recv` +/// finds no stop write and this assertion fails deterministically. +/// +/// GREEN (with the ack-on-write fix): the stop path requests a write +/// acknowledgement; the io task urgent-flushes the batch and fires the ack, so +/// StopCmd only resolves after the write is on the wire. +#[tokio::test] +async fn test_stop_resolves_only_after_stop_write_reaches_hardware() { + // 500ms gap: large enough that, pre-fix, the stop write provably has not been + // flushed by the time StopCmd resolves (which is ~immediate). + let (server, mut device) = + test_server_with_device_and_message_gap("Massage Demo", Duration::from_millis(500)); + + let device_index = bring_device_online(&server, "Stop Write Ack Test").await; + + // Put the device into an actively-running state so the stop has real work to + // flush. This vibrate write itself enters the batch window. + server + .parse_message(ButtplugClientMessageVariant::V4( + OutputCmdV4::new( + device_index, + 0, + OutputCommand::Vibrate(OutputValue::new(50)), + ) + .into(), + )) + .await + .expect("vibrate command should succeed"); + + // Stop. With ack-on-write, parse_message for StopCmd resolves only after the + // stop write has been flushed to hardware. + server + .parse_message(ButtplugClientMessageVariant::V4( + StopCmdV4::new(Some(device_index), None, true, true).into(), + )) + .await + .expect("stop command should succeed"); + + // The instant stop resolves, the stop write must already be on the wire. No + // sleep here: that is the whole point — we are asserting ordering, not + // eventual delivery. + assert!( + recorded_a_stop_write(&mut device), + "StopCmd resolved but the stop write was not yet recorded on the hardware channel — \ + it is still sitting in the device io task's batch window" + ); +} + +/// Test B — the original incident: with a batched device in an active output +/// state, `server.shutdown()` must drive the per-device stop write all the way +/// to hardware before it resolves. This is the assertion the task-scope work +/// could not make with the 1ms harness gap (the disconnect tore the io task down +/// inside the batch window, dropping the pending stop write). With ack-on-write, +/// stop_devices waits for the write before shutdown proceeds to disconnect. +/// +/// RED (pre-fix): shutdown's stop_devices fire-and-forgets the stop write, then +/// disconnect tears down the io task mid-batch and the write is dropped — no stop +/// write is ever recorded. +/// +/// GREEN (with the fix): the stop write is acknowledged before disconnect, so it +/// is recorded before shutdown resolves. +#[tokio::test] +async fn test_shutdown_writes_stop_before_resolving() { + let (server, mut device) = + test_server_with_device_and_message_gap("Massage Demo", Duration::from_millis(500)); + + let device_index = bring_device_online(&server, "Shutdown Stop Write Test").await; + + server + .parse_message(ButtplugClientMessageVariant::V4( + OutputCmdV4::new( + device_index, + 0, + OutputCommand::Vibrate(OutputValue::new(50)), + ) + .into(), + )) + .await + .expect("vibrate command should succeed"); + + tokio::time::timeout(Duration::from_secs(10), server.shutdown()) + .await + .expect("shutdown did not resolve in time") + .expect("server shutdown errored"); + + // By the time shutdown resolved, the stop write must have reached hardware. + assert!( + recorded_a_stop_write(&mut device), + "shutdown() resolved but the device's stop write was never recorded — \ + the pending stop write was dropped when the io task was torn down mid-batch" + ); +} diff --git a/crates/buttplug_tests/tests/util/mod.rs b/crates/buttplug_tests/tests/util/mod.rs index 9f4e07e8c..a82460353 100644 --- a/crates/buttplug_tests/tests/util/mod.rs +++ b/crates/buttplug_tests/tests/util/mod.rs @@ -170,6 +170,22 @@ pub fn test_server_with_device(device_type: &str) -> (ButtplugServer, TestDevice (test_server_with_comm_manager(builder), device) } +/// Like `test_server_with_device`, but the device's io task batches commands +/// over `message_gap`. A large gap widens the stop-write batching window so +/// tests can deterministically observe whether stop commands are flushed to +/// hardware before stop/shutdown resolve. +#[allow(dead_code)] +pub fn test_server_with_device_and_message_gap( + device_type: &str, + message_gap: std::time::Duration, +) -> (ButtplugServer, TestDeviceChannelHost) { + let mut builder = TestDeviceCommunicationManagerBuilder::default(); + let device = builder + .add_test_device_with_message_gap(&TestDeviceIdentifier::new(device_type, None), message_gap); + + (test_server_with_comm_manager(builder), device) +} + #[allow(dead_code)] pub fn test_server_v4_with_device(device_type: &str) -> (ButtplugServer, TestDeviceChannelHost) { let mut builder = TestDeviceCommunicationManagerBuilder::default(); diff --git a/crates/buttplug_tests/tests/util/test_device_manager/test_device.rs b/crates/buttplug_tests/tests/util/test_device_manager/test_device.rs index 91916c916..5e4c56654 100644 --- a/crates/buttplug_tests/tests/util/test_device_manager/test_device.rs +++ b/crates/buttplug_tests/tests/util/test_device_manager/test_device.rs @@ -115,12 +115,16 @@ impl HardwareSpecializer for TestHardwareSpecializer { } } } + // The device carries its own message gap (default 1ms for the slight delay + // multi-message protocols expect). Tests that need a large batching window + // to make the stop-write race deterministic override it via + // `add_test_device_with_message_gap`. + let message_gap = device.message_gap(); let hardware = Hardware::new( &device.name(), &device.address(), &endpoints, - // Add slight delay for protocols with multiple messages. - &Some(Duration::from_millis(1)), + &Some(message_gap), false, Box::new(device), ); @@ -153,10 +157,15 @@ pub fn new_device_channel() -> (TestDeviceChannelHost, TestDeviceChannelDevice) ) } +/// Default inter-message gap for test hardware. A slight delay multi-message +/// protocols expect; small enough that ordinary tests are unaffected. +const DEFAULT_MESSAGE_GAP: Duration = Duration::from_millis(1); + pub struct TestDevice { name: String, address: String, endpoints: HashSet, + message_gap: Duration, test_device_channel: mpsc::Sender, event_sender: broadcast::Sender, subscribed_endpoints: Arc>, @@ -166,6 +175,16 @@ pub struct TestDevice { impl TestDevice { #[allow(dead_code)] pub fn new(name: &str, address: &str, test_device_channel: TestDeviceChannelDevice) -> Self { + Self::new_with_message_gap(name, address, DEFAULT_MESSAGE_GAP, test_device_channel) + } + + #[allow(dead_code)] + pub fn new_with_message_gap( + name: &str, + address: &str, + message_gap: Duration, + test_device_channel: TestDeviceChannelDevice, + ) -> Self { let (event_sender, _) = broadcast::channel(256); let event_sender_clone = event_sender.clone(); @@ -210,6 +229,7 @@ impl TestDevice { name: name.to_owned(), address: address.to_owned(), endpoints: HashSet::new(), + message_gap, test_device_channel: command_sender, event_sender, subscribed_endpoints, @@ -221,6 +241,10 @@ impl TestDevice { self.endpoints.insert(*endpoint); } + pub fn message_gap(&self) -> Duration { + self.message_gap + } + pub fn name(&self) -> String { self.name.clone() } diff --git a/crates/buttplug_tests/tests/util/test_device_manager/test_device_comm_manager.rs b/crates/buttplug_tests/tests/util/test_device_manager/test_device_comm_manager.rs index 6e0d18a70..81fe9c276 100644 --- a/crates/buttplug_tests/tests/util/test_device_manager/test_device_comm_manager.rs +++ b/crates/buttplug_tests/tests/util/test_device_manager/test_device_comm_manager.rs @@ -30,7 +30,7 @@ use std::{ Arc, atomic::{AtomicBool, Ordering}, }, - time::{SystemTime, UNIX_EPOCH}, + time::{Duration, SystemTime, UNIX_EPOCH}, }; use tokio::sync::mpsc::Sender; @@ -74,8 +74,11 @@ impl TestDeviceIdentifier { } } +/// Default inter-message gap for test hardware, matching `TestDevice`'s default. +const DEFAULT_MESSAGE_GAP: Duration = Duration::from_millis(1); + pub struct TestDeviceCommunicationManagerBuilder { - devices: Option>, + devices: Option>, } impl Default for TestDeviceCommunicationManagerBuilder { @@ -88,12 +91,24 @@ impl Default for TestDeviceCommunicationManagerBuilder { impl TestDeviceCommunicationManagerBuilder { pub fn add_test_device(&mut self, device: &TestDeviceIdentifier) -> TestDeviceChannelHost { + self.add_test_device_with_message_gap(device, DEFAULT_MESSAGE_GAP) + } + + /// Register a test device whose io task batches commands over `message_gap`. + /// A large gap (e.g. 500ms) makes the stop-write batching race deterministic + /// for tests asserting that stop commands are flushed before stop resolves. + #[allow(dead_code)] + pub fn add_test_device_with_message_gap( + &mut self, + device: &TestDeviceIdentifier, + message_gap: Duration, + ) -> TestDeviceChannelHost { let (host_channel, device_channel) = new_device_channel(); self .devices .as_mut() .expect("Devices vec does not exist, is this running twice?") - .push((device.clone(), device_channel)); + .push((device.clone(), message_gap, device_channel)); host_channel } } @@ -115,26 +130,28 @@ impl HardwareCommunicationManagerBuilder for TestDeviceCommunicationManagerBuild fn new_uninitialized_ble_test_device( identifier: &TestDeviceIdentifier, + message_gap: Duration, device_channel: TestDeviceChannelDevice, ) -> TestHardwareConnector { let address = identifier.address.clone(); let specifier = ProtocolCommunicationSpecifier::BluetoothLE( BluetoothLESpecifier::new_from_device(&identifier.name, &HashMap::new(), &[]), ); - let hardware = TestDevice::new(&identifier.name, &address, device_channel); + let hardware = + TestDevice::new_with_message_gap(&identifier.name, &address, message_gap, device_channel); TestHardwareConnector::new(specifier, hardware) } pub struct TestDeviceCommunicationManager { device_sender: Sender, - devices: Vec<(TestDeviceIdentifier, TestDeviceChannelDevice)>, + devices: Vec<(TestDeviceIdentifier, Duration, TestDeviceChannelDevice)>, is_scanning: Arc, } impl TestDeviceCommunicationManager { pub fn new( device_sender: Sender, - devices: Vec<(TestDeviceIdentifier, TestDeviceChannelDevice)>, + devices: Vec<(TestDeviceIdentifier, Duration, TestDeviceChannelDevice)>, ) -> Self { Self { device_sender, @@ -156,8 +173,8 @@ impl HardwareCommunicationManager for TestDeviceCommunicationManager { let mut events = vec![]; - while let Some((device, test_channel)) = self.devices.pop() { - let device_creator = new_uninitialized_ble_test_device(&device, test_channel); + while let Some((device, message_gap, test_channel)) = self.devices.pop() { + let device_creator = new_uninitialized_ble_test_device(&device, message_gap, test_channel); events.push(HardwareCommunicationManagerEvent::DeviceFound { name: device.name.clone(),