From fad15d412ff669b9a6cde8b6056b0fac08fab8e3 Mon Sep 17 00:00:00 2001 From: Dummk0pf Date: Thu, 2 Jul 2026 22:19:41 +0530 Subject: [PATCH 1/4] Fix for count returning zero when scan is disabled 1. Set the row count on the Arrow VectorSchemaRoot explicitly from the pre-computed rowsToProduce. --- .../sql/comet/execution/arrow/SparkColumnarArrowReader.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/SparkColumnarArrowReader.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/SparkColumnarArrowReader.scala index 0af940fab3..157aca7423 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/SparkColumnarArrowReader.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/SparkColumnarArrowReader.scala @@ -91,6 +91,11 @@ private[comet] class SparkColumnarArrowReader( rowsConsumedInCurrent += rowsToProduce writer.finish() + // ArrowWriter derives the root row count from its per-column writes, so a zero-column + // input batch (Spark's count-from-metadata scan: numRows > 0, numCols == 0) would otherwise + // produce a root with rowCount == 0 and silently drop the rows. Set rowCount explicitly so + // downstream aggregations (e.g. df.count()) see the correct value. + getVectorSchemaRoot.setRowCount(rowsToProduce) onConversionNs(System.nanoTime() - startNs) true } From 53b1c1290231be643a007187bfe74ff00bdf8823 Mon Sep 17 00:00:00 2001 From: Dummk0pf Date: Thu, 2 Jul 2026 23:16:31 +0530 Subject: [PATCH 2/4] Added a regression test to for the above changes 1. Writes a local parquet file with dummy data, disables native scan and enables spark to arrow conversion and convert from parquet to check if df.count() matches with the expected count --- .../apache/comet/exec/CometExecSuite.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index dce303b8c1..9959a4f061 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -3711,6 +3711,32 @@ class CometExecSuite extends CometTestBase { } } + test("SparkToColumnar preserves row count for zero-column input batches (df.count())") { + // Regression test: when spark.comet.scan is disabled but convert.parquet is enabled, + // Spark's count-from-metadata optimization emits ColumnarBatches with numRows > 0 and + // numCols == 0. SparkColumnarArrowReader used to derive the Arrow row count from + // per-column writes only, silently producing zero-row Arrow batches in this case and + // making df.count() return 0. + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet", + CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false", + CometConf.COMET_SPARK_TO_ARROW_ENABLED.key -> "true", + CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true") { + withTempPath { dir => + val expected = 10000L + spark + .range(expected) + .selectExpr("id as key", "id % 8 as value") + .toDF("key", "value") + .write + .parquet(dir.toString) + + val df = spark.read.parquet(dir.toString) + assert(df.count() == expected, "df.count() should match number of written rows") + } + } + } + test("read CSV file") { Seq("", "csv").foreach { v1List => withSQLConf( From 282166eca6e2f81462b01b24d1fbd0a6c76e3546 Mon Sep 17 00:00:00 2001 From: Dummk0pf Date: Fri, 3 Jul 2026 20:45:51 +0530 Subject: [PATCH 3/4] Added a guard rail to ensure that the test follows SparkColumnarArrowReader to catch future regressions --- .../test/scala/org/apache/comet/exec/CometExecSuite.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 9959a4f061..4098cd0eeb 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -3733,6 +3733,13 @@ class CometExecSuite extends CometTestBase { val df = spark.read.parquet(dir.toString) assert(df.count() == expected, "df.count() should match number of written rows") + + // Ensuring that this test actually follows the SparkColumnarArrowReader code path + // guarded by the fix, so we can catch future regressions. + val sparkToColumnar = collect(df.queryExecution.executedPlan) { + case s: CometSparkToColumnarExec => s + } + assert(sparkToColumnar.nonEmpty, "Expected CometSparkToColumnarExec in the executed plan") } } } From 87e4c2c8396ca060490287422761a6cbd7919f50 Mon Sep 17 00:00:00 2001 From: Dummk0pf Date: Sat, 4 Jul 2026 00:17:25 +0530 Subject: [PATCH 4/4] fixed the test to use the dataframe with actual count operation in its query plan, instead of just having scan --- .../org/apache/comet/exec/CometExecSuite.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 4098cd0eeb..67a1920a6b 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -3732,11 +3732,17 @@ class CometExecSuite extends CometTestBase { .parquet(dir.toString) val df = spark.read.parquet(dir.toString) - assert(df.count() == expected, "df.count() should match number of written rows") + // Materialize the count query explicitly so we can inspect the plan + // that runs — Dataset.count() executes groupBy().count() internally + // but doesn't expose that plan + val countDf = df.groupBy().count() + assert( + countDf.collect().head.getLong(0) == expected, + "df.count() should match number of written rows") - // Ensuring that this test actually follows the SparkColumnarArrowReader code path - // guarded by the fix, so we can catch future regressions. - val sparkToColumnar = collect(df.queryExecution.executedPlan) { + // Ensure this test actually exercises the SparkColumnarArrowReader code path + // guarded by the fix, so future regressions are caught. + val sparkToColumnar = collect(countDf.queryExecution.executedPlan) { case s: CometSparkToColumnarExec => s } assert(sparkToColumnar.nonEmpty, "Expected CometSparkToColumnarExec in the executed plan")