From 35fc26f96c35c19d76dd3881a97d668edc4191e9 Mon Sep 17 00:00:00 2001 From: David Cameron Date: Mon, 6 Apr 2026 13:27:29 -0400 Subject: [PATCH 1/4] Add batch mode for processing multiple inputs via JSONL --- src/main.rs | 191 +++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 174 insertions(+), 17 deletions(-) diff --git a/src/main.rs b/src/main.rs index cd5f41a1..06675203 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ use wasmtime::Module; use std::{ fs::File, - io::{stdin, BufReader, Read}, + io::{stdin, BufRead, BufReader, Read}, path::PathBuf, }; @@ -61,6 +61,14 @@ struct Opts { /// Path to graphql file containing Function input query; if omitted, defaults will be used to calculate limits. #[clap(short = 'q', long)] query_path: Option, + + /// Enable batch mode - read multiple JSON inputs (one per line) from stdin/file + #[clap(short, long)] + batch: bool, + + /// In batch mode, continue processing on individual input errors (default: false) + #[clap(long)] + batch_continue_on_error: bool, } impl Opts { @@ -114,6 +122,31 @@ fn read_file_to_string(file_path: &PathBuf) -> Result { fn main() -> Result<()> { let opts: Opts = Opts::parse(); + // Create engine and module once (expensive operations - amortize across all inputs) + let engine = function_runner::engine::new_engine()?; + let module = Module::from_file(&engine, &opts.function) + .map_err(|e| anyhow!("Couldn't load the Function {:?}: {}", &opts.function, e))?; + + // Infer codec from the module based on imported modules + let codec = if function_runner::engine::uses_msgpack_provider(&module) { + Codec::Messagepack + } else { + Codec::Json + }; + + if opts.batch { + run_batch_mode(&opts, &engine, &module, codec) + } else { + run_single_mode(&opts, &engine, &module, codec) + } +} + +fn run_single_mode( + opts: &Opts, + engine: &wasmtime::Engine, + module: &Module, + codec: Codec, +) -> Result<()> { let mut input: Box = if let Some(ref input) = opts.input { Box::new(BufReader::new(File::open(input).map_err(|e| { anyhow!("Couldn't load input {:?}: {}", input, e) @@ -130,20 +163,8 @@ fn main() -> Result<()> { input.read_to_end(&mut buffer)?; let schema_string = opts.read_schema_to_string().transpose()?; - let query_string = opts.read_query_to_string().transpose()?; - let engine = function_runner::engine::new_engine()?; - let module = Module::from_file(&engine, &opts.function) - .map_err(|e| anyhow!("Couldn't load the Function {:?}: {}", &opts.function, e))?; - - // Infer codec from the module based on imported modules - let codec = if function_runner::engine::uses_msgpack_provider(&module) { - Codec::Messagepack - } else { - Codec::Json - }; - let input = BytesContainer::new(BytesContainerType::Input, codec, buffer)?; let scale_factor = if let (Some(schema_string), Some(query_string), Some(json_value)) = (schema_string, query_string, input.json_value.clone()) @@ -156,19 +177,19 @@ fn main() -> Result<()> { &json_value, )? } else { - DEFAULT_SCALE_FACTOR // Use default scale factor when schema or query is missing + DEFAULT_SCALE_FACTOR }; let profile_opts = opts.profile_opts(); let function_run_result = run(FunctionRunParams { - function_path: opts.function, + function_path: opts.function.clone(), input, export: opts.export.as_ref(), profile_opts: profile_opts.as_ref(), scale_factor, - module, - engine, + module: module.clone(), + engine: engine.clone(), })?; if opts.json { @@ -187,3 +208,139 @@ fn main() -> Result<()> { anyhow::bail!("The Function execution failed. Review the logs for more information.") } } + +fn run_batch_mode( + opts: &Opts, + engine: &wasmtime::Engine, + module: &Module, + codec: Codec, +) -> Result<()> { + let input_reader: Box = if let Some(ref input) = opts.input { + Box::new(BufReader::new(File::open(input).map_err(|e| { + anyhow!("Couldn't load input {:?}: {}", input, e) + })?)) + } else if !std::io::stdin().is_terminal() { + Box::new(BufReader::new(stdin())) + } else { + return Err(anyhow!( + "You must provide input via the --input flag or piped via stdin." + )); + }; + + // Pre-calculate scale factor (constant across all inputs in batch) + let schema_string = opts.read_schema_to_string().transpose()?; + let query_string = opts.read_query_to_string().transpose()?; + + // Disable profiling in batch mode for performance + let profile_opts = None; + + let mut line_num = 0; + let mut success_count = 0; + let mut error_count = 0; + + for line_result in input_reader.lines() { + line_num += 1; + + let line = match line_result { + Ok(l) => l, + Err(e) => { + error_count += 1; + if opts.batch_continue_on_error { + eprintln!("Error reading line {}: {}", line_num, e); + println!(r#"{{"success":false,"error":"Error reading input: {}"}}"#, e); + continue; + } else { + return Err(e.into()); + } + } + }; + + // Skip empty lines + if line.trim().is_empty() { + continue; + } + + // Parse input + let input = match BytesContainer::new( + BytesContainerType::Input, + codec, + line.into_bytes(), + ) { + Ok(i) => i, + Err(e) => { + error_count += 1; + if opts.batch_continue_on_error { + eprintln!("Error parsing line {}: {}", line_num, e); + println!(r#"{{"success":false,"error":"Invalid JSON input: {}"}}"#, e); + continue; + } else { + return Err(e); + } + } + }; + + // Calculate scale factor for this input + let scale_factor = if let (Some(ref schema_string), Some(ref query_string), Some(ref json_value)) = + (&schema_string, &query_string, &input.json_value) + { + match BluejaySchemaAnalyzer::analyze_schema_definition( + schema_string, + opts.schema_path.as_ref().and_then(|p| p.to_str()), + query_string, + opts.query_path.as_ref().and_then(|p| p.to_str()), + json_value, + ) { + Ok(sf) => sf, + Err(e) => { + error_count += 1; + if opts.batch_continue_on_error { + eprintln!("Error analyzing schema for line {}: {}", line_num, e); + println!(r#"{{"success":false,"error":"Schema analysis failed: {}"}}"#, e); + continue; + } else { + return Err(e); + } + } + } + } else { + DEFAULT_SCALE_FACTOR + }; + + // Run function (reusing engine/module!) + let result = run(FunctionRunParams { + function_path: opts.function.clone(), + input, + export: opts.export.as_ref(), + profile_opts, + scale_factor, + module: module.clone(), + engine: engine.clone(), + }); + + // Output result immediately (streaming JSONL - compact format for line-by-line parsing) + match result { + Ok(function_result) => { + success_count += 1; + // Use compact JSON (not pretty-printed) for JSONL format + let compact_json = serde_json::to_string(&function_result) + .unwrap_or_else(|error| error.to_string()); + println!("{}", compact_json); + } + Err(e) => { + error_count += 1; + if opts.batch_continue_on_error { + eprintln!("Error executing line {}: {}", line_num, e); + println!(r#"{{"success":false,"error":"Execution failed: {}"}}"#, e); + } else { + return Err(e); + } + } + } + } + + // Log summary to stderr (so it doesn't interfere with JSONL output on stdout) + eprintln!("Batch complete: {} inputs processed, {} successful, {} errors", + line_num, success_count, error_count); + + Ok(()) +} From bd5a5992245c06e6c88c297236c3763a04da528c Mon Sep 17 00:00:00 2001 From: Dave Nagoda Date: Tue, 30 Jun 2026 15:07:14 -0700 Subject: [PATCH 2/4] Preserve JSON input object order Function-runner parses JSON inputs into serde_json::Value and reserializes them before passing bytes to the WASM. With serde_json's default map implementation, this sorts object keys lexicographically, which can change JS behavior that depends on Object.keys() ordering for fallback logic.\n\nEnable serde_json's preserve_order feature so JSON input object key order is retained, and add a regression test covering nested metafield-like message objects. This keeps function-rerunner parity closer to production inputs for Checkout Blocks discount functions.\n\nVerified with:\n- cargo test json_input_preserves_object_key_order_in_raw_bytes\n- cargo build --release\n- cargo test\n- Rerunning the 17 discount localized-message mismatch rows with the fixed release runner produced full semantic matches. Assisted-By: devx/c659e918-9568-4750-b122-e3890447348a --- Cargo.lock | 1 + Cargo.toml | 2 +- src/container.rs | 15 +++++++++++++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index c514bc61..38210465 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2390,6 +2390,7 @@ version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" dependencies = [ + "indexmap", "itoa", "memchr", "ryu", diff --git a/Cargo.toml b/Cargo.toml index 713ff3d7..6b785287 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,7 @@ wasmtime-wasi = "=31.0.0" deterministic-wasi-ctx = "=1.0.0" anyhow = "1.0" clap = { version = "4.5", features = ["derive"] } -serde_json = "1.0" +serde_json = { version = "1.0", features = ["preserve_order"] } colored = "3.0" serde = "1.0" rust-embed = "8.9.0" diff --git a/src/container.rs b/src/container.rs index c3c1fd1b..435bf729 100644 --- a/src/container.rs +++ b/src/container.rs @@ -136,3 +136,18 @@ impl BytesContainer { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn json_input_preserves_object_key_order_in_raw_bytes() { + let raw = br#"{"msg":{"sq-XK":"5.00 EUR Zbritje","en":"5.00 EUR Discount"}}"#.to_vec(); + + let input = BytesContainer::new(BytesContainerType::Input, Codec::Json, raw.clone()) + .expect("valid JSON input"); + + assert_eq!(String::from_utf8(input.raw).unwrap(), String::from_utf8(raw).unwrap()); + } +} From abfd3e02ae37c56dad4009c8b63d91cb2f566d17 Mon Sep 17 00:00:00 2001 From: Dave Nagoda Date: Tue, 30 Jun 2026 15:57:01 -0700 Subject: [PATCH 3/4] Refine batch failure handling Make batch mode continue by default and add --batch-fail-on-error for callers that want fail-fast behavior. Count actual function successes and failures based on FunctionRunResult.success instead of treating every successful runner invocation as a successful function run.\n\nAdd integration coverage for default continue behavior, fail-fast behavior, and accurate batch summaries. Also update integration tests to use assert_cmd::cargo::cargo_bin! instead of the deprecated Command::cargo_bin helper.\n\nVerified with:\n- cargo test batch_\n- cargo test Assisted-By: devx/c659e918-9568-4750-b122-e3890447348a --- src/container.rs | 5 +- src/main.rs | 117 ++++++++++++++++++++++--------------- tests/integration_tests.rs | 116 +++++++++++++++++++++++++++++------- 3 files changed, 168 insertions(+), 70 deletions(-) diff --git a/src/container.rs b/src/container.rs index 435bf729..a27a6e83 100644 --- a/src/container.rs +++ b/src/container.rs @@ -148,6 +148,9 @@ mod tests { let input = BytesContainer::new(BytesContainerType::Input, Codec::Json, raw.clone()) .expect("valid JSON input"); - assert_eq!(String::from_utf8(input.raw).unwrap(), String::from_utf8(raw).unwrap()); + assert_eq!( + String::from_utf8(input.raw).unwrap(), + String::from_utf8(raw).unwrap() + ); } } diff --git a/src/main.rs b/src/main.rs index 06675203..6da0b4ac 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,9 +66,9 @@ struct Opts { #[clap(short, long)] batch: bool, - /// In batch mode, continue processing on individual input errors (default: false) + /// In batch mode, fail fast on individual input errors (default: false) #[clap(long)] - batch_continue_on_error: bool, + batch_fail_on_error: bool, } impl Opts { @@ -227,7 +227,7 @@ fn run_batch_mode( )); }; - // Pre-calculate scale factor (constant across all inputs in batch) + // Load schema/query once; scale factor is computed per input. let schema_string = opts.read_schema_to_string().transpose()?; let query_string = opts.read_query_to_string().transpose()?; @@ -235,8 +235,9 @@ fn run_batch_mode( let profile_opts = None; let mut line_num = 0; + let mut processed_count = 0; let mut success_count = 0; - let mut error_count = 0; + let mut failed_count = 0; for line_result in input_reader.lines() { line_num += 1; @@ -244,13 +245,16 @@ fn run_batch_mode( let line = match line_result { Ok(l) => l, Err(e) => { - error_count += 1; - if opts.batch_continue_on_error { + failed_count += 1; + if opts.batch_fail_on_error { + return Err(e.into()); + } else { eprintln!("Error reading line {}: {}", line_num, e); - println!(r#"{{"success":false,"error":"Error reading input: {}"}}"#, e); + println!( + r#"{{"success":false,"error":"Error reading input: {}"}}"#, + e + ); continue; - } else { - return Err(e.into()); } } }; @@ -260,51 +264,53 @@ fn run_batch_mode( continue; } + processed_count += 1; + // Parse input - let input = match BytesContainer::new( - BytesContainerType::Input, - codec, - line.into_bytes(), - ) { + let input = match BytesContainer::new(BytesContainerType::Input, codec, line.into_bytes()) { Ok(i) => i, Err(e) => { - error_count += 1; - if opts.batch_continue_on_error { + failed_count += 1; + if opts.batch_fail_on_error { + return Err(e); + } else { eprintln!("Error parsing line {}: {}", line_num, e); println!(r#"{{"success":false,"error":"Invalid JSON input: {}"}}"#, e); continue; - } else { - return Err(e); } } }; // Calculate scale factor for this input - let scale_factor = if let (Some(ref schema_string), Some(ref query_string), Some(ref json_value)) = - (&schema_string, &query_string, &input.json_value) - { - match BluejaySchemaAnalyzer::analyze_schema_definition( - schema_string, - opts.schema_path.as_ref().and_then(|p| p.to_str()), - query_string, - opts.query_path.as_ref().and_then(|p| p.to_str()), - json_value, - ) { - Ok(sf) => sf, - Err(e) => { - error_count += 1; - if opts.batch_continue_on_error { - eprintln!("Error analyzing schema for line {}: {}", line_num, e); - println!(r#"{{"success":false,"error":"Schema analysis failed: {}"}}"#, e); - continue; - } else { - return Err(e); + let scale_factor = + if let (Some(ref schema_string), Some(ref query_string), Some(ref json_value)) = + (&schema_string, &query_string, &input.json_value) + { + match BluejaySchemaAnalyzer::analyze_schema_definition( + schema_string, + opts.schema_path.as_ref().and_then(|p| p.to_str()), + query_string, + opts.query_path.as_ref().and_then(|p| p.to_str()), + json_value, + ) { + Ok(sf) => sf, + Err(e) => { + failed_count += 1; + if opts.batch_fail_on_error { + return Err(e); + } else { + eprintln!("Error analyzing schema for line {}: {}", line_num, e); + println!( + r#"{{"success":false,"error":"Schema analysis failed: {}"}}"#, + e + ); + continue; + } } } - } - } else { - DEFAULT_SCALE_FACTOR - }; + } else { + DEFAULT_SCALE_FACTOR + }; // Run function (reusing engine/module!) let result = run(FunctionRunParams { @@ -320,27 +326,42 @@ fn run_batch_mode( // Output result immediately (streaming JSONL - compact format for line-by-line parsing) match result { Ok(function_result) => { - success_count += 1; + let function_succeeded = function_result.success; + if function_succeeded { + success_count += 1; + } else { + failed_count += 1; + } + // Use compact JSON (not pretty-printed) for JSONL format let compact_json = serde_json::to_string(&function_result) .unwrap_or_else(|error| error.to_string()); println!("{}", compact_json); + + if !function_succeeded && opts.batch_fail_on_error { + anyhow::bail!( + "Function execution failed on line {}. Review the logs for more information.", + line_num + ); + } } Err(e) => { - error_count += 1; - if opts.batch_continue_on_error { + failed_count += 1; + if opts.batch_fail_on_error { + return Err(e); + } else { eprintln!("Error executing line {}: {}", line_num, e); println!(r#"{{"success":false,"error":"Execution failed: {}"}}"#, e); - } else { - return Err(e); } } } } // Log summary to stderr (so it doesn't interfere with JSONL output on stdout) - eprintln!("Batch complete: {} inputs processed, {} successful, {} errors", - line_num, success_count, error_count); + eprintln!( + "Batch complete: {} inputs processed, {} successful, {} failed", + processed_count, success_count, failed_count + ); Ok(()) } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 12e3c9e2..c390e4a5 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -15,7 +15,7 @@ mod tests { #[test] fn run() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let input_file = temp_input(json!({"count": 0}))?; cmd.args(["--function", "tests/fixtures/build/noop.wasm"]) @@ -28,7 +28,7 @@ mod tests { #[test] fn invalid_json_input() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); cmd.args(["--function", "tests/fixtures/build/exit_code.wasm"]) .arg("--json") @@ -42,7 +42,7 @@ mod tests { #[test] fn run_stdin() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let input_file = temp_input(json!({"exit_code": 0}))?; let file = File::open(input_file.path())?; @@ -63,9 +63,72 @@ mod tests { Ok(()) } + #[test] + fn batch_continues_by_default_and_counts_function_failures() -> Result<()> { + let mut cmd = function_runner_command(); + let input_file = temp_batch_input("{\"code\":0}\n{\"code\":1}\n{\"code\":0}\n")?; + + cmd.args(["--function", "tests/fixtures/build/exit_code.wasm"]) + .arg("--batch") + .arg("--input") + .arg(input_file.as_os_str()); + + let output = cmd.output()?; + + assert!(output.status.success()); + + let stdout = String::from_utf8(output.stdout)?; + let results = stdout + .lines() + .map(serde_json::from_str::) + .collect::, _>>()?; + assert_eq!(results.len(), 3); + assert_eq!(results[0]["success"], true); + assert_eq!(results[1]["success"], false); + assert_eq!(results[2]["success"], true); + + assert_eq!( + String::from_utf8(output.stderr)?, + "Batch complete: 3 inputs processed, 2 successful, 1 failed\n" + ); + + Ok(()) + } + + #[test] + fn batch_fail_on_error_stops_after_function_failure() -> Result<()> { + let mut cmd = function_runner_command(); + let input_file = temp_batch_input("{\"code\":0}\n{\"code\":1}\n{\"code\":0}\n")?; + + cmd.args(["--function", "tests/fixtures/build/exit_code.wasm"]) + .arg("--batch") + .arg("--batch-fail-on-error") + .arg("--input") + .arg(input_file.as_os_str()); + + let output = cmd.output()?; + + assert!(!output.status.success()); + + let stdout = String::from_utf8(output.stdout)?; + let results = stdout + .lines() + .map(serde_json::from_str::) + .collect::, _>>()?; + assert_eq!(results.len(), 2); + assert_eq!(results[0]["success"], true); + assert_eq!(results[1]["success"], false); + + assert!(String::from_utf8(output.stderr)?.contains( + "Function execution failed on line 2. Review the logs for more information." + )); + + Ok(()) + } + #[test] fn run_no_opts() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let output = cmd .stdout(Stdio::piped()) .stderr(Stdio::piped()) @@ -88,7 +151,7 @@ mod tests { #[test] #[ignore = "This test hangs on CI but runs locally, is_terminal is likely returning false in CI"] fn run_function_no_input() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); cmd.args(["--function", "tests/fixtures/build/exit_code.wasm"]); cmd.assert() @@ -100,7 +163,7 @@ mod tests { #[test] fn run_json() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let input_file = temp_input(json!({"count": 0}))?; cmd.args(["--function", "tests/fixtures/build/noop.wasm"]) @@ -117,7 +180,7 @@ mod tests { #[test] fn wasm_file_doesnt_exist() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let input_file = temp_input(json!({"exit_code": 0}))?; cmd.args(["--function", "test/file/doesnt/exist"]) @@ -132,7 +195,7 @@ mod tests { #[test] fn input_file_doesnt_exist() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); cmd.args(["--function", "tests/fixtures/build/exit_code.wasm"]) .args(["--input", "test/file/doesnt/exist.json"]); @@ -174,7 +237,7 @@ mod tests { #[test] fn incorrect_input() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let input_file = temp_input(json!({}))?; cmd.args(["--function", "tests/fixtures/build/exit_code.wasm"]) @@ -196,7 +259,7 @@ mod tests { #[test] fn exports() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let input_file = temp_input(json!({}))?; cmd.args(["--function", "tests/fixtures/build/exports.wasm"]) .args(["--export", "export1"]) @@ -210,7 +273,7 @@ mod tests { #[test] fn missing_export() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let input_file = temp_input(json!({}))?; cmd.args(["--function", "tests/fixtures/build/exports.wasm"]) .arg("--input") @@ -225,7 +288,7 @@ mod tests { #[test] fn failing_function_returns_non_zero_exit_code_for_module_errors() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let input_file = temp_input(json!({}))?; cmd.args([ "--function", @@ -242,7 +305,7 @@ mod tests { } fn profile_base_cmd_in_temp_dir() -> Result<(Command, assert_fs::TempDir)> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let cwd = std::env::current_dir()?; let temp = assert_fs::TempDir::new()?; let input_file = temp.child("input.json"); @@ -257,6 +320,10 @@ mod tests { Ok((cmd, temp)) } + fn function_runner_command() -> Command { + Command::new(assert_cmd::cargo::cargo_bin!("function-runner")) + } + fn temp_input(json: serde_json::Value) -> Result { let file = assert_fs::NamedTempFile::new("input.json")?; file.write_str(json.to_string().as_str())?; @@ -264,9 +331,16 @@ mod tests { Ok(file) } + fn temp_batch_input(jsonl: &str) -> Result { + let file = assert_fs::NamedTempFile::new("input.jsonl")?; + file.write_str(jsonl)?; + + Ok(file) + } + #[test] fn test_scale_limits_analyzer_use_defaults_when_query_and_schema_not_provided() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let input_file = temp_input(json!({"cart": { "lines": [ {"quantity": 2} @@ -289,7 +363,7 @@ mod tests { #[test] fn test_scale_limits_analyzer_use_defaults_when_query_or_schema_not_provided() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let input_file = temp_input(json!({"cart": { "lines": [ {"quantity": 2} @@ -314,7 +388,7 @@ mod tests { #[test] fn test_scale_limits_analyzer_with_scaled_limits() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let input_data = vec![json!({"quantity": 2}); 400]; let json_data = json!({ @@ -349,7 +423,7 @@ mod tests { #[test] fn run_javy_plugin_v2() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let input = temp_input(json!({"hello": "world"}))?; cmd.args([ @@ -386,7 +460,7 @@ mod tests { &trampolined_module, )?; - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let input_file = temp_input(json!({"hello": "world"}))?; cmd.arg("--function") @@ -412,7 +486,7 @@ mod tests { &trampolined_module, )?; - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let input_file = temp_input(json!({"hello": "world"}))?; cmd.arg("--function") @@ -437,7 +511,7 @@ mod tests { #[test] fn run_javy_plugin_v3() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let input = temp_input(json!({"hello": "world"}))?; cmd.args([ @@ -466,7 +540,7 @@ mod tests { #[test] fn invalid_import_combination() -> Result<()> { - let mut cmd = Command::cargo_bin("function-runner")?; + let mut cmd = function_runner_command(); let input = temp_input(json!({"hello": "world"}))?; cmd.args([ From 184eaab1fa79e31e3295034b29ec83626135891d Mon Sep 17 00:00:00 2001 From: Dave Nagoda Date: Tue, 30 Jun 2026 16:38:23 -0700 Subject: [PATCH 4/4] Precompile providers for batch runs Batch mode is intended to process large JSONL input sets efficiently. Javy/provider functions were still compiling the embedded standard provider module for every input row, so provider setup dominated runtime even though the function module itself was reused.\n\nCompile the standard provider once before the batch loop and pass the compiled provider into each row execution. IOHandler now instantiates the precompiled provider module when it matches the function's standard import, falling back to the old Module::from_binary path otherwise.\n\nAdd batch coverage for a Javy v3 function to exercise the provider path.\n\nMeasured locally on 5,000 js_function_javy_plugin_v3 rows:\n- Before: median 52.31s\n- After: median 0.20s warm run\n\nMeasured on discount-order's 250,003-row parity dataset:\n- Before: 549.85s, 454.68 rows/sec\n- After: 119.87s, 2085.58 rows/sec\n\nVerified with:\n- cargo test batch_\n- cargo test Assisted-By: devx/c659e918-9568-4750-b122-e3890447348a --- src/engine.rs | 21 ++++++++++++++++++- src/io.rs | 39 +++++++++++++++++++++++++++------- src/main.rs | 25 +++++++++++++--------- src/validated_module.rs | 43 +++++++++++++++++++++++++++++++++++++- tests/integration_tests.rs | 32 ++++++++++++++++++++++++++++ 5 files changed, 141 insertions(+), 19 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index af1d6f11..92f2b452 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -7,6 +7,7 @@ use wasmtime_wasi::I32Exit; use crate::function_run_result::FunctionRunResult; use crate::io::{IOHandler, OutputAndLogs}; +pub use crate::validated_module::CompiledProvider; use crate::validated_module::ValidatedModule; use crate::{BytesContainer, BytesContainerType}; @@ -88,7 +89,21 @@ impl ResourceLimiter for MemoryLimiter { } } +pub fn compile_standard_provider( + module: &Module, + engine: &Engine, +) -> Result> { + ValidatedModule::compile_standard_provider(module, engine) +} + pub fn run(params: FunctionRunParams) -> Result { + run_with_compiled_provider(params, None) +} + +pub fn run_with_compiled_provider( + params: FunctionRunParams, + compiled_provider: Option<&CompiledProvider>, +) -> Result { let FunctionRunParams { function_path, input, @@ -99,7 +114,11 @@ pub fn run(params: FunctionRunParams) -> Result { module, } = params; - let mut io_handler = IOHandler::new(ValidatedModule::new(module)?, input.clone()); + let mut io_handler = IOHandler::new_with_compiled_provider( + ValidatedModule::new(module)?, + input.clone(), + compiled_provider.cloned(), + ); let mut error_logs: String = String::new(); diff --git a/src/io.rs b/src/io.rs index f18968ea..60552d4e 100644 --- a/src/io.rs +++ b/src/io.rs @@ -7,7 +7,9 @@ use wasmtime_wasi::{ }; use crate::{ - function_run_result::FUNCTION_LOG_LIMIT, validated_module::ValidatedModule, BytesContainer, + function_run_result::FUNCTION_LOG_LIMIT, + validated_module::{CompiledProvider, ValidatedModule}, + BytesContainer, }; pub(crate) struct OutputAndLogs { @@ -29,10 +31,15 @@ pub(crate) struct IOHandler { strategy: IOStrategy, module: ValidatedModule, input: BytesContainer, + compiled_provider: Option, } impl IOHandler { - pub(crate) fn new(module: ValidatedModule, input: BytesContainer) -> Self { + pub(crate) fn new_with_compiled_provider( + module: ValidatedModule, + input: BytesContainer, + compiled_provider: Option, + ) -> Self { Self { strategy: if module.uses_mem_io() { IOStrategy::Memory(None) @@ -44,6 +51,7 @@ impl IOHandler { }, module, input, + compiled_provider, } } @@ -75,7 +83,13 @@ impl IOHandler { store.set_epoch_deadline(1); // Need to make sure we don't timeout during initialization. let old_fuel = store.get_fuel()?; store.set_fuel(u64::MAX)?; // Make sure we have fuel for initialization. - let mem_io_instance = instantiate_imports(&self.module, engine, linker, store); + let mem_io_instance = instantiate_imports( + &self.module, + self.compiled_provider.as_ref(), + engine, + linker, + store, + ); if let IOStrategy::Memory(ref mut instance) = self.strategy { *instance = mem_io_instance; } @@ -157,6 +171,7 @@ impl IOHandler { fn instantiate_imports( module: &ValidatedModule, + compiled_provider: Option<&CompiledProvider>, engine: &Engine, linker: &mut Linker, mut store: &mut Store, @@ -164,14 +179,24 @@ fn instantiate_imports( let mut mem_io_instance = None; if let Some(std_import) = module.std_import() { - let imported_module = Module::from_binary(engine, &std_import.bytes) - .unwrap_or_else(|_| panic!("Failed to load module {}", std_import.name)); + let fallback_module; + let (imported_module, is_mem_io_provider) = match compiled_provider { + Some(compiled_provider) if compiled_provider.name() == std_import.name => ( + compiled_provider.module(), + compiled_provider.is_mem_io_provider(), + ), + _ => { + fallback_module = Module::from_binary(engine, &std_import.bytes) + .unwrap_or_else(|_| panic!("Failed to load module {}", std_import.name)); + (&fallback_module, std_import.is_mem_io_provider()) + } + }; let imported_module_instance = linker - .instantiate(&mut store, &imported_module) + .instantiate(&mut store, imported_module) .expect("Failed to instantiate imported instance"); - if std_import.is_mem_io_provider() { + if is_mem_io_provider { mem_io_instance = Some(imported_module_instance); } diff --git a/src/main.rs b/src/main.rs index 6da0b4ac..9f941591 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,7 @@ use anyhow::{anyhow, Result}; use clap::Parser; use function_runner::{ bluejay_schema_analyzer::BluejaySchemaAnalyzer, - engine::{run, FunctionRunParams, ProfileOpts}, + engine::{run, run_with_compiled_provider, FunctionRunParams, ProfileOpts}, }; use is_terminal::IsTerminal; @@ -231,6 +231,8 @@ fn run_batch_mode( let schema_string = opts.read_schema_to_string().transpose()?; let query_string = opts.read_query_to_string().transpose()?; + let compiled_provider = function_runner::engine::compile_standard_provider(module, engine)?; + // Disable profiling in batch mode for performance let profile_opts = None; @@ -313,15 +315,18 @@ fn run_batch_mode( }; // Run function (reusing engine/module!) - let result = run(FunctionRunParams { - function_path: opts.function.clone(), - input, - export: opts.export.as_ref(), - profile_opts, - scale_factor, - module: module.clone(), - engine: engine.clone(), - }); + let result = run_with_compiled_provider( + FunctionRunParams { + function_path: opts.function.clone(), + input, + export: opts.export.as_ref(), + profile_opts, + scale_factor, + module: module.clone(), + engine: engine.clone(), + }, + compiled_provider.as_ref(), + ); // Output result immediately (streaming JSONL - compact format for line-by-line parsing) match result { diff --git a/src/validated_module.rs b/src/validated_module.rs index 2d4b40e9..8c12e471 100644 --- a/src/validated_module.rs +++ b/src/validated_module.rs @@ -2,12 +2,33 @@ use std::borrow::Cow; use anyhow::{bail, Result}; use rust_embed::RustEmbed; -use wasmtime::Module; +use wasmtime::{Engine, Module}; #[derive(RustEmbed)] #[folder = "providers/"] struct StandardProviders; +#[derive(Debug, Clone)] +pub struct CompiledProvider { + name: String, + is_mem_io: bool, + module: Module, +} + +impl CompiledProvider { + pub(crate) fn name(&self) -> &str { + &self.name + } + + pub(crate) fn is_mem_io_provider(&self) -> bool { + self.is_mem_io + } + + pub(crate) fn module(&self) -> &Module { + &self.module + } +} + #[derive(Debug)] pub(crate) struct Provider { pub(crate) bytes: Cow<'static, [u8]>, @@ -15,6 +36,14 @@ pub(crate) struct Provider { } impl Provider { + pub(crate) fn compile(&self, engine: &Engine) -> Result { + Ok(CompiledProvider { + name: self.name.clone(), + is_mem_io: self.is_mem_io_provider(), + module: Module::from_binary(engine, &self.bytes)?, + }) + } + pub(crate) fn is_mem_io_provider(&self) -> bool { let javy_plugin_version = self .name @@ -45,6 +74,18 @@ pub(crate) struct ValidatedModule { } impl ValidatedModule { + pub(crate) fn compile_standard_provider( + module: &Module, + engine: &Engine, + ) -> Result> { + let validated_module = Self::new(module.clone())?; + validated_module + .std_import + .as_ref() + .map(|provider| provider.compile(engine)) + .transpose() + } + pub(crate) fn new(module: Module) -> Result { // Need to track with deterministic order so don't use a hash let mut imports = vec![]; diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index c390e4a5..c5e299e6 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -126,6 +126,38 @@ mod tests { Ok(()) } + #[test] + fn batch_javy_plugin_v3_uses_provider() -> Result<()> { + let mut cmd = function_runner_command(); + let input_file = temp_batch_input("{\"hello\":\"world\"}\n{\"hello\":\"world\"}\n")?; + + cmd.args([ + "--function", + "tests/fixtures/build/js_function_javy_plugin_v3.wasm", + ]) + .arg("--json") + .arg("--batch") + .arg("--input") + .arg(input_file.as_os_str()); + + let output = cmd.output()?; + + assert!(output.status.success()); + + let stdout = String::from_utf8(output.stdout)?; + let results = stdout + .lines() + .map(serde_json::from_str::) + .collect::, _>>()?; + assert_eq!(results.len(), 2); + assert!(results.iter().all(|result| result["success"] == true)); + assert!(results + .iter() + .all(|result| result["output"].to_string().contains("world output"))); + + Ok(()) + } + #[test] fn run_no_opts() -> Result<()> { let mut cmd = function_runner_command();