Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions .ai/skills/audit-skill-md/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ exposed at the package root), include it.
| `SessionContext` | `python/datafusion/context.py` | "Data Loading" |
| `DataFrame` | `python/datafusion/dataframe.py` | "DataFrame Operations Quick Reference", "Executing and Collecting Results", "Idiomatic Patterns" |
| `Expr` | `python/datafusion/expr.py` | "Expression Building", "Common Pitfalls" |
| `functions` | `python/datafusion/functions.py` | "Available Functions (Categorized)", scattered uses throughout |
| `functions` | `python/datafusion/functions/__init__.py` | "Available Functions (Categorized)", scattered uses throughout |
| `functions.spark` | `python/datafusion/functions/spark.py` | "Available Functions (Categorized)" → "Spark-Compatible Functions" subsection |
| Top-level helpers (`col`, `lit`, `WindowFrame`, ...) | `python/datafusion/__init__.py` | "Import Conventions", "Core Abstractions" |

## Scope argument
Expand All @@ -61,7 +62,8 @@ is given or `all` is specified, audit every area.
| `session-context` | `SessionContext` methods and the "Data Loading" section |
| `dataframe` | `DataFrame` methods and the operations / executing / patterns sections |
| `expr` | `Expr` methods/operators and the "Expression Building" section |
| `functions` | `functions.py` `__all__` and the "Available Functions (Categorized)" section |
| `functions` | `functions/__init__.py` `__all__` and the "Available Functions (Categorized)" section |
| `spark-functions` | `functions/spark.py` `__all__`, the "Spark-Compatible Functions" subsection, and the divergent-semantics table |
| `patterns` | "Idiomatic Patterns" section — confirm patterns still match recommended style |
| `pitfalls` | "Common Pitfalls" — confirm each pitfall still reproduces, drop ones fixed upstream |
| `version-notes` | Cross-check version annotations (see below) |
Expand Down Expand Up @@ -123,7 +125,11 @@ For each function name, method name, or import shown in `SKILL.md`, verify it
still exists in the current API:

- Function names mentioned in prose or in the categorized list should appear
in `python/datafusion/functions.py`'s `__all__`.
in `python/datafusion/functions/__init__.py`'s `__all__`.
- Spark function names mentioned in the "Spark-Compatible Functions"
subsection should appear in `python/datafusion/functions/spark.py`'s
`__all__`. Also confirm the divergent-semantics table still matches the
current spark vs. main signatures.
- Method calls in code blocks should resolve against the current class.
- Imports (`from datafusion import ...`) should succeed against the current
`__init__.py`.
Expand Down
50 changes: 45 additions & 5 deletions .ai/skills/check-upstream/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,18 +209,58 @@ These upstream FFI types have been reviewed and do not need to be independently
- FFI example in `examples/datafusion-ffi-example/`
- Type appears in union type hints where accepted

### 8. `__all__` Hygiene (functions.py)
### 8. Spark-Compatible Functions (`datafusion-spark` crate)

**Upstream source of truth:**
- Crate source: https://github.com/apache/datafusion/tree/main/datafusion/spark/src
- Rust docs: https://docs.rs/datafusion-spark/latest/datafusion_spark/

**Where they are exposed in this project:**
- Python API: `python/datafusion/functions/spark.py` — each function wraps
a call to `datafusion._internal.functions.spark`; the public surface is
the module's `__all__` list.
- Rust bindings: `crates/core/src/spark_functions.rs` — `#[pyfunction]`
definitions registered via `init_module()` and re-exported under
`datafusion._internal.functions.spark`.

**Coverage policy:** The spark namespace mirrors
`pyspark.sql.functions` parameter names and shapes exactly so pyspark
callers can paste code unchanged. Extras over pyspark are permitted as
long as positional pyspark calls still work — for example, the spark
`avg` / `try_sum` / `collect_list` / `collect_set` retain the
`distinct`/`filter`/`order_by`/`null_treatment` kwargs from the main
namespace while pyspark's single-positional form continues to work.

**How to check:**
1. Fetch the upstream `datafusion-spark` function list from the crate
source under `datafusion/spark/src/function/` (each subdirectory is a
category: `string/`, `math/`, `datetime/`, etc.). The crate's
`function.rs` collects all `ScalarUDF` factories.
2. Cross-reference against `pyspark.sql.functions` for the public-facing
shape — pyspark is the contract this namespace is matching.
3. Compare against the functions listed in
`python/datafusion/functions/spark.py`'s `__all__`. A function is
covered if it exists in the Python `spark` namespace, even if it
aliases another function's Rust binding.
4. Report functions that are missing from the Python spark namespace.

