fix: count returning zero when scan is disabled and going through CometSparkColumnarToColumnar#4795
fix: count returning zero when scan is disabled and going through CometSparkColumnarToColumnar#4795Dummk0pf wants to merge 6 commits into
Conversation
1. Set the row count on the Arrow VectorSchemaRoot explicitly from the pre-computed rowsToProduce.
mbutrovich
left a comment
There was a problem hiding this comment.
Thanks for the quick fix @Dummk0pf! Are you able to add a test that reproduces on main without the fix that would serve as a regression test for us? I think you could write a Parquet file locally, disable Comet's Parquet scan so it goes through CometSparkColumnarToColumnar?
1. Writes a local parquet file with dummy data, disables native scan and enables spark to arrow conversion and convert from parquet to check if df.count() matches with the expected count
|
Thanks again for the review @mbutrovich, I have added the regression test for the above changes. |
|
Thanks for the quick fix and revision @Dummk0pf! Are you able to assert in the test collecting |
|
I also confirmed that the new test fails on main currently. |
…Reader to catch future regressions
|
Thanks @mbutrovich , I have added the assertion to the test |
mbutrovich
left a comment
There was a problem hiding this comment.
Approved pending CI. Thanks again @Dummk0pf!
|
Thanks @mbutrovich , but it seems some builds are failing, checking those failures now |
…s query plan, instead of just having scan
|
Hi @mbutrovich I fixed the bug in the test, I took the query plan of the scanned df instead of creating another df with count in its query plan, i have corrected it now, sorry for the bug and thank you for your patience |
Which issue does this PR close?
Closes #4793.
Rationale for this change
This change fixes the bug where
df.count()returns zero, whenspark.comet.scan.enabledis disabled andspark.comet.convert.parquet.enabledis enabled.Spark has a well-known "count-from-metadata" optimization:
df.count()rewrites theParquet scan to require zero data columns.
FileSourceScanExectherefore emitsColumnarBatchwithnumRows == <row-group row count>andnumCols == 0.With
spark.comet.scan.enabled=falseandspark.comet.convert.parquet.enabled=true,Comet inserts a
CometSparkColumnarToColumnarnode that funnels every Spark batchthrough
SparkColumnarArrowReader.loadNextBatch(). That method only sets the Arrowrow count as a side-effect of copying columns:
ArrowWriter.finish()callsroot.setRowCount(count), wherecountwas set toinput.numElements()insidewriteCol/writeColNoNull. With zero columns thosemethods are never called,
countremains0, and the emitted Arrow batch hasnumRows == 0even though the input Spark batch had thousands of rows. Downstreamaggregations then see zero rows per partition and
df.count()returns0.df.show()was unaffected because it requires actual columns; the loop runs andcountis set correctly.What changes are included in this PR?
Set the row count on the Arrow
VectorSchemaRootexplicitly from the pre-computedrowsToProduce, so the value is correct regardless of column count. For batches withcolumns this is a no-op (the same value
ArrowWriter.finish()would have written);for zero-column batches it restores correctness.
How are these changes tested?
Query plan for df.count() with comet 0.16.0 jar file
Query plan for df.count() with the jar file built from source using these changes