From 5f6e9ceed385787432eaee1af51842aa395dab4e Mon Sep 17 00:00:00 2001 From: Ryu <114303361+ryuapp@users.noreply.github.com> Date: Fri, 5 Jun 2026 20:29:30 +0900 Subject: [PATCH] fix: stream external pipelines --- src/runtime.rs | 179 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 173 insertions(+), 6 deletions(-) diff --git a/src/runtime.rs b/src/runtime.rs index 32907a0..e3a17ad 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -7,7 +7,7 @@ use std::env; use std::fs::{File, OpenOptions}; use std::io::{self, Write}; use std::path::{Path, PathBuf}; -use std::process::{Command, Stdio}; +use std::process::{Child, Command, Stdio}; #[derive(Debug, Clone, Default)] pub struct RunOptions {} @@ -106,6 +106,10 @@ impl Shell { return self.run_command(&pipeline.commands[0], None, capture_stdout); } + if let Some(output) = self.try_run_external_pipeline(pipeline, capture_stdout)? { + return Ok(output); + } + let mut input = None; let mut status = 0; let mut stdout = Vec::new(); @@ -129,6 +133,121 @@ impl Shell { }) } + fn try_run_external_pipeline( + &self, + pipeline: &Pipeline, + capture_stdout: bool, + ) -> Result> { + let mut specs = Vec::new(); + let last_idx = pipeline.commands.len() - 1; + for (idx, command) in pipeline.commands.iter().enumerate() { + if idx != last_idx && has_stdout_redirect(command) { + return Ok(None); + } + + let spec = match self.external_command_spec(command)? { + Some(spec) => spec, + None => return Ok(None), + }; + specs.push(spec); + } + + self.run_external_pipeline(&specs, capture_stdout).map(Some) + } + + fn external_command_spec(&self, command: &AstCommand) -> Result> { + let mut env_overlay = HashMap::new(); + for assignment in &command.assignments { + env_overlay.insert( + assignment.name.clone(), + self.expand_word(&assignment.value)?, + ); + } + + if command.args.is_empty() { + return Ok(None); + } + + let args = self.expand_words(&command.args, &env_overlay)?; + let name = args.first().context("missing command name")?; + if commands::is_builtin(name) { + return Ok(None); + } + + Ok(Some(ExternalCommandSpec { + name: name.clone(), + argv: args[1..].to_vec(), + env_overlay, + ast: command.clone(), + })) + } + + fn run_external_pipeline( + &self, + specs: &[ExternalCommandSpec], + capture_stdout: bool, + ) -> Result { + let mut children = Vec::new(); + let mut previous_stdout = None; + let mut last_child = None; + let last_idx = specs.len() - 1; + + for (idx, spec) in specs.iter().enumerate() { + let is_last = idx == last_idx; + let mut command = self.external_command(spec)?; + + if let Some(path) = redirect_path(&spec.ast, RedirectKind::Stdin, self)? { + command.stdin(File::open(path)?); + let _ = previous_stdout.take(); + } else if let Some(stdout) = previous_stdout.take() { + command.stdin(Stdio::from(stdout)); + } + + if !is_last { + if let Some((path, append)) = stdout_redirect(&spec.ast, self)? { + command.stdout(open_output(path, append)?); + } else { + command.stdout(Stdio::piped()); + } + } else if capture_stdout { + command.stdout(Stdio::piped()); + } else if let Some((path, append)) = stdout_redirect(&spec.ast, self)? { + command.stdout(open_output(path, append)?); + } + + if let Some((path, append)) = stderr_redirect(&spec.ast, self)? { + command.stderr(open_output(path, append)?); + } + + let mut child = command + .spawn() + .with_context(|| format!("failed to run {}", spec.name))?; + + previous_stdout = if is_last { None } else { child.stdout.take() }; + if is_last { + last_child = Some(child); + } else { + children.push(child); + } + } + + let last_child = last_child.context("missing last pipeline command")?; + let output = if capture_stdout { + let output = last_child.wait_with_output()?; + CommandOutput { + status: output.status.code().unwrap_or(1), + stdout: output.stdout, + exit: false, + } + } else { + let status = wait_status(last_child)?; + CommandOutput::status(status) + }; + + wait_children(children)?; + Ok(output) + } + fn run_command( &mut self, command: &AstCommand, @@ -198,11 +317,12 @@ impl Shell { stdin_bytes: Option>, capture_stdout: bool, ) -> Result { - let program = resolve_program(name, &self.vars)?; - let mut command = Command::new(program); - command.args(argv); - command.envs(&self.vars); - command.envs(env_overlay); + let mut command = self.external_command(&ExternalCommandSpec { + name: name.to_string(), + argv: argv.to_vec(), + env_overlay: env_overlay.clone(), + ast: ast.clone(), + })?; if stdin_bytes.is_some() { command.stdin(Stdio::piped()); @@ -237,6 +357,15 @@ impl Shell { }) } + fn external_command(&self, spec: &ExternalCommandSpec) -> Result { + let program = resolve_program(&spec.name, &self.vars)?; + let mut command = Command::new(program); + command.args(&spec.argv); + command.envs(&self.vars); + command.envs(&spec.env_overlay); + Ok(command) + } + fn expand_words( &self, words: &[Word], @@ -323,6 +452,25 @@ impl Shell { } } +#[derive(Debug)] +struct ExternalCommandSpec { + name: String, + argv: Vec, + env_overlay: HashMap, + ast: AstCommand, +} + +fn wait_status(child: Child) -> Result { + Ok(child.wait_with_output()?.status.code().unwrap_or(1)) +} + +fn wait_children(children: Vec) -> Result<()> { + for child in children { + wait_status(child)?; + } + Ok(()) +} + fn read_command_substitution( chars: &mut std::iter::Peekable>, ) -> Result { @@ -411,6 +559,15 @@ fn write_builtin_streams( Ok(()) } +fn has_stdout_redirect(command: &AstCommand) -> bool { + command.redirects.iter().any(|redirect| { + matches!( + redirect.kind, + RedirectKind::StdoutTruncate | RedirectKind::StdoutAppend + ) + }) +} + fn stdout_redirect(command: &AstCommand, shell: &Shell) -> Result> { for redirect in command.redirects.iter().rev() { match redirect.kind { @@ -632,6 +789,16 @@ mod tests { assert!(stdout.is_empty()); } + #[cfg(not(windows))] + #[test] + fn streams_external_pipeline_without_buffering_all_output() { + let mut shell = Shell::new(); + let (status, stdout) = shell.run_script_capture("yes | head -n 1").unwrap(); + + assert_eq!(status, 0); + assert_eq!(String::from_utf8_lossy(&stdout), "y\n"); + } + #[cfg(windows)] fn null_device() -> &'static str { "NUL"