**Cross-cutting reference:** The longer-form roadmap for spark coverage
lives in `PYSPARK_ALIGNMENT_PLAN.md` (root of repo). Use it as the source
of truth for which gaps are intentionally deferred vs. ready to land.

### 9. `__all__` Hygiene (functions.py and functions/spark.py)

Independent of upstream parity, also flag public `def` symbols in
`python/datafusion/functions.py` that are missing from the module's
`__all__`. These are functions a user can call but that do not show up in
`python/datafusion/functions.py` **and** `python/datafusion/functions/spark.py`
that are missing from that file's `__all__`. These are functions a user
can call but that do not show up in
`from datafusion.functions import *`, in tab-completion against the
namespace, or in generated API docs — typically an oversight rather than
an intentional omission.

**How to check:**
1. Grep for `^def ([a-z_][a-z0-9_]*)\(` in `python/datafusion/functions.py`
to enumerate every public function definition.
1. Grep for `^def ([a-z_][a-z0-9_]*)\(` in each file to enumerate every
public function definition.
2. Read the `__all__` list at the top of the same file.
3. Report any function in (1) that is not in (2). Skip private helpers
(names starting with `_`).
Expand Down
27 changes: 24 additions & 3 deletions .ai/skills/make-pythonic/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,30 @@ You are improving the datafusion-python API to feel more natural to Python users

**Core principle:** A Python user should be able to write `split_part(col("a"), ",", 2)` instead of `split_part(col("a"), lit(","), lit(2))` when the arguments are contextually obvious literals.

## Scope: `functions` vs `functions.spark`

This skill targets the **default `datafusion.functions` namespace** (file:
`python/datafusion/functions/__init__.py`). Do **not** apply pythonic
coercion to `python/datafusion/functions/spark.py` — that namespace is a
deliberate mirror of `pyspark.sql.functions`, so its parameter names,
order, and types must match pyspark exactly. Adding `Expr | int` style
unions there would diverge from the pyspark contract callers rely on.

Two exceptions where pythonic-style additions in `functions.spark` are
still on-brand:
- **Pyspark itself accepts a native type.** Pyspark's `format_string`
takes `format: str | Column`; the spark wrapper already auto-promotes a
plain `str` to a literal — keep parity.
- **Strictly additive optional kwargs** that pyspark also has (e.g.
`like(escapeChar=...)`). These belong in the [PYSPARK_ALIGNMENT_PLAN.md]
follow-up PRs, not in a make-pythonic pass.

If the user explicitly scopes to "spark", validate parity with pyspark
rather than applying generic coercion.

## How to Identify Candidates

The user may specify a scope via `$ARGUMENTS`. If no scope is given or "all" is specified, audit all functions in `python/datafusion/functions.py`.
The user may specify a scope via `$ARGUMENTS`. If no scope is given or "all" is specified, audit all functions in `python/datafusion/functions/__init__.py`.

For each function, determine if any parameter can accept native Python types by evaluating **two complementary signals**:

Expand Down Expand Up @@ -309,7 +330,7 @@ For each function being updated:

### Step 1: Analyze the Function

1. Read the current Python function signature in `python/datafusion/functions.py`
1. Read the current Python function signature in `python/datafusion/functions/__init__.py`
2. Read the Rust binding in `crates/core/src/functions.rs`
3. Optionally check the upstream DataFusion docs for the function
4. Determine which category (A, B, or C) applies to each parameter
Expand Down Expand Up @@ -346,7 +367,7 @@ dfn.functions.left(dfn.col("a"), 3)

After making changes, run the doctests to verify:
```bash
python -m pytest --doctest-modules python/datafusion/functions.py -v
python -m pytest --doctest-modules python/datafusion/functions/__init__.py -v
```

