What is the problem the feature request solves?
Today Comet accelerates Spark by running native operators inside Spark executors via JNI. This is great for accelerating existing Spark clusters, but every task still pays JVM/executor overhead, and the deployment is inseparable from a full Spark cluster.
There is growing interest in running Spark workloads with a fully native execution engine (no per-partition JVM), reachable through the standard Spark Connect API so that existing PySpark / Spark SQL clients work unchanged. Comet already has a large, Spark-validated body of native expressions and operators — we should be able to offer a fully native deployment that reuses them, rather than rebuilding that work from scratch.
Describe the potential solution
Add a new, optional deployment mode for Comet — it does not replace the existing in-Spark accelerator. In this mode, Comet provides native scheduler + executor processes (built on Apache DataFusion Ballista), and users interact through the Spark Connect API to get fully native, distributed execution using Comet's existing expressions and operators.
The key idea is to keep a lightweight Spark driver as the control plane and make the data plane fully native, so that almost none of the hard work is new:
PySpark / Spark Connect client (unchanged, sc://…)
│ gRPC (Spark Connect protobuf)
▼
┌─────────────────────────────┐ CONTROL PLANE (1 JVM, lightweight)
│ Spark driver │ • Spark's own Spark Connect server
│ + Catalyst (analyze/opt) │ • Comet plans the query into native ops
│ + Comet QueryPlanSerde │ • acts as a Ballista client
└──────────────┬──────────────┘
│ Comet plan protobuf → submit job
▼
┌─────────────────────────────┐ DATA PLANE (fully native, no Spark JVM)
│ Ballista scheduler │ • splits plan into stages at shuffle bnds
│ Ballista executors ×N │ • rebuilds plan via Comet PhysicalPlanner
│ (Comet-flavored) │ • runs Comet operators + spark-expr
└─────────────────────────────┘ • Ballista shuffle between stages
Why this is mostly integration rather than new invention — each hard piece already exists:
- Spark Connect protocol + plan resolution is handled by real Spark. The Spark driver runs Spark's own Connect server and Catalyst, so protocol coverage, analysis, type coercion, and optimization come for free. No native plan resolver to build.
- Spark → native plan translation reuses Comet's existing
QueryPlanSerde / operator serde, which already operate on resolved Spark plans.
- Native, Spark-compatible execution reuses Comet's existing operators and the
spark-expr crate.
- Distributed scheduling and shuffle are provided by Ballista, which already exposes the extension points needed (
PhysicalExtensionCodec / LogicalExtensionCodec wired through both scheduler and executor, ShuffleWriterExec / ShuffleReaderExec stage boundaries, and a spark-compat feature that already integrates datafusion-spark functions).
Plan-handoff approach: the Spark driver emits Comet's existing plan protobuf and submits it to Ballista as a client. A Comet-provided PhysicalExtensionCodec, registered on the scheduler and executors, deserializes each Comet node into its DataFusion ExecutionPlan (reusing Comet's PhysicalPlanner). Ballista's DistributedPlanner then splits the plan into stages at shuffle boundaries and distributes tasks — unchanged. Comet operators ride through Ballista as physical-extension nodes, so Ballista core needs no knowledge of Comet.
Shuffle: where Catalyst/Comet inserts an exchange, we emit a Ballista shuffle boundary (which is how Ballista defines stages and balances work across tasks), reusing Comet's native columnar shuffle as the on-executor implementation behind that boundary.
Suggested phasing:
- Walking skeleton —
spark.sql("SELECT … FROM parquet") with filter + project, single stage, one executor, results streamed back to the Connect client. Proves the end-to-end pipe.
- Shuffle — add an aggregate/join so a real Ballista stage boundary and shuffle fire.
- Coverage + benchmarks — widen operator/expression coverage to run TPC-H, and benchmark it.
Additional context
Coverage / fallback: the existing in-Spark mode falls back to Spark for unsupported operators, but this deployment has no Spark executors to fall back to. The proposed initial stance is strict-native: use Comet's existing support-level tagging to detect whether the whole plan can execute natively, and if not, fail with a clear error identifying the unsupported node rather than degrading silently. Coverage then grows over time. A future hybrid could fall back to a co-located Spark executor pool, but that is out of scope initially.
Relationship to existing Comet: this is purely additive — a new deployment option. The in-Spark accelerator is unchanged, and both modes share the same native operators and expressions.
Relationship to Ballista: the intent is to depend on upstream Ballista (not a fork), so that Ballista's ongoing work on distributed scheduling — broadcast joins, skew mitigation, partition coalescing, and adaptive/AQE execution — benefits Comet's native mode directly.
Not limited to Spark Connect — and execution is all-or-nothing. The native side only consumes a Comet plan protobuf; it has no knowledge of how that plan was produced. Comet's QueryPlanSerde emits that protobuf from any Spark physical plan, so the same native mode also enables whole-query offload from a regular Spark application, not just Spark Connect: the application builds a plan and, instead of running it on its own executors, submits it to the Ballista data plane and collects the results (or has Ballista write the output). Execution is all-or-nothing — a query offloaded to Ballista runs entirely on the Ballista data plane, terminating at the driver or in written output. Interleaving Ballista execution with Spark's own distributed execution within a single job (two schedulers and two shuffle systems handing data back and forth) is explicitly out of scope. Spark Connect is the natural first target precisely because its clients already fit this "front-end only, terminal results" contract.
This issue is intended to open discussion on the approach before any implementation work begins. Feedback on the plan-handoff design and the strict-native coverage stance is especially welcome.
What is the problem the feature request solves?
Today Comet accelerates Spark by running native operators inside Spark executors via JNI. This is great for accelerating existing Spark clusters, but every task still pays JVM/executor overhead, and the deployment is inseparable from a full Spark cluster.
There is growing interest in running Spark workloads with a fully native execution engine (no per-partition JVM), reachable through the standard Spark Connect API so that existing PySpark / Spark SQL clients work unchanged. Comet already has a large, Spark-validated body of native expressions and operators — we should be able to offer a fully native deployment that reuses them, rather than rebuilding that work from scratch.
Describe the potential solution
Add a new, optional deployment mode for Comet — it does not replace the existing in-Spark accelerator. In this mode, Comet provides native scheduler + executor processes (built on Apache DataFusion Ballista), and users interact through the Spark Connect API to get fully native, distributed execution using Comet's existing expressions and operators.
The key idea is to keep a lightweight Spark driver as the control plane and make the data plane fully native, so that almost none of the hard work is new:
Why this is mostly integration rather than new invention — each hard piece already exists:
QueryPlanSerde/ operator serde, which already operate on resolved Spark plans.spark-exprcrate.PhysicalExtensionCodec/LogicalExtensionCodecwired through both scheduler and executor,ShuffleWriterExec/ShuffleReaderExecstage boundaries, and aspark-compatfeature that already integratesdatafusion-sparkfunctions).Plan-handoff approach: the Spark driver emits Comet's existing plan protobuf and submits it to Ballista as a client. A Comet-provided
PhysicalExtensionCodec, registered on the scheduler and executors, deserializes each Comet node into its DataFusionExecutionPlan(reusing Comet'sPhysicalPlanner). Ballista'sDistributedPlannerthen splits the plan into stages at shuffle boundaries and distributes tasks — unchanged. Comet operators ride through Ballista as physical-extension nodes, so Ballista core needs no knowledge of Comet.Shuffle: where Catalyst/Comet inserts an exchange, we emit a Ballista shuffle boundary (which is how Ballista defines stages and balances work across tasks), reusing Comet's native columnar shuffle as the on-executor implementation behind that boundary.
Suggested phasing:
spark.sql("SELECT … FROM parquet")with filter + project, single stage, one executor, results streamed back to the Connect client. Proves the end-to-end pipe.Additional context
Coverage / fallback: the existing in-Spark mode falls back to Spark for unsupported operators, but this deployment has no Spark executors to fall back to. The proposed initial stance is strict-native: use Comet's existing support-level tagging to detect whether the whole plan can execute natively, and if not, fail with a clear error identifying the unsupported node rather than degrading silently. Coverage then grows over time. A future hybrid could fall back to a co-located Spark executor pool, but that is out of scope initially.
Relationship to existing Comet: this is purely additive — a new deployment option. The in-Spark accelerator is unchanged, and both modes share the same native operators and expressions.
Relationship to Ballista: the intent is to depend on upstream Ballista (not a fork), so that Ballista's ongoing work on distributed scheduling — broadcast joins, skew mitigation, partition coalescing, and adaptive/AQE execution — benefits Comet's native mode directly.
Not limited to Spark Connect — and execution is all-or-nothing. The native side only consumes a Comet plan protobuf; it has no knowledge of how that plan was produced. Comet's
QueryPlanSerdeemits that protobuf from any Spark physical plan, so the same native mode also enables whole-query offload from a regular Spark application, not just Spark Connect: the application builds a plan and, instead of running it on its own executors, submits it to the Ballista data plane and collects the results (or has Ballista write the output). Execution is all-or-nothing — a query offloaded to Ballista runs entirely on the Ballista data plane, terminating at the driver or in written output. Interleaving Ballista execution with Spark's own distributed execution within a single job (two schedulers and two shuffle systems handing data back and forth) is explicitly out of scope. Spark Connect is the natural first target precisely because its clients already fit this "front-end only, terminal results" contract.This issue is intended to open discussion on the approach before any implementation work begins. Feedback on the plan-handoff design and the strict-native coverage stance is especially welcome.