Skip to content

feat: Native execution via Ballista [experimental]#4800

Draft
andygrove wants to merge 42 commits into
apache:mainfrom
andygrove:poc/ffi-execution-plan
Draft

feat: Native execution via Ballista [experimental]#4800
andygrove wants to merge 42 commits into
apache:mainfrom
andygrove:poc/ffi-execution-plan

Conversation

@andygrove

@andygrove andygrove commented Jul 2, 2026

Copy link
Copy Markdown
Member

⚠️ EXPERIMENTAL / RESEARCH — not for merge. A proof-of-concept, opened as a draft to make the
approach in #4796 concrete and reviewable. It takes deliberate shortcuts (see Known limitations) and
depends on an experimental Ballista branch. Feedback on the direction is very welcome; please don't
review it as production-ready code.

Which issue does this PR close?

Relates to #4796 (native, Ballista-backed Comet execution mode). Does not close it.

Rationale for this change

#4796 proposes an optional deployment mode where a Spark query executes on an Apache DataFusion
Ballista data plane instead of inside Spark executors — the driver plans the query and hands the whole
Comet plan to Ballista, which distributes and runs it natively. The central question was whether Comet and
Ballista (which track DataFusion independently) can interoperate without co-linking, and whether a real
analytical query can actually run this way. This PR answers both with working code:

A Spark app's TPC-H Q1 aggregate now executes distributed on Ballista — driver-side, with Comet's native
operators on both sides of a hash shuffle, zero Spark-executor tasks — and matches Spark's own Q1
row-for-row (including decimal scale and avg values).

What changes are included in this PR?

Reuse boundary (datafusion-ffi). Comet exposes a native plan (built by its own PhysicalPlanner) as an
FFI_ExecutionPlan; the Ballista side consumes it as a ForeignExecutionPlan. No crate co-linking; only a
stable C ABI (same DataFusion major). Comet's planner.rs, operators, and expressions are reused as-is.