## Coercion Helper Pattern
Expand Down
43 changes: 43 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ datafusion-catalog = { version = "54", default-features = false }
datafusion-common = { version = "54", default-features = false }
datafusion-functions-aggregate = { version = "54" }
datafusion-functions-window = { version = "54" }
datafusion-spark = { version = "54" }
datafusion-expr = { version = "54" }
prost = "0.14.3"
serde_json = "1"
Expand Down Expand Up @@ -79,4 +80,5 @@ datafusion-catalog = { git = "https://github.com/apache/datafusion", rev = "1321
datafusion-common = { git = "https://github.com/apache/datafusion", rev = "1321d60cc37ee487d1e7ce7f501357c3236b2542" }
datafusion-functions-aggregate = { git = "https://github.com/apache/datafusion", rev = "1321d60cc37ee487d1e7ce7f501357c3236b2542" }
datafusion-functions-window = { git = "https://github.com/apache/datafusion", rev = "1321d60cc37ee487d1e7ce7f501357c3236b2542" }
datafusion-spark = { git = "https://github.com/apache/datafusion", rev = "1321d60cc37ee487d1e7ce7f501357c3236b2542" }
datafusion-expr = { git = "https://github.com/apache/datafusion", rev = "1321d60cc37ee487d1e7ce7f501357c3236b2542" }
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ datafusion = { workspace = true, features = ["avro", "unicode_expressions"] }
datafusion-substrait = { workspace = true, optional = true }
datafusion-proto = { workspace = true }
datafusion-ffi = { workspace = true }
datafusion-spark = { workspace = true }
prost = { workspace = true } # keep in line with `datafusion-substrait`
serde_json = { workspace = true }
uuid = { workspace = true, features = ["v4"] }
Expand Down
22 changes: 18 additions & 4 deletions crates/core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,21 @@ impl PySessionContext {
Ok(())
}

/// Register all `datafusion-spark` UDFs/UDAFs/UDWFs, overriding any built-in
/// DataFusion functions of the same name with their Spark-semantics version.
pub fn enable_spark_functions(&self) -> PyResult<()> {
for udf in datafusion_spark::all_default_scalar_functions() {
self.ctx.register_udf((*udf).clone());
}
for udaf in datafusion_spark::all_default_aggregate_functions() {
self.ctx.register_udaf((*udaf).clone());
}
for udwf in datafusion_spark::all_default_window_functions() {
self.ctx.register_udwf((*udwf).clone());
}
Ok(())
}

pub fn deregister_udaf(&self, name: &str) {
self.ctx.deregister_udaf(name);
}
Expand Down Expand Up @@ -1562,10 +1577,9 @@ impl PySessionContext {
pub fn parse_file_compression_type(
file_compression_type: Option<String>,
) -> Result<FileCompressionType, PyErr> {
FileCompressionType::from_str(&*file_compression_type.unwrap_or("".to_string()).as_str())
.map_err(|_| {
PyValueError::new_err("file_compression_type must one of: gzip, bz2, xz, zstd")
})
FileCompressionType::from_str(&file_compression_type.unwrap_or_default()).map_err(|_| {
PyValueError::new_err("file_compression_type must one of: gzip, bz2, xz, zstd")
})
Comment on lines +1580 to +1582
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive by update to remove clippy warning that was specified at the module level.

}

impl From<PySessionContext> for SessionContext {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::expr::conditional_expr::PyCaseBuilder;
use crate::expr::sort_expr::{PySortExpr, to_sort_expressions};
use crate::expr::window::PyWindowFrame;

fn add_builder_fns_to_aggregate(
pub(crate) fn add_builder_fns_to_aggregate(
agg_fn: Expr,
distinct: Option<bool>,
filter: Option<PyExpr>,
Expand Down
12 changes: 5 additions & 7 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,25 @@ pub use datafusion_substrait;
use mimalloc::MiMalloc;
use pyo3::prelude::*;

#[allow(clippy::borrow_deref_ref)]
pub mod analyzer;
pub mod catalog;
pub mod codec;
pub mod common;

#[allow(clippy::borrow_deref_ref)]
pub mod context;
#[allow(clippy::borrow_deref_ref)]
pub mod dataframe;
mod dataset;
mod dataset_exec;
pub mod errors;
#[allow(clippy::borrow_deref_ref)]
pub mod expr;
#[allow(clippy::borrow_deref_ref)]
mod functions;
pub mod metrics;
mod options;
pub mod physical_plan;
mod pyarrow_filter_expression;
pub mod pyarrow_util;
mod record_batch;
mod spark_functions;
pub mod sql;
pub mod store;
pub mod table;
Expand All @@ -57,9 +53,7 @@ pub mod unparser;
mod array;
#[cfg(feature = "substrait")]
pub mod substrait;
#[allow(clippy::borrow_deref_ref)]
mod udaf;
#[allow(clippy::borrow_deref_ref)]
mod udf;
pub mod udtf;
mod udwf;
Expand Down Expand Up @@ -124,6 +118,10 @@ fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
// Register the functions as a submodule
let funcs = PyModule::new(py, "functions")?;
functions::init_module(&funcs)?;
// Spark-compatible functions live under `functions.spark`.
let spark_funcs = PyModule::new(py, "spark")?;
spark_functions::init_module(&spark_funcs)?;
funcs.add_submodule(&spark_funcs)?;
m.add_submodule(&funcs)?;

let store = PyModule::new(py, "object_store")?;
Expand Down
Loading
Loading