Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
768b3e9
impl map_from_entries
Dec 14, 2025
c68c342
Revert "impl map_from_entries"
Dec 16, 2025
d887555
Merge branch 'apache:main' into main
kazantsev-maksim Dec 16, 2025
231aa90
Merge branch 'apache:main' into main
kazantsev-maksim Dec 17, 2025
9500bbb
Merge branch 'apache:main' into main
kazantsev-maksim Dec 24, 2025
9577481
Merge branch 'apache:main' into main
kazantsev-maksim Dec 28, 2025
3791557
Merge branch 'apache:main' into main
kazantsev-maksim Jan 2, 2026
7c2f082
Merge branch 'apache:main' into main
kazantsev-maksim Jan 3, 2026
609a605
Merge branch 'apache:main' into main
kazantsev-maksim Jan 6, 2026
a151b2c
Merge branch 'apache:main' into main
kazantsev-maksim Jan 7, 2026
ad3e7f5
Merge branch 'apache:main' into main
kazantsev-maksim Jan 10, 2026
ea92e4b
Merge branch 'apache:main' into main
kazantsev-maksim Jan 14, 2026
8dfeca3
Merge branch 'apache:main' into main
kazantsev-maksim Jan 17, 2026
559741e
Merge branch 'apache:main' into main
kazantsev-maksim Jan 20, 2026
ebda14e
Merge branch 'apache:main' into main
kazantsev-maksim Jan 21, 2026
408152e
Merge branch 'apache:main' into main
kazantsev-maksim Jan 23, 2026
d7857b2
Merge branch 'apache:main' into main
kazantsev-maksim Jan 24, 2026
aef41be
Merge branch 'apache:main' into main
kazantsev-maksim Jan 29, 2026
5ac1c58
Merge branch 'apache:main' into main
kazantsev-maksim Jan 30, 2026
9ae8e23
Merge branch 'apache:main' into main
kazantsev-maksim Feb 1, 2026
5ca3888
Merge branch 'apache:main' into main
kazantsev-maksim Feb 4, 2026
160a817
Merge branch 'apache:main' into main
kazantsev-maksim Feb 5, 2026
88fc313
Merge branch 'apache:main' into main
kazantsev-maksim Feb 7, 2026
e14c180
Merge branch 'apache:main' into main
kazantsev-maksim Feb 13, 2026
610a885
Merge branch 'apache:main' into main
kazantsev-maksim Feb 20, 2026
f8acb2c
Merge branch 'apache:main' into main
kazantsev-maksim Feb 21, 2026
ec94897
Merge branch 'apache:main' into main
kazantsev-maksim Feb 26, 2026
43405e4
Merge branch 'apache:main' into main
kazantsev-maksim Feb 27, 2026
47b4915
Merge branch 'apache:main' into main
kazantsev-maksim Mar 1, 2026
26e2682
Merge branch 'apache:main' into main
kazantsev-maksim Mar 3, 2026
6cb5f07
Merge branch 'apache:main' into main
kazantsev-maksim Mar 4, 2026
ec194fb
Merge branch 'apache:main' into main
kazantsev-maksim Mar 31, 2026
256fccb
Merge branch 'apache:main' into main
kazantsev-maksim Apr 3, 2026
912c8f9
Merge branch 'apache:main' into main
kazantsev-maksim Apr 3, 2026
561a664
Merge branch 'apache:main' into main
kazantsev-maksim Apr 8, 2026
d926ef4
Merge branch 'apache:main' into main
kazantsev-maksim Apr 11, 2026
671412c
Merge branch 'apache:main' into main
kazantsev-maksim Apr 17, 2026
c9f52d1
Merge branch 'apache:main' into main
kazantsev-maksim Apr 22, 2026
67f72d9
Merge branch 'apache:main' into main
kazantsev-maksim Apr 23, 2026
314e594
Merge branch 'apache:main' into main
kazantsev-maksim Apr 24, 2026
ac8292f
Merge branch 'apache:main' into main
kazantsev-maksim May 1, 2026
c9c140e
Merge branch 'apache:main' into main
kazantsev-maksim May 7, 2026
decca58
Merge branch 'apache:main' into main
kazantsev-maksim May 13, 2026
0919b33
Merge branch 'apache:main' into main
kazantsev-maksim May 16, 2026
7495e21
Merge branch 'apache:main' into main
kazantsev-maksim May 19, 2026
0a37a60
Merge branch 'apache:main' into main
kazantsev-maksim May 21, 2026
abbba84
Merge branch 'apache:main' into main
kazantsev-maksim May 25, 2026
6020560
Merge branch 'apache:main' into main
kazantsev-maksim May 28, 2026
e2bdfb1
Merge branch 'apache:main' into main
kazantsev-maksim May 31, 2026
3edfc33
Merge branch 'apache:main' into main
kazantsev-maksim Jun 3, 2026
a39e860
Merge branch 'apache:main' into main
kazantsev-maksim Jun 4, 2026
e88dd7b
Merge branch 'apache:main' into main
kazantsev-maksim Jun 5, 2026
3e29d37
Merge branch 'apache:main' into main
kazantsev-maksim Jun 7, 2026
4068359
Merge branch 'apache:main' into main
kazantsev-maksim Jun 12, 2026
a3cb8de
Merge branch 'apache:main' into main
kazantsev-maksim Jun 13, 2026
b33726f
Merge branch 'apache:main' into main
kazantsev-maksim Jun 21, 2026
698f7a1
Merge branch 'apache:main' into main
kazantsev-maksim Jun 22, 2026
18162a6
Merge branch 'apache:main' into main
kazantsev-maksim Jun 23, 2026
6b0d500
Support native DataFusion lambda functions
Jun 28, 2026
4281483
more tests
Jun 29, 2026
d157db2
Merge branch 'main' into array_filter
kazantsev-maksim Jun 29, 2026
d056f64
fix
Jun 29, 2026
6037e7a
Merge remote-tracking branch 'origin/array_filter' into array_filter
Jun 29, 2026
30f06f4
Fix PR issues
Jul 1, 2026
47f2726
Fix PR issues
Jul 1, 2026
435365a
Fix PR issues
Jul 1, 2026
6f6eb6f
Merge branch 'apache:main' into main
kazantsev-maksim Jul 1, 2026
c21a42e
Merge branch 'apache:main' into main
kazantsev-maksim Jul 2, 2026
528d392
Merge remote-tracking branch 'origin/main' into array_filter
Jul 2, 2026
618ae48
Merge branch 'apache:main' into main
kazantsev-maksim Jul 3, 2026
b6f795d
Merge remote-tracking branch 'refs/remotes/origin/main' into array_fi…
Jul 3, 2026
fc13227
fix PR issues
Jul 3, 2026
9102c3d
Fix PR issues
Jul 3, 2026
4d068e3
Merge branch 'apache:main' into main
kazantsev-maksim Jul 3, 2026
6ef4112
Merge remote-tracking branch 'origin/main' into array_filter
Jul 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 102 additions & 6 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ use datafusion::{
prelude::SessionContext,
};
use datafusion_comet_spark_expr::{
create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, BinaryOutputStyle,
BloomFilterAgg, BloomFilterMightContain, CsvWriteOptions, EvalMode, SparkArraysZipFunc,
SparkBloomFilterVersion, SumInteger, ToCsv,
create_comet_hof_func, create_comet_physical_fun, create_comet_physical_fun_with_eval_mode,
BinaryOutputStyle, BloomFilterAgg, BloomFilterMightContain, CsvWriteOptions, EvalMode,
SparkArraysZipFunc, SparkBloomFilterVersion, SumInteger, ToCsv,
};
use datafusion_spark::function::aggregate::collect::SparkCollectSet;
use iceberg::expr::Bind;
Expand All @@ -95,9 +95,9 @@ use datafusion::logical_expr::{
AggregateUDF, ReturnFieldArgs, ScalarUDF, TypeSignature, WindowFrame, WindowFrameBound,
WindowFrameUnits, WindowFunctionDefinition,
};
use datafusion::physical_expr::expressions::{Literal, StatsType};
use datafusion::physical_expr::expressions::{LambdaExpr, LambdaVariable, Literal, StatsType};
use datafusion::physical_expr::window::WindowExpr;
use datafusion::physical_expr::LexOrdering;
use datafusion::physical_expr::{HigherOrderFunctionExpr, LexOrdering};

