Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 110 additions & 30 deletions crates/buttplug_server/src/device/device_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use buttplug_core::{
OutputValue,
StopCmdV4,
},
util::{stream::convert_broadcast_receiver_to_stream, task::TaskScope},
util::{async_manager, stream::convert_broadcast_receiver_to_stream, task::TaskScope},
};
use buttplug_server_device_config::{
DeviceConfigurationManager,
Expand All @@ -36,10 +36,13 @@ use buttplug_server_device_config::{
};
use dashmap::DashMap;
use futures::future::{self, BoxFuture, FutureExt};
use tokio::sync::{
broadcast,
mpsc::{Sender, channel},
oneshot,
use tokio::{
select,
sync::{
broadcast,
mpsc::{Sender, channel},
oneshot,
},
};
use tokio_stream::StreamExt;
use uuid::Uuid;
Expand All @@ -58,7 +61,7 @@ use crate::{
use super::{
InternalDeviceEvent,
OutputObservation,
device_task::{DeviceTaskConfig, spawn_device_task},
device_task::{DeviceTaskConfig, DeviceTaskMessage, spawn_device_task},
hardware::{Hardware, HardwareCommand, HardwareConnector, HardwareEvent},
protocol::{ProtocolHandler, ProtocolKeepaliveStrategy, ProtocolSpecializer},
};
Expand Down Expand Up @@ -109,7 +112,7 @@ pub struct DeviceHandle {
legacy_attributes: ServerDeviceAttributes,
last_output_command: Arc<DashMap<Uuid, CheckedOutputCmdV4>>,
stop_commands: Arc<Vec<ButtplugDeviceCommandMessageUnionV4>>,
internal_hw_msg_sender: Sender<Vec<HardwareCommand>>,
internal_hw_msg_sender: Sender<DeviceTaskMessage>,
output_observation_sender: Option<broadcast::Sender<OutputObservation>>,
/// Scope owning this device's tasks. Rides in an Arc since DeviceHandle is
/// Clone; the subtree cancels when the last clone drops.
Expand All @@ -127,7 +130,7 @@ impl DeviceHandle {
definition: ServerDeviceDefinition,
identifier: UserDeviceIdentifier,
stop_commands: Vec<ButtplugDeviceCommandMessageUnionV4>,
internal_hw_msg_sender: Sender<Vec<HardwareCommand>>,
internal_hw_msg_sender: Sender<DeviceTaskMessage>,
output_observation_sender: Option<broadcast::Sender<OutputObservation>>,
task_scope: Arc<TaskScope>,
) -> Self {
Expand Down Expand Up @@ -272,19 +275,39 @@ impl DeviceHandle {
// --- Private command handling methods ---

fn handle_outputcmd_v4(&self, msg: &CheckedOutputCmdV4) -> ButtplugServerResultFuture {
match self.output_cmd_hardware_commands(msg) {
// Deduped (no-op) or empty: nothing to send, succeed immediately.
None => future::ready(Ok(message::OkV0::default().into())).boxed(),
Some(Err(err)) => future::ready(Err(err)).boxed(),
Some(Ok(commands)) => self.handle_hardware_commands(commands, false),
}
}

/// Run the bookkeeping for an output command (dedupe map, output-observation
/// emit) and ask the protocol handler for the hardware commands it produces.
///
/// Returns `None` when the command is a deduped no-op (nothing to send),
/// `Some(Err(..))` when the handler errored, and `Some(Ok(cmds))` otherwise.
/// Shared by `handle_outputcmd_v4` (normal output) and the stop path, so both
/// keep identical dedupe-map and observation behaviour. The stop path
/// accumulates the returned commands across all stop messages and flushes them
/// in a single write-acknowledged batch.
fn output_cmd_hardware_commands(
&self,
msg: &CheckedOutputCmdV4,
) -> Option<Result<Vec<HardwareCommand>, ButtplugError>> {
if let Some(last_msg) = self.last_output_command.get(&msg.feature_id())
&& *last_msg == *msg
{
trace!("No commands generated for incoming device packet, skipping and returning success.");
return future::ready(Ok(message::OkV0::default().into())).boxed();
return None;
}
self
.last_output_command
.insert(msg.feature_id(), msg.clone());

if let Some(sender) = &self.output_observation_sender {
// OutputType derives Display via strum, producing clean names like "Vibrate", "Rotate".
// The design uses format!("{:?}") but to_string() is preferred for clean output.
let _ = sender.send(OutputObservation {
device_index: self.definition.index(),
feature_index: msg.feature_index(),
Expand All @@ -293,37 +316,94 @@ impl DeviceHandle {
});
}

self.handle_generic_command_result(self.handler.handle_output_cmd(msg))
Some(self.handler.handle_output_cmd(msg).map_err(|e| e.into()))
}

fn handle_hardware_commands(&self, commands: Vec<HardwareCommand>) -> ButtplugServerResultFuture {
/// Queue hardware commands for the device io task.
///
/// When `wait_for_write` is false (all normal output paths) this is
/// fire-and-forget: the commands are dropped into the io channel and the
/// future resolves immediately, leaving any batching to the io task.
///
/// When `wait_for_write` is true (the stop path) the commands carry a
/// write-acknowledgement channel: the io task flushes the batch to hardware
/// immediately and fires the ack, and this future only resolves once that
/// write has gone out (or the bounded wait elapses / the device goes away).
fn handle_hardware_commands(
&self,
commands: Vec<HardwareCommand>,
wait_for_write: bool,
) -> ButtplugServerResultFuture {
let sender = self.internal_hw_msg_sender.clone();
async move {
let _ = sender.send(commands).await;
if wait_for_write {
let (ack_sender, ack_receiver) = oneshot::channel();
if sender
.send(DeviceTaskMessage {
commands,
ack: Some(ack_sender),
})
.await
.is_err()
{
debug!("Device io task gone, skipping write acknowledgement wait.");
return Ok(message::OkV0::default().into());
}
// Bounded wait for the write ack. There is no tokio::time on WASM, so
// race the receiver against a runtime-agnostic sleep instead of
// tokio::time::timeout. A wedged or dead device must not hang shutdown
// or error a stop, so any non-success outcome resolves Ok (matching the
// existing swallow semantics; scope cancellation handles stragglers).
select! {
ack = ack_receiver => {
if ack.is_err() {
debug!("Device io task dropped ack; device likely disconnecting.");
}
}
_ = async_manager::sleep(Duration::from_secs(1)) => {
warn!("Timed out waiting for stop command write acknowledgement.");
}
}
} else {
let _ = sender
.send(DeviceTaskMessage {
commands,
ack: None,
})
.await;
}
Ok(message::OkV0::default().into())
}
.boxed()
}

fn handle_generic_command_result(
&self,
command_result: Result<Vec<HardwareCommand>, ButtplugDeviceError>,
) -> ButtplugServerResultFuture {
let hardware_commands = match command_result {
Ok(commands) => commands,
Err(err) => return future::ready(Err(err.into())).boxed(),
};

self.handle_hardware_commands(hardware_commands)
}

fn handle_stop_device_cmd(&self, msg: &StopCmdV4) -> ButtplugServerResultFuture {
let mut fut_vec = vec![];
if msg.outputs() {
self
.stop_commands
.iter()
.for_each(|msg| fut_vec.push(self.parse_message(msg.clone())));
// Accumulate the hardware commands from every stop output into a single
// write-acknowledged batch. A multi-feature device produces one stop
// OutputCmd per feature, but the protocol handler folds them into one
// accumulated write; sending each individually with its own ack would
// emit one write per feature instead. Collecting them and issuing a
// single acked flush keeps the on-wire output identical to normal output
// batching while still resolving only once the stop write has landed.
//
// Stop commands are always OutputCmds (see build_device_handle); any
// other shape falls back to the normal fire-and-forget parse path.
let mut stop_hardware_commands = vec![];
for stop_msg in self.stop_commands.iter() {
match stop_msg {
ButtplugDeviceCommandMessageUnionV4::OutputCmd(output_msg) => {
match self.output_cmd_hardware_commands(output_msg) {
None => {}
Some(Ok(commands)) => stop_hardware_commands.extend(commands),
Some(Err(err)) => fut_vec.push(future::ready(Err(err)).boxed()),
}
}
other => fut_vec.push(self.parse_message(other.clone())),
}
}
fut_vec.push(self.handle_hardware_commands(stop_hardware_commands, true));
}
if msg.inputs() {
self.definition.features().iter().for_each(|(i, f)| {
Expand Down Expand Up @@ -536,7 +616,7 @@ pub(super) async fn build_device_handle(
let strategy = handler.keepalive_strategy();

// Create the hardware command channel and spawn the device task
let (internal_hw_msg_sender, internal_hw_msg_recv) = channel::<Vec<HardwareCommand>>(1024);
let (internal_hw_msg_sender, internal_hw_msg_recv) = channel::<DeviceTaskMessage>(1024);

let device_wait_duration = if let Some(gap) = definition.message_gap_ms() {
Some(Duration::from_millis(gap as u64))
Expand Down
104 changes: 84 additions & 20 deletions crates/buttplug_server/src/device/device_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ pub struct DeviceTaskConfig {
pub keepalive_strategy: ProtocolKeepaliveStrategy,
}

/// A batch of hardware commands for the device io task, optionally carrying a
/// write acknowledgement channel. When `ack` is present the batch is urgent:
/// the io task flushes it (and any pending batched commands) to hardware
/// immediately, then fires the ack.
pub struct DeviceTaskMessage {
pub commands: Vec<HardwareCommand>,
pub ack: Option<tokio::sync::oneshot::Sender<()>>,
}

/// Spawn the device communication task.
///
/// This task handles:
Expand All @@ -48,7 +57,7 @@ pub fn spawn_device_task(
hardware: Arc<Hardware>,
_handler: Arc<dyn ProtocolHandler>,
config: DeviceTaskConfig,
mut command_receiver: Receiver<Vec<HardwareCommand>>,
mut command_receiver: Receiver<DeviceTaskMessage>,
) {
task_scope.spawn("io", move |token| async move {
run_device_task(hardware, config, &mut command_receiver, token).await;
Expand All @@ -62,7 +71,7 @@ pub fn spawn_device_task(
async fn run_device_task(
hardware: Arc<Hardware>,
config: DeviceTaskConfig,
command_receiver: &mut Receiver<Vec<HardwareCommand>>,
command_receiver: &mut Receiver<DeviceTaskMessage>,
token: CancellationToken,
) {
let mut hardware_events = hardware.event_stream();
Expand Down Expand Up @@ -93,6 +102,23 @@ async fn run_device_task(
let mut pending_commands: VecDeque<HardwareCommand> = VecDeque::new();
let mut batch_deadline: Option<Instant> = None;

// Write every pending command to hardware in order, updating the keepalive
// packet as writes go out. Shared by the receive arm (urgent acked flush),
// the batch-deadline arm, and the flush-on-exit hardening below.
async fn flush_pending(
hardware: &Hardware,
pending_commands: &mut VecDeque<HardwareCommand>,
track_keepalive: bool,
keepalive_packet: &mut Option<HardwareWriteCmd>,
) {
while let Some(cmd) = pending_commands.pop_front() {
let _ = hardware.parse_message(&cmd).await;
if track_keepalive && let HardwareCommand::Write(ref write_cmd) = cmd {
*keepalive_packet = Some(write_cmd.clone());
}
}
}

loop {
// Calculate keepalive timeout
let keepalive_fut = async {
Expand Down Expand Up @@ -121,17 +147,54 @@ async fn run_device_task(
// Priority 0: Cooperative cancellation - wins over new work.
_ = token.cancelled() => {
info!("Device task cancelled, shutting down");
// Best-effort flush of any batched writes (e.g. a stop command still
// sitting in the batch window) before exiting. The hardware is still
// present here, so the writes can land; write errors are swallowed.
flush_pending(
&hardware,
&mut pending_commands,
track_keepalive,
&mut keepalive_packet,
)
.await;
return;
}

// Priority 1: Incoming commands
msg = command_receiver.recv() => {
let Some(commands) = msg else {
let Some(DeviceTaskMessage { commands, ack }) = msg else {
info!("No longer receiving messages from device parent, breaking");
// The command channel closed (all DeviceHandles dropped). Hardware is
// still present, so best-effort flush any batched writes before exit.
flush_pending(
&hardware,
&mut pending_commands,
track_keepalive,
&mut keepalive_packet,
)
.await;
break;
};

if let Some(device_wait_duration) = device_wait_duration {
if let Some(ack) = ack {
// Urgent write-acknowledged batch (stop path). Merge with any pending
// batch using the existing dedupe, flush everything to hardware now
// regardless of the batch deadline, then fire the ack so the caller
// only resolves once the writes have actually gone out.
for command in commands {
pending_commands.retain(|existing| !command.overlaps(existing));
pending_commands.push_back(command);
}
flush_pending(
&hardware,
&mut pending_commands,
track_keepalive,
&mut keepalive_packet,
)
.await;
batch_deadline = None;
let _ = ack.send(());
} else if let Some(device_wait_duration) = device_wait_duration {
// Batching enabled
if pending_commands.is_empty() {
// First batch - add directly without deduplication (matches old behavior)
Expand All @@ -147,28 +210,27 @@ async fn run_device_task(
} else {
// No batching - send immediately
trace!("No wait duration, sending commands immediately: {:?}", commands);
for cmd in commands {
let _ = hardware.parse_message(&cmd).await;
if track_keepalive
&& let HardwareCommand::Write(ref write_cmd) = cmd
{
keepalive_packet = Some(write_cmd.clone());
}
}
pending_commands.extend(commands);
flush_pending(
&hardware,
&mut pending_commands,
track_keepalive,
&mut keepalive_packet,
)
.await;
}
}

// Priority 2: Batch deadline reached - flush pending commands
_ = batch_fut => {
trace!("Batch deadline reached, sending {} commands", pending_commands.len());
while let Some(cmd) = pending_commands.pop_front() {
let _ = hardware.parse_message(&cmd).await;
if track_keepalive
&& let HardwareCommand::Write(ref write_cmd) = cmd
{
keepalive_packet = Some(write_cmd.clone());
}
}
flush_pending(
&hardware,
&mut pending_commands,
track_keepalive,
&mut keepalive_packet,
)
.await;
batch_deadline = None;
}

Expand Down Expand Up @@ -208,6 +270,8 @@ async fn run_device_task(
hw_event = hardware_events.recv() => {
if matches!(hw_event, Ok(HardwareEvent::Disconnected(_))) || hw_event.is_err() {
info!("Hardware disconnected, shutting down task");
// Do NOT flush pending_commands here: the hardware is gone, so writes
// would fail and any pending stop is moot.
return;
}
}
Expand Down
Loading