Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
Expand Down Expand Up @@ -89,6 +91,7 @@
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DatabaseSchemaTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.AlterLogicalViewNode;
Expand All @@ -99,6 +102,7 @@
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
Expand Down Expand Up @@ -133,6 +137,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -166,7 +171,8 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {
this::executeStatementForTableModel);
private final PipeTreeStatementDataTypeConvertExecutionVisitor
treeStatementDataTypeConvertExecutionVisitor =
new PipeTreeStatementDataTypeConvertExecutionVisitor(this::executeStatementForTreeModel);
new PipeTreeStatementDataTypeConvertExecutionVisitor(
statement -> executeStatementForTreeModel(statement, getTreeDatabaseName(statement)));
public final PipeTreeStatementToBatchVisitor batchVisitor = new PipeTreeStatementToBatchVisitor();

// Used for data transfer: confignode (cluster A) -> datanode (cluster B) -> confignode (cluster
Expand All @@ -186,6 +192,14 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {
private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();

private PipeMemoryBlock allocatedMemoryBlock;
private final Set<String> autoCreatedTreeDatabases = ConcurrentHashMap.newKeySet();
private final Set<String> conflictedTreeDatabases = ConcurrentHashMap.newKeySet();

private enum TreeDatabaseCreationResult {
SKIPPED,
CREATED_OR_EXISTED,
CONFLICTED
}

static {
try {
Expand Down Expand Up @@ -988,6 +1002,9 @@ private TSStatus executeStatementWithPermissionCheckAndRetryOnDataTypeMismatch(
((InsertBaseStatement) statement).getDatabaseName().isPresent()
? ((InsertBaseStatement) statement).getDatabaseName().get()
: null;
} else if (statement instanceof InsertBaseStatement) {
isTableModelStatement = false;
databaseName = getTreeDatabaseName(statement);
} else {
isTableModelStatement = false;
databaseName = null;
Expand Down Expand Up @@ -1038,7 +1055,7 @@ private TSStatus executeStatementWithPermissionCheckAndRetryOnDataTypeMismatch(
final TSStatus status =
isTableModelStatement
? executeStatementForTableModel(statement, databaseName)
: executeStatementForTreeModel(statement);
: executeStatementForTreeModel(statement, getTreeDatabaseName(statement));

// Try to convert data type if the status code is not success. Insert statements normally return
// above after the first converted execution. The retry path is kept for load and fallback
Expand Down Expand Up @@ -1181,7 +1198,84 @@ private void autoCreateDatabaseIfNecessary(final String database) {
}
}

private TSStatus executeStatementForTreeModel(final Statement statement) {
private TreeDatabaseCreationResult autoCreateTreeDatabaseIfNecessary(final String database) {
if (database == null
|| LoadTsFileStatement.getDatabaseLevelByTreeDatabase(database) == null
|| !IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
return TreeDatabaseCreationResult.SKIPPED;
}
if (autoCreatedTreeDatabases.contains(database)) {
return TreeDatabaseCreationResult.CREATED_OR_EXISTED;
}
if (conflictedTreeDatabases.contains(database)) {
return TreeDatabaseCreationResult.CONFLICTED;
}

try {
final TSStatus status =
AuthorityChecker.getAccessControl()
.checkCanCreateDatabaseForTree(getUserEntity(), new PartialPath(database));
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(status.getMessage());
}

final DatabaseSchemaStatement statement =
new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
statement.setDatabasePath(new PartialPath(database));
statement.setEnablePrintExceptionLog(false);
final DatabaseSchemaTask task = new DatabaseSchemaTask(statement);
final ListenableFuture<ConfigTaskResult> future =
task.execute(ClusterConfigTaskExecutor.getInstance());
final ConfigTaskResult result = future.get();
final int statusCode = result.getStatusCode().getStatusCode();
if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| statusCode == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
autoCreatedTreeDatabases.add(database);
return TreeDatabaseCreationResult.CREATED_OR_EXISTED;
}
if (statusCode == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
conflictedTreeDatabases.add(database);
return TreeDatabaseCreationResult.CONFLICTED;
}
throw new PipeException(
String.format(
"Auto create tree database failed: %s, status code: %s",
database, result.getStatusCode()));
} catch (final IllegalPathException e) {
throw new PipeException(String.format("Illegal tree database %s.", database), e);
} catch (final ExecutionException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
final Throwable rootCause = getRootCause(e);
final int errorCode;
if (rootCause instanceof IoTDBException) {
errorCode = ((IoTDBException) rootCause).getErrorCode();
} else if (rootCause instanceof IoTDBRuntimeException) {
errorCode = ((IoTDBRuntimeException) rootCause).getErrorCode();
} else {
errorCode = TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode();
}
if (errorCode == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
autoCreatedTreeDatabases.add(database);
return TreeDatabaseCreationResult.CREATED_OR_EXISTED;
}
if (errorCode == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
conflictedTreeDatabases.add(database);
return TreeDatabaseCreationResult.CONFLICTED;
}
throw new PipeException(
DataNodePipeMessages.AUTO_CREATE_DATABASE_FAILED_BECAUSE + e.getMessage());
}
}

private TSStatus executeStatementForTreeModel(
final Statement statement, final String databaseName) {
if (autoCreateTreeDatabaseIfNecessary(databaseName) == TreeDatabaseCreationResult.CONFLICTED) {
// Continue execution, but let partition analysis infer the receiver-side database.
clearTreeDatabaseName(statement);
}

return Coordinator.getInstance()
.executeForTreeModel(
shouldMarkAsPipeRequest.get() ? new PipeEnrichedStatement(statement) : statement,
Expand All @@ -1196,6 +1290,53 @@ private TSStatus executeStatementForTreeModel(final Statement statement) {
.status;
}

private IAuditEntity getUserEntity() {
return userEntity != null
? userEntity
: AuthorityChecker.createIAuditEntity(username, SESSION_MANAGER.getCurrSession());
}

private String getTreeDatabaseName(final Statement statement) {
if (statement instanceof LoadTsFileStatement) {
return ((LoadTsFileStatement) statement).getDatabase();
}
if (statement instanceof InsertBaseStatement) {
return ((InsertBaseStatement) statement).getDatabaseName().orElse(null);
}
return null;
}

static void clearTreeDatabaseName(final Statement statement) {
if (statement instanceof LoadTsFileStatement) {
final LoadTsFileStatement loadTsFileStatement = (LoadTsFileStatement) statement;
loadTsFileStatement.setDatabase(null);
loadTsFileStatement.setDatabaseLevel(
IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel());
} else if (statement instanceof InsertBaseStatement) {
clearTreeInsertDatabaseName((InsertBaseStatement) statement);
}
}

private static void clearTreeInsertDatabaseName(final InsertBaseStatement statement) {
statement.setDatabaseName(null);
if (statement instanceof InsertRowsStatement) {
for (final InsertBaseStatement childStatement :
((InsertRowsStatement) statement).getInsertRowStatementList()) {
childStatement.setDatabaseName(null);
}
} else if (statement instanceof InsertRowsOfOneDeviceStatement) {
for (final InsertBaseStatement childStatement :
((InsertRowsOfOneDeviceStatement) statement).getInsertRowStatementList()) {
childStatement.setDatabaseName(null);
}
} else if (statement instanceof InsertMultiTabletsStatement) {
for (final InsertBaseStatement childStatement :
((InsertMultiTabletsStatement) statement).getInsertTabletStatementList()) {
childStatement.setDatabaseName(null);
}
}
}

private TSStatus executeStatementForTableModelWithPermissionCheck(
final org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement statement,
final String databaseName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,15 @@ public Optional<TSStatus> visitLoadFile(
new TsFileInsertionEventScanParser(
file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null, true)) {
for (final Pair<Tablet, Boolean> tabletWithIsAligned : parser.toTabletWithIsAligneds()) {
final InsertTabletStatement insertTabletStatement =
PipeTransferTabletRawReq.toTPipeTransferRawReq(
tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight())
.constructStatement();
if (loadTsFileStatement.getDatabase() != null) {
insertTabletStatement.setDatabaseName(loadTsFileStatement.getDatabase());
}
final PipeConvertedInsertTabletStatement statement =
new PipeConvertedInsertTabletStatement(
PipeTransferTabletRawReq.toTPipeTransferRawReq(
tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight())
.constructStatement(),
false);
new PipeConvertedInsertTabletStatement(insertTabletStatement, false);

TSStatus result;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ private LoadTsFileStatement buildRetryTreeLoadStatement(
.setConvertOnTypeMismatch(true);
if (database != null) {
statement.setDatabase(database);
statement.updateDatabaseLevelByTreeDatabase();
}
if (isGeneratedByPipe) {
statement.markIsGeneratedByPipe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
package org.apache.iotdb.db.pipe.receiver.protocol.thrift;

import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
Expand All @@ -29,6 +33,8 @@

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;

public class IoTDBDataNodeReceiverTest {
Expand Down Expand Up @@ -150,4 +156,53 @@ public void testLoadTsFileSyncStatementCanSkipVerifySchemaWhenNotConvertingType(
Files.deleteIfExists(tsFile);
}
}

@Test
public void testClearTreeDatabaseNameForLoadTsFileStatement() throws Exception {
final Path tsFile = Files.createTempFile("pipe-load-clear-tree-database", ".tsfile");
try {
final LoadTsFileStatement statement =
IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
"root.test.sg_0", tsFile.toString(), true, true);

IoTDBDataNodeReceiver.clearTreeDatabaseName(statement);

Assert.assertNull(statement.getDatabase());
Assert.assertEquals(
IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel(),
statement.getDatabaseLevel());
} finally {
Files.deleteIfExists(tsFile);
}
}

@Test
public void testClearTreeDatabaseNameForBatchInsertStatements() {
final InsertRowStatement rowStatement1 = new InsertRowStatement();
rowStatement1.setDatabaseName("root.test.sg_0");
final InsertRowStatement rowStatement2 = new InsertRowStatement();
rowStatement2.setDatabaseName("root.test.sg_0");
final InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
insertRowsStatement.setDatabaseName("root.test.sg_0");
insertRowsStatement.setInsertRowStatementList(Arrays.asList(rowStatement1, rowStatement2));

IoTDBDataNodeReceiver.clearTreeDatabaseName(insertRowsStatement);

Assert.assertFalse(insertRowsStatement.getDatabaseName().isPresent());
Assert.assertFalse(rowStatement1.getDatabaseName().isPresent());
Assert.assertFalse(rowStatement2.getDatabaseName().isPresent());

final InsertTabletStatement tabletStatement = new InsertTabletStatement();
tabletStatement.setDatabaseName("root.test.sg_0");
final InsertMultiTabletsStatement insertMultiTabletsStatement =
new InsertMultiTabletsStatement();
insertMultiTabletsStatement.setDatabaseName("root.test.sg_0");
insertMultiTabletsStatement.setInsertTabletStatementList(
Collections.singletonList(tabletStatement));

IoTDBDataNodeReceiver.clearTreeDatabaseName(insertMultiTabletsStatement);

Assert.assertFalse(insertMultiTabletsStatement.getDatabaseName().isPresent());
Assert.assertFalse(tabletStatement.getDatabaseName().isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.io.File;
import java.lang.reflect.Method;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -87,4 +91,31 @@ public void testGetPartitionQueryDatabaseForTableModelLoad() {

Assert.assertEquals("test", LoadTsFileScheduler.getPartitionQueryDatabase(node, false));
}

@Test
public void testBuildRetryTreeLoadStatementUpdatesDatabaseLevel() throws Exception {
final LoadTsFileScheduler scheduler =
new LoadTsFileScheduler(
distributedQueryPlan,
mock(MPPQueryContext.class),
mock(QueryStateMachine.class),
mock(IClientManager.class),
mock(IPartitionFetcher.class),
true);
final Method method =
LoadTsFileScheduler.class.getDeclaredMethod(
"buildRetryTreeLoadStatement", String.class, boolean.class, String.class);
method.setAccessible(true);

final File tsFile = File.createTempFile("test", ".tsfile");
tsFile.deleteOnExit();

final LoadTsFileStatement statement =
(LoadTsFileStatement)
method.invoke(scheduler, tsFile.getAbsolutePath(), true, "root.test.sg_0");

Assert.assertEquals("root.test.sg_0", statement.getDatabase());
Assert.assertEquals(2, statement.getDatabaseLevel());
Assert.assertTrue(statement.isGeneratedByPipe());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.nio.file.Files;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;

Expand Down
Loading