-
Notifications
You must be signed in to change notification settings - Fork 4.8k
HIVE-29578: Iceberg: add support for logical views #6449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
db655f2
f3ddf97
1484278
b90b769
3b8d48b
136c3d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -161,6 +161,7 @@ public static void updateHmsTableForIcebergView( | |
| HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE.toUpperCase(Locale.ENGLISH), | ||
| metadata.schema(), | ||
| maxHiveTablePropertySize); | ||
| parameters.put(hive_metastoreConstants.META_TABLE_STORAGE, HIVE_ICEBERG_STORAGE_HANDLER); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if the view is the previous version of view and we store it in Hive?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure how this question is related to the pointed location in code, but same as previous comment - when querying Iceberg logical view it's properties are retrieved from Iceberg and not from HMS. |
||
| tbl.setParameters(parameters); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,146 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.hive; | ||
|
|
||
| import java.io.Closeable; | ||
| import java.io.IOException; | ||
| import java.io.UncheckedIOException; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import org.apache.commons.lang3.StringUtils; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.hive.metastore.api.FieldSchema; | ||
| import org.apache.iceberg.CatalogUtil; | ||
| import org.apache.iceberg.catalog.Catalog; | ||
| import org.apache.iceberg.catalog.Namespace; | ||
| import org.apache.iceberg.catalog.TableIdentifier; | ||
| import org.apache.iceberg.catalog.ViewCatalog; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.iceberg.view.ViewBuilder; | ||
|
|
||
| /** | ||
| * Commits a native Iceberg view through the configured default Iceberg catalog (HiveCatalog or REST | ||
| * catalog, etc.) when {@code Catalog} also implements {@link ViewCatalog}. | ||
| */ | ||
| public final class IcebergLogicalViewSupport { | ||
|
|
||
| private IcebergLogicalViewSupport() { | ||
| } | ||
|
|
||
| /** | ||
| * Loads the native Iceberg logical view definition and applies SQL, schema, and Iceberg params to {@code hmsTable} | ||
| */ | ||
| public static void enrichHmsTableFromIcebergView( | ||
| org.apache.hadoop.hive.metastore.api.Table hmsTable, Configuration conf) { | ||
| TableIdentifier identifier = TableIdentifier.of(hmsTable.getDbName(), hmsTable.getTableName()); | ||
| String catalogName = IcebergCatalogProperties.getCatalogName(conf); | ||
| Map<String, String> catalogProps = IcebergCatalogProperties.getCatalogProperties(conf, catalogName); | ||
| Catalog catalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, conf); | ||
|
|
||
| try { | ||
| if (catalog instanceof Closeable closeable) { | ||
| try (Closeable ignored = closeable) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm pretty sure I'm missing something: what is the reason of checking if the catalog is Closeable and ignoring it?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We are not ignoring it. The |
||
| loadAndApplyView(hmsTable, conf, catalog, catalogName, identifier); | ||
| } | ||
| } else { | ||
| loadAndApplyView(hmsTable, conf, catalog, catalogName, identifier); | ||
| } | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException("Failed to close Iceberg catalog", e); | ||
| } | ||
| } | ||
|
|
||
| private static void loadAndApplyView( | ||
| org.apache.hadoop.hive.metastore.api.Table hmsTable, | ||
| Configuration conf, | ||
| Catalog catalog, | ||
| String catalogName, | ||
| TableIdentifier identifier) { | ||
| ViewCatalog viewCatalog = asViewCatalog(catalog, catalogName); | ||
| MetastoreUtil.applyIcebergViewToHmsTable(hmsTable, viewCatalog.loadView(identifier), conf); | ||
| } | ||
|
|
||
| /** Creates or replaces a view in the Iceberg catalog. */ | ||
| public static void createOrReplaceView( | ||
| Configuration conf, | ||
| String databaseName, | ||
| String viewName, | ||
| List<FieldSchema> fieldSchemas, | ||
| String viewSql, | ||
| Map<String, String> tblProperties, | ||
| String comment) { | ||
|
|
||
| TableIdentifier identifier = TableIdentifier.of(databaseName, viewName); | ||
| String catalogName = IcebergCatalogProperties.getCatalogName(conf); | ||
| Map<String, String> catalogProps = IcebergCatalogProperties.getCatalogProperties(conf, catalogName); | ||
| Catalog catalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, conf); | ||
|
|
||
| if (catalog instanceof Closeable closeable) { | ||
| try (Closeable ignored = closeable) { | ||
| commitView(catalog, catalogName, identifier, fieldSchemas, viewSql, tblProperties, comment); | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException("Failed to close Iceberg catalog", e); | ||
| } | ||
| } else { | ||
| commitView(catalog, catalogName, identifier, fieldSchemas, viewSql, tblProperties, comment); | ||
| } | ||
| } | ||
|
|
||
| private static void commitView( | ||
| Catalog catalog, | ||
| String catalogName, | ||
| TableIdentifier identifier, | ||
| List<FieldSchema> fieldSchemas, | ||
| String viewSql, | ||
| Map<String, String> tblProperties, | ||
| String comment) { | ||
| ViewCatalog viewCatalog = asViewCatalog(catalog, catalogName); | ||
|
|
||
| ViewBuilder builder = | ||
| viewCatalog | ||
| .buildView(identifier) | ||
| .withSchema(HiveSchemaUtil.convert(fieldSchemas, Collections.emptyMap(), true)) | ||
| .withDefaultNamespace(Namespace.of(identifier.namespace().level(0))) | ||
| .withQuery("hive", viewSql); | ||
|
|
||
| if (StringUtils.isNotBlank(comment)) { | ||
| builder = builder.withProperty("comment", comment); | ||
| } | ||
|
|
||
| Map<String, String> tblProps = | ||
| tblProperties == null ? Maps.newHashMap() : Maps.newHashMap(tblProperties); | ||
|
|
||
| builder.withProperties(tblProps); | ||
|
|
||
| builder.createOrReplace(); | ||
| } | ||
|
|
||
| private static ViewCatalog asViewCatalog(Catalog catalog, String catalogName) { | ||
| if (catalog instanceof ViewCatalog viewCatalog) { | ||
| return viewCatalog; | ||
| } | ||
| throw new UnsupportedOperationException( | ||
| String.format( | ||
| "Iceberg catalog '%s' does not implement ViewCatalog.", | ||
| catalogName) + | ||
| " Iceberg views require a catalog that implements ViewCatalog (e.g. HiveCatalog or REST)."); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,8 +36,10 @@ | |
| import org.apache.hadoop.hive.metastore.api.SkewedInfo; | ||
| import org.apache.hadoop.hive.metastore.api.StorageDescriptor; | ||
| import org.apache.hadoop.hive.metastore.api.Table; | ||
| import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; | ||
| import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; | ||
| import org.apache.hadoop.hive.serde.serdeConstants; | ||
| import org.apache.iceberg.BaseMetastoreTableOperations; | ||
| import org.apache.iceberg.BaseTable; | ||
| import org.apache.iceberg.CatalogUtil; | ||
| import org.apache.iceberg.Schema; | ||
|
|
@@ -46,6 +48,11 @@ | |
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.iceberg.util.PropertyUtil; | ||
| import org.apache.iceberg.view.BaseView; | ||
| import org.apache.iceberg.view.SQLViewRepresentation; | ||
| import org.apache.iceberg.view.View; | ||
| import org.apache.iceberg.view.ViewMetadata; | ||
| import org.apache.thrift.TException; | ||
|
|
||
| public class MetastoreUtil { | ||
|
|
@@ -134,20 +141,101 @@ public static Table toHiveTable(org.apache.iceberg.Table table, Configuration co | |
| result.setDbName(tableName.getDb()); | ||
| result.setTableName(tableName.getTable()); | ||
| result.setTableType(TableType.EXTERNAL_TABLE.toString()); | ||
| result.setPartitionKeys(getPartitionKeys(table, table.spec().specId())); | ||
|
|
||
| // TODO: Revert after HIVE-29633 is fixed | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this comment be a leftover? As I see there a toHiveView method created below. And HIVE-29633 is about views.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, this is intentional and agreed with Denys and Krisztian. |
||
| // result.setPartitionKeys(getPartitionKeys(table, table.spec().specId())); | ||
| result.setPartitionKeys(Lists.newArrayList()); | ||
|
|
||
| TableMetadata metadata = ((BaseTable) table).operations().current(); | ||
| long maxHiveTablePropertySize = conf.getLong(HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE, | ||
| HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT); | ||
| HMSTablePropertyHelper.updateHmsTableForIcebergTable(metadata.metadataFileLocation(), result, metadata, | ||
| null, true, maxHiveTablePropertySize, null); | ||
| String catalogType = IcebergCatalogProperties.getCatalogType(conf); | ||
| if (!StringUtils.isEmpty(catalogType) && !IcebergCatalogProperties.NO_CATALOG_TYPE.equals(catalogType)) { | ||
| result.getParameters().put(CatalogUtil.ICEBERG_CATALOG_TYPE, IcebergCatalogProperties.getCatalogType(conf)); | ||
| result.getParameters().put(CatalogUtil.ICEBERG_CATALOG_TYPE, catalogType); | ||
| } | ||
| result.setSd(getHiveStorageDescriptor(table)); | ||
| return result; | ||
| } | ||
|
|
||
| /** | ||
| * Builds a minimal HMS {@link Table} shell for a native Iceberg logical view (identity, view type, | ||
| * and Iceberg storage-handler markers only). The storage handler {@code postGetTable} hook enriches | ||
| * this object via {@link IcebergLogicalViewSupport#enrichHmsTableFromIcebergView} (view SQL, | ||
| * schema, and Iceberg parameters). | ||
| */ | ||
| public static Table buildMinimalHMSView(String catName, String dbName, String tableName) { | ||
| Table result = new Table(); | ||
| result.setCatName(catName); | ||
| result.setDbName(dbName); | ||
| result.setTableName(tableName); | ||
| result.setTableType(TableType.VIRTUAL_VIEW.toString()); | ||
|
|
||
| Map<String, String> parameters = Maps.newHashMap(); | ||
| parameters.put( | ||
| BaseMetastoreTableOperations.TABLE_TYPE_PROP, HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE); | ||
| parameters.put( | ||
| hive_metastoreConstants.META_TABLE_STORAGE, HMSTablePropertyHelper.HIVE_ICEBERG_STORAGE_HANDLER); | ||
| result.setParameters(parameters); | ||
| return result; | ||
| } | ||
|
|
||
| /** | ||
| * Applies Iceberg view metadata (SQL, schema, params) onto an existing HMS {@link Table}. | ||
| */ | ||
| public static void applyIcebergViewToHmsTable(Table hmsTable, View view, Configuration conf) { | ||
| ViewMetadata metadata = ((BaseView) view).operations().current(); | ||
| String sqlText = viewSqlText(view, metadata); | ||
|
|
||
| boolean hiveEngineEnabled = false; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Why it’s false in This path materializes an HMS So we keep a minimal SD consistent with normal virtual views and avoid implying this HMS object is an Iceberg-backed table for the Hive engine. For tables, |
||
| hmsTable.setSd(HiveOperationsBase.storageDescriptor(metadata.schema(), metadata.location(), hiveEngineEnabled)); | ||
| StorageDescriptor sd = hmsTable.getSd(); | ||
|
|
||
| if (sd.getBucketCols() == null) { | ||
| sd.setBucketCols(Lists.newArrayList()); | ||
| } | ||
|
|
||
| if (sd.getSortCols() == null) { | ||
| sd.setSortCols(Lists.newArrayList()); | ||
| } | ||
|
|
||
| long maxHiveTablePropertySize = | ||
| conf.getLong( | ||
| HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE, | ||
| HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT); | ||
| HMSTablePropertyHelper.updateHmsTableForIcebergView( | ||
| metadata.metadataFileLocation(), | ||
| hmsTable, | ||
| metadata, | ||
| Collections.emptySet(), | ||
| maxHiveTablePropertySize, | ||
| null); | ||
|
|
||
| hmsTable.setCreateTime((int) (metadata.version(1).timestampMillis() / 1000)); | ||
| hmsTable.setLastAccessTime((int) (metadata.currentVersion().timestampMillis() / 1000)); | ||
| hmsTable.setOwner( | ||
| PropertyUtil.propertyAsString( | ||
| metadata.properties(), HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser())); | ||
|
|
||
| // In-memory overlay for compile/describe: authoritative SQL comes from Iceberg metadata. | ||
| hmsTable.setViewOriginalText(sqlText); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can it cause any trouble on Hive side of the original text is actually the expanded text?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've implemented a lot of test cases both on HMS and REST paths and no problems are observed. It is taken from Iceberg metadata: |
||
| hmsTable.setViewExpandedText(sqlText); | ||
|
|
||
| String catalogType = IcebergCatalogProperties.getCatalogType(conf); | ||
| if (!StringUtils.isEmpty(catalogType) && !IcebergCatalogProperties.NO_CATALOG_TYPE.equals(catalogType)) { | ||
| hmsTable.getParameters().put(CatalogUtil.ICEBERG_CATALOG_TYPE, IcebergCatalogProperties.getCatalogType(conf)); | ||
| } | ||
| } | ||
|
|
||
| private static String viewSqlText(View view, ViewMetadata metadata) { | ||
| SQLViewRepresentation hiveRepr = view.sqlFor("hive"); | ||
| if (hiveRepr != null) { | ||
| return hiveRepr.sql(); | ||
| } | ||
| return HiveViewOperations.sqlFor(metadata); | ||
| } | ||
|
|
||
| private static StorageDescriptor getHiveStorageDescriptor(org.apache.iceberg.Table table) { | ||
| var result = new StorageDescriptor(); | ||
| result.setCols(HiveSchemaUtil.convert(table.schema())); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logical view is maybe a too verbose. I checked the Hive documentation and error messages and I only saw this naming: a view is a view or materialized view.
Technically it is true, it is a logical view. In the naming convention that Hive uses, it is just a view.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me omitting the word 'logical' sounds ambiguous.
Also, we are adding support for more view types and we will have regular hive view, hive matrialized view, Iceberg (logical) view, Iceberg materialized view (your PR), I just thought it will help understanding what view type we are referring to having 'logical' in the name.
@kasakrisz what do you think?