use crate::parquet::parquet_exec::init_datasource_exec;
use arrow::array::{
Expand All @@ -113,7 +113,7 @@ use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::joins::NestedLoopJoinExec;
use datafusion::physical_plan::limit::GlobalLimitExec;
use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
use datafusion_comet_proto::spark_expression::ListLiteral;
use datafusion_comet_proto::spark_expression::{HigherOrderFunc, LambdaFunction, ListLiteral};
use datafusion_comet_proto::spark_operator::SparkFilePartition;
use datafusion_comet_proto::{
spark_expression::{
Expand Down Expand Up @@ -532,6 +532,19 @@ impl PhysicalPlanner {
_ => func,
}
}
ExprStruct::HighOrderFunc(hof) => {
self.create_high_order_function_expr(hof, input_schema)
}
ExprStruct::NamedLambdaVariable(nlv) => {
let idx = input_schema.index_of(&nlv.name).map_err(|_| {
GeneralError(format!(
"NamedLambdaVariable '{}' not found in enclosing lambda schema",
nlv.name
))
})?;
let field = Arc::clone(&input_schema.fields()[idx]);
Ok(Arc::new(LambdaVariable::new(idx, field)))
}
ExprStruct::CaseWhen(case_when) => {
let when_then_pairs = case_when
.when
Expand Down Expand Up @@ -3113,6 +3126,89 @@ impl PhysicalPlanner {
}
}

fn create_high_order_function_expr(
&self,
expr: &HigherOrderFunc,
input_schema: SchemaRef,
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
let comet_hof_func =
create_comet_hof_func(expr.func_name.as_str(), &self.session_ctx.state())?;

let value_args = expr
.value_args
.iter()
.map(|e| self.create_expr(e, Arc::clone(&input_schema)))
.collect::<Result<Vec<_>, _>>()?;

let lambdas = expr
.lambdas
.iter()
.map(|l| self.create_lambda_expr(l, &input_schema))
.collect::<Result<Vec<_>, _>>()?;

// NOTE: assumes all value arguments precede all lambda arguments.
// Holds for array_filter and the current single-lambda HOFs, but would
// NOT generalize to a future HOF with interleaved value/lambda args
// (e.g. f(value, lambda, value, lambda)). Revisit this split if such a
// function is added.
let mut args: Vec<Arc<dyn PhysicalExpr>> =
Vec::with_capacity(value_args.len() + lambdas.len());
args.extend(value_args);
args.extend(lambdas);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A short comment that this assumes all value args precede all lambdas would help. That holds for array_filter and the current single-lambda functions, but would not generalize to a future HOF with interleaved value and lambda args.


let higher_order_function_expr = HigherOrderFunctionExpr::try_new_with_schema(
comet_hof_func,
args,
&input_schema,
Arc::new(ConfigOptions::default()),
)?;

Ok(Arc::new(higher_order_function_expr))
}

fn create_lambda_expr(
&self,
lambda: &LambdaFunction,
input_schema: &SchemaRef,
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
let mut body_fields: Vec<Arc<Field>> =
input_schema.fields().iter().map(Arc::clone).collect();

for arg in &lambda.args {
let data_type = arg.data_type.as_ref().ok_or_else(|| {
DataFusionError::Internal("lambda variable without data type".to_string())
})?;
let arrow_data_type = to_arrow_datatype(data_type);
body_fields.push(Arc::new(Field::new(
&arg.name,
arrow_data_type,
arg.nullable,
)));
}

let body_schema = Arc::new(Schema::new(
body_fields
.iter()
.map(|f| f.as_ref().clone())
.collect::<Vec<Field>>(),
));

let lambda_body = lambda
.body
.as_ref()
.ok_or_else(|| DataFusionError::Internal("lambda has no body".to_string()))?;
let body_expr = self.create_expr(lambda_body, body_schema)?;

Ok(Arc::new(LambdaExpr::try_new(
lambda
.args
.iter()
.map(|a| a.name.clone())
.collect::<Vec<_>>(),
body_expr,
)?))
}

fn create_scalar_function_expr(
&self,
expr: &ScalarFunc,
Expand Down
19 changes: 19 additions & 0 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ message Expr {
JvmScalarUdf jvm_scalar_udf = 70;
PreciseTimestampConversion precise_timestamp_conversion = 71;
Shuffle shuffle = 72;
HigherOrderFunc high_order_func = 73;
NamedLambdaVariable named_lambda_variable = 74;
}

// Optional QueryContext for error reporting (contains SQL text and position)
Expand Down Expand Up @@ -557,3 +559,20 @@ message JvmScalarUdf {
// Whether the result column may contain nulls.
bool return_nullable = 4;
}

message HigherOrderFunc {
string func_name = 1;
repeated Expr value_args = 2;
repeated LambdaFunction lambdas = 3;
}

message NamedLambdaVariable {
string name = 1;
DataType data_type = 2;
bool nullable = 3;
}

message LambdaFunction {
Expr body = 1;
repeated NamedLambdaVariable args = 2;
}
30 changes: 30 additions & 0 deletions native/spark-expr/src/comet_high_order_funcs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::common::DataFusionError;
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::HigherOrderUDF;
use std::sync::Arc;

pub fn create_comet_hof_func(
func_name: &str,
registry: &dyn FunctionRegistry,
) -> Result<Arc<HigherOrderUDF>, DataFusionError> {
registry.higher_order_function(func_name).map_err(|e| {
DataFusionError::Execution(format!("HOF {func_name} not found in the registry: {e}"))
})
}
2 changes: 2 additions & 0 deletions native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub use predicate_funcs::{spark_isnan, RLike};

mod agg_funcs;
mod array_funcs;
mod comet_high_order_funcs;
mod comet_scalar_funcs;
pub mod hash_funcs;

Expand Down Expand Up @@ -70,6 +71,7 @@ pub use conditional_funcs::*;
pub use conversion_funcs::*;
pub use nondetermenistic_funcs::*;

pub use comet_high_order_funcs::create_comet_hof_func;
pub use comet_scalar_funcs::{
create_comet_physical_fun, create_comet_physical_fun_with_eval_mode,
register_all_comet_functions,
Expand Down
10 changes: 10 additions & 0 deletions spark/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,16 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)

val COMET_EXEC_HIGHER_ORDER_FUNCTION_NATIVE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.exec.higherOrderFunction.native.enabled")
.category(CATEGORY_EXEC)
.doc(
"When enabled, supported higher-order functions (e.g. filter) are executed by the " +
"native DataFusion engine. Shapes the native path cannot handle fall back to the " +
"codegen dispatcher, and finally to Spark.")
.booleanConf
.createWithDefault(true)

val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.native.shuffle.partitioning.hash.enabled")
.category(CATEGORY_SHUFFLE)
Expand Down
Loading