feat: Native execution via Ballista [experimental]#4800
Draft
andygrove wants to merge 42 commits into
Draft
Conversation
Add an FFI export that decodes a Comet `Operator` protobuf, builds the native plan with the existing `PhysicalPlanner`, and wraps the root as an `FFI_ExecutionPlan`. A process compiled against a different DataFusion version (for example a Ballista executor) can then execute Comet's native operators without linking Comet's Rust crates. The plan must use `NativeScan` leaves so no JVM-fed input sources are required. Relates to apache#4796
Add a new native workspace member that ports the comet-ffi-consumer PoC's CometScanExec, CometPhysicalCodec/CometLogicalCodec, and CometTableProvider so Comet native scans can run as leaves inside Ballista distributed plans. Wire the Ballista dependency as a git dep pinned to DF54 rev a8b3c79c, and depend on Comet's core crate via a workspace path dep instead of the PoC's path deps into sibling checkouts.
Adapt the c2a PoC binary into an integration test: build a CoalesceBatchesExec(CometScanExec) plan, encode it with CometPhysicalCodec exactly as Ballista's scheduler would, decode it in a fresh SessionContext to simulate an executor, and execute it. Asserts the Comet leaf survives Ballista's physical-plan serialization and still produces the expected 5 rows. Relates to apache#4796
…, tidy deps/docs Relates to apache#4796
…ow round-trip Add a driver-side "offload to Ballista" submission entry that lets the JVM run a serialized Comet Operator proto on an in-process standalone Ballista engine (no Spark executors) and receive the result back over the Arrow C Data Interface. - Build datafusion-comet-ballista as a cdylib (plus rlib) carrying a JNI entry Java_org_apache_comet_ballista_NativeBallista_executeQuery. It reuses the existing proto -> standalone Ballista -> RecordBatches recipe and exports each result column into JVM-allocated FFI_ArrowArray/FFI_ArrowSchema structs, mirroring jni_api::prepare_output. A buildTestProto entry returns the fixed test proto so the JVM side needs no generated proto classes. - Add a Scala NativeBallista binding and a ScalaTest that loads the new lib, runs the proto, imports the result via Arrow Java, and asserts 5 rows come back with no Spark cluster. - Add a Rust integration test that exercises the same C Data export/import boundary end to end. Relates to apache#4796.
Overview, architecture, components, config, and a roadmap with task tracking for the driver-side Comet->Ballista offload work. Relates to apache#4796
…llista Add a driver-side seam so a `collect()` on a single-stage Comet query runs on the Spark driver via an in-process Ballista engine, launching no Spark executor tasks. Gated by `spark.comet.exec.ballista.enabled` (default false). - CometConf: add COMET_EXEC_BALLISTA_ENABLED. - CometExec.executeCollect and the Comet columnar-to-row nodes (the real collect roots) dispatch to CometExec.executeCollectViaBallista when the flag is on. It locates the single CometNativeExec boundary carrying the serialized plan, injects each NativeScan's file partitions into the template proto (merging all partitions into one native scan leaf), submits it via NativeBallista, imports the exported Arrow batch on the driver, and materializes the rows. Plans with more than one native block (an exchange) throw UnsupportedOperationException. - NativeBallista: promote from the test tree to main and load libdatafusion_comet_ballista after libcomet so core JNI symbols bind to libcomet (the ballista cdylib re-exports them) rather than a divergent copy. - ffi_jni: derive the result schema from the built plan's schema() instead of the NativeScan proto, so plans with operators above the scan report their true output schema. - Tests: add CometBallistaOffloadSuite proving identical rows flag-on vs -off and zero executor task starts (SparkListener) for the offloaded collect. Relates to apache#4796.
… caveat Relates to apache#4796. - Add a negative test that runs a multi-partition GROUP BY (forces an exchange, disabling AQE for determinism) with the offload flag on and asserts the single-stage guard in executeCollectViaBallista throws UnsupportedOperationException, after confirming the plan actually contains an exchange. - Add a positive control to the existing zero-task test: run the flag-off baseline collect() through the same SparkListener/waitUntilEmpty apparatus and assert task starts > 0, proving the listener genuinely observes executor tasks so the == 0 assertion for the offloaded collect is meaningful. - Document in COMET_EXEC_BALLISTA_ENABLED that R1 targets single-stage queries without dynamic partition pruning or correlated scalar subqueries, since resolving those via waitForSubqueries()/ updateResult() can still launch Spark executor tasks under offload.
… Ballista Add CometBallistaQ1Suite, the milestone demonstration for the R1 driver-side Ballista offload using TPC-H Q1 lineitem data and per-row semantics. A synthetic single-file lineitem (correct decimal(12,2)/date/string types, rows straddling the Q1 date cutoff) is queried with spark.comet.exec.ballista.enabled=true and its collected rows are asserted identical to the flag-off Comet baseline, with zero Spark executor tasks (SparkListener), reusing CometBallistaOffloadSuite's apparatus and positive control. Full Q1 cannot be offloaded under R1's single-serialized-block machinery: the GROUP BY forces either a partial->exchange->final shuffle (two blocks) or, with coalesce(1), a CometCoalesce sink boundary (still two blocks); the file scan's UnknownPartitioning never satisfies the aggregate's ClusteredDistribution, so the exchange is unavoidable. The multi-block/aggregate case is R2. The suite therefore offloads the largest single-block subset of Q1 -- scan + WHERE date filter + Q1's decimal arithmetic projections (disc_price, charge) -- as one exchange-free CometNativeExec, exercising the native Parquet scan, date filtering, and decimal multiplication that matter for offload correctness. The plan is asserted single-block before offloading. Relates to apache#4796.
… full Q1 needs R2) Relates to apache#4796
Introduce InputBatchStream, a small dyn-compatible trait (next_batch) that abstracts ScanExec's input source. AlignedArrowStreamReader (the JVM/FFI path via planner.rs) implements it unchanged; a new NativeBatchStream wraps a DataFusion SendableRecordBatchStream and implements it by blocking each pull on futures::executor::block_on, which is safe outside a Tokio runtime and composes with Comet's executor threads. ScanExec.input_source now holds Arc<Mutex<dyn InputBatchStream>> instead of being hard-wired to AlignedArrowStreamReader, and ScanExec::new_native gives a constructor for the native case. The JVM path in planner.rs's OpStruct::Scan handling is otherwise unchanged, just wraps its reader in the trait object. This is the enabling change for a later Comet fragment Scan/ShuffleScan leaf to be fed by a Ballista shuffle-reader stream with no JVM in the data path. Relates to apache#4796.
Add a CometFragmentExec that runs a Comet plan fragment (serialized Operator proto) whose input-leaf Scan operators are fed by the node's DataFusion children, reusing T2's native ScanExec input path. A childless fragment behaves like CometScanExec. Core gains a native (non-FFI) fragment builder in execution::fragment: it builds the plan via the planner, injects each child stream into the returned Scan handle through a new ScanExec::set_native_input setter (the handle shares its batch slot with the executable leaf), and drives get_next_batch while streaming the root, mirroring jni_api's busy-poll. CometPhysicalCodec (de)serializes the fragment via a distinct COMET_FRAGMENT_MAGIC tag, with children round-tripping through datafusion-proto. Relates to apache#4796.
…shuffle Offload a two-stage Comet GROUP BY from the Spark driver to an in-process Ballista engine, running it distributed across a hash shuffle with Comet native fragments on both sides: partial aggregate -> Ballista IPC shuffle -> final aggregate, results returned at the driver with zero Spark-executor tasks. Native side (executeQueryDistributed): assemble CometFragmentExec(block2, [Hash-Repartition(CometFragmentExec(block1))]), serialize with CometPhysicalCodec, and submit via execute_physical_plan to a standalone cluster started from a SessionState carrying the Comet codecs (so both scheduler and executor can rebuild the fragments). Ballista splits at the RepartitionExec(Hash) into two stages and the push-mode fetch returns all final partitions, concatenated and exported over Arrow-FFI. Fix CometFragmentExec partitioning: a fragment with children is a per-partition transform, so it must report its children's partition count (not the block's internal single partition). Without this only output partition 0 was executed, silently dropping the other hash buckets' groups. JVM driver: relax the offload guard to dispatch on plan shape -- one native block + no exchange stays the R1 single-stage path; two native blocks + one CometShuffleExchangeExec become the R2 distributed path, reading the group-key count and shuffle-partition count from the exchange's HashPartitioning. R2 requires shuffle directRead disabled so the final block's leaf serializes as a plain Scan the shuffle-fed fragment can consume. The partial-aggregate count state composes across the IPC shuffle: block2's Scan leaf schema is derived from the exchange output (block1's schema), so the shuffle write and final-aggregate read schemas match by construction. Point the Ballista dependency at the local worktree (physical-plan submission branch) and add ballista-scheduler/ballista-executor for the standalone start. Relates to apache#4796.
…shuffle Add the R2 milestone test: TPC-H Q1's full aggregate (sum x4, avg x3, count over decimals, grouped by the two keys l_returnflag/l_linestatus, no ORDER BY) offloaded from the driver and distributed across the Ballista hash shuffle as the 2-block shape (Comet partial-agg -> CometShuffleExchangeExec -> Comet final-agg), with 0 Spark-executor tasks. Reuses the T4 two-block driver path and R1's synthetic lineitem generator. This verifies the composition risks the count(*) test left unchecked: avg's partial (sum + count) state and decimal partial sums round-trip through Ballista's Arrow IPC shuffle and compose in the Comet final aggregate. Results match Spark's own Q1 row-for-row, including decimal scale (sum scale 4/6, avg scale 6) and avg values. The reference oracle runs with Comet fully disabled (pure Spark): a Comet-native baseline uses the native tokio engine which, once an in-process Ballista offload has run in the JVM, resolves with_env to a second, uninitialized JAVA_VM OnceCell in libdatafusion_comet_ballista and panics. Pure Spark is immune and is the correct oracle. Documented as an infrastructure limitation of the offload spike. Relates to apache#4796.
…ence limitation Relates to apache#4796
…anch Replace the local path deps with a git-rev pin on the experimental Ballista branch (apache/datafusion-ballista#1924) so the branch builds without a local checkout. Relates to apache#4796
Move the `datafusion-comet-ballista` crate into `datafusion-comet` core as a gated `execution::ballista` module and delete the separate crate, so the offload is compiled into the single `libcomet` cdylib only when built with the new default-off `ballista` Cargo feature. This removes the second, statically-linked copy of Comet core (and its distinct `JAVA_VM` static) that made a Comet-on-executor query and an in-process Ballista offload panic with "JAVA_VM not initialized" when run in the same JVM. - Make the ballista deps optional and add the `ballista` feature (default off); fix module paths to `crate::execution::…` / `super::…` and reuse core's existing `execution::ffi` and `execution::fragment` helpers. - Re-gate the moved integration tests with `#![cfg(feature = "ballista")]`. - Load the offload JNI entries from `libcomet` (built with the feature) instead of a separate `libdatafusion_comet_ballista`; `NativeBallista.isAvailable` probes symbol presence so offload suites skip on a feature-less build. - Add a `make core-ballista` convenience target; the default build stays lean. - Flip the Q1 suite's second-test oracle to Comet-on-executor (enabled) to assert coexistence with a prior offload in one JVM. Relates to apache#4796.
…ster Add Comet-flavored scheduler/executor binaries and a remote submission path so a distributed Comet plan can run on a genuinely external Ballista cluster (separate scheduler + executor processes), not just the in-process standalone engine. - Add `comet-ballista-scheduler` / `comet-ballista-executor` binaries (`required-features = ["ballista"]`) that construct the Ballista config in Rust and inject Comet's `CometLogicalCodec` / `CometPhysicalCodec` via the config `override_*_codec` fields — which the stock Ballista CLIs hardcode to None — so shipped Comet plan nodes decode on the scheduler and executor. - Generalize `execute_two_stage` with a `scheduler_url`: empty keeps the in-process standalone path; non-empty submits the built 2-stage plan to that external scheduler. Thread the URL through `submit_and_export_distributed` and the `executeQueryDistributed` JNI entry. - Wire the JVM side: `NativeBallista.executeQueryDistributed` gains a `schedulerUrl`; new `spark.comet.exec.ballista.scheduler.url` config; read and passed through in `operators.scala`. - Add an ignored integration test that spawns the two binaries as child processes and submits `CometFragment(NativeScan) -> hash-shuffle -> CometFragment(Filter)` to the external scheduler, asserting correct results. Verifies the Comet fragments execute in the separate, JVM-less executor process (only libjvm present, JAVA_VM uninitialized) with no "JAVA_VM not initialized" panic. Relates to apache#4796.
…river Add CometBallistaExternalClusterQ1Suite (-Pspark-4.0), which spawns the feature-built comet-scheduler and comet-executor as child processes on non-default ports (libjvm on the loader path, port readiness + registration grace, teardown that kills them even on failure), points spark.comet.exec.ballista.scheduler.url at the live scheduler, and offloads full TPC-H Q1's aggregate to it. Asserts the collected rows match a Spark oracle row-for-row (including decimal scale) and that 0 Spark-executor tasks run, closing the R1b gap where the scheduler.url path was validated only by compilation. This is the first time the full Q1 aggregate fragment (partial-agg NativeScan leaf -> hash shuffle -> final-agg over a Scan leaf) runs on a separate, JVM-less executor process; it runs cleanly with no JAVA_VM panic. Relates to apache#4796. No Rust changed; the default (no-feature) build is unaffected.
Fixes the RAT license check on the offload test files. Relates to apache#4796
They use Spark 4.0+ APIs (withSQLConf returning a value), so compiling them under Spark 3.4/3.5 fails. Gate them to the 4.x-only test source dir. Relates to apache#4796
…guard Apply whole-branch landing-review fixes to the experimental Comet -> Ballista offload feature so the default build stays Ballista-free and the offload path is correct. - Keep the default `libcomet` build Ballista-free: `execution::ffi` and `execution::fragment` are used only by the feature-gated `ballista` module, so gate both `pub mod` declarations behind `#[cfg(feature = "ballista")]` and make `datafusion-ffi` an optional dependency activated by the `ballista` feature. The default `cargo build` now pulls zero offload deps (no datafusion-ffi / ballista / tonic). - Fix the busy-spin in `NativeFragmentStream::poll_next`: only re-poll the root after actually feeding new input into a leaf. When no leaf needed input (or a childless `NativeScan` fragment has no leaves) and the root returned `Pending`, return `Poll::Pending` and rely on the root's registered waker instead of hot-looping a worker thread on every async-I/O `Pending`. - Tighten the R2 two-block offload guard: only take the hash-shuffle path when block1 is a partial `HashAggregate`, block2 the matching final `HashAggregate`, and the grouping-key width matches the exchange. Other single-hash-exchange shapes (e.g. a window `PARTITION BY`) previously hashed the wrong columns and silently produced wrong results; they now fall through to a clear rejection. - Reject Iceberg native scans in `injectScanFiles`: `CometIcebergNativeScanExec` leaves carry their splits differently and would be shipped with no files to read (silently zero rows), so raise a clear error. - Minor: replace library `eprintln!` debug output with `log::debug!`, fix the `build_test_proto` doc sentence, stage exported Arrow columns into a Vec before writing them into the JVM structs (avoid a partial-write leak on mid-loop failure), and document that the offload flag requires AQE off. Relates to apache#4796.
PR apache/datafusion-ballista#1924 (execute_physical_plan + the PhysicalPlan submission variant the distributed offload needs) has merged, so pin to the apache/datafusion-ballista main rev 6472c7f2 instead of the personal fork branch.
Drop the build_test_proto/buildTestProto spike helper (a hand-built NativeScan proto used only to probe symbol availability and by the spike suite) and the CometBallistaFfiSpikeSuite. Replace the availability probe with a dedicated no-op JNI entry (probeAvailable). The offload path already runs real Comet-serialized plans; the remaining suites cover it against real Spark SQL.
executeSingleBlockViaBallista and executeTwoBlockViaBallista in CometExec are fully superseded by BallistaOffloadPlanner.buildOffloadPlan + NativeBallista.executeOffloadPlan but were left unused, which fails the build under -Xfatal-warnings (unused-private-method warnings).
… multi-join Comet fuses adjacent native operators into one serialized block, so the old directExchanges-only walk could flatten exchanges from two different joins in a fused multi-join block into one list. A block that happened to surface exactly two exchanges from different joins passed the size<=2 guard and was silently mis-paired as one join's left/right, producing wrong results instead of a clean rejection. Resolve a block's DAG inputs by validating that its exchanges are explained by exactly one binary Comet join (CometHashJoinExec or CometSortMergeJoinExec) whose left and right sides each contribute exactly one of the block's exchanges. Any other shape -- a fused multi-join block, a join with a broadcast (non-hash-exchange) side, or exchanges not cleanly split by a single join -- now throws UnsupportedOperationException instead of guessing. Also update the stale executeCollectViaBallista docstring in operators.scala to describe the current single-join support.
…ape in E2E The walker previously classified a CometBroadcastHashJoinExec as a leaf block because directExchanges did not stop at CometBroadcastExchangeExec and directJoins only matched CometHashJoinExec/CometSortMergeJoinExec, so a broadcast join only failed by luck downstream instead of being explicitly rejected. Reject it in resolveInputs and stop/reject on CometBroadcastExchangeExec/CometBroadcastHashJoinExec during the DAG walk. Also add a pre-flight plan-shape assertion to the join E2E test, mirroring the existing aggregate test's assertion, so a future plan regression is caught before comparing offloaded vs baseline output.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Relates to #4796 (native, Ballista-backed Comet execution mode). Does not close it.
Rationale for this change
#4796 proposes an optional deployment mode where a Spark query executes on an Apache DataFusion
Ballista data plane instead of inside Spark executors — the driver plans the query and hands the whole
Comet plan to Ballista, which distributes and runs it natively. The central question was whether Comet and
Ballista (which track DataFusion independently) can interoperate without co-linking, and whether a real
analytical query can actually run this way. This PR answers both with working code:
A Spark app's TPC-H Q1 aggregate now executes distributed on Ballista — driver-side, with Comet's native
operators on both sides of a hash shuffle, zero Spark-executor tasks — and matches Spark's own Q1
row-for-row (including decimal scale and
avgvalues).What changes are included in this PR?
Reuse boundary (
datafusion-ffi). Comet exposes a native plan (built by its ownPhysicalPlanner) as anFFI_ExecutionPlan; the Ballista side consumes it as aForeignExecutionPlan. No crate co-linking; only astable C ABI (same DataFusion major). Comet's
planner.rs, operators, and expressions are reused as-is.datafusion-comet-ballistacrate (native/ballista/):CometScanExec/CometFragmentExec(Comet planfragments as DataFusion operators — a fragment's
Scanleaf can be fed by a DataFusion child stream),CometPhysicalCodec/CometLogicalCodec(compose with Ballista's codecs),CometTableProvider.Native input path (
native/coreScanExec): aScanleaf can now be driven by a nativeRecordBatchStream(not only a JVM input), so a Comet fragment can read a Ballista shuffle-reader stream.Driver-side offload (
spark/): a new configspark.comet.exec.ballista.enabled; the driver'sexecuteCollectpath serializes the Comet plan, submits it to Ballista, and returns rows on the driver — noSpark-executor tasks. For a shuffle (
GROUP BY), the driver walks the multi-block plan and assemblesCometFragment(final-agg) ← Ballista hash-shuffle ← CometFragment(partial-agg), which Ballista splits intostages and runs.
Depends on apache/datafusion-ballista#1924 (a
physical_plansubmission variant), currently pinned to afork branch.
How are these changes tested?
native/core/native/ballistaRust tests: FFI plan export executes; codec round-trip;CometScanExecand
CometFragmentExec(child-fed) execute; a standalone-Ballista distributed run.-Pspark-4.0): the offloadedcollect()returns identical rows with the flag onvs off and a
SparkListenerasserts 0 executor tasks; a single-stage guard negative test; a distributed2-block
GROUP BY; and full TPC-H Q1's aggregate distributed across the shuffle, matching Spark's Q1.Known limitations (why this is a draft)
comet-ballistacdylib statically links a second copy of Comet core, so aComet-on-executor query and an in-process Ballista offload cannot yet share one JVM. Follow-up: unify it.
libjvmstill required (Comet core links the JNI bridge) — a JVM-free executor needs the JNI bridgefeature-gated.
ORDER BY/join is a 3rd stage — sort on thedriver for now); DPP/correlated-subquery inputs may still launch executor tasks.