From c5ed6e917399debfd9c19463cfeaa6c351d32eb9 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 18 Jun 2026 16:26:50 +0800 Subject: [PATCH] Fix pipe tree database creation on receiver --- .../thrift/IoTDBDataNodeReceiver.java | 147 +++++++++++++++++- ...tementDataTypeConvertExecutionVisitor.java | 13 +- .../scheduler/load/LoadTsFileScheduler.java | 1 + .../thrift/IoTDBDataNodeReceiverTest.java | 55 +++++++ .../load/LoadTsFileSchedulerTest.java | 31 ++++ .../db/utils/ConfigurationFileUtilsTest.java | 1 + 6 files changed, 240 insertions(+), 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 4f101e1796a21..978d924c05651 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -26,6 +26,8 @@ import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.config.PipeConfig; @@ -89,6 +91,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DatabaseSchemaTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask; import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.AlterLogicalViewNode; @@ -99,6 +102,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.StatementType; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; @@ -133,6 +137,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -166,7 +171,8 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { this::executeStatementForTableModel); private final PipeTreeStatementDataTypeConvertExecutionVisitor treeStatementDataTypeConvertExecutionVisitor = - new PipeTreeStatementDataTypeConvertExecutionVisitor(this::executeStatementForTreeModel); + new PipeTreeStatementDataTypeConvertExecutionVisitor( + statement -> executeStatementForTreeModel(statement, getTreeDatabaseName(statement))); public final PipeTreeStatementToBatchVisitor batchVisitor = new PipeTreeStatementToBatchVisitor(); // Used for data transfer: confignode (cluster A) -> datanode (cluster B) -> confignode (cluster @@ -186,6 +192,14 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance(); private PipeMemoryBlock allocatedMemoryBlock; + private final Set autoCreatedTreeDatabases = ConcurrentHashMap.newKeySet(); + private final Set conflictedTreeDatabases = ConcurrentHashMap.newKeySet(); + + private enum TreeDatabaseCreationResult { + SKIPPED, + CREATED_OR_EXISTED, + CONFLICTED + } static { try { @@ -988,6 +1002,9 @@ private TSStatus executeStatementWithPermissionCheckAndRetryOnDataTypeMismatch( ((InsertBaseStatement) statement).getDatabaseName().isPresent() ? ((InsertBaseStatement) statement).getDatabaseName().get() : null; + } else if (statement instanceof InsertBaseStatement) { + isTableModelStatement = false; + databaseName = getTreeDatabaseName(statement); } else { isTableModelStatement = false; databaseName = null; @@ -1038,7 +1055,7 @@ private TSStatus executeStatementWithPermissionCheckAndRetryOnDataTypeMismatch( final TSStatus status = isTableModelStatement ? executeStatementForTableModel(statement, databaseName) - : executeStatementForTreeModel(statement); + : executeStatementForTreeModel(statement, getTreeDatabaseName(statement)); // Try to convert data type if the status code is not success. Insert statements normally return // above after the first converted execution. The retry path is kept for load and fallback @@ -1181,7 +1198,84 @@ private void autoCreateDatabaseIfNecessary(final String database) { } } - private TSStatus executeStatementForTreeModel(final Statement statement) { + private TreeDatabaseCreationResult autoCreateTreeDatabaseIfNecessary(final String database) { + if (database == null + || LoadTsFileStatement.getDatabaseLevelByTreeDatabase(database) == null + || !IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) { + return TreeDatabaseCreationResult.SKIPPED; + } + if (autoCreatedTreeDatabases.contains(database)) { + return TreeDatabaseCreationResult.CREATED_OR_EXISTED; + } + if (conflictedTreeDatabases.contains(database)) { + return TreeDatabaseCreationResult.CONFLICTED; + } + + try { + final TSStatus status = + AuthorityChecker.getAccessControl() + .checkCanCreateDatabaseForTree(getUserEntity(), new PartialPath(database)); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(status.getMessage()); + } + + final DatabaseSchemaStatement statement = + new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE); + statement.setDatabasePath(new PartialPath(database)); + statement.setEnablePrintExceptionLog(false); + final DatabaseSchemaTask task = new DatabaseSchemaTask(statement); + final ListenableFuture future = + task.execute(ClusterConfigTaskExecutor.getInstance()); + final ConfigTaskResult result = future.get(); + final int statusCode = result.getStatusCode().getStatusCode(); + if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || statusCode == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { + autoCreatedTreeDatabases.add(database); + return TreeDatabaseCreationResult.CREATED_OR_EXISTED; + } + if (statusCode == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) { + conflictedTreeDatabases.add(database); + return TreeDatabaseCreationResult.CONFLICTED; + } + throw new PipeException( + String.format( + "Auto create tree database failed: %s, status code: %s", + database, result.getStatusCode())); + } catch (final IllegalPathException e) { + throw new PipeException(String.format("Illegal tree database %s.", database), e); + } catch (final ExecutionException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + final Throwable rootCause = getRootCause(e); + final int errorCode; + if (rootCause instanceof IoTDBException) { + errorCode = ((IoTDBException) rootCause).getErrorCode(); + } else if (rootCause instanceof IoTDBRuntimeException) { + errorCode = ((IoTDBRuntimeException) rootCause).getErrorCode(); + } else { + errorCode = TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(); + } + if (errorCode == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { + autoCreatedTreeDatabases.add(database); + return TreeDatabaseCreationResult.CREATED_OR_EXISTED; + } + if (errorCode == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) { + conflictedTreeDatabases.add(database); + return TreeDatabaseCreationResult.CONFLICTED; + } + throw new PipeException( + DataNodePipeMessages.AUTO_CREATE_DATABASE_FAILED_BECAUSE + e.getMessage()); + } + } + + private TSStatus executeStatementForTreeModel( + final Statement statement, final String databaseName) { + if (autoCreateTreeDatabaseIfNecessary(databaseName) == TreeDatabaseCreationResult.CONFLICTED) { + // Continue execution, but let partition analysis infer the receiver-side database. + clearTreeDatabaseName(statement); + } + return Coordinator.getInstance() .executeForTreeModel( shouldMarkAsPipeRequest.get() ? new PipeEnrichedStatement(statement) : statement, @@ -1196,6 +1290,53 @@ private TSStatus executeStatementForTreeModel(final Statement statement) { .status; } + private IAuditEntity getUserEntity() { + return userEntity != null + ? userEntity + : AuthorityChecker.createIAuditEntity(username, SESSION_MANAGER.getCurrSession()); + } + + private String getTreeDatabaseName(final Statement statement) { + if (statement instanceof LoadTsFileStatement) { + return ((LoadTsFileStatement) statement).getDatabase(); + } + if (statement instanceof InsertBaseStatement) { + return ((InsertBaseStatement) statement).getDatabaseName().orElse(null); + } + return null; + } + + static void clearTreeDatabaseName(final Statement statement) { + if (statement instanceof LoadTsFileStatement) { + final LoadTsFileStatement loadTsFileStatement = (LoadTsFileStatement) statement; + loadTsFileStatement.setDatabase(null); + loadTsFileStatement.setDatabaseLevel( + IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel()); + } else if (statement instanceof InsertBaseStatement) { + clearTreeInsertDatabaseName((InsertBaseStatement) statement); + } + } + + private static void clearTreeInsertDatabaseName(final InsertBaseStatement statement) { + statement.setDatabaseName(null); + if (statement instanceof InsertRowsStatement) { + for (final InsertBaseStatement childStatement : + ((InsertRowsStatement) statement).getInsertRowStatementList()) { + childStatement.setDatabaseName(null); + } + } else if (statement instanceof InsertRowsOfOneDeviceStatement) { + for (final InsertBaseStatement childStatement : + ((InsertRowsOfOneDeviceStatement) statement).getInsertRowStatementList()) { + childStatement.setDatabaseName(null); + } + } else if (statement instanceof InsertMultiTabletsStatement) { + for (final InsertBaseStatement childStatement : + ((InsertMultiTabletsStatement) statement).getInsertTabletStatementList()) { + childStatement.setDatabaseName(null); + } + } + } + private TSStatus executeStatementForTableModelWithPermissionCheck( final org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement statement, final String databaseName) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java index e78e273fc0c0f..f8c6eadc5f50d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java @@ -105,12 +105,15 @@ public Optional visitLoadFile( new TsFileInsertionEventScanParser( file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null, true)) { for (final Pair tabletWithIsAligned : parser.toTabletWithIsAligneds()) { + final InsertTabletStatement insertTabletStatement = + PipeTransferTabletRawReq.toTPipeTransferRawReq( + tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight()) + .constructStatement(); + if (loadTsFileStatement.getDatabase() != null) { + insertTabletStatement.setDatabaseName(loadTsFileStatement.getDatabase()); + } final PipeConvertedInsertTabletStatement statement = - new PipeConvertedInsertTabletStatement( - PipeTransferTabletRawReq.toTPipeTransferRawReq( - tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight()) - .constructStatement(), - false); + new PipeConvertedInsertTabletStatement(insertTabletStatement, false); TSStatus result; try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index 6ca3164a3e12d..e2db158475aca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -652,6 +652,7 @@ private LoadTsFileStatement buildRetryTreeLoadStatement( .setConvertOnTypeMismatch(true); if (database != null) { statement.setDatabase(database); + statement.updateDatabaseLevelByTreeDatabase(); } if (isGeneratedByPipe) { statement.markIsGeneratedByPipe(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java index 7f9197dfa04e7..8f2e86c62d057 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java @@ -20,6 +20,10 @@ package org.apache.iotdb.db.pipe.receiver.protocol.thrift; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator; @@ -29,6 +33,8 @@ import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; import java.util.Map; public class IoTDBDataNodeReceiverTest { @@ -150,4 +156,53 @@ public void testLoadTsFileSyncStatementCanSkipVerifySchemaWhenNotConvertingType( Files.deleteIfExists(tsFile); } } + + @Test + public void testClearTreeDatabaseNameForLoadTsFileStatement() throws Exception { + final Path tsFile = Files.createTempFile("pipe-load-clear-tree-database", ".tsfile"); + try { + final LoadTsFileStatement statement = + IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync( + "root.test.sg_0", tsFile.toString(), true, true); + + IoTDBDataNodeReceiver.clearTreeDatabaseName(statement); + + Assert.assertNull(statement.getDatabase()); + Assert.assertEquals( + IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel(), + statement.getDatabaseLevel()); + } finally { + Files.deleteIfExists(tsFile); + } + } + + @Test + public void testClearTreeDatabaseNameForBatchInsertStatements() { + final InsertRowStatement rowStatement1 = new InsertRowStatement(); + rowStatement1.setDatabaseName("root.test.sg_0"); + final InsertRowStatement rowStatement2 = new InsertRowStatement(); + rowStatement2.setDatabaseName("root.test.sg_0"); + final InsertRowsStatement insertRowsStatement = new InsertRowsStatement(); + insertRowsStatement.setDatabaseName("root.test.sg_0"); + insertRowsStatement.setInsertRowStatementList(Arrays.asList(rowStatement1, rowStatement2)); + + IoTDBDataNodeReceiver.clearTreeDatabaseName(insertRowsStatement); + + Assert.assertFalse(insertRowsStatement.getDatabaseName().isPresent()); + Assert.assertFalse(rowStatement1.getDatabaseName().isPresent()); + Assert.assertFalse(rowStatement2.getDatabaseName().isPresent()); + + final InsertTabletStatement tabletStatement = new InsertTabletStatement(); + tabletStatement.setDatabaseName("root.test.sg_0"); + final InsertMultiTabletsStatement insertMultiTabletsStatement = + new InsertMultiTabletsStatement(); + insertMultiTabletsStatement.setDatabaseName("root.test.sg_0"); + insertMultiTabletsStatement.setInsertTabletStatementList( + Collections.singletonList(tabletStatement)); + + IoTDBDataNodeReceiver.clearTreeDatabaseName(insertMultiTabletsStatement); + + Assert.assertFalse(insertMultiTabletsStatement.getDatabaseName().isPresent()); + Assert.assertFalse(tabletStatement.getDatabaseName().isPresent()); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java index 2db41c2ccb0ff..19d97490c8162 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.junit.Assert; import org.junit.Before; @@ -35,6 +36,9 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import java.io.File; +import java.lang.reflect.Method; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -87,4 +91,31 @@ public void testGetPartitionQueryDatabaseForTableModelLoad() { Assert.assertEquals("test", LoadTsFileScheduler.getPartitionQueryDatabase(node, false)); } + + @Test + public void testBuildRetryTreeLoadStatementUpdatesDatabaseLevel() throws Exception { + final LoadTsFileScheduler scheduler = + new LoadTsFileScheduler( + distributedQueryPlan, + mock(MPPQueryContext.class), + mock(QueryStateMachine.class), + mock(IClientManager.class), + mock(IPartitionFetcher.class), + true); + final Method method = + LoadTsFileScheduler.class.getDeclaredMethod( + "buildRetryTreeLoadStatement", String.class, boolean.class, String.class); + method.setAccessible(true); + + final File tsFile = File.createTempFile("test", ".tsfile"); + tsFile.deleteOnExit(); + + final LoadTsFileStatement statement = + (LoadTsFileStatement) + method.invoke(scheduler, tsFile.getAbsolutePath(), true, "root.test.sg_0"); + + Assert.assertEquals("root.test.sg_0", statement.getDatabase()); + Assert.assertEquals(2, statement.getDatabaseLevel()); + Assert.assertTrue(statement.isGeneratedByPipe()); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/ConfigurationFileUtilsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/ConfigurationFileUtilsTest.java index a21b8fc58c21b..7223ad7dcd905 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/ConfigurationFileUtilsTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/ConfigurationFileUtilsTest.java @@ -36,6 +36,7 @@ import java.nio.file.Files; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set;