Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions crates/integration_tests/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,88 @@ async fn test_read_schema_evolution_drop_column() {
);
}

/// Test reading a table after ALTER TABLE RENAME COLUMN across mixed file formats.
/// Old files have the old physical field name; reader should map by field id.
#[tokio::test]
async fn test_read_schema_evolution_rename_column() {
let (plan, batches) =
scan_and_read_with_fs_catalog("schema_evolution_rename_column", None).await;

let formats: HashSet<&str> = plan
.splits()
.iter()
.flat_map(|split| split.data_files())
.filter_map(|file| file.file_name.rsplit_once('.').map(|(_, ext)| ext))
.collect();
assert_eq!(
formats,
HashSet::from(["avro", "orc", "parquet"]),
"schema_evolution_rename_column should scan all provisioned file formats"
);

let mut rows: Vec<(i32, String)> = Vec::new();
for batch in &batches {
assert!(
batch.column_by_name("payload").is_none(),
"Old column name 'payload' should not appear in output"
);

let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("id");
let renamed_payload = batch
.column_by_name("renamed_payload")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("renamed_payload");
for i in 0..batch.num_rows() {
rows.push((id.value(i), renamed_payload.value(i).to_string()));
}
}
rows.sort_by_key(|(id, _)| *id);

assert_eq!(
rows,
vec![
(1, "parquet-old".into()),
(2, "parquet-old-2".into()),
(3, "orc-new".into()),
(4, "avro-new".into()),
],
"Renamed column should read old and new files by field id under the new column name"
);

let (_, projected_batches) =
scan_and_read_with_fs_catalog("schema_evolution_rename_column", Some(&["renamed_payload"]))
.await;
let mut projected_values = Vec::new();
for batch in &projected_batches {
assert_eq!(
batch.num_columns(),
1,
"Projection should return only the renamed column"
);
let renamed_payload = batch
.column_by_name("renamed_payload")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("projected renamed_payload");
for i in 0..batch.num_rows() {
projected_values.push(renamed_payload.value(i).to_string());
}
}
projected_values.sort();
assert_eq!(
projected_values,
vec![
"avro-new".to_string(),
"orc-new".to_string(),
"parquet-old".to_string(),
"parquet-old-2".to_string(),
],
"Projection on renamed column should still use field-id mapping"
);
}

// ---------------------------------------------------------------------------
// Complex type integration tests
// ---------------------------------------------------------------------------
Expand Down
39 changes: 39 additions & 0 deletions dev/spark/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,45 @@ def main():
"""
)

# ===== Schema Evolution: Rename Column across mixed file formats =====
# Old files have physical column name 'payload'; after RENAME COLUMN, new files
# have 'renamed_payload'. Reader should map by field id and expose the new name.
spark.sql(
"""
CREATE TABLE IF NOT EXISTS schema_evolution_rename_column (
id INT,
payload STRING
) USING paimon
TBLPROPERTIES (
'file.format' = 'parquet'
)
"""
)
spark.sql(
"""
INSERT INTO schema_evolution_rename_column VALUES
(1, 'parquet-old'),
(2, 'parquet-old-2')
"""
)
spark.sql(
"ALTER TABLE schema_evolution_rename_column RENAME COLUMN payload TO renamed_payload"
)
spark.sql("ALTER TABLE schema_evolution_rename_column SET TBLPROPERTIES ('file.format' = 'orc')")
spark.sql(
"""
INSERT INTO schema_evolution_rename_column VALUES
(3, 'orc-new')
"""
)
spark.sql("ALTER TABLE schema_evolution_rename_column SET TBLPROPERTIES ('file.format' = 'avro')")
spark.sql(
"""
INSERT INTO schema_evolution_rename_column VALUES
(4, 'avro-new')
"""
)

# ===== Complex Types table: ARRAY, MAP, STRUCT =====
spark.sql(
"""
Expand Down
Loading