Skip to content

Distinct-aggregate rewrite can split an incompatible-buffer aggregate across Comet and Spark, causing crash or wrong results #4813

Description

@andygrove

Describe the bug

An aggregate whose intermediate buffer format is incompatible between Spark and Comet (CometAggregateExpressionSerde.supportsMixedPartialFinal == false) can be split across Comet and Spark within a distinct-aggregate rewrite, so a Comet-encoded partial buffer is handed to a Spark aggregate (or vice versa). Depending on the aggregate this produces a hard crash or silently wrong results.

This is a pre-existing gap in the mixed-execution guards added for #1389. Those guards only handle a direct single-stage Partial → Final pair:

  • CometExecRule.tagUnsafePartialAggregates / findPartialAggInPlan only looked for a Partial aggregate feeding directly into a non-convertible Final, and stopped at intermediate aggregate stages.
  • CometBaseAggregate.doConvert only guarded the Final mode (sparkFinalMode), not intermediate PartialMerge stages.

Spark's single-distinct rewrite (AggUtils.planAggregateWithOneDistinct) produces a four-stage plan where the non-distinct aggregate's Partial is separated from its Final by intermediate PartialMerge stages:

Final(pa, count)            <- consumes buffers
  PartialMerge(pa) + Partial(count distinct)
    PartialMerge(pa)        <- consumes buffers, keyed by the distinct column
      Partial(pa)           <- produces the incompatible buffer

If any stage of this chain falls back to Spark while another converts to Comet (e.g. because the distinct aggregate or its grouping key is unsupported by Comet, or via the comet.exec.*HashAggregate.enabled test configs), the incompatible buffer crosses the engine boundary.

Steps to reproduce

Any incompatible-buffer aggregate combined with a distinct aggregate, forced to split. For example with a TypedImperativeAggregate:

SELECT collect_set(v), count(DISTINCT k) FROM t GROUP BY g

Forcing the split with spark.comet.exec.finalHashAggregate.enabled=false (Comet does the partial/merge, Spark does the final) crashes, because Spark's aggregate tries to deserialize a Comet-encoded buffer.

Expected behavior

When an incompatible-buffer aggregate cannot run entirely in one engine across the whole distinct-rewrite chain, the entire chain for that aggregate must fall back to Spark, as already happens for the simple Partial → Final case.

Additional context

Surfaced by the approx_percentile / percentile_approx work (#4801): Spark's ObjectHashAggregateSuite "randomized aggregation test - [typed, with distinct]" crashed with a native panic

quantile_summaries.rs: range end index 100 out of range for slice of length 96

because Comet's Greenwald-Khanna digest is wildly different from Spark's PercentileDigest, so the boundary panics loudly. Other affected aggregates (collect_set, collect_list, exact percentile, and the declarative avg / decimal sum / variance family) have structurally similar buffers and instead risk silent wrong results, which is why this went unnoticed.

Affected: any aggregate with supportsMixedPartialFinal == false used together with a distinct aggregate.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions