diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index fdf213a3..680fcb39 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -1379,6 +1379,88 @@ async fn test_read_schema_evolution_drop_column() { ); } +/// Test reading a table after ALTER TABLE RENAME COLUMN across mixed file formats. +/// Old files have the old physical field name; reader should map by field id. +#[tokio::test] +async fn test_read_schema_evolution_rename_column() { + let (plan, batches) = + scan_and_read_with_fs_catalog("schema_evolution_rename_column", None).await; + + let formats: HashSet<&str> = plan + .splits() + .iter() + .flat_map(|split| split.data_files()) + .filter_map(|file| file.file_name.rsplit_once('.').map(|(_, ext)| ext)) + .collect(); + assert_eq!( + formats, + HashSet::from(["avro", "orc", "parquet"]), + "schema_evolution_rename_column should scan all provisioned file formats" + ); + + let mut rows: Vec<(i32, String)> = Vec::new(); + for batch in &batches { + assert!( + batch.column_by_name("payload").is_none(), + "Old column name 'payload' should not appear in output" + ); + + let id = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("id"); + let renamed_payload = batch + .column_by_name("renamed_payload") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("renamed_payload"); + for i in 0..batch.num_rows() { + rows.push((id.value(i), renamed_payload.value(i).to_string())); + } + } + rows.sort_by_key(|(id, _)| *id); + + assert_eq!( + rows, + vec![ + (1, "parquet-old".into()), + (2, "parquet-old-2".into()), + (3, "orc-new".into()), + (4, "avro-new".into()), + ], + "Renamed column should read old and new files by field id under the new column name" + ); + + let (_, projected_batches) = + scan_and_read_with_fs_catalog("schema_evolution_rename_column", Some(&["renamed_payload"])) + .await; + let mut projected_values = Vec::new(); + for batch in &projected_batches { + assert_eq!( + batch.num_columns(), + 1, + "Projection should return only the renamed column" + ); + let renamed_payload = batch + .column_by_name("renamed_payload") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("projected renamed_payload"); + for i in 0..batch.num_rows() { + projected_values.push(renamed_payload.value(i).to_string()); + } + } + projected_values.sort(); + assert_eq!( + projected_values, + vec![ + "avro-new".to_string(), + "orc-new".to_string(), + "parquet-old".to_string(), + "parquet-old-2".to_string(), + ], + "Projection on renamed column should still use field-id mapping" + ); +} + // --------------------------------------------------------------------------- // Complex type integration tests // --------------------------------------------------------------------------- diff --git a/dev/spark/provision.py b/dev/spark/provision.py index c7c408d3..92d27280 100644 --- a/dev/spark/provision.py +++ b/dev/spark/provision.py @@ -553,6 +553,45 @@ def main(): """ ) + # ===== Schema Evolution: Rename Column across mixed file formats ===== + # Old files have physical column name 'payload'; after RENAME COLUMN, new files + # have 'renamed_payload'. Reader should map by field id and expose the new name. + spark.sql( + """ + CREATE TABLE IF NOT EXISTS schema_evolution_rename_column ( + id INT, + payload STRING + ) USING paimon + TBLPROPERTIES ( + 'file.format' = 'parquet' + ) + """ + ) + spark.sql( + """ + INSERT INTO schema_evolution_rename_column VALUES + (1, 'parquet-old'), + (2, 'parquet-old-2') + """ + ) + spark.sql( + "ALTER TABLE schema_evolution_rename_column RENAME COLUMN payload TO renamed_payload" + ) + spark.sql("ALTER TABLE schema_evolution_rename_column SET TBLPROPERTIES ('file.format' = 'orc')") + spark.sql( + """ + INSERT INTO schema_evolution_rename_column VALUES + (3, 'orc-new') + """ + ) + spark.sql("ALTER TABLE schema_evolution_rename_column SET TBLPROPERTIES ('file.format' = 'avro')") + spark.sql( + """ + INSERT INTO schema_evolution_rename_column VALUES + (4, 'avro-new') + """ + ) + # ===== Complex Types table: ARRAY, MAP, STRUCT ===== spark.sql( """