Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ private[comet] class SparkColumnarArrowReader(
rowsConsumedInCurrent += rowsToProduce

writer.finish()
// ArrowWriter derives the root row count from its per-column writes, so a zero-column
// input batch (Spark's count-from-metadata scan: numRows > 0, numCols == 0) would otherwise
// produce a root with rowCount == 0 and silently drop the rows. Set rowCount explicitly so
// downstream aggregations (e.g. df.count()) see the correct value.
getVectorSchemaRoot.setRowCount(rowsToProduce)
onConversionNs(System.nanoTime() - startNs)
true
}
Expand Down
39 changes: 39 additions & 0 deletions spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3711,6 +3711,45 @@ class CometExecSuite extends CometTestBase {
}
}

test("SparkToColumnar preserves row count for zero-column input batches (df.count())") {
// Regression test: when spark.comet.scan is disabled but convert.parquet is enabled,
// Spark's count-from-metadata optimization emits ColumnarBatches with numRows > 0 and
// numCols == 0. SparkColumnarArrowReader used to derive the Arrow row count from
// per-column writes only, silently producing zero-row Arrow batches in this case and
// making df.count() return 0.
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false",
CometConf.COMET_SPARK_TO_ARROW_ENABLED.key -> "true",
CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true") {
withTempPath { dir =>
val expected = 10000L
spark
.range(expected)
.selectExpr("id as key", "id % 8 as value")
.toDF("key", "value")
.write
.parquet(dir.toString)

val df = spark.read.parquet(dir.toString)
// Materialize the count query explicitly so we can inspect the plan
// that runs — Dataset.count() executes groupBy().count() internally
// but doesn't expose that plan
val countDf = df.groupBy().count()
assert(
countDf.collect().head.getLong(0) == expected,
"df.count() should match number of written rows")

// Ensure this test actually exercises the SparkColumnarArrowReader code path
// guarded by the fix, so future regressions are caught.
val sparkToColumnar = collect(countDf.queryExecution.executedPlan) {
case s: CometSparkToColumnarExec => s
}
assert(sparkToColumnar.nonEmpty, "Expected CometSparkToColumnarExec in the executed plan")
}
}
}

test("read CSV file") {
Seq("", "csv").foreach { v1List =>
withSQLConf(
Expand Down
Loading