datafusion-comet-ballista crate (native/ballista/): CometScanExec / CometFragmentExec (Comet plan
fragments as DataFusion operators — a fragment's Scan leaf can be fed by a DataFusion child stream),
CometPhysicalCodec/CometLogicalCodec (compose with Ballista's codecs), CometTableProvider.

Native input path (native/core ScanExec): a Scan leaf can now be driven by a native
RecordBatchStream (not only a JVM input), so a Comet fragment can read a Ballista shuffle-reader stream.

Driver-side offload (spark/): a new config spark.comet.exec.ballista.enabled; the driver's
executeCollect path serializes the Comet plan, submits it to Ballista, and returns rows on the driver — no
Spark-executor tasks. For a shuffle (GROUP BY), the driver walks the multi-block plan and assembles
CometFragment(final-agg) ← Ballista hash-shuffle ← CometFragment(partial-agg), which Ballista splits into
stages and runs.

Depends on apache/datafusion-ballista#1924 (a physical_plan submission variant), currently pinned to a
fork branch.

How are these changes tested?

  • native/core / native/ballista Rust tests: FFI plan export executes; codec round-trip; CometScanExec
    and CometFragmentExec (child-fed) execute; a standalone-Ballista distributed run.
  • Scala suites (run under -Pspark-4.0): the offloaded collect() returns identical rows with the flag on
    vs off and a SparkListener asserts 0 executor tasks; a single-stage guard negative test; a distributed
    2-block GROUP BY; and full TPC-H Q1's aggregate distributed across the shuffle, matching Spark's Q1.

Known limitations (why this is a draft)

  • Depends on an experimental Ballista branch (git-pinned); source releases disallow git deps.
  • Coexistence: the comet-ballista cdylib statically links a second copy of Comet core, so a
    Comet-on-executor query and an in-process Ballista offload cannot yet share one JVM. Follow-up: unify it.
  • libjvm still required (Comet core links the JNI bridge) — a JVM-free executor needs the JNI bridge
    feature-gated.
  • Single-partition scans; only 2-block plans (a trailing ORDER BY/join is a 3rd stage — sort on the
    driver for now); DPP/correlated-subquery inputs may still launch executor tasks.
  • Hand-built/driver-derived plan shapes; not yet driven from a Spark Connect front-end.

andygrove added 11 commits July 2, 2026 13:56
Add an FFI export that decodes a Comet `Operator` protobuf, builds the
native plan with the existing `PhysicalPlanner`, and wraps the root as an
`FFI_ExecutionPlan`. A process compiled against a different DataFusion
version (for example a Ballista executor) can then execute Comet's native
operators without linking Comet's Rust crates. The plan must use
`NativeScan` leaves so no JVM-fed input sources are required.

Relates to apache#4796
Add a new native workspace member that ports the comet-ffi-consumer
PoC's CometScanExec, CometPhysicalCodec/CometLogicalCodec, and
CometTableProvider so Comet native scans can run as leaves inside
Ballista distributed plans.

Wire the Ballista dependency as a git dep pinned to DF54 rev
a8b3c79c, and depend on Comet's core crate via a workspace path dep
instead of the PoC's path deps into sibling checkouts.
Adapt the c2a PoC binary into an integration test: build a
CoalesceBatchesExec(CometScanExec) plan, encode it with
CometPhysicalCodec exactly as Ballista's scheduler would, decode it
in a fresh SessionContext to simulate an executor, and execute it.
Asserts the Comet leaf survives Ballista's physical-plan
serialization and still produces the expected 5 rows.

Relates to apache#4796
…ow round-trip

Add a driver-side "offload to Ballista" submission entry that lets the JVM run
a serialized Comet Operator proto on an in-process standalone Ballista engine
(no Spark executors) and receive the result back over the Arrow C Data
Interface.

- Build datafusion-comet-ballista as a cdylib (plus rlib) carrying a JNI entry
  Java_org_apache_comet_ballista_NativeBallista_executeQuery. It reuses the
  existing proto -> standalone Ballista -> RecordBatches recipe and exports each
  result column into JVM-allocated FFI_ArrowArray/FFI_ArrowSchema structs,
  mirroring jni_api::prepare_output. A buildTestProto entry returns the fixed
  test proto so the JVM side needs no generated proto classes.
- Add a Scala NativeBallista binding and a ScalaTest that loads the new lib,
  runs the proto, imports the result via Arrow Java, and asserts 5 rows come
  back with no Spark cluster.
- Add a Rust integration test that exercises the same C Data export/import
  boundary end to end.

Relates to apache#4796.
Overview, architecture, components, config, and a roadmap with task tracking
for the driver-side Comet->Ballista offload work.

Relates to apache#4796
…llista

Add a driver-side seam so a `collect()` on a single-stage Comet query runs on
the Spark driver via an in-process Ballista engine, launching no Spark executor
tasks. Gated by `spark.comet.exec.ballista.enabled` (default false).

- CometConf: add COMET_EXEC_BALLISTA_ENABLED.
- CometExec.executeCollect and the Comet columnar-to-row nodes (the real collect
  roots) dispatch to CometExec.executeCollectViaBallista when the flag is on.
  It locates the single CometNativeExec boundary carrying the serialized plan,
  injects each NativeScan's file partitions into the template proto (merging all
  partitions into one native scan leaf), submits it via NativeBallista, imports
  the exported Arrow batch on the driver, and materializes the rows. Plans with
  more than one native block (an exchange) throw UnsupportedOperationException.
- NativeBallista: promote from the test tree to main and load
  libdatafusion_comet_ballista after libcomet so core JNI symbols bind to
  libcomet (the ballista cdylib re-exports them) rather than a divergent copy.
- ffi_jni: derive the result schema from the built plan's schema() instead of
  the NativeScan proto, so plans with operators above the scan report their
  true output schema.
- Tests: add CometBallistaOffloadSuite proving identical rows flag-on vs -off
  and zero executor task starts (SparkListener) for the offloaded collect.

Relates to apache#4796.
… caveat

Relates to apache#4796.

- Add a negative test that runs a multi-partition GROUP BY (forces an
  exchange, disabling AQE for determinism) with the offload flag on and
  asserts the single-stage guard in executeCollectViaBallista throws
  UnsupportedOperationException, after confirming the plan actually
  contains an exchange.
- Add a positive control to the existing zero-task test: run the
  flag-off baseline collect() through the same
  SparkListener/waitUntilEmpty apparatus and assert task starts > 0,
  proving the listener genuinely observes executor tasks so the == 0
  assertion for the offloaded collect is meaningful.
- Document in COMET_EXEC_BALLISTA_ENABLED that R1 targets single-stage
  queries without dynamic partition pruning or correlated scalar
  subqueries, since resolving those via waitForSubqueries()/
  updateResult() can still launch Spark executor tasks under offload.
@andygrove andygrove changed the title [Experimental] Native execution mode: run Comet plans on Ballista via datafusion-ffi (research spike for #4796) feat: Support running Spark jobs natively (no JVM) via Ballista [experimental] Jul 2, 2026
andygrove added 8 commits July 2, 2026 16:29
… Ballista

Add CometBallistaQ1Suite, the milestone demonstration for the R1 driver-side
Ballista offload using TPC-H Q1 lineitem data and per-row semantics. A synthetic
single-file lineitem (correct decimal(12,2)/date/string types, rows straddling
the Q1 date cutoff) is queried with spark.comet.exec.ballista.enabled=true and
its collected rows are asserted identical to the flag-off Comet baseline, with
zero Spark executor tasks (SparkListener), reusing CometBallistaOffloadSuite's
apparatus and positive control.

Full Q1 cannot be offloaded under R1's single-serialized-block machinery: the
GROUP BY forces either a partial->exchange->final shuffle (two blocks) or, with
coalesce(1), a CometCoalesce sink boundary (still two blocks); the file scan's
UnknownPartitioning never satisfies the aggregate's ClusteredDistribution, so
the exchange is unavoidable. The multi-block/aggregate case is R2. The suite
therefore offloads the largest single-block subset of Q1 -- scan + WHERE date
filter + Q1's decimal arithmetic projections (disc_price, charge) -- as one
exchange-free CometNativeExec, exercising the native Parquet scan, date
filtering, and decimal multiplication that matter for offload correctness. The
plan is asserted single-block before offloading.

Relates to apache#4796.
Introduce InputBatchStream, a small dyn-compatible trait (next_batch)
that abstracts ScanExec's input source. AlignedArrowStreamReader (the
JVM/FFI path via planner.rs) implements it unchanged; a new
NativeBatchStream wraps a DataFusion SendableRecordBatchStream and
implements it by blocking each pull on futures::executor::block_on,
which is safe outside a Tokio runtime and composes with Comet's
executor threads.

ScanExec.input_source now holds Arc<Mutex<dyn InputBatchStream>>
instead of being hard-wired to AlignedArrowStreamReader, and
ScanExec::new_native gives a constructor for the native case. The JVM
path in planner.rs's OpStruct::Scan handling is otherwise unchanged,
just wraps its reader in the trait object.

This is the enabling change for a later Comet fragment Scan/ShuffleScan
leaf to be fed by a Ballista shuffle-reader stream with no JVM in the
data path. Relates to apache#4796.
Add a CometFragmentExec that runs a Comet plan fragment (serialized Operator
proto) whose input-leaf Scan operators are fed by the node's DataFusion
children, reusing T2's native ScanExec input path. A childless fragment behaves
like CometScanExec.

Core gains a native (non-FFI) fragment builder in execution::fragment: it builds
the plan via the planner, injects each child stream into the returned Scan
handle through a new ScanExec::set_native_input setter (the handle shares its
batch slot with the executable leaf), and drives get_next_batch while streaming
the root, mirroring jni_api's busy-poll. CometPhysicalCodec (de)serializes the
fragment via a distinct COMET_FRAGMENT_MAGIC tag, with children round-tripping
through datafusion-proto.

Relates to apache#4796.
…shuffle

Offload a two-stage Comet GROUP BY from the Spark driver to an in-process
Ballista engine, running it distributed across a hash shuffle with Comet
native fragments on both sides: partial aggregate -> Ballista IPC shuffle
-> final aggregate, results returned at the driver with zero Spark-executor
tasks.

Native side (executeQueryDistributed): assemble
CometFragmentExec(block2, [Hash-Repartition(CometFragmentExec(block1))]),
serialize with CometPhysicalCodec, and submit via execute_physical_plan to
a standalone cluster started from a SessionState carrying the Comet codecs
(so both scheduler and executor can rebuild the fragments). Ballista splits
at the RepartitionExec(Hash) into two stages and the push-mode fetch returns
all final partitions, concatenated and exported over Arrow-FFI.

Fix CometFragmentExec partitioning: a fragment with children is a
per-partition transform, so it must report its children's partition count
(not the block's internal single partition). Without this only output
partition 0 was executed, silently dropping the other hash buckets' groups.

JVM driver: relax the offload guard to dispatch on plan shape -- one native
block + no exchange stays the R1 single-stage path; two native blocks + one
CometShuffleExchangeExec become the R2 distributed path, reading the group-key
count and shuffle-partition count from the exchange's HashPartitioning. R2
requires shuffle directRead disabled so the final block's leaf serializes as
a plain Scan the shuffle-fed fragment can consume.

The partial-aggregate count state composes across the IPC shuffle: block2's
Scan leaf schema is derived from the exchange output (block1's schema), so
the shuffle write and final-aggregate read schemas match by construction.

Point the Ballista dependency at the local worktree (physical-plan submission
branch) and add ballista-scheduler/ballista-executor for the standalone start.

Relates to apache#4796.
…shuffle

Add the R2 milestone test: TPC-H Q1's full aggregate (sum x4, avg x3,
count over decimals, grouped by the two keys l_returnflag/l_linestatus,
no ORDER BY) offloaded from the driver and distributed across the
Ballista hash shuffle as the 2-block shape (Comet partial-agg ->
CometShuffleExchangeExec -> Comet final-agg), with 0 Spark-executor
tasks. Reuses the T4 two-block driver path and R1's synthetic lineitem
generator.

This verifies the composition risks the count(*) test left unchecked:
avg's partial (sum + count) state and decimal partial sums round-trip
through Ballista's Arrow IPC shuffle and compose in the Comet final
aggregate. Results match Spark's own Q1 row-for-row, including decimal
scale (sum scale 4/6, avg scale 6) and avg values.

The reference oracle runs with Comet fully disabled (pure Spark): a
Comet-native baseline uses the native tokio engine which, once an
in-process Ballista offload has run in the JVM, resolves with_env to a
second, uninitialized JAVA_VM OnceCell in libdatafusion_comet_ballista
and panics. Pure Spark is immune and is the correct oracle. Documented
as an infrastructure limitation of the offload spike.

Relates to apache#4796.
…anch

Replace the local path deps with a git-rev pin on the experimental Ballista
branch (apache/datafusion-ballista#1924) so the branch builds without a local
checkout.

Relates to apache#4796
@andygrove andygrove changed the title feat: Support running Spark jobs natively (no JVM) via Ballista [experimental] [Experimental] Native execution mode: run a Spark query (incl. distributed TPC-H Q1) on Ballista (research spike for #4796) Jul 3, 2026
andygrove added 5 commits July 2, 2026 19:00
Move the `datafusion-comet-ballista` crate into `datafusion-comet` core as a
gated `execution::ballista` module and delete the separate crate, so the offload
is compiled into the single `libcomet` cdylib only when built with the new
default-off `ballista` Cargo feature. This removes the second, statically-linked
copy of Comet core (and its distinct `JAVA_VM` static) that made a
Comet-on-executor query and an in-process Ballista offload panic with
"JAVA_VM not initialized" when run in the same JVM.

- Make the ballista deps optional and add the `ballista` feature (default off);
  fix module paths to `crate::execution::…` / `super::…` and reuse core's
  existing `execution::ffi` and `execution::fragment` helpers.
- Re-gate the moved integration tests with `#![cfg(feature = "ballista")]`.
- Load the offload JNI entries from `libcomet` (built with the feature) instead
  of a separate `libdatafusion_comet_ballista`; `NativeBallista.isAvailable`
  probes symbol presence so offload suites skip on a feature-less build.
- Add a `make core-ballista` convenience target; the default build stays lean.
- Flip the Q1 suite's second-test oracle to Comet-on-executor (enabled) to
  assert coexistence with a prior offload in one JVM.

Relates to apache#4796.
…ster

Add Comet-flavored scheduler/executor binaries and a remote submission path so
a distributed Comet plan can run on a genuinely external Ballista cluster
(separate scheduler + executor processes), not just the in-process standalone
engine.

- Add `comet-ballista-scheduler` / `comet-ballista-executor` binaries
  (`required-features = ["ballista"]`) that construct the Ballista config in
  Rust and inject Comet's `CometLogicalCodec` / `CometPhysicalCodec` via the
  config `override_*_codec` fields — which the stock Ballista CLIs hardcode to
  None — so shipped Comet plan nodes decode on the scheduler and executor.
- Generalize `execute_two_stage` with a `scheduler_url`: empty keeps the
  in-process standalone path; non-empty submits the built 2-stage plan to that
  external scheduler. Thread the URL through `submit_and_export_distributed`
  and the `executeQueryDistributed` JNI entry.
- Wire the JVM side: `NativeBallista.executeQueryDistributed` gains a
  `schedulerUrl`; new `spark.comet.exec.ballista.scheduler.url` config; read and
  passed through in `operators.scala`.
- Add an ignored integration test that spawns the two binaries as child
  processes and submits `CometFragment(NativeScan) -> hash-shuffle ->
  CometFragment(Filter)` to the external scheduler, asserting correct results.

Verifies the Comet fragments execute in the separate, JVM-less executor process
(only libjvm present, JAVA_VM uninitialized) with no "JAVA_VM not initialized"
panic. Relates to apache#4796.
…river

Add CometBallistaExternalClusterQ1Suite (-Pspark-4.0), which spawns the
feature-built comet-scheduler and comet-executor as child processes on
non-default ports (libjvm on the loader path, port readiness + registration
grace, teardown that kills them even on failure), points
spark.comet.exec.ballista.scheduler.url at the live scheduler, and offloads
full TPC-H Q1's aggregate to it. Asserts the collected rows match a Spark
oracle row-for-row (including decimal scale) and that 0 Spark-executor tasks
run, closing the R1b gap where the scheduler.url path was validated only by
compilation.

This is the first time the full Q1 aggregate fragment (partial-agg NativeScan
leaf -> hash shuffle -> final-agg over a Scan leaf) runs on a separate,
JVM-less executor process; it runs cleanly with no JAVA_VM panic. Relates to
 apache#4796. No Rust changed; the default (no-feature) build is unaffected.
@andygrove andygrove changed the title [Experimental] Native execution mode: run a Spark query (incl. distributed TPC-H Q1) on Ballista (research spike for #4796) feat: Native execution via Ballista [experimental] Jul 3, 2026
andygrove added 3 commits July 2, 2026 22:23
Fixes the RAT license check on the offload test files. Relates to apache#4796
They use Spark 4.0+ APIs (withSQLConf returning a value), so compiling them
under Spark 3.4/3.5 fails. Gate them to the 4.x-only test source dir. Relates to apache#4796
…guard

Apply whole-branch landing-review fixes to the experimental Comet -> Ballista
offload feature so the default build stays Ballista-free and the offload path is
correct.

- Keep the default `libcomet` build Ballista-free: `execution::ffi` and
  `execution::fragment` are used only by the feature-gated `ballista` module, so
  gate both `pub mod` declarations behind `#[cfg(feature = "ballista")]` and make
  `datafusion-ffi` an optional dependency activated by the `ballista` feature.
  The default `cargo build` now pulls zero offload deps (no datafusion-ffi /
  ballista / tonic).

- Fix the busy-spin in `NativeFragmentStream::poll_next`: only re-poll the root
  after actually feeding new input into a leaf. When no leaf needed input (or a
  childless `NativeScan` fragment has no leaves) and the root returned `Pending`,
  return `Poll::Pending` and rely on the root's registered waker instead of
  hot-looping a worker thread on every async-I/O `Pending`.

- Tighten the R2 two-block offload guard: only take the hash-shuffle path when
  block1 is a partial `HashAggregate`, block2 the matching final `HashAggregate`,
  and the grouping-key width matches the exchange. Other single-hash-exchange
  shapes (e.g. a window `PARTITION BY`) previously hashed the wrong columns and
  silently produced wrong results; they now fall through to a clear rejection.

- Reject Iceberg native scans in `injectScanFiles`: `CometIcebergNativeScanExec`
  leaves carry their splits differently and would be shipped with no files to
  read (silently zero rows), so raise a clear error.

- Minor: replace library `eprintln!` debug output with `log::debug!`, fix the
  `build_test_proto` doc sentence, stage exported Arrow columns into a Vec before
  writing them into the JVM structs (avoid a partial-write leak on mid-loop
  failure), and document that the offload flag requires AQE off.

Relates to apache#4796.
andygrove added 15 commits July 2, 2026 22:47
PR apache/datafusion-ballista#1924 (execute_physical_plan + the PhysicalPlan
submission variant the distributed offload needs) has merged, so pin to the
apache/datafusion-ballista main rev 6472c7f2 instead of the personal fork branch.
Drop the build_test_proto/buildTestProto spike helper (a hand-built NativeScan
proto used only to probe symbol availability and by the spike suite) and the
CometBallistaFfiSpikeSuite. Replace the availability probe with a dedicated no-op
JNI entry (probeAvailable). The offload path already runs real Comet-serialized
plans; the remaining suites cover it against real Spark SQL.
executeSingleBlockViaBallista and executeTwoBlockViaBallista in
CometExec are fully superseded by BallistaOffloadPlanner.buildOffloadPlan
+ NativeBallista.executeOffloadPlan but were left unused, which fails
the build under -Xfatal-warnings (unused-private-method warnings).
… multi-join

Comet fuses adjacent native operators into one serialized block, so the
old directExchanges-only walk could flatten exchanges from two different
joins in a fused multi-join block into one list. A block that happened
to surface exactly two exchanges from different joins passed the
size<=2 guard and was silently mis-paired as one join's left/right,
producing wrong results instead of a clean rejection.

Resolve a block's DAG inputs by validating that its exchanges are
explained by exactly one binary Comet join (CometHashJoinExec or
CometSortMergeJoinExec) whose left and right sides each contribute
exactly one of the block's exchanges. Any other shape -- a fused
multi-join block, a join with a broadcast (non-hash-exchange) side, or
exchanges not cleanly split by a single join -- now throws
UnsupportedOperationException instead of guessing.

Also update the stale executeCollectViaBallista docstring in
operators.scala to describe the current single-join support.
…ape in E2E

The walker previously classified a CometBroadcastHashJoinExec as a leaf
block because directExchanges did not stop at CometBroadcastExchangeExec
and directJoins only matched CometHashJoinExec/CometSortMergeJoinExec,
so a broadcast join only failed by luck downstream instead of being
explicitly rejected. Reject it in resolveInputs and stop/reject on
CometBroadcastExchangeExec/CometBroadcastHashJoinExec during the DAG
walk.

Also add a pre-flight plan-shape assertion to the join E2E test,
mirroring the existing aggregate test's assertion, so a future plan
regression is caught before comparing offloaded vs baseline output.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant