Skip to content

fix: count returning zero when scan is disabled and going through CometSparkColumnarToColumnar#4795

Open
Dummk0pf wants to merge 6 commits into
apache:mainfrom
Dummk0pf:fix/arrow-reader-count
Open

fix: count returning zero when scan is disabled and going through CometSparkColumnarToColumnar#4795
Dummk0pf wants to merge 6 commits into
apache:mainfrom
Dummk0pf:fix/arrow-reader-count

Conversation

@Dummk0pf

@Dummk0pf Dummk0pf commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #4793.

Rationale for this change

This change fixes the bug where df.count() returns zero, when spark.comet.scan.enabled is disabled and spark.comet.convert.parquet.enabled is enabled.

Spark has a well-known "count-from-metadata" optimization: df.count() rewrites the
Parquet scan to require zero data columns. FileSourceScanExec therefore emits
ColumnarBatch with numRows == <row-group row count> and numCols == 0.

With spark.comet.scan.enabled=false and spark.comet.convert.parquet.enabled=true,
Comet inserts a CometSparkColumnarToColumnar node that funnels every Spark batch
through SparkColumnarArrowReader.loadNextBatch(). That method only sets the Arrow
row count as a side-effect of copying columns:

val writer = ArrowWriter.create(getVectorSchemaRoot)
var col = 0
while (col < current.numCols()) { // never executes when numCols == 0
  val column = current.column(col)
  val columnArray = new ColumnarArray(column, rowsConsumedInCurrent, rowsToProduce)
  if (column.hasNull) writer.writeCol(columnArray, col)
  else writer.writeColNoNull(columnArray, col)
  col += 1
}
writer.finish()  // ArrowWriter.count is still 0

ArrowWriter.finish() calls root.setRowCount(count), where count was set to
input.numElements() inside writeCol / writeColNoNull. With zero columns those
methods are never called, count remains 0, and the emitted Arrow batch has
numRows == 0 even though the input Spark batch had thousands of rows. Downstream
aggregations then see zero rows per partition and df.count() returns 0.

df.show() was unaffected because it requires actual columns; the loop runs and
count is set correctly.

What changes are included in this PR?

Set the row count on the Arrow VectorSchemaRoot explicitly from the pre-computed
rowsToProduce, so the value is correct regardless of column count. For batches with
columns this is a no-op (the same value ArrowWriter.finish() would have written);
for zero-column batches it restores correctness.

How are these changes tested?

Query plan for df.count() with comet 0.16.0 jar file

Screenshot 2026-07-02 at 9 46 24 PM

Query plan for df.count() with the jar file built from source using these changes

Screenshot 2026-07-02 at 9 46 34 PM

1. Set the row count on the Arrow VectorSchemaRoot explicitly from the pre-computed rowsToProduce.

@mbutrovich mbutrovich left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick fix @Dummk0pf! Are you able to add a test that reproduces on main without the fix that would serve as a regression test for us? I think you could write a Parquet file locally, disable Comet's Parquet scan so it goes through CometSparkColumnarToColumnar?

@mbutrovich mbutrovich changed the title Fix for count returning zero when scan is disabled fix: count returning zero when scan is disabled and going through CometSparkColumnarToColumnar Jul 2, 2026
1. Writes a local parquet file with dummy data, disables native scan and enables spark to arrow conversion and convert from parquet to check if df.count() matches with the expected count
@Dummk0pf

Dummk0pf commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

Thanks again for the review @mbutrovich, I have added the regression test for the above changes.

@mbutrovich

mbutrovich commented Jul 3, 2026

Copy link
Copy Markdown
Contributor

Thanks for the quick fix and revision @Dummk0pf! Are you able to assert in the test collecting case s: CometSparkToColumnarExec => s from the executed plan and asserting it's non-empty (there are other tests that do something similar)? I want to make sure we're exercising the expected codepath and prevent future regressions. Otherwise this is ready to go!

@mbutrovich

Copy link
Copy Markdown
Contributor

I also confirmed that the new test fails on main currently.

@Dummk0pf

Dummk0pf commented Jul 3, 2026

Copy link
Copy Markdown
Contributor Author

Thanks @mbutrovich , I have added the assertion to the test

@mbutrovich mbutrovich left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved pending CI. Thanks again @Dummk0pf!

@Dummk0pf

Dummk0pf commented Jul 3, 2026

Copy link
Copy Markdown
Contributor Author

Thanks @mbutrovich , but it seems some builds are failing, checking those failures now

@Dummk0pf

Dummk0pf commented Jul 3, 2026

Copy link
Copy Markdown
Contributor Author

Hi @mbutrovich I fixed the bug in the test, I took the query plan of the scanned df instead of creating another df with count in its query plan, i have corrected it now, sorry for the bug and thank you for your patience

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

bug: df.count() returns 0 when Comet native scan is disabled and when Comet parquet conversion is enabled

2 participants