diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index 2ca3a13c62..c681510ece 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -237,7 +237,7 @@ The type-name conversion functions (`bigint`, `binary`, `boolean`, `date`, `deci | `convert_timezone` | ✅ | Routes through the JVM codegen dispatcher by default (handles all timezone forms); the native path is opt-in via allowIncompatible ([details](compatibility/expressions/datetime.md)) | | `curdate` | ✅ | Constant-folded to a literal (alias of `current_date`) | | `current_date` | ✅ | Constant-folded to a literal before Comet sees the plan | -| `current_time` | 🔜 | Blocked on Spark 4.1 TIME type support ([#4288](https://github.com/apache/datafusion-comet/issues/4288)) | +| `current_time` | ✅ | Constant-folded to a literal before Comet sees the plan (Spark 4.1+) | | `current_timestamp` | ✅ | Constant-folded to a literal before Comet sees the plan | | `current_timezone` | ✅ | | | `date_add` | ✅ | | @@ -264,7 +264,7 @@ The type-name conversion functions (`bigint`, `binary`, `boolean`, `date`, `deci | `make_date` | ✅ | | | `make_dt_interval` | ✅ | | | `make_interval` | 🔜 | Produces legacy CalendarInterval; tracked by [#4540](https://github.com/apache/datafusion-comet/issues/4540) | -| `make_time` | 🔜 | Spark 4.1 TIME type; tracked by [#4288](https://github.com/apache/datafusion-comet/issues/4288) | +| `make_time` | ✅ | Spark 4.1+ | | `make_timestamp` | ✅ | | | `make_timestamp_ltz` | ✅ | 2-arg TIME form falls back | | `make_timestamp_ntz` | ✅ | 2-arg TIME form falls back | @@ -278,13 +278,13 @@ The type-name conversion functions (`bigint`, `binary`, `boolean`, `date`, `deci | `quarter` | ✅ | | | `second` | ✅ | | | `session_window` | 🔜 | Batch session-window grouping falls back (`UpdatingSessionsExec` is not yet native); tracked by [#4785](https://github.com/apache/datafusion-comet/issues/4785) | -| `time_diff` | 🔜 | Spark 4.1 TIME type; tracked by [#4288](https://github.com/apache/datafusion-comet/issues/4288) | -| `time_trunc` | 🔜 | Spark 4.1 TIME type; tracked by [#4288](https://github.com/apache/datafusion-comet/issues/4288) | +| `time_diff` | ✅ | Spark 4.1+; routes through the JVM codegen dispatcher | +| `time_trunc` | ✅ | Spark 4.1+; routes through the JVM codegen dispatcher | | `timestamp_micros` | ✅ | | | `timestamp_millis` | ✅ | | | `timestamp_seconds` | ✅ | | | `to_date` | ✅ | Rewrites to `Cast` (or `Cast(GetTimestamp)` with a format) before Comet sees the plan | -| `to_time` | 🔜 | Spark 4.1 TIME type; tracked by [#4288](https://github.com/apache/datafusion-comet/issues/4288) | +| `to_time` | ✅ | Spark 4.1+ | | `to_timestamp` | ✅ | Rewrites to `Cast` (or `GetTimestamp` with a format) before Comet sees the plan | | `to_timestamp_ltz` | ✅ | Rewrites to `to_timestamp` (`TimestampType`) | | `to_timestamp_ntz` | ✅ | Rewrites to `to_timestamp` (`TimestampNTZType`) | @@ -294,7 +294,7 @@ The type-name conversion functions (`bigint`, `binary`, `boolean`, `date`, `deci | `try_make_interval` | 🔜 | Produces legacy CalendarInterval; tracked by [#4540](https://github.com/apache/datafusion-comet/issues/4540) | | `try_make_timestamp` | ✅ | | | `try_to_date` | 🔜 | Rewrites to `Cast`/`GetTimestamp` but currently falls back; tracked by [#4556](https://github.com/apache/datafusion-comet/issues/4556) | -| `try_to_time` | 🔜 | Spark 4.1 TIME type; tracked by [#4288](https://github.com/apache/datafusion-comet/issues/4288) | +| `try_to_time` | ✅ | Spark 4.1+ | | `try_to_timestamp` | 🔜 | Rewrites to `Cast`/`GetTimestamp` but currently falls back; tracked by [#4556](https://github.com/apache/datafusion-comet/issues/4556) | | `unix_date` | ✅ | | | `unix_micros` | ✅ | | diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 25162332fd..7d595e8e41 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -389,6 +389,9 @@ impl PhysicalPlanner { DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => { ScalarValue::TimestampMicrosecond(Some(*value), Some(tz)) } + DataType::Time64(TimeUnit::Nanosecond) => { + ScalarValue::Time64Nanosecond(Some(*value)) + } dt => { return Err(GeneralError(format!( "Expected either 'Int64' or 'Timestamp' for LongVal, but found {dt:?}" diff --git a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala index 9f8fc77eba..115d3f78c8 100644 --- a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala +++ b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala @@ -24,8 +24,9 @@ import scala.collection.mutable.ListBuffer import org.apache.spark.sql.types._ import org.apache.comet.DataTypeSupport.{ARRAY_ELEMENT, MAP_KEY, MAP_VALUE} +import org.apache.comet.shims.CometTypeShim -trait DataTypeSupport { +trait DataTypeSupport extends CometTypeShim { /** * Checks if this schema is supported by checking if each field in the schema is supported. @@ -53,6 +54,8 @@ trait DataTypeSupport { BinaryType | StringType | _: DecimalType | DateType | TimestampType | TimestampNTZType => true + case dt if isTimeType(dt) => + true case StructType(fields) => fields.nonEmpty && fields.forall(f => isTypeSupported(f.dataType, f.name, fallbackReasons)) diff --git a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala index 107fb5e7f8..d90d342c09 100644 --- a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala +++ b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.comet.shims.CometExprTraitShim +import org.apache.comet.shims.{CometExprTraitShim, CometTypeShim} /** * Compiles a bound [[Expression]] plus an Arrow input schema into a [[CometBatchKernel]] that @@ -49,7 +49,7 @@ import org.apache.comet.shims.CometExprTraitShim * The generated kernel is the `InternalRow` that Spark's `BoundReference.genCode` reads from. See * [[generateSource]] for how the wiring is set up. */ -object CometBatchKernelCodegen extends Logging with CometExprTraitShim { +object CometBatchKernelCodegen extends Logging with CometExprTraitShim with CometTypeShim { /** * Resolve an Arrow vector class by simple name through the codegen object's own classloader. @@ -69,6 +69,7 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim { case "DateDayVector" => classOf[DateDayVector] case "TimeStampMicroVector" => classOf[TimeStampMicroVector] case "TimeStampMicroTZVector" => classOf[TimeStampMicroTZVector] + case "TimeNanoVector" => classOf[TimeNanoVector] case "VarCharVector" => classOf[VarCharVector] case "VarBinaryVector" => classOf[VarBinaryVector] case "IntervalYearVector" => classOf[IntervalYearVector] @@ -87,6 +88,7 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim { case _: StringType | _: BinaryType => true case DateType | TimestampType | TimestampNTZType => true case _: YearMonthIntervalType | _: DayTimeIntervalType => true + case dt if isTimeType(dt) => true case ArrayType(inner, _) => isSupportedDataType(inner) case st: StructType => st.fields.forall(f => isSupportedDataType(f.dataType)) case mt: MapType => isSupportedDataType(mt.keyType) && isSupportedDataType(mt.valueType) diff --git a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenInput.scala b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenInput.scala index 09bfc52bd4..6872d8681e 100644 --- a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenInput.scala +++ b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenInput.scala @@ -62,7 +62,8 @@ private[codegen] object CometBatchKernelCodegenInput { classOf[Float8Vector], classOf[DateDayVector], classOf[TimeStampMicroVector], - classOf[TimeStampMicroTZVector]) + classOf[TimeStampMicroTZVector], + classOf[TimeNanoVector]) private val cometPlainVectorName: String = classOf[CometPlainVector].getName /** Emit kernel typed-vector field declarations for every level of every input column. */ @@ -134,7 +135,8 @@ private[codegen] object CometBatchKernelCodegenInput { case (ArrowColumnSpec(cls, _), ord) if cls == classOf[BigIntVector] || cls == classOf[TimeStampMicroVector] || - cls == classOf[TimeStampMicroTZVector] => + cls == classOf[TimeStampMicroTZVector] || + cls == classOf[TimeNanoVector] => s" case $ord: return this.col$ord.getLong(this.rowIdx);" } val floatCases = withOrd.collect { diff --git a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenOutput.scala b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenOutput.scala index fe282cdc65..6bdf109489 100644 --- a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenOutput.scala +++ b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenOutput.scala @@ -31,13 +31,14 @@ import org.apache.spark.sql.comet.util.Utils import org.apache.spark.sql.types._ import org.apache.comet.CometArrowAllocator +import org.apache.comet.shims.CometTypeShim /** * Output-side emitters for the codegen kernel: [[allocateOutput]], [[emitOutputWriter]] * (top-level write entry), [[emitWrite]] (recursive per-type write), the output vector-class * lookup. Paired with [[CometBatchKernelCodegenInput]] on the read side. */ -private[codegen] object CometBatchKernelCodegenOutput { +private[codegen] object CometBatchKernelCodegenOutput extends CometTypeShim { /** * Spark `DataType` to an Arrow `Field` with names Comet expects on FFI export. Spark's @@ -173,6 +174,7 @@ private[codegen] object CometBatchKernelCodegenOutput { case TimestampNTZType => classOf[TimeStampMicroVector].getName case _: YearMonthIntervalType => classOf[IntervalYearVector].getName case _: DayTimeIntervalType => classOf[DurationVector].getName + case dt if isTimeType(dt) => classOf[TimeNanoVector].getName case _: ArrayType => classOf[ListVector].getName case _: StructType => classOf[StructVector].getName case _: MapType => classOf[MapVector].getName @@ -216,6 +218,10 @@ private[codegen] object CometBatchKernelCodegenOutput { // DayTimeIntervalType -> DurationVector.set(int, long micros). val set = if (nested) "setSafe" else "set" OutputEmit("", s"$targetVec.$set($idx, $source);") + case dt if isTimeType(dt) => + // TIME is stored as nanoseconds-of-day in a Long, matching TimeNanoVector.set(int, long). + val set = if (nested) "setSafe" else "set" + OutputEmit("", s"$targetVec.$set($idx, $source);") case dt: DecimalType => // DecimalOutputShortFastPath: precision <= 18 fits in a signed long, so pass the unscaled // value to `setSafe(int, long)` and skip the BigDecimal allocation. @@ -398,6 +404,7 @@ private[codegen] object CometBatchKernelCodegenOutput { case ShortType => s"$target.getShort($idx)" case IntegerType | DateType => s"$target.getInt($idx)" case LongType | TimestampType | TimestampNTZType => s"$target.getLong($idx)" + case dt if isTimeType(dt) => s"$target.getLong($idx)" case FloatType => s"$target.getFloat($idx)" case DoubleType => s"$target.getDouble($idx)" case dt: DecimalType => s"$target.getDecimal($idx, ${dt.precision}, ${dt.scale})" diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 619b69912f..576695e498 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -25,9 +25,9 @@ import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.{isSpark40Plus, withFallbackReason} -import org.apache.comet.serde.{CometExpressionSerde, Compatible, ExprOuterClass, Incompatible, SupportLevel, Unsupported} +import org.apache.comet.serde.{CometExpressionSerde, CometScalaUDF, Compatible, ExprOuterClass, Incompatible, SupportLevel, Unsupported} import org.apache.comet.serde.ExprOuterClass.Expr -import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProtoInternal, serializeDataType} +import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProtoInternal, isTimeType, serializeDataType} import org.apache.comet.shims.CometExprShim object CometCast extends CometExpressionSerde[Cast] with CometExprShim { @@ -77,11 +77,19 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { // (see `convert`), so the cast never executes natively and the result matches Spark by // definition. `CometLiteral` then validates the resulting literal's data type. Compatible() + } else if (involvesTimeType(cast)) { + // Casts to/from TIME have no native lowering yet. `convert` routes them through the JVM + // codegen dispatcher instead of falling the projection back to Spark; report `Compatible` + // here so the dispatch path is taken. + Compatible() } else { isSupported(cast.child.dataType, cast.dataType, cast.timeZoneId, evalMode(cast)) } } + private def involvesTimeType(cast: Cast): Boolean = + isTimeType(cast.child.dataType) || isTimeType(cast.dataType) + override def convert( cast: Cast, inputs: Seq[Attribute], @@ -90,6 +98,11 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { cast.child match { case _: Literal => exprToProtoInternal(Literal.create(cast.eval(), cast.dataType), inputs, binding) + case _ if involvesTimeType(cast) => + // Casts to/from TIME (`TimeType(_)` <-> string/integral/decimal, precision changes) all + // have full Spark codegen but no native lowering. Route through the JVM codegen + // dispatcher so the enclosing projection stays native. + CometScalaUDF.emitJvmCodegenDispatch(cast, inputs, binding) case _ => if (isAlwaysCastToNull(cast.child.dataType, cast.dataType, cometEvalMode)) { exprToProtoInternal(Literal.create(null, cast.dataType), inputs, binding) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index bc02e22da9..e0e11f1ec2 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -751,6 +751,12 @@ case class CometScanTypeChecker() extends DataTypeSupport with CometTypeShim { "native execution if your data does not contain unsigned small integers. " + CometConf.COMPAT_GUIDE false + case dt if isTimeType(dt) => + // The native Parquet reader has not been taught to decode the TIME logical type into + // an Arrow Time64(NANOSECOND) vector, so fall back to Spark for scans that expose one. + fallbackReasons += s"Unsupported $name of type $dt (native Parquet scan does not " + + "support TIME)" + false case dt if isStringCollationType(dt) => // we don't need specific support for collation in scans, but this // is a convenient place to force the whole query to fall back to Spark for now diff --git a/spark/src/main/scala/org/apache/comet/udf/codegen/CometScalaUDFCodegen.scala b/spark/src/main/scala/org/apache/comet/udf/codegen/CometScalaUDFCodegen.scala index f575dd5b53..bd300e1b5a 100644 --- a/spark/src/main/scala/org/apache/comet/udf/codegen/CometScalaUDFCodegen.scala +++ b/spark/src/main/scala/org/apache/comet/udf/codegen/CometScalaUDFCodegen.scala @@ -220,7 +220,7 @@ class CometScalaUDFCodegen extends CometUDF with Logging { case _: BitVector | _: TinyIntVector | _: SmallIntVector | _: IntVector | _: BigIntVector | _: Float4Vector | _: Float8Vector | _: DecimalVector | _: VarCharVector | _: VarBinaryVector | _: DateDayVector | _: TimeStampMicroVector | - _: TimeStampMicroTZVector => + _: TimeStampMicroTZVector | _: TimeNanoVector => ScalarColumnSpec(v.getClass.asInstanceOf[Class[_ <: ValueVector]], nullable = true) case other => throw new UnsupportedOperationException( diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala index 161a4e0553..5898114167 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala @@ -119,8 +119,8 @@ object CometLocalTableScanExec extends CometSink[LocalTableScanExec] with DataTy CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED) // ArrowWriter (used by RowArrowReader) handles NullType via Utils.toArrowType + NullWriter; - // other types off DataTypeSupport's allow list (TimeType, intervals, ...) have no ArrowWriter - // coverage and must fall back to Spark. + // TimeType routes through the new TimeNanoWriter. Other types off DataTypeSupport's allow list + // (intervals, ...) have no ArrowWriter coverage and must fall back to Spark. override def isTypeSupported( dt: DataType, name: String, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/ArrowWriters.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/ArrowWriters.scala index 342441ce28..acd2fb30f1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/ArrowWriters.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/ArrowWriters.scala @@ -66,6 +66,7 @@ private[arrow] object ArrowWriter { case (DateType, vector: DateDayVector) => new DateWriter(vector) case (TimestampType, vector: TimeStampMicroTZVector) => new TimestampWriter(vector) case (TimestampNTZType, vector: TimeStampMicroVector) => new TimestampNTZWriter(vector) + case (dt, vector: TimeNanoVector) if Utils.isTimeType(dt) => new TimeNanoWriter(vector) case (ArrayType(_, _), vector: ListVector) => val elementVector = createFieldWriter(vector.getDataVector()) new ArrayWriter(vector, elementVector) @@ -365,6 +366,17 @@ private[arrow] class TimestampNTZWriter(val valueVector: TimeStampMicroVector) } } +private[arrow] class TimeNanoWriter(val valueVector: TimeNanoVector) extends ArrowFieldWriter { + + override def setNull(): Unit = { + valueVector.setNull(count) + } + + override def setValue(input: SpecializedGetters, ordinal: Int): Unit = { + valueVector.setSafe(count, input.getLong(ordinal)) + } +} + private[arrow] class ArrayWriter(val valueVector: ListVector, val elementWriter: ArrowFieldWriter) extends ArrowFieldWriter { diff --git a/spark/src/main/spark-4.1+/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.1+/org/apache/comet/shims/CometExprShim.scala index 64474291e7..04fc6e26c2 100644 --- a/spark/src/main/spark-4.1+/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.1+/org/apache/comet/shims/CometExprShim.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.TimeType import org.apache.comet.expressions.CometEvalMode +import org.apache.comet.serde.CometScalaUDF import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithFallbackReason, scalarFunctionExprToProtoWithReturnType} @@ -46,6 +47,22 @@ trait CometExprShim extends Spark4xCometExprShim { } } + // Function names on `DateTimeUtils` reached via `StaticInvoke` from the Spark 4.1 `TIME` family of + // `RuntimeReplaceable` expressions (`HoursOfTime`, `MinutesOfTime`, `SecondsOfTime`, + // `SecondsOfTimeWithFraction`, `TimeAddInterval`, `SubtractTimes`, `TimeDiff`, `TimeTrunc`). + // Each of these is codegen-safe (the replacement itself is a `StaticInvoke` with a proper + // `doGenCode`) but has no native lowering yet, so we route them through the JVM codegen + // dispatcher to keep the enclosing projection native. + private val timeCodegenDispatchFunctions: Set[String] = Set( + "getHoursOfTime", + "getMinutesOfTime", + "getSecondsOfTime", + "getSecondsOfTimeWithFraction", + "timeAddInterval", + "subtractTimes", + "timeDiff", + "timeTrunc") + // Spark 4.1 introduced TimeType and the make_time / to_time / try_to_time functions. // Their planner forms differ from the shared 4.x patterns (DateTimeUtils.makeTime // StaticInvoke and ToTimeParser Invoke / TryEval(Invoke)), so they live here rather @@ -66,6 +83,14 @@ trait CometExprShim extends Spark4xCometExprShim { scalarFunctionExprToProtoWithReturnType("make_time", s.dataType, true, childExprs: _*) optExprWithFallbackReason(optExpr, expr, s.arguments: _*) + // Route the other Spark 4.1 TIME `StaticInvoke` forms through the JVM codegen dispatcher. + // `emitJvmCodegenDispatch` runs Spark's own `doGenCode` inside the Comet pipeline, so the + // projection stays native while behavior matches Spark exactly. + case s: StaticInvoke + if s.staticObject == classOf[DateTimeUtils.type] && + timeCodegenDispatchFunctions.contains(s.functionName) => + CometScalaUDF.emitJvmCodegenDispatch(s, inputs, binding) + case i: Invoke => (i.targetObject, i.functionName, i.arguments) match { case (Literal(parser: ToTimeParser, _), "parse", args) diff --git a/spark/src/test/resources/sql-tests/expressions/cast/cast_time.sql b/spark/src/test/resources/sql-tests/expressions/cast/cast_time.sql new file mode 100644 index 0000000000..663bafd585 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/cast/cast_time.sql @@ -0,0 +1,91 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- Casts to/from TIME have full Spark codegen but no native lowering in Comet. CometCast +-- routes them through the JVM codegen dispatcher so the enclosing projection stays +-- native and results match Spark exactly. + +statement +CREATE TABLE test_cast_time(s string, h int, m int, sec decimal(16,6)) USING parquet + +statement +INSERT INTO test_cast_time VALUES + ('00:00:00', 0, 0, 0.000000), + ('12:34:56', 12, 34, 56.000000), + ('23:59:59.999999',23, 59, 59.999999), + ('01:00:00', 1, 0, 0.000000), + (NULL, NULL, NULL, NULL) + +-- string -> TIME (column) +query +SELECT CAST(s AS TIME) FROM test_cast_time + +-- string -> TIME (literals) +query +SELECT CAST('00:00:00' AS TIME) + +query +SELECT CAST('23:59:59.999999' AS TIME) + +-- TIME -> string (column) +query +SELECT CAST(make_time(h, m, sec) AS STRING) FROM test_cast_time + +-- TIME -> string (literals) +query +SELECT CAST(TIME '00:00:00' AS STRING) + +query +SELECT CAST(TIME '13:45:07.123456' AS STRING) + +-- TIME -> BIGINT (whole seconds since midnight, via floor(nanos / 1e9)) +query +SELECT CAST(make_time(h, m, sec) AS BIGINT) FROM test_cast_time + +query +SELECT CAST(TIME '00:00:00' AS BIGINT) + +query +SELECT CAST(TIME '01:00:00' AS BIGINT) + +query +SELECT CAST(TIME '23:59:59.999999' AS BIGINT) + +-- TIME -> INT (same seconds-truncation semantics) +query +SELECT CAST(TIME '13:45:07.999999' AS INT) + +-- TIME -> DECIMAL (fractional-seconds-preserving, nanos-based) +query +SELECT CAST(TIME '13:45:07.123456' AS DECIMAL(18,6)) + +-- TIME -> TIME with different precision (truncates fractional digits) +query +SELECT CAST(TIME '13:45:07.123456' AS TIME(0)) + +query +SELECT CAST(TIME '13:45:07.123456' AS TIME(3)) + +-- NULL round-trips +query +SELECT CAST(CAST(NULL AS STRING) AS TIME) + +query +SELECT CAST(CAST(NULL AS TIME) AS STRING) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/current_time.sql b/spark/src/test/resources/sql-tests/expressions/datetime/current_time.sql new file mode 100644 index 0000000000..328160b654 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/current_time.sql @@ -0,0 +1,36 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- current_time() is CodegenFallback but foldable; Spark's optimizer folds it to a Time +-- literal before Comet sees the plan, so it never actually hits the codegen dispatcher. +-- These tests pin behavior rather than a specific timestamp value. + +query +SELECT current_time() IS NOT NULL + +query +SELECT current_time(0) IS NOT NULL + +query +SELECT current_time(6) IS NOT NULL + +-- Two calls in the same query return the same value +query +SELECT current_time() = current_time() diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/hour_time.sql b/spark/src/test/resources/sql-tests/expressions/datetime/hour_time.sql new file mode 100644 index 0000000000..685f31ceb5 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/hour_time.sql @@ -0,0 +1,51 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- hour(TIME) rewrites to HoursOfTime -> StaticInvoke(DateTimeUtils.getHoursOfTime). +-- The shim routes that StaticInvoke through the JVM codegen dispatcher. + +statement +CREATE TABLE test_hour_time(h int, m int, s decimal(16,6)) USING parquet + +statement +INSERT INTO test_hour_time VALUES + (0, 0, 0.000000), + (1, 2, 3.500000), + (12, 30, 45.123456), + (23, 59, 59.999999), + (NULL, 0, 0.000000) + +-- column argument built via make_time +query +SELECT hour(make_time(h, m, s)) FROM test_hour_time + +-- literal TIME arguments +query +SELECT hour(TIME '00:00:00') + +query +SELECT hour(TIME '13:45:00') + +query +SELECT hour(TIME '23:59:59.999999') + +-- null TIME +query +SELECT hour(CAST(NULL AS TIME)) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/minute_time.sql b/spark/src/test/resources/sql-tests/expressions/datetime/minute_time.sql new file mode 100644 index 0000000000..ba84ced853 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/minute_time.sql @@ -0,0 +1,48 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- minute(TIME) rewrites to MinutesOfTime -> StaticInvoke(DateTimeUtils.getMinutesOfTime). +-- The shim routes that StaticInvoke through the JVM codegen dispatcher. + +statement +CREATE TABLE test_minute_time(h int, m int, s decimal(16,6)) USING parquet + +statement +INSERT INTO test_minute_time VALUES + (0, 0, 0.000000), + (1, 2, 3.500000), + (12, 30, 45.123456), + (23, 59, 59.999999), + (0, NULL, 0.000000) + +query +SELECT minute(make_time(h, m, s)) FROM test_minute_time + +query +SELECT minute(TIME '00:00:00') + +query +SELECT minute(TIME '13:45:00') + +query +SELECT minute(TIME '23:59:59.999999') + +query +SELECT minute(CAST(NULL AS TIME)) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/second_time.sql b/spark/src/test/resources/sql-tests/expressions/datetime/second_time.sql new file mode 100644 index 0000000000..6ce4f0801b --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/second_time.sql @@ -0,0 +1,63 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- second(TIME) rewrites to SecondsOfTime -> StaticInvoke(DateTimeUtils.getSecondsOfTime). +-- EXTRACT(SECOND FROM TIME) rewrites to SecondsOfTimeWithFraction -> +-- StaticInvoke(DateTimeUtils.getSecondsOfTimeWithFraction) which returns Decimal(8,6). +-- Both StaticInvoke forms route through the JVM codegen dispatcher. + +statement +CREATE TABLE test_second_time(h int, m int, s decimal(16,6)) USING parquet + +statement +INSERT INTO test_second_time VALUES + (0, 0, 0.000000), + (1, 2, 3.500000), + (12, 30, 45.123456), + (23, 59, 59.999999), + (0, 0, NULL) + +query +SELECT second(make_time(h, m, s)) FROM test_second_time + +query +SELECT second(TIME '00:00:00') + +query +SELECT second(TIME '13:45:07') + +query +SELECT second(TIME '23:59:59.999999') + +query +SELECT second(CAST(NULL AS TIME)) + +-- EXTRACT(SECOND FROM TIME) preserves fractional part as Decimal(8,6) +query +SELECT EXTRACT(SECOND FROM make_time(h, m, s)) FROM test_second_time + +query +SELECT EXTRACT(SECOND FROM TIME '00:00:00') + +query +SELECT EXTRACT(SECOND FROM TIME '13:45:07.123456') + +query +SELECT EXTRACT(SECOND FROM TIME '23:59:59.999999') diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/subtract_times.sql b/spark/src/test/resources/sql-tests/expressions/datetime/subtract_times.sql new file mode 100644 index 0000000000..187192a046 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/subtract_times.sql @@ -0,0 +1,44 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- time - time rewrites to SubtractTimes -> StaticInvoke(DateTimeUtils.subtractTimes). +-- The shim routes it through the JVM codegen dispatcher; the result is +-- DayTimeIntervalType(HOUR, SECOND), which Comet's projection layer does not accept as an +-- output type yet, so these queries currently fall back to Spark. spark_answer_only pins +-- the semantics until DayTimeIntervalType is broadly supported. + +statement +CREATE TABLE test_time_sub(h1 int, m1 int, s1 decimal(16,6), h2 int, m2 int, s2 decimal(16,6)) USING parquet + +statement +INSERT INTO test_time_sub VALUES + (1, 0, 0.000000, 2, 15, 30.000000), + (10, 0, 0.000000, 9, 59, 59.500000), + (0, 0, 0.000000, 23, 59, 59.999999), + (NULL, 0, 0.000000, 0, 0, 0.000000) + +query spark_answer_only +SELECT make_time(h2, m2, s2) - make_time(h1, m1, s1) FROM test_time_sub + +query spark_answer_only +SELECT TIME '02:15:30' - TIME '01:00:00' + +query spark_answer_only +SELECT TIME '09:59:59.5' - TIME '10:00:00' diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/time_add_interval.sql b/spark/src/test/resources/sql-tests/expressions/datetime/time_add_interval.sql new file mode 100644 index 0000000000..82044fb08f --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/time_add_interval.sql @@ -0,0 +1,59 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- time + day-time interval rewrites to TimeAddInterval -> +-- StaticInvoke(DateTimeUtils.timeAddInterval). Routes through the JVM codegen dispatcher. + +statement +CREATE TABLE test_time_add(h int, m int, s decimal(16,6)) USING parquet + +statement +INSERT INTO test_time_add VALUES + (0, 0, 0.000000), + (10, 15, 30.000000), + (22, 30, 0.000000), + (12, 34, 56.789012), + (NULL, 0, 0.000000) + +-- add an hour to a TIME column +query +SELECT make_time(h, m, s) + INTERVAL '1' HOUR FROM test_time_add + +-- add fractional-second interval +query +SELECT make_time(h, m, s) + INTERVAL '0.500' SECOND FROM test_time_add + +-- literal TIME + literal interval +query +SELECT TIME '00:00:00' + INTERVAL '1' HOUR + +query +SELECT TIME '22:30:00' + INTERVAL '30' MINUTE + +query +SELECT TIME '10:00:00' + INTERVAL '0.5' SECOND + +-- NULL TIME +query +SELECT CAST(NULL AS TIME) + INTERVAL '1' HOUR + +-- NULL interval +query +SELECT TIME '10:00:00' + CAST(NULL AS INTERVAL HOUR) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/time_diff.sql b/spark/src/test/resources/sql-tests/expressions/datetime/time_diff.sql new file mode 100644 index 0000000000..d190bc5461 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/time_diff.sql @@ -0,0 +1,65 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- time_diff(unit, start, end) rewrites to TimeDiff -> +-- StaticInvoke(DateTimeUtils.timeDiff). Result is LongType. Routes through the JVM codegen +-- dispatcher. + +statement +CREATE TABLE test_time_diff(h1 int, m1 int, s1 decimal(16,6), h2 int, m2 int, s2 decimal(16,6)) USING parquet + +statement +INSERT INTO test_time_diff VALUES + (20, 30, 29.000000, 21, 30, 29.000000), + (0, 0, 0.000000, 0, 0, 1.000000), + (20, 30, 29.000000, 12, 0, 0.000000), + (0, 0, 0.000000, 23, 59, 59.999999), + (NULL, 0, 0.000000, 0, 0, 0.000000) + +-- column arguments across all supported units +query +SELECT time_diff('HOUR', make_time(h1, m1, s1), make_time(h2, m2, s2)) FROM test_time_diff + +query +SELECT time_diff('MINUTE', make_time(h1, m1, s1), make_time(h2, m2, s2)) FROM test_time_diff + +query +SELECT time_diff('SECOND', make_time(h1, m1, s1), make_time(h2, m2, s2)) FROM test_time_diff + +query +SELECT time_diff('MILLISECOND', make_time(h1, m1, s1), make_time(h2, m2, s2)) FROM test_time_diff + +query +SELECT time_diff('MICROSECOND', make_time(h1, m1, s1), make_time(h2, m2, s2)) FROM test_time_diff + +-- literal TIME arguments +query +SELECT time_diff('HOUR', TIME '20:30:29', TIME '21:30:29') + +query +SELECT time_diff('SECOND', TIME '00:00:00', TIME '23:59:59.999999') + +-- negative difference (start > end) +query +SELECT time_diff('HOUR', TIME '20:30:29', TIME '12:00:00') + +-- lowercase unit +query +SELECT time_diff('second', TIME '00:00:00', TIME '00:00:01') diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/time_trunc.sql b/spark/src/test/resources/sql-tests/expressions/datetime/time_trunc.sql new file mode 100644 index 0000000000..5f9b25c50b --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/time_trunc.sql @@ -0,0 +1,62 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true + +-- time_trunc(unit, time) rewrites to TimeTrunc -> StaticInvoke(DateTimeUtils.timeTrunc). +-- Routes through the JVM codegen dispatcher. + +statement +CREATE TABLE test_time_trunc(h int, m int, s decimal(16,6)) USING parquet + +statement +INSERT INTO test_time_trunc VALUES + (9, 32, 5.359000), + (23, 59, 59.999999), + (0, 0, 0.000000), + (12, 34, 56.123456), + (NULL, 0, 0.000000) + +query +SELECT time_trunc('HOUR', make_time(h, m, s)) FROM test_time_trunc + +query +SELECT time_trunc('MINUTE', make_time(h, m, s)) FROM test_time_trunc + +query +SELECT time_trunc('SECOND', make_time(h, m, s)) FROM test_time_trunc + +query +SELECT time_trunc('MILLISECOND', make_time(h, m, s)) FROM test_time_trunc + +query +SELECT time_trunc('MICROSECOND', make_time(h, m, s)) FROM test_time_trunc + +-- literal TIME arguments +query +SELECT time_trunc('HOUR', TIME '09:32:05.359') + +query +SELECT time_trunc('MILLISECOND', TIME '09:32:05.123456') + +query +SELECT time_trunc('SECOND', TIME '23:59:59.999999') + +-- lowercase unit +query +SELECT time_trunc('hour', TIME '13:45:07.999999') diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index dce303b8c1..a01f6ef8bd 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -3940,7 +3940,7 @@ class CometExecSuite extends CometTestBase { } } - test("CometLocalTableScanExec falls back when schema contains TimeType") { + test("CometLocalTableScanExec handles TimeType column") { assume( org.apache.comet.CometSparkSessionExtensions.isSpark41Plus, "TimeType requires Spark 4.1+") @@ -3949,10 +3949,10 @@ class CometExecSuite extends CometTestBase { withSQLConf( "spark.sql.timeType.enabled" -> "true", CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { - // VALUES folds to a LocalRelation, exercising the CometLocalTableScanExec convert - // path; the TimeType column should drive the schema-level fallback. + // VALUES folds to a LocalRelation, exercising the CometLocalTableScanExec convert path. + // TimeType routes through TimeNanoWriter, so the native scan handles it end-to-end. val df = spark.sql("SELECT * FROM VALUES (TIME '12:34:56'), (TIME '01:02:03') AS t(c)") - checkSparkAnswer(df) + checkSparkAnswerAndOperator(df) } }