From d7c72e83f5d91c3de274019cf8215d37ed00ea7a Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 2 Jun 2026 23:19:29 -0700 Subject: [PATCH 1/4] feat(datafusion): support PARTITIONED BY for identity-partitioned external tables `CREATE EXTERNAL TABLE ... STORED AS ICEBERG` previously rejected any `PARTITIONED BY` clause. Since DataFusion's grammar only accepts plain column names (it cannot express transforms such as `bucket[N]` or `day`), allow the clause for identity-partitioned tables and validate that the declared columns match the table's default partition spec, in order. Tables partitioned with non-identity transforms can still be registered by omitting the clause; specifying it returns a clear error pointing at the offending transform. Closes #2050 --- .../src/table/table_provider_factory.rs | 248 +++++++++++++++++- .../TableMetadataV2WithBucketPartition.json | 78 ++++++ ...eMetadataV2WithMultiIdentityPartition.json | 84 ++++++ 3 files changed, 404 insertions(+), 6 deletions(-) create mode 100644 crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2WithBucketPartition.json create mode 100644 crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2WithMultiIdentityPartition.json diff --git a/crates/integrations/datafusion/src/table/table_provider_factory.rs b/crates/integrations/datafusion/src/table/table_provider_factory.rs index 9692340cac..08f865ca36 100644 --- a/crates/integrations/datafusion/src/table/table_provider_factory.rs +++ b/crates/integrations/datafusion/src/table/table_provider_factory.rs @@ -25,7 +25,8 @@ use datafusion::error::Result as DFResult; use datafusion::logical_expr::CreateExternalTable; use datafusion::sql::TableReference; use iceberg::io::{FileIOBuilder, LocalFsStorageFactory, StorageFactory}; -use iceberg::table::StaticTable; +use iceberg::spec::Transform; +use iceberg::table::{StaticTable, Table}; use iceberg::{Error, ErrorKind, Result, TableIdent}; use super::IcebergStaticTableProvider; @@ -87,6 +88,20 @@ use crate::to_datafusion_error; /// } /// ``` /// +/// A `PARTITIONED BY` clause is supported for identity-partitioned tables. Because the +/// table's partitioning is defined entirely by the Iceberg metadata, and DataFusion only +/// accepts plain column names here (not transforms like `bucket[N]` or `day`), the clause +/// must list the identity partition columns of the table's default partition spec, in order: +/// +/// ```sql +/// CREATE EXTERNAL TABLE my_iceberg_table +/// STORED AS ICEBERG LOCATION '/path/to/metadata.json' +/// PARTITIONED BY (event_date) +/// ``` +/// +/// Tables partitioned with non-identity transforms can still be registered by omitting the +/// `PARTITIONED BY` clause. +/// /// # Note /// This factory is designed to work with the DataFusion query engine, /// specifically for handling Iceberg tables in external table commands. @@ -94,8 +109,10 @@ use crate::to_datafusion_error; /// the creation of new tables not yet available. /// /// # Errors -/// An error will be returned if any unsupported feature, such as partition columns, -/// order expressions, constraints, or column defaults, is detected in the table creation command. +/// An error will be returned if any unsupported feature, such as order expressions, +/// constraints, or column defaults, is detected in the table creation command, if a +/// `PARTITIONED BY` clause is used on a table with a non-identity transform, or if the +/// `PARTITIONED BY` columns do not match the table's identity partition columns. #[derive(Debug, Default)] pub struct IcebergTableProviderFactory { storage_factory: Option>, @@ -146,6 +163,11 @@ impl TableProviderFactory for IcebergTableProviderFactory { .map_err(to_datafusion_error)? .into_table(); + // The Iceberg metadata is the source of truth for partitioning, so a + // `PARTITIONED BY` clause is only accepted when it agrees with the + // table's default partition spec. + validate_partition_columns(&table, &cmd.table_partition_cols).map_err(to_datafusion_error)?; + let provider = IcebergStaticTableProvider::try_new_from_table(table) .await .map_err(to_datafusion_error)?; @@ -157,7 +179,6 @@ impl TableProviderFactory for IcebergTableProviderFactory { fn check_cmd(cmd: &CreateExternalTable) -> Result<()> { let CreateExternalTable { schema, - table_partition_cols, order_exprs, constraints, column_defaults, @@ -166,7 +187,6 @@ fn check_cmd(cmd: &CreateExternalTable) -> Result<()> { // Check if any of the fields violate the constraints in a single condition let is_invalid = !schema.fields().is_empty() - || !table_partition_cols.is_empty() || !order_exprs.is_empty() || !constraints.is_empty() || !column_defaults.is_empty(); @@ -181,6 +201,64 @@ fn check_cmd(cmd: &CreateExternalTable) -> Result<()> { Ok(()) } +/// Validates the `PARTITIONED BY` columns from a `CREATE EXTERNAL TABLE` command +/// against the table's default partition spec. +/// +/// `CREATE EXTERNAL TABLE ... STORED AS ICEBERG` loads an existing table, so the +/// partitioning is fully determined by the Iceberg metadata. DataFusion's grammar only +/// accepts plain column names in `PARTITIONED BY` (it cannot express transforms such as +/// `bucket[N]` or `day`), so the clause is only supported for identity-partitioned tables. +/// For an identity transform the partition field name equals its source column name, so the +/// clause must list those column names in order. +/// +/// An empty clause (no `PARTITIONED BY`) skips this check: any table, including one with +/// non-identity transforms, can still be registered for read-only access without declaring +/// its partitioning. +fn validate_partition_columns(table: &Table, declared_partition_cols: &[String]) -> Result<()> { + if declared_partition_cols.is_empty() { + return Ok(()); + } + + let spec = table.metadata().default_partition_spec(); + + // DataFusion cannot express non-identity transforms in `PARTITIONED BY`, so a table + // partitioned by any other transform cannot be described by this clause. + // + // TODO: support non-identity transforms (bucket/truncate/year/month/day/hour) once + // DataFusion's `PARTITIONED BY` grammar can express transform functions such as + // `bucket(16, id)` or `days(ts)`. See https://github.com/apache/iceberg-rust/issues/2050. + if let Some(field) = spec + .fields() + .iter() + .find(|f| f.transform != Transform::Identity) + { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "PARTITIONED BY only supports identity-partitioned tables, but partition field \ + '{}' uses the '{}' transform. Omit the PARTITIONED BY clause to register this table.", + field.name, field.transform + ), + )); + } + + let actual: Vec<&str> = spec.fields().iter().map(|f| f.name.as_str()).collect(); + let declared: Vec<&str> = declared_partition_cols.iter().map(String::as_str).collect(); + + if declared != actual { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "PARTITIONED BY columns {declared:?} do not match the table's identity partition \ + columns {actual:?}. The PARTITIONED BY clause must list those columns in order, \ + or be omitted." + ), + )); + } + + Ok(()) +} + /// Complements the namespace of a table name if necessary. /// /// # Note @@ -252,7 +330,29 @@ mod tests { ) } + fn bucket_partitioned_table_metadata_location() -> String { + format!( + "{}/testdata/table_metadata/{}", + env!("CARGO_MANIFEST_DIR"), + "TableMetadataV2WithBucketPartition.json" + ) + } + + fn multi_identity_partitioned_table_metadata_location() -> String { + format!( + "{}/testdata/table_metadata/{}", + env!("CARGO_MANIFEST_DIR"), + "TableMetadataV2WithMultiIdentityPartition.json" + ) + } + fn create_external_table_cmd() -> CreateExternalTable { + create_external_table_cmd_with_partition_cols(Default::default()) + } + + fn create_external_table_cmd_with_partition_cols( + table_partition_cols: Vec, + ) -> CreateExternalTable { let metadata_file_path = table_metadata_location(); CreateExternalTable { @@ -261,7 +361,7 @@ mod tests { schema: Arc::new(DFSchema::empty()), file_type: "iceberg".to_string(), options: Default::default(), - table_partition_cols: Default::default(), + table_partition_cols, order_exprs: Default::default(), constraints: Constraints::default(), column_defaults: Default::default(), @@ -324,4 +424,140 @@ mod tests { assert_eq!(actual_schema.as_ref(), &expected_schema); } + + // `TableMetadataV2.json` has a default partition spec with a single identity + // partition field named "x". + + #[tokio::test] + async fn test_partitioned_by_matching_partition_spec() { + let factory = IcebergTableProviderFactory::new(); + let state = SessionStateBuilder::new().build(); + let cmd = create_external_table_cmd_with_partition_cols(vec!["x".to_string()]); + + let table_provider = factory + .create(&state, &cmd) + .await + .expect("create table with matching PARTITIONED BY should succeed"); + + assert_eq!(table_provider.schema().as_ref(), &table_metadata_v2_schema()); + } + + #[tokio::test] + async fn test_partitioned_by_mismatching_partition_spec() { + let factory = IcebergTableProviderFactory::new(); + let state = SessionStateBuilder::new().build(); + let cmd = create_external_table_cmd_with_partition_cols(vec!["y".to_string()]); + + let err = factory + .create(&state, &cmd) + .await + .expect_err("create table with mismatching PARTITIONED BY should fail"); + + assert!( + err.to_string() + .contains("do not match the table's identity partition columns"), + "unexpected error: {err}" + ); + } + + #[tokio::test] + async fn test_partitioned_by_multiple_identity_columns() { + let factory = IcebergTableProviderFactory::new(); + let state = SessionStateBuilder::new().build(); + + // The table is partitioned by identity(x), identity(y). + let mut cmd = + create_external_table_cmd_with_partition_cols(vec!["x".to_string(), "y".to_string()]); + cmd.location = multi_identity_partitioned_table_metadata_location(); + + factory + .create(&state, &cmd) + .await + .expect("PARTITIONED BY (x, y) should match the multi-column partition spec"); + } + + #[tokio::test] + async fn test_partitioned_by_multiple_identity_columns_wrong_order() { + let factory = IcebergTableProviderFactory::new(); + let state = SessionStateBuilder::new().build(); + + // Declaring the partition columns in the wrong order must fail, since partition + // field order is significant in Iceberg. + let mut cmd = + create_external_table_cmd_with_partition_cols(vec!["y".to_string(), "x".to_string()]); + cmd.location = multi_identity_partitioned_table_metadata_location(); + + let err = factory + .create(&state, &cmd) + .await + .expect_err("PARTITIONED BY (y, x) should fail when the spec order is (x, y)"); + + assert!( + err.to_string() + .contains("do not match the table's identity partition columns"), + "unexpected error: {err}" + ); + } + + #[tokio::test] + async fn test_partitioned_by_partial_columns_fails() { + let factory = IcebergTableProviderFactory::new(); + let state = SessionStateBuilder::new().build(); + + // Declaring only a subset of the partition columns must fail: the table is + // partitioned by identity(x), identity(y) but only `x` is declared. + let mut cmd = create_external_table_cmd_with_partition_cols(vec!["x".to_string()]); + cmd.location = multi_identity_partitioned_table_metadata_location(); + + let err = factory + .create(&state, &cmd) + .await + .expect_err("PARTITIONED BY (x) should fail when the spec is (x, y)"); + + assert!( + err.to_string() + .contains("do not match the table's identity partition columns"), + "unexpected error: {err}" + ); + } + + #[tokio::test] + async fn test_partitioned_by_rejects_non_identity_transform() { + let factory = IcebergTableProviderFactory::new(); + let state = SessionStateBuilder::new().build(); + + // The table is partitioned by `bucket[4](x)`, which cannot be expressed as a + // plain column name in `PARTITIONED BY`. + let mut cmd = create_external_table_cmd_with_partition_cols(vec!["x_bucket".to_string()]); + cmd.location = bucket_partitioned_table_metadata_location(); + + let err = factory + .create(&state, &cmd) + .await + .expect_err("PARTITIONED BY on a bucket-partitioned table should fail"); + + let msg = err.to_string(); + assert!( + msg.contains("only supports identity-partitioned tables") + && msg.contains("bucket[4]"), + "unexpected error: {msg}" + ); + } + + #[tokio::test] + async fn test_non_identity_partition_table_registers_without_partitioned_by() { + // Omitting PARTITIONED BY must still allow registering a non-identity partitioned + // table for read-only access. + let factory = IcebergTableProviderFactory::new(); + let state = SessionStateBuilder::new().build(); + + let mut cmd = create_external_table_cmd(); + cmd.location = bucket_partitioned_table_metadata_location(); + + factory + .create(&state, &cmd) + .await + .expect("registering a bucket-partitioned table without PARTITIONED BY should succeed"); + } + } diff --git a/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2WithBucketPartition.json b/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2WithBucketPartition.json new file mode 100644 index 0000000000..ff4d3690e0 --- /dev/null +++ b/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2WithBucketPartition.json @@ -0,0 +1,78 @@ +{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c2", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 1, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + } + ] + }, + { + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [ + 1, + 2 + ], + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + }, + { + "id": 2, + "name": "y", + "required": true, + "type": "long" + }, + { + "id": 3, + "name": "z", + "required": true, + "type": "long" + } + ] + } + ], + "default-spec-id": 0, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + { + "name": "x_bucket", + "transform": "bucket[4]", + "source-id": 1, + "field-id": 1000 + } + ] + } + ], + "last-partition-id": 1000, + "default-sort-order-id": 0, + "sort-orders": [ + { + "order-id": 0, + "fields": [] + } + ], + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [] +} diff --git a/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2WithMultiIdentityPartition.json b/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2WithMultiIdentityPartition.json new file mode 100644 index 0000000000..90ebfb9c38 --- /dev/null +++ b/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2WithMultiIdentityPartition.json @@ -0,0 +1,84 @@ +{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c3", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 1, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + } + ] + }, + { + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [ + 1, + 2 + ], + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + }, + { + "id": 2, + "name": "y", + "required": true, + "type": "long" + }, + { + "id": 3, + "name": "z", + "required": true, + "type": "long" + } + ] + } + ], + "default-spec-id": 0, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + { + "name": "x", + "transform": "identity", + "source-id": 1, + "field-id": 1000 + }, + { + "name": "y", + "transform": "identity", + "source-id": 2, + "field-id": 1001 + } + ] + } + ], + "last-partition-id": 1001, + "default-sort-order-id": 0, + "sort-orders": [ + { + "order-id": 0, + "fields": [] + } + ], + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [] +} From 5cc07513db0ba8eedfb78aaff8e4e10732c4f275 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 2 Jun 2026 23:32:08 -0700 Subject: [PATCH 2/4] test(datafusion): dedupe metadata-location helpers and consolidate partition mismatch cases --- .../src/table/table_provider_factory.rs | 103 +++++++----------- 1 file changed, 42 insertions(+), 61 deletions(-) diff --git a/crates/integrations/datafusion/src/table/table_provider_factory.rs b/crates/integrations/datafusion/src/table/table_provider_factory.rs index 08f865ca36..1fca3876ad 100644 --- a/crates/integrations/datafusion/src/table/table_provider_factory.rs +++ b/crates/integrations/datafusion/src/table/table_provider_factory.rs @@ -166,7 +166,8 @@ impl TableProviderFactory for IcebergTableProviderFactory { // The Iceberg metadata is the source of truth for partitioning, so a // `PARTITIONED BY` clause is only accepted when it agrees with the // table's default partition spec. - validate_partition_columns(&table, &cmd.table_partition_cols).map_err(to_datafusion_error)?; + validate_partition_columns(&table, &cmd.table_partition_cols) + .map_err(to_datafusion_error)?; let provider = IcebergStaticTableProvider::try_new_from_table(table) .await @@ -322,28 +323,23 @@ mod tests { ]) } - fn table_metadata_location() -> String { + fn metadata_location(file_name: &str) -> String { format!( - "{}/testdata/table_metadata/{}", - env!("CARGO_MANIFEST_DIR"), - "TableMetadataV2.json" + "{}/testdata/table_metadata/{file_name}", + env!("CARGO_MANIFEST_DIR") ) } + fn table_metadata_location() -> String { + metadata_location("TableMetadataV2.json") + } + fn bucket_partitioned_table_metadata_location() -> String { - format!( - "{}/testdata/table_metadata/{}", - env!("CARGO_MANIFEST_DIR"), - "TableMetadataV2WithBucketPartition.json" - ) + metadata_location("TableMetadataV2WithBucketPartition.json") } fn multi_identity_partitioned_table_metadata_location() -> String { - format!( - "{}/testdata/table_metadata/{}", - env!("CARGO_MANIFEST_DIR"), - "TableMetadataV2WithMultiIdentityPartition.json" - ) + metadata_location("TableMetadataV2WithMultiIdentityPartition.json") } fn create_external_table_cmd() -> CreateExternalTable { @@ -439,7 +435,10 @@ mod tests { .await .expect("create table with matching PARTITIONED BY should succeed"); - assert_eq!(table_provider.schema().as_ref(), &table_metadata_v2_schema()); + assert_eq!( + table_provider.schema().as_ref(), + &table_metadata_v2_schema() + ); } #[tokio::test] @@ -477,48 +476,32 @@ mod tests { } #[tokio::test] - async fn test_partitioned_by_multiple_identity_columns_wrong_order() { - let factory = IcebergTableProviderFactory::new(); - let state = SessionStateBuilder::new().build(); - - // Declaring the partition columns in the wrong order must fail, since partition - // field order is significant in Iceberg. - let mut cmd = - create_external_table_cmd_with_partition_cols(vec!["y".to_string(), "x".to_string()]); - cmd.location = multi_identity_partitioned_table_metadata_location(); - - let err = factory - .create(&state, &cmd) - .await - .expect_err("PARTITIONED BY (y, x) should fail when the spec order is (x, y)"); - - assert!( - err.to_string() - .contains("do not match the table's identity partition columns"), - "unexpected error: {err}" - ); - } - - #[tokio::test] - async fn test_partitioned_by_partial_columns_fails() { - let factory = IcebergTableProviderFactory::new(); - let state = SessionStateBuilder::new().build(); - - // Declaring only a subset of the partition columns must fail: the table is - // partitioned by identity(x), identity(y) but only `x` is declared. - let mut cmd = create_external_table_cmd_with_partition_cols(vec!["x".to_string()]); - cmd.location = multi_identity_partitioned_table_metadata_location(); - - let err = factory - .create(&state, &cmd) - .await - .expect_err("PARTITIONED BY (x) should fail when the spec is (x, y)"); - - assert!( - err.to_string() - .contains("do not match the table's identity partition columns"), - "unexpected error: {err}" - ); + async fn test_partitioned_by_identity_columns_mismatch_is_rejected() { + // All cases run against the table partitioned by identity(x), identity(y). + // Each declares partition columns that differ from the spec and must be rejected. + let cases: [(&[&str], &str); 3] = [ + (&["y", "x"], "wrong order"), + (&["x"], "subset / missing column"), + (&["x", "y", "z"], "extra column"), + ]; + + for (declared_cols, description) in cases { + let factory = IcebergTableProviderFactory::new(); + let state = SessionStateBuilder::new().build(); + + let mut cmd = create_external_table_cmd_with_partition_cols( + declared_cols.iter().map(|s| s.to_string()).collect(), + ); + cmd.location = multi_identity_partitioned_table_metadata_location(); + + let err = factory.create(&state, &cmd).await.unwrap_err(); + + assert!( + err.to_string() + .contains("do not match the table's identity partition columns"), + "case '{description}': unexpected error: {err}" + ); + } } #[tokio::test] @@ -538,8 +521,7 @@ mod tests { let msg = err.to_string(); assert!( - msg.contains("only supports identity-partitioned tables") - && msg.contains("bucket[4]"), + msg.contains("only supports identity-partitioned tables") && msg.contains("bucket[4]"), "unexpected error: {msg}" ); } @@ -559,5 +541,4 @@ mod tests { .await .expect("registering a bucket-partitioned table without PARTITIONED BY should succeed"); } - } From 729c86d35213407439c93881581f0b307f0c7aac Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 2 Jun 2026 23:33:11 -0700 Subject: [PATCH 3/4] chore(datafusion): drop self-referential issue link from TODO comment --- .../integrations/datafusion/src/table/table_provider_factory.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/src/table/table_provider_factory.rs b/crates/integrations/datafusion/src/table/table_provider_factory.rs index 1fca3876ad..3e8baf6879 100644 --- a/crates/integrations/datafusion/src/table/table_provider_factory.rs +++ b/crates/integrations/datafusion/src/table/table_provider_factory.rs @@ -227,7 +227,7 @@ fn validate_partition_columns(table: &Table, declared_partition_cols: &[String]) // // TODO: support non-identity transforms (bucket/truncate/year/month/day/hour) once // DataFusion's `PARTITIONED BY` grammar can express transform functions such as - // `bucket(16, id)` or `days(ts)`. See https://github.com/apache/iceberg-rust/issues/2050. + // `bucket(16, id)` or `days(ts)`. if let Some(field) = spec .fields() .iter() From 3a7fc986e2baeadffc3744b4b6786cd6227ce8b1 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 2 Jun 2026 23:48:21 -0700 Subject: [PATCH 4/4] test(datafusion): add end-to-end SQL tests for PARTITIONED BY external tables --- .../src/table/table_provider_factory.rs | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/crates/integrations/datafusion/src/table/table_provider_factory.rs b/crates/integrations/datafusion/src/table/table_provider_factory.rs index 3e8baf6879..e18e7f6f6e 100644 --- a/crates/integrations/datafusion/src/table/table_provider_factory.rs +++ b/crates/integrations/datafusion/src/table/table_provider_factory.rs @@ -541,4 +541,61 @@ mod tests { .await .expect("registering a bucket-partitioned table without PARTITIONED BY should succeed"); } + + /// Registers the `IcebergTableProviderFactory` and runs the statement through the full + /// SQL pipeline (parser + planner + factory). + async fn iceberg_external_table_ctx() -> SessionContext { + let mut state = SessionStateBuilder::new().with_default_features().build(); + state.table_factories_mut().insert( + "ICEBERG".to_string(), + Arc::new(IcebergTableProviderFactory::new()), + ); + SessionContext::new_with_state(state) + } + + #[tokio::test] + async fn test_partitioned_by_via_sql() { + let ctx = iceberg_external_table_ctx().await; + + // The table is partitioned by identity(x). + let sql = format!( + "CREATE EXTERNAL TABLE static_table STORED AS ICEBERG LOCATION '{}' PARTITIONED BY (x)", + table_metadata_location() + ); + ctx.sql(&sql) + .await + .expect("CREATE EXTERNAL TABLE ... PARTITIONED BY (x) should succeed"); + + let table_provider = ctx + .table_provider(TableReference::bare("static_table")) + .await + .expect("table not found"); + + assert_eq!( + table_provider.schema().as_ref(), + &table_metadata_v2_schema() + ); + } + + #[tokio::test] + async fn test_partitioned_by_via_sql_rejects_non_identity_transform() { + let ctx = iceberg_external_table_ctx().await; + + // The table is partitioned by bucket[4](x), which `PARTITIONED BY` cannot express. + let sql = format!( + "CREATE EXTERNAL TABLE static_table STORED AS ICEBERG LOCATION '{}' PARTITIONED BY (x_bucket)", + bucket_partitioned_table_metadata_location() + ); + + let err = ctx + .sql(&sql) + .await + .expect_err("PARTITIONED BY on a bucket-partitioned table should fail"); + + assert!( + err.to_string() + .contains("only supports identity-partitioned tables"), + "unexpected error: {err}" + ); + } }