From 26da35fc2475acae82929a6256a6545416231469 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 3 Jul 2026 11:37:59 -0600 Subject: [PATCH 1/2] feat: support Spark 4.1 TIME type and expressions via codegen dispatch Wires the Spark 4.1 TIME data type end-to-end and adds JVM codegen dispatch for the new TIME expressions. No native implementations are added: everything routes through the existing emitJvmCodegenDispatch path so the enclosing projection stays native while Spark's own doGenCode supplies the semantics. Adds TimeType to DataTypeSupport, teaches the batch-kernel input and output emitters, CometScalaUDFCodegen, and ArrowWriters to handle TimeNanoVector as a Long-backed vector, and updates the shim so HoursOfTime, MinutesOfTime, SecondsOfTime, SecondsOfTimeWithFraction, TimeAddInterval, SubtractTimes, TimeDiff, and TimeTrunc reach the dispatcher via their StaticInvoke(DateTimeUtils, ...) replacement form. Casts touching TimeType follow the same route via CometCast. Parquet scans still opt out of TimeType (no native decoder yet); the fallback reason is now explicit. --- docs/source/user-guide/latest/expressions.md | 12 +- .../org/apache/comet/DataTypeSupport.scala | 5 +- .../codegen/CometBatchKernelCodegen.scala | 6 +- .../CometBatchKernelCodegenInput.scala | 6 +- .../CometBatchKernelCodegenOutput.scala | 9 +- .../apache/comet/expressions/CometCast.scala | 17 ++- .../apache/comet/rules/CometScanRule.scala | 6 + .../udf/codegen/CometScalaUDFCodegen.scala | 2 +- .../sql/comet/CometLocalTableScanExec.scala | 4 +- .../comet/execution/arrow/ArrowWriters.scala | 12 ++ .../apache/comet/shims/CometExprShim.scala | 25 ++++ .../apache/comet/exec/CometExecSuite.scala | 8 +- .../apache/spark/sql/CometTimeTypeSuite.scala | 139 ++++++++++++++++++ 13 files changed, 230 insertions(+), 21 deletions(-) create mode 100644 spark/src/test/spark-4.1/org/apache/spark/sql/CometTimeTypeSuite.scala diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index 2ca3a13c62..c681510ece 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -237,7 +237,7 @@ The type-name conversion functions (`bigint`, `binary`, `boolean`, `date`, `deci | `convert_timezone` | ✅ | Routes through the JVM codegen dispatcher by default (handles all timezone forms); the native path is opt-in via allowIncompatible ([details](compatibility/expressions/datetime.md)) | | `curdate` | ✅ | Constant-folded to a literal (alias of `current_date`) | | `current_date` | ✅ | Constant-folded to a literal before Comet sees the plan | -| `current_time` | 🔜 | Blocked on Spark 4.1 TIME type support ([#4288](https://github.com/apache/datafusion-comet/issues/4288)) | +| `current_time` | ✅ | Constant-folded to a literal before Comet sees the plan (Spark 4.1+) | | `current_timestamp` | ✅ | Constant-folded to a literal before Comet sees the plan | | `current_timezone` | ✅ | | | `date_add` | ✅ | | @@ -264,7 +264,7 @@ The type-name conversion functions (`bigint`, `binary`, `boolean`, `date`, `deci | `make_date` | ✅ | | | `make_dt_interval` | ✅ | | | `make_interval` | 🔜 | Produces legacy CalendarInterval; tracked by [#4540](https://github.com/apache/datafusion-comet/issues/4540) | -| `make_time` | 🔜 | Spark 4.1 TIME type; tracked by [#4288](https://github.com/apache/datafusion-comet/issues/4288) | +| `make_time` | ✅ | Spark 4.1+ | | `make_timestamp` | ✅ | | | `make_timestamp_ltz` | ✅ | 2-arg TIME form falls back | | `make_timestamp_ntz` | ✅ | 2-arg TIME form falls back | @@ -278,13 +278,13 @@ The type-name conversion functions (`bigint`, `binary`, `boolean`, `date`, `deci | `quarter` | ✅ | | | `second` | ✅ | | | `session_window` | 🔜 | Batch session-window grouping falls back (`UpdatingSessionsExec` is not yet native); tracked by [#4785](https://github.com/apache/datafusion-comet/issues/4785) | -| `time_diff` | 🔜 | Spark 4.1 TIME type; tracked by [#4288](https://github.com/apache/datafusion-comet/issues/4288) | -| `time_trunc` | 🔜 | Spark 4.1 TIME type; tracked by [#4288](https://github.com/apache/datafusion-comet/issues/4288) | +| `time_diff` | ✅ | Spark 4.1+; routes through the JVM codegen dispatcher | +| `time_trunc` | ✅ | Spark 4.1+; routes through the JVM codegen dispatcher | | `timestamp_micros` | ✅ | | | `timestamp_millis` | ✅ | | | `timestamp_seconds` | ✅ | | | `to_date` | ✅ | Rewrites to `Cast` (or `Cast(GetTimestamp)` with a format) before Comet sees the plan | -| `to_time` | 🔜 | Spark 4.1 TIME type; tracked by [#4288](https://github.com/apache/datafusion-comet/issues/4288) | +| `to_time` | ✅ | Spark 4.1+ | | `to_timestamp` | ✅ | Rewrites to `Cast` (or `GetTimestamp` with a format) before Comet sees the plan | | `to_timestamp_ltz` | ✅ | Rewrites to `to_timestamp` (`TimestampType`) | | `to_timestamp_ntz` | ✅ | Rewrites to `to_timestamp` (`TimestampNTZType`) | @@ -294,7 +294,7 @@ The type-name conversion functions (`bigint`, `binary`, `boolean`, `date`, `deci | `try_make_interval` | 🔜 | Produces legacy CalendarInterval; tracked by [#4540](https://github.com/apache/datafusion-comet/issues/4540) | | `try_make_timestamp` | ✅ | | | `try_to_date` | 🔜 | Rewrites to `Cast`/`GetTimestamp` but currently falls back; tracked by [#4556](https://github.com/apache/datafusion-comet/issues/4556) | -| `try_to_time` | 🔜 | Spark 4.1 TIME type; tracked by [#4288](https://github.com/apache/datafusion-comet/issues/4288) | +| `try_to_time` | ✅ | Spark 4.1+ | | `try_to_timestamp` | 🔜 | Rewrites to `Cast`/`GetTimestamp` but currently falls back; tracked by [#4556](https://github.com/apache/datafusion-comet/issues/4556) | | `unix_date` | ✅ | | | `unix_micros` | ✅ | | diff --git a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala index 9f8fc77eba..115d3f78c8 100644 --- a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala +++ b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala @@ -24,8 +24,9 @@ import scala.collection.mutable.ListBuffer import org.apache.spark.sql.types._ import org.apache.comet.DataTypeSupport.{ARRAY_ELEMENT, MAP_KEY, MAP_VALUE} +import org.apache.comet.shims.CometTypeShim -trait DataTypeSupport { +trait DataTypeSupport extends CometTypeShim { /** * Checks if this schema is supported by checking if each field in the schema is supported. @@ -53,6 +54,8 @@ trait DataTypeSupport { BinaryType | StringType | _: DecimalType | DateType | TimestampType | TimestampNTZType => true + case dt if isTimeType(dt) => + true case StructType(fields) => fields.nonEmpty && fields.forall(f => isTypeSupported(f.dataType, f.name, fallbackReasons)) diff --git a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala index 107fb5e7f8..d90d342c09 100644 --- a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala +++ b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.comet.shims.CometExprTraitShim +import org.apache.comet.shims.{CometExprTraitShim, CometTypeShim} /** * Compiles a bound [[Expression]] plus an Arrow input schema into a [[CometBatchKernel]] that @@ -49,7 +49,7 @@ import org.apache.comet.shims.CometExprTraitShim * The generated kernel is the `InternalRow` that Spark's `BoundReference.genCode` reads from. See * [[generateSource]] for how the wiring is set up. */ -object CometBatchKernelCodegen extends Logging with CometExprTraitShim { +object CometBatchKernelCodegen extends Logging with CometExprTraitShim with CometTypeShim { /** * Resolve an Arrow vector class by simple name through the codegen object's own classloader. @@ -69,6 +69,7 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim { case "DateDayVector" => classOf[DateDayVector] case "TimeStampMicroVector" => classOf[TimeStampMicroVector] case "TimeStampMicroTZVector" => classOf[TimeStampMicroTZVector] + case "TimeNanoVector" => classOf[TimeNanoVector] case "VarCharVector" => classOf[VarCharVector] case "VarBinaryVector" => classOf[VarBinaryVector] case "IntervalYearVector" => classOf[IntervalYearVector] @@ -87,6 +88,7 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim { case _: StringType | _: BinaryType => true case DateType | TimestampType | TimestampNTZType => true case _: YearMonthIntervalType | _: DayTimeIntervalType => true + case dt if isTimeType(dt) => true case ArrayType(inner, _) => isSupportedDataType(inner) case st: StructType => st.fields.forall(f => isSupportedDataType(f.dataType)) case mt: MapType => isSupportedDataType(mt.keyType) && isSupportedDataType(mt.valueType) diff --git a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenInput.scala b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenInput.scala index 09bfc52bd4..6872d8681e 100644 --- a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenInput.scala +++ b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenInput.scala @@ -62,7 +62,8 @@ private[codegen] object CometBatchKernelCodegenInput { classOf[Float8Vector], classOf[DateDayVector], classOf[TimeStampMicroVector], - classOf[TimeStampMicroTZVector]) + classOf[TimeStampMicroTZVector], + classOf[TimeNanoVector]) private val cometPlainVectorName: String = classOf[CometPlainVector].getName /** Emit kernel typed-vector field declarations for every level of every input column. */ @@ -134,7 +135,8 @@ private[codegen] object CometBatchKernelCodegenInput { case (ArrowColumnSpec(cls, _), ord) if cls == classOf[BigIntVector] || cls == classOf[TimeStampMicroVector] || - cls == classOf[TimeStampMicroTZVector] => + cls == classOf[TimeStampMicroTZVector] || + cls == classOf[TimeNanoVector] => s" case $ord: return this.col$ord.getLong(this.rowIdx);" } val floatCases = withOrd.collect { diff --git a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenOutput.scala b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenOutput.scala index fe282cdc65..6bdf109489 100644 --- a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenOutput.scala +++ b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenOutput.scala @@ -31,13 +31,14 @@ import org.apache.spark.sql.comet.util.Utils import org.apache.spark.sql.types._ import org.apache.comet.CometArrowAllocator +import org.apache.comet.shims.CometTypeShim /** * Output-side emitters for the codegen kernel: [[allocateOutput]], [[emitOutputWriter]] * (top-level write entry), [[emitWrite]] (recursive per-type write), the output vector-class * lookup. Paired with [[CometBatchKernelCodegenInput]] on the read side. */ -private[codegen] object CometBatchKernelCodegenOutput { +private[codegen] object CometBatchKernelCodegenOutput extends CometTypeShim { /** * Spark `DataType` to an Arrow `Field` with names Comet expects on FFI export. Spark's @@ -173,6 +174,7 @@ private[codegen] object CometBatchKernelCodegenOutput { case TimestampNTZType => classOf[TimeStampMicroVector].getName case _: YearMonthIntervalType => classOf[IntervalYearVector].getName case _: DayTimeIntervalType => classOf[DurationVector].getName + case dt if isTimeType(dt) => classOf[TimeNanoVector].getName case _: ArrayType => classOf[ListVector].getName case _: StructType => classOf[StructVector].getName case _: MapType => classOf[MapVector].getName @@ -216,6 +218,10 @@ private[codegen] object CometBatchKernelCodegenOutput { // DayTimeIntervalType -> DurationVector.set(int, long micros). val set = if (nested) "setSafe" else "set" OutputEmit("", s"$targetVec.$set($idx, $source);") + case dt if isTimeType(dt) => + // TIME is stored as nanoseconds-of-day in a Long, matching TimeNanoVector.set(int, long). + val set = if (nested) "setSafe" else "set" + OutputEmit("", s"$targetVec.$set($idx, $source);") case dt: DecimalType => // DecimalOutputShortFastPath: precision <= 18 fits in a signed long, so pass the unscaled // value to `setSafe(int, long)` and skip the BigDecimal allocation. @@ -398,6 +404,7 @@ private[codegen] object CometBatchKernelCodegenOutput { case ShortType => s"$target.getShort($idx)" case IntegerType | DateType => s"$target.getInt($idx)" case LongType | TimestampType | TimestampNTZType => s"$target.getLong($idx)" + case dt if isTimeType(dt) => s"$target.getLong($idx)" case FloatType => s"$target.getFloat($idx)" case DoubleType => s"$target.getDouble($idx)" case dt: DecimalType => s"$target.getDecimal($idx, ${dt.precision}, ${dt.scale})" diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 619b69912f..576695e498 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -25,9 +25,9 @@ import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.{isSpark40Plus, withFallbackReason} -import org.apache.comet.serde.{CometExpressionSerde, Compatible, ExprOuterClass, Incompatible, SupportLevel, Unsupported} +import org.apache.comet.serde.{CometExpressionSerde, CometScalaUDF, Compatible, ExprOuterClass, Incompatible, SupportLevel, Unsupported} import org.apache.comet.serde.ExprOuterClass.Expr -import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProtoInternal, serializeDataType} +import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProtoInternal, isTimeType, serializeDataType} import org.apache.comet.shims.CometExprShim object CometCast extends CometExpressionSerde[Cast] with CometExprShim { @@ -77,11 +77,19 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { // (see `convert`), so the cast never executes natively and the result matches Spark by // definition. `CometLiteral` then validates the resulting literal's data type. Compatible() + } else if (involvesTimeType(cast)) { + // Casts to/from TIME have no native lowering yet. `convert` routes them through the JVM + // codegen dispatcher instead of falling the projection back to Spark; report `Compatible` + // here so the dispatch path is taken. + Compatible() } else { isSupported(cast.child.dataType, cast.dataType, cast.timeZoneId, evalMode(cast)) } } + private def involvesTimeType(cast: Cast): Boolean = + isTimeType(cast.child.dataType) || isTimeType(cast.dataType) + override def convert( cast: Cast, inputs: Seq[Attribute], @@ -90,6 +98,11 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { cast.child match { case _: Literal => exprToProtoInternal(Literal.create(cast.eval(), cast.dataType), inputs, binding) + case _ if involvesTimeType(cast) => + // Casts to/from TIME (`TimeType(_)` <-> string/integral/decimal, precision changes) all + // have full Spark codegen but no native lowering. Route through the JVM codegen + // dispatcher so the enclosing projection stays native. + CometScalaUDF.emitJvmCodegenDispatch(cast, inputs, binding) case _ => if (isAlwaysCastToNull(cast.child.dataType, cast.dataType, cometEvalMode)) { exprToProtoInternal(Literal.create(null, cast.dataType), inputs, binding) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index bc02e22da9..e0e11f1ec2 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -751,6 +751,12 @@ case class CometScanTypeChecker() extends DataTypeSupport with CometTypeShim { "native execution if your data does not contain unsigned small integers. " + CometConf.COMPAT_GUIDE false + case dt if isTimeType(dt) => + // The native Parquet reader has not been taught to decode the TIME logical type into + // an Arrow Time64(NANOSECOND) vector, so fall back to Spark for scans that expose one. + fallbackReasons += s"Unsupported $name of type $dt (native Parquet scan does not " + + "support TIME)" + false case dt if isStringCollationType(dt) => // we don't need specific support for collation in scans, but this // is a convenient place to force the whole query to fall back to Spark for now diff --git a/spark/src/main/scala/org/apache/comet/udf/codegen/CometScalaUDFCodegen.scala b/spark/src/main/scala/org/apache/comet/udf/codegen/CometScalaUDFCodegen.scala index f575dd5b53..bd300e1b5a 100644 --- a/spark/src/main/scala/org/apache/comet/udf/codegen/CometScalaUDFCodegen.scala +++ b/spark/src/main/scala/org/apache/comet/udf/codegen/CometScalaUDFCodegen.scala @@ -220,7 +220,7 @@ class CometScalaUDFCodegen extends CometUDF with Logging { case _: BitVector | _: TinyIntVector | _: SmallIntVector | _: IntVector | _: BigIntVector | _: Float4Vector | _: Float8Vector | _: DecimalVector | _: VarCharVector | _: VarBinaryVector | _: DateDayVector | _: TimeStampMicroVector | - _: TimeStampMicroTZVector => + _: TimeStampMicroTZVector | _: TimeNanoVector => ScalarColumnSpec(v.getClass.asInstanceOf[Class[_ <: ValueVector]], nullable = true) case other => throw new UnsupportedOperationException( diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala index 161a4e0553..5898114167 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala @@ -119,8 +119,8 @@ object CometLocalTableScanExec extends CometSink[LocalTableScanExec] with DataTy CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED) // ArrowWriter (used by RowArrowReader) handles NullType via Utils.toArrowType + NullWriter; - // other types off DataTypeSupport's allow list (TimeType, intervals, ...) have no ArrowWriter - // coverage and must fall back to Spark. + // TimeType routes through the new TimeNanoWriter. Other types off DataTypeSupport's allow list + // (intervals, ...) have no ArrowWriter coverage and must fall back to Spark. override def isTypeSupported( dt: DataType, name: String, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/ArrowWriters.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/ArrowWriters.scala index 342441ce28..acd2fb30f1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/ArrowWriters.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/ArrowWriters.scala @@ -66,6 +66,7 @@ private[arrow] object ArrowWriter { case (DateType, vector: DateDayVector) => new DateWriter(vector) case (TimestampType, vector: TimeStampMicroTZVector) => new TimestampWriter(vector) case (TimestampNTZType, vector: TimeStampMicroVector) => new TimestampNTZWriter(vector) + case (dt, vector: TimeNanoVector) if Utils.isTimeType(dt) => new TimeNanoWriter(vector) case (ArrayType(_, _), vector: ListVector) => val elementVector = createFieldWriter(vector.getDataVector()) new ArrayWriter(vector, elementVector) @@ -365,6 +366,17 @@ private[arrow] class TimestampNTZWriter(val valueVector: TimeStampMicroVector) } } +private[arrow] class TimeNanoWriter(val valueVector: TimeNanoVector) extends ArrowFieldWriter { + + override def setNull(): Unit = { + valueVector.setNull(count) + } + + override def setValue(input: SpecializedGetters, ordinal: Int): Unit = { + valueVector.setSafe(count, input.getLong(ordinal)) + } +} + private[arrow] class ArrayWriter(val valueVector: ListVector, val elementWriter: ArrowFieldWriter) extends ArrowFieldWriter { diff --git a/spark/src/main/spark-4.1+/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.1+/org/apache/comet/shims/CometExprShim.scala index 64474291e7..04fc6e26c2 100644 --- a/spark/src/main/spark-4.1+/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.1+/org/apache/comet/shims/CometExprShim.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.TimeType import org.apache.comet.expressions.CometEvalMode +import org.apache.comet.serde.CometScalaUDF import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithFallbackReason, scalarFunctionExprToProtoWithReturnType} @@ -46,6 +47,22 @@ trait CometExprShim extends Spark4xCometExprShim { } } + // Function names on `DateTimeUtils` reached via `StaticInvoke` from the Spark 4.1 `TIME` family of + // `RuntimeReplaceable` expressions (`HoursOfTime`, `MinutesOfTime`, `SecondsOfTime`, + // `SecondsOfTimeWithFraction`, `TimeAddInterval`, `SubtractTimes`, `TimeDiff`, `TimeTrunc`). + // Each of these is codegen-safe (the replacement itself is a `StaticInvoke` with a proper + // `doGenCode`) but has no native lowering yet, so we route them through the JVM codegen + // dispatcher to keep the enclosing projection native. + private val timeCodegenDispatchFunctions: Set[String] = Set( + "getHoursOfTime", + "getMinutesOfTime", + "getSecondsOfTime", + "getSecondsOfTimeWithFraction", + "timeAddInterval", + "subtractTimes", + "timeDiff", + "timeTrunc") + // Spark 4.1 introduced TimeType and the make_time / to_time / try_to_time functions. // Their planner forms differ from the shared 4.x patterns (DateTimeUtils.makeTime // StaticInvoke and ToTimeParser Invoke / TryEval(Invoke)), so they live here rather @@ -66,6 +83,14 @@ trait CometExprShim extends Spark4xCometExprShim { scalarFunctionExprToProtoWithReturnType("make_time", s.dataType, true, childExprs: _*) optExprWithFallbackReason(optExpr, expr, s.arguments: _*) + // Route the other Spark 4.1 TIME `StaticInvoke` forms through the JVM codegen dispatcher. + // `emitJvmCodegenDispatch` runs Spark's own `doGenCode` inside the Comet pipeline, so the + // projection stays native while behavior matches Spark exactly. + case s: StaticInvoke + if s.staticObject == classOf[DateTimeUtils.type] && + timeCodegenDispatchFunctions.contains(s.functionName) => + CometScalaUDF.emitJvmCodegenDispatch(s, inputs, binding) + case i: Invoke => (i.targetObject, i.functionName, i.arguments) match { case (Literal(parser: ToTimeParser, _), "parse", args) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index dce303b8c1..a01f6ef8bd 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -3940,7 +3940,7 @@ class CometExecSuite extends CometTestBase { } } - test("CometLocalTableScanExec falls back when schema contains TimeType") { + test("CometLocalTableScanExec handles TimeType column") { assume( org.apache.comet.CometSparkSessionExtensions.isSpark41Plus, "TimeType requires Spark 4.1+") @@ -3949,10 +3949,10 @@ class CometExecSuite extends CometTestBase { withSQLConf( "spark.sql.timeType.enabled" -> "true", CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { - // VALUES folds to a LocalRelation, exercising the CometLocalTableScanExec convert - // path; the TimeType column should drive the schema-level fallback. + // VALUES folds to a LocalRelation, exercising the CometLocalTableScanExec convert path. + // TimeType routes through TimeNanoWriter, so the native scan handles it end-to-end. val df = spark.sql("SELECT * FROM VALUES (TIME '12:34:56'), (TIME '01:02:03') AS t(c)") - checkSparkAnswer(df) + checkSparkAnswerAndOperator(df) } } diff --git a/spark/src/test/spark-4.1/org/apache/spark/sql/CometTimeTypeSuite.scala b/spark/src/test/spark-4.1/org/apache/spark/sql/CometTimeTypeSuite.scala new file mode 100644 index 0000000000..e6d49591a0 --- /dev/null +++ b/spark/src/test/spark-4.1/org/apache/spark/sql/CometTimeTypeSuite.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql + +import org.apache.comet.CometConf + +/** + * Coverage for the Spark 4.1 `TIME` type and the SQL functions that operate on it. All function + * calls that don't have a native lowering are routed through the JVM codegen dispatcher via the + * `CometExprShim`. Casts to/from TIME follow the same route via `CometCast`. + */ +class CometTimeTypeSuite extends CometTestBase { + + override protected def sparkConf = super.sparkConf + .set("spark.sql.timeType.enabled", "true") + .set(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key, "true") + + test("literal TIME values round-trip through LocalTableScan") { + val df = spark.sql( + "SELECT * FROM VALUES (TIME '00:00:00'), (TIME '12:34:56.789012'), " + + "(TIME '23:59:59.999999') AS t(c)") + checkSparkAnswerAndOperator(df) + } + + test("make_time(hour, minute, second) via native path") { + val df = spark.sql( + "SELECT make_time(h, m, s) FROM VALUES (0, 0, 0.0), (6, 30, 45.887), " + + "(23, 59, 59.999999) AS t(h, m, s)") + checkSparkAnswerAndOperator(df) + } + + test("to_time(str) via native path") { + val df = spark.sql( + "SELECT to_time(s) FROM VALUES ('00:12:00'), ('12:34:56.789'), " + + "('23:59:59.999999') AS t(s)") + checkSparkAnswerAndOperator(df) + } + + test("try_to_time(str) via native path returns NULL on malformed input") { + val df = spark.sql("SELECT try_to_time(s) FROM VALUES ('00:12:00'), ('bad'), (NULL) AS t(s)") + checkSparkAnswerAndOperator(df) + } + + test("hour(TIME) via codegen dispatch") { + val df = spark.sql( + "SELECT hour(t) FROM VALUES (TIME '00:00:00'), (TIME '13:45:00'), " + + "(TIME '23:59:59.999999') AS v(t)") + checkSparkAnswerAndOperator(df) + } + + test("minute(TIME) via codegen dispatch") { + val df = spark.sql( + "SELECT minute(t) FROM VALUES (TIME '00:00:00'), (TIME '13:45:00'), " + + "(TIME '23:59:59.999999') AS v(t)") + checkSparkAnswerAndOperator(df) + } + + test("second(TIME) via codegen dispatch") { + val df = spark.sql( + "SELECT second(t) FROM VALUES (TIME '00:00:00'), (TIME '13:45:07'), " + + "(TIME '23:59:59.999999') AS v(t)") + checkSparkAnswerAndOperator(df) + } + + test("EXTRACT SECOND FROM TIME returns decimal via codegen dispatch") { + val df = spark.sql( + "SELECT EXTRACT(SECOND FROM t) FROM VALUES (TIME '00:00:00'), " + + "(TIME '13:45:07.123456'), (TIME '23:59:59.999999') AS v(t)") + checkSparkAnswerAndOperator(df) + } + + test("time + day-time interval via codegen dispatch") { + val df = spark.sql( + "SELECT t + INTERVAL '1' HOUR FROM VALUES (TIME '00:00:00'), (TIME '22:30:00') AS v(t)") + checkSparkAnswerAndOperator(df) + } + + test("time - time returns day-time interval via codegen dispatch") { + // Comet does not yet support DayTimeIntervalType outputs at the projection level, so this + // exercises the shim wiring but currently falls back to Spark. Verify results only. + val df = spark.sql( + "SELECT b - a FROM VALUES (TIME '01:00:00', TIME '02:15:30'), " + + "(TIME '10:00:00', TIME '09:59:59.5') AS v(a, b)") + checkSparkAnswer(df) + } + + test("time_diff via codegen dispatch") { + val df = spark.sql( + "SELECT time_diff('HOUR', a, b), time_diff('MINUTE', a, b), time_diff('SECOND', a, b) " + + "FROM VALUES (TIME '20:30:29', TIME '21:30:29'), (TIME '00:00:00', TIME '00:00:01') " + + "AS v(a, b)") + checkSparkAnswerAndOperator(df) + } + + test("time_trunc via codegen dispatch") { + val df = spark.sql( + "SELECT time_trunc('HOUR', t), time_trunc('MINUTE', t), time_trunc('SECOND', t), " + + "time_trunc('MILLISECOND', t) " + + "FROM VALUES (TIME '09:32:05.359'), (TIME '23:59:59.999999') AS v(t)") + checkSparkAnswerAndOperator(df) + } + + test("cast string to TIME via codegen dispatch") { + val df = + spark.sql("SELECT CAST(s AS TIME) FROM VALUES ('00:12:00'), ('23:59:59.999999') AS v(s)") + checkSparkAnswerAndOperator(df) + } + + test("cast TIME to string via codegen dispatch") { + val df = spark.sql( + "SELECT CAST(t AS STRING) FROM VALUES (TIME '00:00:00'), " + + "(TIME '13:45:07.123456') AS v(t)") + checkSparkAnswerAndOperator(df) + } + + test("cast TIME to LONG returns seconds via codegen dispatch") { + val df = spark.sql( + "SELECT CAST(t AS BIGINT) FROM VALUES (TIME '00:00:00'), (TIME '01:00:00'), " + + "(TIME '23:59:59.999999') AS v(t)") + checkSparkAnswerAndOperator(df) + } +} From 963f81e7b8e93e0e42f6d2941ae283d5b26d441f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 3 Jul 2026 12:15:00 -0600 Subject: [PATCH 2/2] test: cover Spark 4.1 TIME expressions via SQL-file tests Replaces the initial CometTimeTypeSuite Scala tests with per-expression sql-tests fixtures under expressions/datetime and expressions/cast, so coverage runs through the same CometSqlFileTestSuite pipeline as other datetime tests (constant folding disabled, unified reporting). Also accept Time64(Nanosecond) in the native LongVal literal path so constant-folded TIME literals from CometCast (cast literal string -> TIME evaluated at planning time) round-trip through the native scan. --- native/core/src/execution/planner.rs | 3 + .../sql-tests/expressions/cast/cast_time.sql | 91 ++++++++++++ .../expressions/datetime/current_time.sql | 36 +++++ .../expressions/datetime/hour_time.sql | 51 +++++++ .../expressions/datetime/minute_time.sql | 48 ++++++ .../expressions/datetime/second_time.sql | 63 ++++++++ .../expressions/datetime/subtract_times.sql | 44 ++++++ .../datetime/time_add_interval.sql | 59 ++++++++ .../expressions/datetime/time_diff.sql | 65 ++++++++ .../expressions/datetime/time_trunc.sql | 62 ++++++++ .../apache/spark/sql/CometTimeTypeSuite.scala | 139 ------------------ 11 files changed, 522 insertions(+), 139 deletions(-) create mode 100644 spark/src/test/resources/sql-tests/expressions/cast/cast_time.sql create mode 100644 spark/src/test/resources/sql-tests/expressions/datetime/current_time.sql create mode 100644 spark/src/test/resources/sql-tests/expressions/datetime/hour_time.sql create mode 100644 spark/src/test/resources/sql-tests/expressions/datetime/minute_time.sql create mode 100644 spark/src/test/resources/sql-tests/expressions/datetime/second_time.sql create mode 100644 spark/src/test/resources/sql-tests/expressions/datetime/subtract_times.sql create mode 100644 spark/src/test/resources/sql-tests/expressions/datetime/time_add_interval.sql create mode 100644 spark/src/test/resources/sql-tests/expressions/datetime/time_diff.sql create mode 100644 spark/src/test/resources/sql-tests/expressions/datetime/time_trunc.sql delete mode 100644 spark/src/test/spark-4.1/org/apache/spark/sql/CometTimeTypeSuite.scala diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 25162332fd..7d595e8e41 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -389,6 +389,9 @@ impl PhysicalPlanner { DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => { ScalarValue::TimestampMicrosecond(Some(*value), Some(tz)) } + DataType::Time64(TimeUnit::Nanosecond) => { + ScalarValue::Time64Nanosecond(Some(*value)) + } dt => { return Err(GeneralError(format!( "Expected either 'Int64' or 'Timestamp' for LongVal, but found {dt:?}" diff --git a/spark/src/test/resources/sql-tests/expressions/cast/cast_time.sql b/spark/src/test/resources/sql-tests/expressions/cast/cast_time.sql new file mode 100644 index 0000000000..663bafd585 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/cast/cast_time.sql @@ -0,0 +1,91 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- Casts to/from TIME have full Spark codegen but no native lowering in Comet. CometCast +-- routes them through the JVM codegen dispatcher so the enclosing projection stays +-- native and results match Spark exactly. + +statement +CREATE TABLE test_cast_time(s string, h int, m int, sec decimal(16,6)) USING parquet + +statement +INSERT INTO test_cast_time VALUES + ('00:00:00', 0, 0, 0.000000), + ('12:34:56', 12, 34, 56.000000), + ('23:59:59.999999',23, 59, 59.999999), + ('01:00:00', 1, 0, 0.000000), + (NULL, NULL, NULL, NULL) + +-- string -> TIME (column) +query +SELECT CAST(s AS TIME) FROM test_cast_time + +-- string -> TIME (literals) +query +SELECT CAST('00:00:00' AS TIME) + +query +SELECT CAST('23:59:59.999999' AS TIME) + +-- TIME -> string (column) +query +SELECT CAST(make_time(h, m, sec) AS STRING) FROM test_cast_time + +-- TIME -> string (literals) +query +SELECT CAST(TIME '00:00:00' AS STRING) + +query +SELECT CAST(TIME '13:45:07.123456' AS STRING) + +-- TIME -> BIGINT (whole seconds since midnight, via floor(nanos / 1e9)) +query +SELECT CAST(make_time(h, m, sec) AS BIGINT) FROM test_cast_time + +query +SELECT CAST(TIME '00:00:00' AS BIGINT) + +query +SELECT CAST(TIME '01:00:00' AS BIGINT) + +query +SELECT CAST(TIME '23:59:59.999999' AS BIGINT) + +-- TIME -> INT (same seconds-truncation semantics) +query +SELECT CAST(TIME '13:45:07.999999' AS INT) + +-- TIME -> DECIMAL (fractional-seconds-preserving, nanos-based) +query +SELECT CAST(TIME '13:45:07.123456' AS DECIMAL(18,6)) + +-- TIME -> TIME with different precision (truncates fractional digits) +query +SELECT CAST(TIME '13:45:07.123456' AS TIME(0)) + +query +SELECT CAST(TIME '13:45:07.123456' AS TIME(3)) + +-- NULL round-trips +query +SELECT CAST(CAST(NULL AS STRING) AS TIME) + +query +SELECT CAST(CAST(NULL AS TIME) AS STRING) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/current_time.sql b/spark/src/test/resources/sql-tests/expressions/datetime/current_time.sql new file mode 100644 index 0000000000..328160b654 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/current_time.sql @@ -0,0 +1,36 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- current_time() is CodegenFallback but foldable; Spark's optimizer folds it to a Time +-- literal before Comet sees the plan, so it never actually hits the codegen dispatcher. +-- These tests pin behavior rather than a specific timestamp value. + +query +SELECT current_time() IS NOT NULL + +query +SELECT current_time(0) IS NOT NULL + +query +SELECT current_time(6) IS NOT NULL + +-- Two calls in the same query return the same value +query +SELECT current_time() = current_time() diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/hour_time.sql b/spark/src/test/resources/sql-tests/expressions/datetime/hour_time.sql new file mode 100644 index 0000000000..685f31ceb5 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/hour_time.sql @@ -0,0 +1,51 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- hour(TIME) rewrites to HoursOfTime -> StaticInvoke(DateTimeUtils.getHoursOfTime). +-- The shim routes that StaticInvoke through the JVM codegen dispatcher. + +statement +CREATE TABLE test_hour_time(h int, m int, s decimal(16,6)) USING parquet + +statement +INSERT INTO test_hour_time VALUES + (0, 0, 0.000000), + (1, 2, 3.500000), + (12, 30, 45.123456), + (23, 59, 59.999999), + (NULL, 0, 0.000000) + +-- column argument built via make_time +query +SELECT hour(make_time(h, m, s)) FROM test_hour_time + +-- literal TIME arguments +query +SELECT hour(TIME '00:00:00') + +query +SELECT hour(TIME '13:45:00') + +query +SELECT hour(TIME '23:59:59.999999') + +-- null TIME +query +SELECT hour(CAST(NULL AS TIME)) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/minute_time.sql b/spark/src/test/resources/sql-tests/expressions/datetime/minute_time.sql new file mode 100644 index 0000000000..ba84ced853 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/minute_time.sql @@ -0,0 +1,48 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- minute(TIME) rewrites to MinutesOfTime -> StaticInvoke(DateTimeUtils.getMinutesOfTime). +-- The shim routes that StaticInvoke through the JVM codegen dispatcher. + +statement +CREATE TABLE test_minute_time(h int, m int, s decimal(16,6)) USING parquet + +statement +INSERT INTO test_minute_time VALUES + (0, 0, 0.000000), + (1, 2, 3.500000), + (12, 30, 45.123456), + (23, 59, 59.999999), + (0, NULL, 0.000000) + +query +SELECT minute(make_time(h, m, s)) FROM test_minute_time + +query +SELECT minute(TIME '00:00:00') + +query +SELECT minute(TIME '13:45:00') + +query +SELECT minute(TIME '23:59:59.999999') + +query +SELECT minute(CAST(NULL AS TIME)) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/second_time.sql b/spark/src/test/resources/sql-tests/expressions/datetime/second_time.sql new file mode 100644 index 0000000000..6ce4f0801b --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/second_time.sql @@ -0,0 +1,63 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- second(TIME) rewrites to SecondsOfTime -> StaticInvoke(DateTimeUtils.getSecondsOfTime). +-- EXTRACT(SECOND FROM TIME) rewrites to SecondsOfTimeWithFraction -> +-- StaticInvoke(DateTimeUtils.getSecondsOfTimeWithFraction) which returns Decimal(8,6). +-- Both StaticInvoke forms route through the JVM codegen dispatcher. + +statement +CREATE TABLE test_second_time(h int, m int, s decimal(16,6)) USING parquet + +statement +INSERT INTO test_second_time VALUES + (0, 0, 0.000000), + (1, 2, 3.500000), + (12, 30, 45.123456), + (23, 59, 59.999999), + (0, 0, NULL) + +query +SELECT second(make_time(h, m, s)) FROM test_second_time + +query +SELECT second(TIME '00:00:00') + +query +SELECT second(TIME '13:45:07') + +query +SELECT second(TIME '23:59:59.999999') + +query +SELECT second(CAST(NULL AS TIME)) + +-- EXTRACT(SECOND FROM TIME) preserves fractional part as Decimal(8,6) +query +SELECT EXTRACT(SECOND FROM make_time(h, m, s)) FROM test_second_time + +query +SELECT EXTRACT(SECOND FROM TIME '00:00:00') + +query +SELECT EXTRACT(SECOND FROM TIME '13:45:07.123456') + +query +SELECT EXTRACT(SECOND FROM TIME '23:59:59.999999') diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/subtract_times.sql b/spark/src/test/resources/sql-tests/expressions/datetime/subtract_times.sql new file mode 100644 index 0000000000..187192a046 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/subtract_times.sql @@ -0,0 +1,44 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- time - time rewrites to SubtractTimes -> StaticInvoke(DateTimeUtils.subtractTimes). +-- The shim routes it through the JVM codegen dispatcher; the result is +-- DayTimeIntervalType(HOUR, SECOND), which Comet's projection layer does not accept as an +-- output type yet, so these queries currently fall back to Spark. spark_answer_only pins +-- the semantics until DayTimeIntervalType is broadly supported. + +statement +CREATE TABLE test_time_sub(h1 int, m1 int, s1 decimal(16,6), h2 int, m2 int, s2 decimal(16,6)) USING parquet + +statement +INSERT INTO test_time_sub VALUES + (1, 0, 0.000000, 2, 15, 30.000000), + (10, 0, 0.000000, 9, 59, 59.500000), + (0, 0, 0.000000, 23, 59, 59.999999), + (NULL, 0, 0.000000, 0, 0, 0.000000) + +query spark_answer_only +SELECT make_time(h2, m2, s2) - make_time(h1, m1, s1) FROM test_time_sub + +query spark_answer_only +SELECT TIME '02:15:30' - TIME '01:00:00' + +query spark_answer_only +SELECT TIME '09:59:59.5' - TIME '10:00:00' diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/time_add_interval.sql b/spark/src/test/resources/sql-tests/expressions/datetime/time_add_interval.sql new file mode 100644 index 0000000000..82044fb08f --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/time_add_interval.sql @@ -0,0 +1,59 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- time + day-time interval rewrites to TimeAddInterval -> +-- StaticInvoke(DateTimeUtils.timeAddInterval). Routes through the JVM codegen dispatcher. + +statement +CREATE TABLE test_time_add(h int, m int, s decimal(16,6)) USING parquet + +statement +INSERT INTO test_time_add VALUES + (0, 0, 0.000000), + (10, 15, 30.000000), + (22, 30, 0.000000), + (12, 34, 56.789012), + (NULL, 0, 0.000000) + +-- add an hour to a TIME column +query +SELECT make_time(h, m, s) + INTERVAL '1' HOUR FROM test_time_add + +-- add fractional-second interval +query +SELECT make_time(h, m, s) + INTERVAL '0.500' SECOND FROM test_time_add + +-- literal TIME + literal interval +query +SELECT TIME '00:00:00' + INTERVAL '1' HOUR + +query +SELECT TIME '22:30:00' + INTERVAL '30' MINUTE + +query +SELECT TIME '10:00:00' + INTERVAL '0.5' SECOND + +-- NULL TIME +query +SELECT CAST(NULL AS TIME) + INTERVAL '1' HOUR + +-- NULL interval +query +SELECT TIME '10:00:00' + CAST(NULL AS INTERVAL HOUR) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/time_diff.sql b/spark/src/test/resources/sql-tests/expressions/datetime/time_diff.sql new file mode 100644 index 0000000000..d190bc5461 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/time_diff.sql @@ -0,0 +1,65 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- time_diff(unit, start, end) rewrites to TimeDiff -> +-- StaticInvoke(DateTimeUtils.timeDiff). Result is LongType. Routes through the JVM codegen +-- dispatcher. + +statement +CREATE TABLE test_time_diff(h1 int, m1 int, s1 decimal(16,6), h2 int, m2 int, s2 decimal(16,6)) USING parquet + +statement +INSERT INTO test_time_diff VALUES + (20, 30, 29.000000, 21, 30, 29.000000), + (0, 0, 0.000000, 0, 0, 1.000000), + (20, 30, 29.000000, 12, 0, 0.000000), + (0, 0, 0.000000, 23, 59, 59.999999), + (NULL, 0, 0.000000, 0, 0, 0.000000) + +-- column arguments across all supported units +query +SELECT time_diff('HOUR', make_time(h1, m1, s1), make_time(h2, m2, s2)) FROM test_time_diff + +query +SELECT time_diff('MINUTE', make_time(h1, m1, s1), make_time(h2, m2, s2)) FROM test_time_diff + +query +SELECT time_diff('SECOND', make_time(h1, m1, s1), make_time(h2, m2, s2)) FROM test_time_diff + +query +SELECT time_diff('MILLISECOND', make_time(h1, m1, s1), make_time(h2, m2, s2)) FROM test_time_diff + +query +SELECT time_diff('MICROSECOND', make_time(h1, m1, s1), make_time(h2, m2, s2)) FROM test_time_diff + +-- literal TIME arguments +query +SELECT time_diff('HOUR', TIME '20:30:29', TIME '21:30:29') + +query +SELECT time_diff('SECOND', TIME '00:00:00', TIME '23:59:59.999999') + +-- negative difference (start > end) +query +SELECT time_diff('HOUR', TIME '20:30:29', TIME '12:00:00') + +-- lowercase unit +query +SELECT time_diff('second', TIME '00:00:00', TIME '00:00:01') diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/time_trunc.sql b/spark/src/test/resources/sql-tests/expressions/datetime/time_trunc.sql new file mode 100644 index 0000000000..5f9b25c50b --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/time_trunc.sql @@ -0,0 +1,62 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- time_trunc(unit, time) rewrites to TimeTrunc -> StaticInvoke(DateTimeUtils.timeTrunc). +-- Routes through the JVM codegen dispatcher. + +statement +CREATE TABLE test_time_trunc(h int, m int, s decimal(16,6)) USING parquet + +statement +INSERT INTO test_time_trunc VALUES + (9, 32, 5.359000), + (23, 59, 59.999999), + (0, 0, 0.000000), + (12, 34, 56.123456), + (NULL, 0, 0.000000) + +query +SELECT time_trunc('HOUR', make_time(h, m, s)) FROM test_time_trunc + +query +SELECT time_trunc('MINUTE', make_time(h, m, s)) FROM test_time_trunc + +query +SELECT time_trunc('SECOND', make_time(h, m, s)) FROM test_time_trunc + +query +SELECT time_trunc('MILLISECOND', make_time(h, m, s)) FROM test_time_trunc + +query +SELECT time_trunc('MICROSECOND', make_time(h, m, s)) FROM test_time_trunc + +-- literal TIME arguments +query +SELECT time_trunc('HOUR', TIME '09:32:05.359') + +query +SELECT time_trunc('MILLISECOND', TIME '09:32:05.123456') + +query +SELECT time_trunc('SECOND', TIME '23:59:59.999999') + +-- lowercase unit +query +SELECT time_trunc('hour', TIME '13:45:07.999999') diff --git a/spark/src/test/spark-4.1/org/apache/spark/sql/CometTimeTypeSuite.scala b/spark/src/test/spark-4.1/org/apache/spark/sql/CometTimeTypeSuite.scala deleted file mode 100644 index e6d49591a0..0000000000 --- a/spark/src/test/spark-4.1/org/apache/spark/sql/CometTimeTypeSuite.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.spark.sql - -import org.apache.comet.CometConf - -/** - * Coverage for the Spark 4.1 `TIME` type and the SQL functions that operate on it. All function - * calls that don't have a native lowering are routed through the JVM codegen dispatcher via the - * `CometExprShim`. Casts to/from TIME follow the same route via `CometCast`. - */ -class CometTimeTypeSuite extends CometTestBase { - - override protected def sparkConf = super.sparkConf - .set("spark.sql.timeType.enabled", "true") - .set(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key, "true") - - test("literal TIME values round-trip through LocalTableScan") { - val df = spark.sql( - "SELECT * FROM VALUES (TIME '00:00:00'), (TIME '12:34:56.789012'), " + - "(TIME '23:59:59.999999') AS t(c)") - checkSparkAnswerAndOperator(df) - } - - test("make_time(hour, minute, second) via native path") { - val df = spark.sql( - "SELECT make_time(h, m, s) FROM VALUES (0, 0, 0.0), (6, 30, 45.887), " + - "(23, 59, 59.999999) AS t(h, m, s)") - checkSparkAnswerAndOperator(df) - } - - test("to_time(str) via native path") { - val df = spark.sql( - "SELECT to_time(s) FROM VALUES ('00:12:00'), ('12:34:56.789'), " + - "('23:59:59.999999') AS t(s)") - checkSparkAnswerAndOperator(df) - } - - test("try_to_time(str) via native path returns NULL on malformed input") { - val df = spark.sql("SELECT try_to_time(s) FROM VALUES ('00:12:00'), ('bad'), (NULL) AS t(s)") - checkSparkAnswerAndOperator(df) - } - - test("hour(TIME) via codegen dispatch") { - val df = spark.sql( - "SELECT hour(t) FROM VALUES (TIME '00:00:00'), (TIME '13:45:00'), " + - "(TIME '23:59:59.999999') AS v(t)") - checkSparkAnswerAndOperator(df) - } - - test("minute(TIME) via codegen dispatch") { - val df = spark.sql( - "SELECT minute(t) FROM VALUES (TIME '00:00:00'), (TIME '13:45:00'), " + - "(TIME '23:59:59.999999') AS v(t)") - checkSparkAnswerAndOperator(df) - } - - test("second(TIME) via codegen dispatch") { - val df = spark.sql( - "SELECT second(t) FROM VALUES (TIME '00:00:00'), (TIME '13:45:07'), " + - "(TIME '23:59:59.999999') AS v(t)") - checkSparkAnswerAndOperator(df) - } - - test("EXTRACT SECOND FROM TIME returns decimal via codegen dispatch") { - val df = spark.sql( - "SELECT EXTRACT(SECOND FROM t) FROM VALUES (TIME '00:00:00'), " + - "(TIME '13:45:07.123456'), (TIME '23:59:59.999999') AS v(t)") - checkSparkAnswerAndOperator(df) - } - - test("time + day-time interval via codegen dispatch") { - val df = spark.sql( - "SELECT t + INTERVAL '1' HOUR FROM VALUES (TIME '00:00:00'), (TIME '22:30:00') AS v(t)") - checkSparkAnswerAndOperator(df) - } - - test("time - time returns day-time interval via codegen dispatch") { - // Comet does not yet support DayTimeIntervalType outputs at the projection level, so this - // exercises the shim wiring but currently falls back to Spark. Verify results only. - val df = spark.sql( - "SELECT b - a FROM VALUES (TIME '01:00:00', TIME '02:15:30'), " + - "(TIME '10:00:00', TIME '09:59:59.5') AS v(a, b)") - checkSparkAnswer(df) - } - - test("time_diff via codegen dispatch") { - val df = spark.sql( - "SELECT time_diff('HOUR', a, b), time_diff('MINUTE', a, b), time_diff('SECOND', a, b) " + - "FROM VALUES (TIME '20:30:29', TIME '21:30:29'), (TIME '00:00:00', TIME '00:00:01') " + - "AS v(a, b)") - checkSparkAnswerAndOperator(df) - } - - test("time_trunc via codegen dispatch") { - val df = spark.sql( - "SELECT time_trunc('HOUR', t), time_trunc('MINUTE', t), time_trunc('SECOND', t), " + - "time_trunc('MILLISECOND', t) " + - "FROM VALUES (TIME '09:32:05.359'), (TIME '23:59:59.999999') AS v(t)") - checkSparkAnswerAndOperator(df) - } - - test("cast string to TIME via codegen dispatch") { - val df = - spark.sql("SELECT CAST(s AS TIME) FROM VALUES ('00:12:00'), ('23:59:59.999999') AS v(s)") - checkSparkAnswerAndOperator(df) - } - - test("cast TIME to string via codegen dispatch") { - val df = spark.sql( - "SELECT CAST(t AS STRING) FROM VALUES (TIME '00:00:00'), " + - "(TIME '13:45:07.123456') AS v(t)") - checkSparkAnswerAndOperator(df) - } - - test("cast TIME to LONG returns seconds via codegen dispatch") { - val df = spark.sql( - "SELECT CAST(t AS BIGINT) FROM VALUES (TIME '00:00:00'), (TIME '01:00:00'), " + - "(TIME '23:59:59.999999') AS v(t)") - checkSparkAnswerAndOperator(df) - } -}