feat: enable pickling of most Expr except udaf and udwf#1544
Conversation
Adds Python-aware encoding to PythonLogicalCodec/PythonPhysicalCodec so a ScalarUDF defined in Python travels inside the serialized expression (cloudpickled into fun_definition) instead of needing a matching registration on the receiver. With that in place, Expr gains __reduce__ + classmethod from_bytes(buf, ctx=None) so pickle.dumps / pickle.loads work end-to-end on expressions built from col, lit, built-in functions, and Python scalar UDFs. Wire format is framed as <DFPYUDF magic, version byte, cloudpickle tuple>; the version byte lets a too-new/too-old payload surface a clean Execution error instead of an opaque cloudpickle unpack failure. Schema serde is via arrow-rs's native IPC (no pyarrow round-trip). Cloudpickle module handle is cached per-interpreter through PyOnceLock. Worker-side context resolution lives in a new datafusion.ipc module: set_worker_ctx / get_worker_ctx / clear_worker_ctx plus a private _resolve_ctx helper consulted by Expr.from_bytes. Priority is explicit ctx > worker ctx > global SessionContext. FFI UDFs still travel by name and require the matching registration on the receiver's context. Aggregate and window UDF inline encoding, the per-session with_python_udf_inlining toggle, sender-side context, and the user-guide docs land in follow-on PRs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…edge-case tests
Inline `.. warning::` blocks on `Expr.to_bytes`, `Expr.from_bytes`, and
`Expr.__reduce__` so the cloudpickle / arbitrary-code-execution caveat is
visible at the public API surface in advance of the user-guide page that
lands in PR 4.
Add doctest-style `Examples:` blocks to `datafusion.ipc` functions
(`set_worker_ctx`, `clear_worker_ctx`, `get_worker_ctx`, `_resolve_ctx`),
`ScalarUDF.name`, and the new `Expr` pickle methods, per CLAUDE.md.
Tighten `Expr.__reduce__` return annotation to
`tuple[Callable[[bytes], Expr], tuple[bytes]]`.
Tests: multi-arg UDF round-trip (covers synthetic `arg_{i}` schema-field
loop in the codec) plus malformed-bytes paths through `Expr.from_bytes`.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ntjohnson1
left a comment
There was a problem hiding this comment.
The general shape of the changes here seem reasonable. I guess my only lingering thought/question is mostly around cloudpickle capabilities and the usage of it.
| }) | ||
| } | ||
|
|
||
| /// Build the cloudpickle payload for a `PythonFunctionScalarUDF`. |
There was a problem hiding this comment.
Maybe it is capture more clearly somewhere else but it feels like there is some nuance of the dependency on cloudpickle that's not fully communicated here. I didn't do too much of a deep dive on it.
-
cloudpickle only works on the same version of python (I'm not sure if it detects the mismatch with a nice error). So potentially your header might want to capture the source python version to give a nicer error and advertise that there is a limitation of only sending to the same version of python for remote workers
-
cloudpickle seems to have serialize by reference (more like dill) and by value (super cool). The former needs the function installed in the environment so when deserialized it can reference it where maybe the later tries to just capture all necessary bits (here is where I didn't deep dive a ton). Those are fairly different mental models for support.
There was a problem hiding this comment.
Good points! I updated docstring and also added in version checking.
| def test_udf_self_contained_blob(self): | ||
| e = _double_udf()(col("a")) | ||
| blob = pickle.dumps(e) | ||
| # The codec inlines the callable, so the blob is much bigger than a |
There was a problem hiding this comment.
I think this is testing the thing I was asking about but I haven't thought deeply enough if it actually does. If I know cloud pickle says it can serialize lambdas but if I instead had
from foo import double
def _double_udf():
return udf(
double,
[pa.int64()],
pa.int64(),
volatility="immutable",
name="double",
)Would I still be able to deserialize this on remote in a python environment without foo?
There was a problem hiding this comment.
Updated documentation to explain when you need foo available in the worker
cloudpickle bytecode is not portable across Python minor versions —
a payload produced on 3.11 fails to load on 3.12 with an opaque
marshal/unpickle error. Embed the sender's (major, minor) in the
DFPYUDF wire header and reject mismatches at decode time with an
actionable error that names both versions, instead of letting the
failure surface from inside cloudpickle.loads.
Header layout becomes:
DFPYUDF (7) | version (1) | py_major (1) | py_minor (1) | cloudpickle
Extend the Security warnings on Expr.to_bytes / from_bytes /
__reduce__ with a Portability section covering the cross-version
constraint and cloudpickle's by-value/by-reference behavior (the
callable inlines bytecode and closure cells, but imported names
travel by reference and must be importable on the receiver). Add
a matching Serialization model note to the datafusion.ipc module
docstring.
New tests:
- codec::wire_header_tests: py-major/minor mismatch, truncated
py-version bytes, round-trip with py-version
- test_pickle_expr::test_cross_version_error_message: patches the
py_minor byte inside an emitted payload and asserts the error
message identifies the version mismatch
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ne UDFs Reviewer feedback on the Expr-pickle PRs (apache#1544) asked that the cloudpickle portability caveats be discoverable on the user-facing page, not only in docstrings. The distributing_work.rst page is the designated canonical home for the distribution story, so add them here: * New 'Portability requirements for inline Python UDFs' subsection covering the matching-Python-minor-version requirement and the by-value vs by-reference import-capture rule (imported modules must be importable on the worker). * Qualify the 'fully portable' Python-UDF bullet to point at the new requirements. * Cross-reference the new subsection from the closure-capture note. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
) * docs: user guide page + runnable examples for distributing expressions Wraps up the Expr-pickle work with the user-facing material: * docs/source/user-guide/io/distributing_work.rst — new user guide page covering the multiprocessing, Ray, and datafusion-distributed patterns. Includes the Security section that is the canonical home for the cloudpickle / pickle.loads threat model. * docs/source/user-guide/io/index.rst — toctree entry. * examples/multiprocessing_pickle_expr.py — runnable example: a Pool.map of a closure-capturing UDF across processes, with worker context registration in the initializer. * examples/ray_pickle_expr.py — Ray actor analogue. * examples/datafusion-ffi-example/python/tests/_test_pickle_strict_ffi.py — exercises the strict-mode refusal end to end against an FFI capsule scalar UDF (kept under the FFI example crate because the test needs that crate's compiled artifacts). * examples/README.md — index entries for the new files. Also tightens three docstrings that previously duplicated the security warning so they point at the canonical Security section instead: * PythonLogicalCodec::with_python_udf_inlining (rustdoc): one-line summary plus a relative pointer to distributing_work.rst and the upstream Python pickle module security warning. * SessionContext.with_python_udf_inlining: one-sentence summary plus :doc: link to the user guide. * datafusion.ipc module docstring: cross-reference to the user guide for the full pattern. The crate-level codec.rs module rustdoc also updates "pure-Python scalar UDFs" to "scalar / aggregate / window UDFs" now that all three are covered. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs: document Python-version and import portability caveats for inline UDFs Reviewer feedback on the Expr-pickle PRs (#1544) asked that the cloudpickle portability caveats be discoverable on the user-facing page, not only in docstrings. The distributing_work.rst page is the designated canonical home for the distribution story, so add them here: * New 'Portability requirements for inline Python UDFs' subsection covering the matching-Python-minor-version requirement and the by-value vs by-reference import-capture rule (imported modules must be importable on the worker). * Qualify the 'fully portable' Python-UDF bullet to point at the new requirements. * Cross-reference the new subsection from the closure-capture note. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs: restore version-byte and cloudpickle-cache rustdoc wording Two codec.rs docstrings were reworded in PR4 in ways that dropped information: * try_encode_python_scalar_udf: restore the `DFPYUDF` family prefix + version byte description of the payload framing (PR4 had collapsed it to `DFPYUDF1` prefix, dropping the version-byte mention). * cloudpickle cached-handle comment: restore "The encode/decode helpers above" wording. * docs: fix reversed tuple order in multiprocessing example docstring The 'Worker layout' docstring described tasks as `(expr, label)` but the code builds and unpacks them as `(label, expr)`. Correct the doc to match. * Respond to first batch of reviewer comments * docs: relocate and restructure distributing-work guide Move the page from user-guide/io/ to the top level of user-guide/ — distributing work is a runtime/operational concern, not a file-format topic, and the shorter "Distributing work" title fits the sidebar cleanly. Restructure the body to lead with the practical worker-setup pattern instead of the four-slot SessionContext taxonomy. The taxonomy survives at the bottom as a reference subsection; the worker-init example and portability rules now reach the reader before they need it. Also addresses reviewer NIT: wrap the `if __name__ == "__main__":` guidance in a `.. note::` admonition and link to the Python multiprocessing docs. Add a header paragraph to each runnable example pointing to the user-guide page so a reader who jumps straight to the example gets the surrounding context. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Which issue does this PR close?
Addresses part of #1517
This is PR 1 of 4. The four PRs stack sequentially on top of this one; subsequent PRs target this branch's tip until it merges.
Follow up PRs:
Rationale for this change
Today a
LogicalPlanorExprreferencing a Python-definedScalarUDFcannot survive a serialization round-trip without the receiver pre-registering a matching UDF, because the upstream protobuf codecs only carry the UDF name. That blocks shipping expressions to worker processes viapickle.dumps/multiprocessing.Pool/ Ray actors /datafusion-distributed. This PR closes the scalar-UDF case end-to-end so the naturalpickle.dumps(expr)pattern works for built-ins and Python scalar UDFs with no receiver-side setup.What changes are included in this PR?
Adds Python-aware encoding to
PythonLogicalCodecandPythonPhysicalCodec.On the Python side,
Exprgains__reduce__plus aclassmethod from_bytes(buf, ctx=None). A newdatafusion.ipcmodule exposesset_worker_ctx/get_worker_ctx/clear_worker_ctxthread-locals;_resolve_ctxconsults explicit-ctx > worker-ctx > globalSessionContext.cloudpickle>=2.0is added as a runtime dependency (lazy-imported on the encode/decode hot path). This is a tiny dependency, in the kilobyte range.Aggregate and window inline encoding, the per-session
with_python_udf_inliningtoggle, sender-side context wiring, and the user-guide docs land in PRs 2-4 of this series.Are there any user-facing changes?
Yes, but these are only additions.
Expris now picklable. Built-ins and Python scalar UDFs round-trip with no worker-side setup.Expr.to_bytes(ctx=None)/Expr.from_bytes(buf, ctx=None)shape.from_bytesis now aclassmethodwithctxas a keyword-onlyNone-default. Breaking for any directExpr.from_bytes(ctx, blob)callers — the in-tree call sites are updated.datafusion.ipcwithset_worker_ctx/get_worker_ctx/clear_worker_ctx.ScalarUDF.nameproperty.cloudpickle>=2.0.Expr.from_bytes` has a signature flip, but that is unreleased (only merged yesterday) and so not a change any user will experience.