Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/source/user-guide/latest/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ The type-name conversion functions (`bigint`, `binary`, `boolean`, `date`, `deci
| `convert_timezone` | ✅ | Routes through the JVM codegen dispatcher by default (handles all timezone forms); the native path is opt-in via allowIncompatible ([details](compatibility/expressions/datetime.md)) |
| `curdate` | ✅ | Constant-folded to a literal (alias of `current_date`) |
| `current_date` | ✅ | Constant-folded to a literal before Comet sees the plan |
| `current_time` | 🔜 | Blocked on Spark 4.1 TIME type support ([#4288](https://github.com/apache/datafusion-comet/issues/4288)) |
| `current_time` | | Constant-folded to a literal before Comet sees the plan (Spark 4.1+) |
| `current_timestamp` | ✅ | Constant-folded to a literal before Comet sees the plan |
| `current_timezone` | ✅ | |
| `date_add` | ✅ | |
Expand All @@ -264,7 +264,7 @@ The type-name conversion functions (`bigint`, `binary`, `boolean`, `date`, `deci
| `make_date` | ✅ | |
| `make_dt_interval` | ✅ | |
| `make_interval` | 🔜 | Produces legacy CalendarInterval; tracked by [#4540](https://github.com/apache/datafusion-comet/issues/4540) |
| `make_time` | 🔜 | Spark 4.1 TIME type; tracked by [#4288](https://github.com/apache/datafusion-comet/issues/4288) |
| `make_time` | | Spark 4.1+ |
| `make_timestamp` | ✅ | |
| `make_timestamp_ltz` | ✅ | 2-arg TIME form falls back |
| `make_timestamp_ntz` | ✅ | 2-arg TIME form falls back |
Expand All @@ -278,13 +278,13 @@ The type-name conversion functions (`bigint`, `binary`, `boolean`, `date`, `deci
| `quarter` | ✅ | |
| `second` | ✅ | |
| `session_window` | 🔜 | Batch session-window grouping falls back (`UpdatingSessionsExec` is not yet native); tracked by [#4785](https://github.com/apache/datafusion-comet/issues/4785) |
| `time_diff` | 🔜 | Spark 4.1 TIME type; tracked by [#4288](https://github.com/apache/datafusion-comet/issues/4288) |
| `time_trunc` | 🔜 | Spark 4.1 TIME type; tracked by [#4288](https://github.com/apache/datafusion-comet/issues/4288) |
| `time_diff` | | Spark 4.1+; routes through the JVM codegen dispatcher |
| `time_trunc` | | Spark 4.1+; routes through the JVM codegen dispatcher |
| `timestamp_micros` | ✅ | |
| `timestamp_millis` | ✅ | |
| `timestamp_seconds` | ✅ | |
| `to_date` | ✅ | Rewrites to `Cast` (or `Cast(GetTimestamp)` with a format) before Comet sees the plan |
| `to_time` | 🔜 | Spark 4.1 TIME type; tracked by [#4288](https://github.com/apache/datafusion-comet/issues/4288) |
| `to_time` | | Spark 4.1+ |
| `to_timestamp` | ✅ | Rewrites to `Cast` (or `GetTimestamp` with a format) before Comet sees the plan |
| `to_timestamp_ltz` | ✅ | Rewrites to `to_timestamp` (`TimestampType`) |
| `to_timestamp_ntz` | ✅ | Rewrites to `to_timestamp` (`TimestampNTZType`) |
Expand All @@ -294,7 +294,7 @@ The type-name conversion functions (`bigint`, `binary`, `boolean`, `date`, `deci
| `try_make_interval` | 🔜 | Produces legacy CalendarInterval; tracked by [#4540](https://github.com/apache/datafusion-comet/issues/4540) |
| `try_make_timestamp` | ✅ | |
| `try_to_date` | 🔜 | Rewrites to `Cast`/`GetTimestamp` but currently falls back; tracked by [#4556](https://github.com/apache/datafusion-comet/issues/4556) |
| `try_to_time` | 🔜 | Spark 4.1 TIME type; tracked by [#4288](https://github.com/apache/datafusion-comet/issues/4288) |
| `try_to_time` | | Spark 4.1+ |
| `try_to_timestamp` | 🔜 | Rewrites to `Cast`/`GetTimestamp` but currently falls back; tracked by [#4556](https://github.com/apache/datafusion-comet/issues/4556) |
| `unix_date` | ✅ | |
| `unix_micros` | ✅ | |
Expand Down
3 changes: 3 additions & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,9 @@ impl PhysicalPlanner {
DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => {
ScalarValue::TimestampMicrosecond(Some(*value), Some(tz))
}
DataType::Time64(TimeUnit::Nanosecond) => {
ScalarValue::Time64Nanosecond(Some(*value))
}
dt => {
return Err(GeneralError(format!(
"Expected either 'Int64' or 'Timestamp' for LongVal, but found {dt:?}"
Expand Down
5 changes: 4 additions & 1 deletion spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.types._

import org.apache.comet.DataTypeSupport.{ARRAY_ELEMENT, MAP_KEY, MAP_VALUE}
import org.apache.comet.shims.CometTypeShim

trait DataTypeSupport {
trait DataTypeSupport extends CometTypeShim {

/**
* Checks if this schema is supported by checking if each field in the schema is supported.
Expand Down Expand Up @@ -53,6 +54,8 @@ trait DataTypeSupport {
BinaryType | StringType | _: DecimalType | DateType | TimestampType |
TimestampNTZType =>
true
case dt if isTimeType(dt) =>
true
case StructType(fields) =>
fields.nonEmpty && fields.forall(f =>
isTypeSupported(f.dataType, f.name, fallbackReasons))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import org.apache.comet.shims.CometExprTraitShim
import org.apache.comet.shims.{CometExprTraitShim, CometTypeShim}

/**
* Compiles a bound [[Expression]] plus an Arrow input schema into a [[CometBatchKernel]] that
Expand All @@ -49,7 +49,7 @@ import org.apache.comet.shims.CometExprTraitShim
* The generated kernel is the `InternalRow` that Spark's `BoundReference.genCode` reads from. See
* [[generateSource]] for how the wiring is set up.
*/
object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
object CometBatchKernelCodegen extends Logging with CometExprTraitShim with CometTypeShim {

/**
* Resolve an Arrow vector class by simple name through the codegen object's own classloader.
Expand All @@ -69,6 +69,7 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
case "DateDayVector" => classOf[DateDayVector]
case "TimeStampMicroVector" => classOf[TimeStampMicroVector]
case "TimeStampMicroTZVector" => classOf[TimeStampMicroTZVector]
case "TimeNanoVector" => classOf[TimeNanoVector]
case "VarCharVector" => classOf[VarCharVector]
case "VarBinaryVector" => classOf[VarBinaryVector]
case "IntervalYearVector" => classOf[IntervalYearVector]
Expand All @@ -87,6 +88,7 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
case _: StringType | _: BinaryType => true
case DateType | TimestampType | TimestampNTZType => true
case _: YearMonthIntervalType | _: DayTimeIntervalType => true
case dt if isTimeType(dt) => true
case ArrayType(inner, _) => isSupportedDataType(inner)
case st: StructType => st.fields.forall(f => isSupportedDataType(f.dataType))
case mt: MapType => isSupportedDataType(mt.keyType) && isSupportedDataType(mt.valueType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ private[codegen] object CometBatchKernelCodegenInput {
classOf[Float8Vector],
classOf[DateDayVector],
classOf[TimeStampMicroVector],
classOf[TimeStampMicroTZVector])
classOf[TimeStampMicroTZVector],
classOf[TimeNanoVector])
private val cometPlainVectorName: String = classOf[CometPlainVector].getName

/** Emit kernel typed-vector field declarations for every level of every input column. */
Expand Down Expand Up @@ -134,7 +135,8 @@ private[codegen] object CometBatchKernelCodegenInput {
case (ArrowColumnSpec(cls, _), ord)
if cls == classOf[BigIntVector] ||
cls == classOf[TimeStampMicroVector] ||
cls == classOf[TimeStampMicroTZVector] =>
cls == classOf[TimeStampMicroTZVector] ||
cls == classOf[TimeNanoVector] =>
s" case $ord: return this.col$ord.getLong(this.rowIdx);"
}
val floatCases = withOrd.collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.types._

import org.apache.comet.CometArrowAllocator
import org.apache.comet.shims.CometTypeShim

/**
* Output-side emitters for the codegen kernel: [[allocateOutput]], [[emitOutputWriter]]
* (top-level write entry), [[emitWrite]] (recursive per-type write), the output vector-class
* lookup. Paired with [[CometBatchKernelCodegenInput]] on the read side.
*/
private[codegen] object CometBatchKernelCodegenOutput {
private[codegen] object CometBatchKernelCodegenOutput extends CometTypeShim {

/**
* Spark `DataType` to an Arrow `Field` with names Comet expects on FFI export. Spark's
Expand Down Expand Up @@ -173,6 +174,7 @@ private[codegen] object CometBatchKernelCodegenOutput {
case TimestampNTZType => classOf[TimeStampMicroVector].getName
case _: YearMonthIntervalType => classOf[IntervalYearVector].getName
case _: DayTimeIntervalType => classOf[DurationVector].getName
case dt if isTimeType(dt) => classOf[TimeNanoVector].getName
case _: ArrayType => classOf[ListVector].getName
case _: StructType => classOf[StructVector].getName
case _: MapType => classOf[MapVector].getName
Expand Down Expand Up @@ -216,6 +218,10 @@ private[codegen] object CometBatchKernelCodegenOutput {
// DayTimeIntervalType -> DurationVector.set(int, long micros).
val set = if (nested) "setSafe" else "set"
OutputEmit("", s"$targetVec.$set($idx, $source);")
case dt if isTimeType(dt) =>
// TIME is stored as nanoseconds-of-day in a Long, matching TimeNanoVector.set(int, long).
val set = if (nested) "setSafe" else "set"
OutputEmit("", s"$targetVec.$set($idx, $source);")
case dt: DecimalType =>
// DecimalOutputShortFastPath: precision <= 18 fits in a signed long, so pass the unscaled
// value to `setSafe(int, long)` and skip the BigDecimal allocation.
Expand Down Expand Up @@ -398,6 +404,7 @@ private[codegen] object CometBatchKernelCodegenOutput {
case ShortType => s"$target.getShort($idx)"
case IntegerType | DateType => s"$target.getInt($idx)"
case LongType | TimestampType | TimestampNTZType => s"$target.getLong($idx)"
case dt if isTimeType(dt) => s"$target.getLong($idx)"
case FloatType => s"$target.getFloat($idx)"
case DoubleType => s"$target.getDouble($idx)"
case dt: DecimalType => s"$target.getDecimal($idx, ${dt.precision}, ${dt.scale})"
Expand Down
17 changes: 15 additions & 2 deletions spark/src/main/scala/org/apache/comet/expressions/CometCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType,

import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.{isSpark40Plus, withFallbackReason}
import org.apache.comet.serde.{CometExpressionSerde, Compatible, ExprOuterClass, Incompatible, SupportLevel, Unsupported}
import org.apache.comet.serde.{CometExpressionSerde, CometScalaUDF, Compatible, ExprOuterClass, Incompatible, SupportLevel, Unsupported}
import org.apache.comet.serde.ExprOuterClass.Expr
import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProtoInternal, serializeDataType}
import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProtoInternal, isTimeType, serializeDataType}
import org.apache.comet.shims.CometExprShim

object CometCast extends CometExpressionSerde[Cast] with CometExprShim {
Expand Down Expand Up @@ -77,11 +77,19 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim {
// (see `convert`), so the cast never executes natively and the result matches Spark by
// definition. `CometLiteral` then validates the resulting literal's data type.
Compatible()
} else if (involvesTimeType(cast)) {
// Casts to/from TIME have no native lowering yet. `convert` routes them through the JVM
// codegen dispatcher instead of falling the projection back to Spark; report `Compatible`
// here so the dispatch path is taken.
Compatible()
} else {
isSupported(cast.child.dataType, cast.dataType, cast.timeZoneId, evalMode(cast))
}
}

private def involvesTimeType(cast: Cast): Boolean =
isTimeType(cast.child.dataType) || isTimeType(cast.dataType)

override def convert(
cast: Cast,
inputs: Seq[Attribute],
Expand All @@ -90,6 +98,11 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim {
cast.child match {
case _: Literal =>
exprToProtoInternal(Literal.create(cast.eval(), cast.dataType), inputs, binding)
case _ if involvesTimeType(cast) =>
// Casts to/from TIME (`TimeType(_)` <-> string/integral/decimal, precision changes) all
// have full Spark codegen but no native lowering. Route through the JVM codegen
// dispatcher so the enclosing projection stays native.
CometScalaUDF.emitJvmCodegenDispatch(cast, inputs, binding)
case _ =>
if (isAlwaysCastToNull(cast.child.dataType, cast.dataType, cometEvalMode)) {
exprToProtoInternal(Literal.create(null, cast.dataType), inputs, binding)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,12 @@ case class CometScanTypeChecker() extends DataTypeSupport with CometTypeShim {
"native execution if your data does not contain unsigned small integers. " +
CometConf.COMPAT_GUIDE
false
case dt if isTimeType(dt) =>
// The native Parquet reader has not been taught to decode the TIME logical type into
// an Arrow Time64(NANOSECOND) vector, so fall back to Spark for scans that expose one.
fallbackReasons += s"Unsupported $name of type $dt (native Parquet scan does not " +
"support TIME)"
false
case dt if isStringCollationType(dt) =>
// we don't need specific support for collation in scans, but this
// is a convenient place to force the whole query to fall back to Spark for now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ class CometScalaUDFCodegen extends CometUDF with Logging {
case _: BitVector | _: TinyIntVector | _: SmallIntVector | _: IntVector | _: BigIntVector |
_: Float4Vector | _: Float8Vector | _: DecimalVector | _: VarCharVector |
_: VarBinaryVector | _: DateDayVector | _: TimeStampMicroVector |
_: TimeStampMicroTZVector =>
_: TimeStampMicroTZVector | _: TimeNanoVector =>
ScalarColumnSpec(v.getClass.asInstanceOf[Class[_ <: ValueVector]], nullable = true)
case other =>
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ object CometLocalTableScanExec extends CometSink[LocalTableScanExec] with DataTy
CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED)

// ArrowWriter (used by RowArrowReader) handles NullType via Utils.toArrowType + NullWriter;
// other types off DataTypeSupport's allow list (TimeType, intervals, ...) have no ArrowWriter
// coverage and must fall back to Spark.
// TimeType routes through the new TimeNanoWriter. Other types off DataTypeSupport's allow list
// (intervals, ...) have no ArrowWriter coverage and must fall back to Spark.
override def isTypeSupported(
dt: DataType,
name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ private[arrow] object ArrowWriter {
case (DateType, vector: DateDayVector) => new DateWriter(vector)
case (TimestampType, vector: TimeStampMicroTZVector) => new TimestampWriter(vector)
case (TimestampNTZType, vector: TimeStampMicroVector) => new TimestampNTZWriter(vector)
case (dt, vector: TimeNanoVector) if Utils.isTimeType(dt) => new TimeNanoWriter(vector)
case (ArrayType(_, _), vector: ListVector) =>
val elementVector = createFieldWriter(vector.getDataVector())
new ArrayWriter(vector, elementVector)
Expand Down Expand Up @@ -365,6 +366,17 @@ private[arrow] class TimestampNTZWriter(val valueVector: TimeStampMicroVector)
}
}

private[arrow] class TimeNanoWriter(val valueVector: TimeNanoVector) extends ArrowFieldWriter {

override def setNull(): Unit = {
valueVector.setNull(count)
}

override def setValue(input: SpecializedGetters, ordinal: Int): Unit = {
valueVector.setSafe(count, input.getLong(ordinal))
}
}

private[arrow] class ArrayWriter(val valueVector: ListVector, val elementWriter: ArrowFieldWriter)
extends ArrowFieldWriter {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.TimeType

import org.apache.comet.expressions.CometEvalMode
import org.apache.comet.serde.CometScalaUDF
import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr}
import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithFallbackReason, scalarFunctionExprToProtoWithReturnType}

Expand All @@ -46,6 +47,22 @@ trait CometExprShim extends Spark4xCometExprShim {
}
}

// Function names on `DateTimeUtils` reached via `StaticInvoke` from the Spark 4.1 `TIME` family of
// `RuntimeReplaceable` expressions (`HoursOfTime`, `MinutesOfTime`, `SecondsOfTime`,
// `SecondsOfTimeWithFraction`, `TimeAddInterval`, `SubtractTimes`, `TimeDiff`, `TimeTrunc`).
// Each of these is codegen-safe (the replacement itself is a `StaticInvoke` with a proper
// `doGenCode`) but has no native lowering yet, so we route them through the JVM codegen
// dispatcher to keep the enclosing projection native.
private val timeCodegenDispatchFunctions: Set[String] = Set(
"getHoursOfTime",
"getMinutesOfTime",
"getSecondsOfTime",
"getSecondsOfTimeWithFraction",
"timeAddInterval",
"subtractTimes",
"timeDiff",
"timeTrunc")

// Spark 4.1 introduced TimeType and the make_time / to_time / try_to_time functions.
// Their planner forms differ from the shared 4.x patterns (DateTimeUtils.makeTime
// StaticInvoke and ToTimeParser Invoke / TryEval(Invoke)), so they live here rather
Expand All @@ -66,6 +83,14 @@ trait CometExprShim extends Spark4xCometExprShim {
scalarFunctionExprToProtoWithReturnType("make_time", s.dataType, true, childExprs: _*)
optExprWithFallbackReason(optExpr, expr, s.arguments: _*)

// Route the other Spark 4.1 TIME `StaticInvoke` forms through the JVM codegen dispatcher.
// `emitJvmCodegenDispatch` runs Spark's own `doGenCode` inside the Comet pipeline, so the
// projection stays native while behavior matches Spark exactly.
case s: StaticInvoke
if s.staticObject == classOf[DateTimeUtils.type] &&
timeCodegenDispatchFunctions.contains(s.functionName) =>
CometScalaUDF.emitJvmCodegenDispatch(s, inputs, binding)

case i: Invoke =>
(i.targetObject, i.functionName, i.arguments) match {
case (Literal(parser: ToTimeParser, _), "parse", args)
Expand Down
Loading
Loading