diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/SparkColumnarArrowReader.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/SparkColumnarArrowReader.scala index 0af940fab3..157aca7423 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/SparkColumnarArrowReader.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/SparkColumnarArrowReader.scala @@ -91,6 +91,11 @@ private[comet] class SparkColumnarArrowReader( rowsConsumedInCurrent += rowsToProduce writer.finish() + // ArrowWriter derives the root row count from its per-column writes, so a zero-column + // input batch (Spark's count-from-metadata scan: numRows > 0, numCols == 0) would otherwise + // produce a root with rowCount == 0 and silently drop the rows. Set rowCount explicitly so + // downstream aggregations (e.g. df.count()) see the correct value. + getVectorSchemaRoot.setRowCount(rowsToProduce) onConversionNs(System.nanoTime() - startNs) true } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index dce303b8c1..67a1920a6b 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -3711,6 +3711,45 @@ class CometExecSuite extends CometTestBase { } } + test("SparkToColumnar preserves row count for zero-column input batches (df.count())") { + // Regression test: when spark.comet.scan is disabled but convert.parquet is enabled, + // Spark's count-from-metadata optimization emits ColumnarBatches with numRows > 0 and + // numCols == 0. SparkColumnarArrowReader used to derive the Arrow row count from + // per-column writes only, silently producing zero-row Arrow batches in this case and + // making df.count() return 0. + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet", + CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false", + CometConf.COMET_SPARK_TO_ARROW_ENABLED.key -> "true", + CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true") { + withTempPath { dir => + val expected = 10000L + spark + .range(expected) + .selectExpr("id as key", "id % 8 as value") + .toDF("key", "value") + .write + .parquet(dir.toString) + + val df = spark.read.parquet(dir.toString) + // Materialize the count query explicitly so we can inspect the plan + // that runs — Dataset.count() executes groupBy().count() internally + // but doesn't expose that plan + val countDf = df.groupBy().count() + assert( + countDf.collect().head.getLong(0) == expected, + "df.count() should match number of written rows") + + // Ensure this test actually exercises the SparkColumnarArrowReader code path + // guarded by the fix, so future regressions are caught. + val sparkToColumnar = collect(countDf.queryExecution.executedPlan) { + case s: CometSparkToColumnarExec => s + } + assert(sparkToColumnar.nonEmpty, "Expected CometSparkToColumnarExec in the executed plan") + } + } + } + test("read CSV file") { Seq("", "csv").foreach { v1List => withSQLConf(