From e773b8f277004bd01e9d0389e3676a681ee98075 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 18 Jun 2026 16:15:57 +0800 Subject: [PATCH 1/4] Fix old pipe root user compatibility --- .../confignode/manager/ConfigManager.java | 4 +- .../executor/ConfigPlanExecutor.java | 1 + .../confignode/persistence/pipe/PipeInfo.java | 8 +- .../persistence/pipe/PipeTaskInfo.java | 52 +++++- .../pipe/PipeTaskInfoAutoRestartTest.java | 169 +++++++++++++++++- .../pipe/agent/task/meta/PipeStaticMeta.java | 60 +++++++ 6 files changed, 283 insertions(+), 11 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 37dd66b152729..da3104e82bca5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -375,7 +375,8 @@ public ConfigManager() throws IOException { TriggerInfo triggerInfo = new TriggerInfo(); CQInfo cqInfo = new CQInfo(); ExternalServiceInfo externalServiceInfo = new ExternalServiceInfo(); - PipeInfo pipeInfo = new PipeInfo(); + this.permissionManager = createPermissionManager(authorInfo); + PipeInfo pipeInfo = new PipeInfo(this.permissionManager::login4Pipe); QuotaInfo quotaInfo = new QuotaInfo(); TTLInfo ttlInfo = new TTLInfo(); SubscriptionInfo subscriptionInfo = new SubscriptionInfo(); @@ -409,7 +410,6 @@ public ConfigManager() throws IOException { new ClusterSchemaQuotaStatistics( COMMON_CONF.getSeriesLimitThreshold(), COMMON_CONF.getDeviceLimitThreshold())); this.partitionManager = new PartitionManager(this, partitionInfo); - this.permissionManager = createPermissionManager(authorInfo); this.procedureManager = createProcedureManager(procedureInfo); this.externalServiceManager = new ExternalServiceManager(this); this.udfManager = new UDFManager(this, udfInfo); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index ed23fda8a6f8f..4a70ace8ca19e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -801,6 +801,7 @@ public boolean loadSnapshot(final File latestSnapshotRootDir) { } }); if (result.get()) { + pipeInfo.getPipeTaskInfo().enrichPipeMetasWithRootUserForCompatibility(); LOGGER.info( ConfigNodeMessages.CONFIGNODESNAPSHOT_LOAD_SNAPSHOT_SUCCESS_LATESTSNAPSHOTROOTDIR, latestSnapshotRootDir); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java index e353398cd9de1..0364f9f7af9ea 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.function.BiFunction; public class PipeInfo implements SnapshotProcessor { @@ -58,8 +59,13 @@ public class PipeInfo implements SnapshotProcessor { private final PipeTaskInfo pipeTaskInfo; public PipeInfo() throws IOException { + this(null); + } + + public PipeInfo(final BiFunction pipeUserPasswordProvider) + throws IOException { pipePluginInfo = new PipePluginInfo(); - pipeTaskInfo = new PipeTaskInfo(); + pipeTaskInfo = new PipeTaskInfo(pipeUserPasswordProvider); } public PipePluginInfo getPipePluginInfo() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 3c2e331b0a164..61beb9823ba4d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; @@ -56,7 +57,6 @@ import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure; import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; -import org.apache.iotdb.confignode.service.ConfigNode; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -76,9 +76,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -95,10 +97,16 @@ public class PipeTaskInfo implements SnapshotProcessor { // Pure in-memory object, not involved in snapshot serialization and deserialization. private final PipeTaskInfoVersion pipeTaskInfoVersion; + private final BiFunction pipeUserPasswordProvider; public PipeTaskInfo() { + this(null); + } + + public PipeTaskInfo(final BiFunction pipeUserPasswordProvider) { this.pipeMetaKeeper = new PipeMetaKeeper(); this.pipeTaskInfoVersion = new PipeTaskInfoVersion(); + this.pipeUserPasswordProvider = pipeUserPasswordProvider; } /////////////////////////////// Lock /////////////////////////////// @@ -445,6 +453,7 @@ private void validatePipePluginUsageByPipeInternal(String pluginName) { public TSStatus createPipe(final CreatePipePlanV2 plan) { acquireWriteLock(); try { + enrichPipeMetaWithRootUserForCompatibility(plan.getPipeStaticMeta()); pipeMetaKeeper.addPipeMeta(new PipeMeta(plan.getPipeStaticMeta(), plan.getPipeRuntimeMeta())); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } finally { @@ -502,6 +511,7 @@ public TSStatus operateMultiplePipes(final OperateMultiplePipesPlanV2 plan) { public TSStatus alterPipe(final AlterPipePlanV2 plan) { acquireWriteLock(); try { + enrichPipeMetaWithRootUserForCompatibility(plan.getPipeStaticMeta()); final PipeTemporaryMeta temporaryMeta = pipeMetaKeeper.getPipeMeta(plan.getPipeStaticMeta().getPipeName()).getTemporaryMeta(); pipeMetaKeeper.removePipeMeta(plan.getPipeStaticMeta().getPipeName()); @@ -719,6 +729,7 @@ private TSStatus handleMetaChangesInternal(final PipeHandleMetaChangePlan plan) plan.getPipeMetaList() .forEach( pipeMeta -> { + enrichPipeMetaWithRootUserForCompatibility(pipeMeta.getStaticMeta()); pipeMetaKeeper.addPipeMeta(pipeMeta); logger.ifPresent(l -> l.debug(ConfigNodeMessages.RECORDING_PIPE_META, pipeMeta)); }); @@ -998,6 +1009,45 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException { } } + public void enrichPipeMetasWithRootUserForCompatibility() { + acquireWriteLock(); + try { + pipeMetaKeeper + .getPipeMetaList() + .forEach( + pipeMeta -> enrichPipeMetaWithRootUserForCompatibility(pipeMeta.getStaticMeta())); + } finally { + releaseWriteLock(); + } + } + + private void enrichPipeMetaWithRootUserForCompatibility(final PipeStaticMeta pipeStaticMeta) { + if (pipeUserPasswordProvider == null) { + return; + } + final boolean shouldEnrichSource = pipeStaticMeta.mayNeedCompatibleRootUserForIoTDBSource(); + final boolean shouldEnrichSink = pipeStaticMeta.mayNeedCompatibleRootUserForWriteBackSink(); + if (!shouldEnrichSource && !shouldEnrichSink) { + return; + } + + final String rootUserName = CommonDescriptor.getInstance().getConfig().getDefaultAdminName(); + final String password = pipeUserPasswordProvider.apply(rootUserName, null); + if (Objects.isNull(password)) { + throw new PipeException( + String.format( + "Failed to enrich pipe %s with root user for compatibility because root user %s does not exist.", + pipeStaticMeta.getPipeName(), rootUserName)); + } + + if (shouldEnrichSource) { + pipeStaticMeta.enrichSourceWithRootUserForCompatibility(rootUserName, password); + } + if (shouldEnrichSink) { + pipeStaticMeta.enrichWriteBackSinkWithRootUserForCompatibility(rootUserName, password); + } + } + private void normalizeRecoveredConsensusPipeStatus() { final List restartedConsensusPipes = new ArrayList<>(); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java index 762c3bf045c5b..d61c1771aa6d8 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java @@ -20,11 +20,15 @@ package org.apache.iotdb.confignode.persistence.pipe; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; +import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp; @@ -88,6 +92,145 @@ public void testRecordDataNodePushPipeMetaExceptionsKeepsUserStoppedPipeOutOfAut Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get()); } + @Test + public void testEnrichOldUserPipeWithRootUserForCompatibility() { + final String rootUserName = CommonDescriptor.getInstance().getConfig().getDefaultAdminName(); + final String rootPassword = "root-current-password"; + pipeTaskInfo = new PipeTaskInfo((username, password) -> rootPassword); + + createPipe("oldPipe", PipeStatus.STOPPED); + + final Map sourceAttributes = + pipeTaskInfo + .getPipeMetaByPipeName("oldPipe") + .getStaticMeta() + .getSourceParameters() + .getAttribute(); + Assert.assertEquals( + String.valueOf(IoTDBConstant.SUPER_USER_ID), + sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_USER_ID)); + Assert.assertEquals( + rootUserName, sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)); + Assert.assertEquals( + rootPassword, sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); + Assert.assertFalse( + pipeTaskInfo + .getPipeMetaByPipeName("oldPipe") + .getStaticMeta() + .getSinkParameters() + .getAttribute() + .containsKey(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY)); + } + + @Test + public void testDoNotOverwritePipeWithUserForCompatibility() { + pipeTaskInfo = new PipeTaskInfo((username, password) -> "root-current-password"); + + createPipeWithSourceAttributes( + "newPipe", + new HashMap() { + { + put("extractor", "iotdb-source"); + put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, "user"); + put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, "user-password"); + } + }); + + final Map sourceAttributes = + pipeTaskInfo + .getPipeMetaByPipeName("newPipe") + .getStaticMeta() + .getSourceParameters() + .getAttribute(); + Assert.assertEquals("user", sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)); + Assert.assertEquals( + "user-password", sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); + } + + @Test + public void testDoNotEnrichSystemPipeForCompatibility() { + pipeTaskInfo = new PipeTaskInfo((username, password) -> "root-current-password"); + + createPipeWithSourceAttributes( + PipeStaticMeta.generateSubscriptionPipeName("topic", "group"), + new HashMap() { + { + put("extractor", "iotdb-source"); + } + }); + + final Map sourceAttributes = + pipeTaskInfo + .getPipeMetaByPipeName(PipeStaticMeta.generateSubscriptionPipeName("topic", "group")) + .getStaticMeta() + .getSourceParameters() + .getAttribute(); + Assert.assertFalse(sourceAttributes.containsKey(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)); + Assert.assertFalse(sourceAttributes.containsKey(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); + } + + @Test + public void testEnrichOldWriteBackSinkWithRootUserForCompatibility() { + final String rootUserName = CommonDescriptor.getInstance().getConfig().getDefaultAdminName(); + final String rootPassword = "root-current-password"; + pipeTaskInfo = new PipeTaskInfo((username, password) -> rootPassword); + + createPipeWithAttributes( + "oldWriteBackPipe", + new HashMap() { + { + put("extractor", "iotdb-source"); + put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, "source-user"); + put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, "source-password"); + } + }, + new HashMap() { + { + put("connector", "write-back-sink"); + } + }); + + final Map sinkAttributes = + pipeTaskInfo + .getPipeMetaByPipeName("oldWriteBackPipe") + .getStaticMeta() + .getSinkParameters() + .getAttribute(); + Assert.assertEquals( + String.valueOf(IoTDBConstant.SUPER_USER_ID), + sinkAttributes.get(PipeSinkConstant.SINK_IOTDB_USER_ID)); + Assert.assertEquals(rootUserName, sinkAttributes.get(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY)); + Assert.assertEquals(rootPassword, sinkAttributes.get(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY)); + } + + @Test + public void testEnrichLoadedPipeMetasWithRootUserForCompatibility() { + final String rootPassword = "root-current-password"; + pipeTaskInfo = new PipeTaskInfo((username, password) -> rootPassword); + + createPipeWithSourceAttributes( + "loadedPipe", + new HashMap() { + { + put("extractor", "iotdb-source"); + } + }); + final Map sourceAttributes = + pipeTaskInfo + .getPipeMetaByPipeName("loadedPipe") + .getStaticMeta() + .getSourceParameters() + .getAttribute(); + sourceAttributes.remove(PipeSourceConstant.SOURCE_IOTDB_USER_ID); + sourceAttributes.remove(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY); + sourceAttributes.remove(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY); + + pipeTaskInfo.enrichPipeMetasWithRootUserForCompatibility(); + + Assert.assertEquals( + rootPassword, sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); + } + private Map createErrorRespMap(final String pipeName) { final TPushPipeMetaRespExceptionMessage exceptionMessage = new TPushPipeMetaRespExceptionMessage( @@ -101,11 +244,27 @@ private Map createErrorRespMap(final String pipeName private void createPipe(final String pipeName, final PipeStatus initialStatus) { final Map extractorAttributes = new HashMap<>(); - final Map processorAttributes = new HashMap<>(); - final Map connectorAttributes = new HashMap<>(); extractorAttributes.put("extractor", "iotdb-source"); - processorAttributes.put("processor", "do-nothing-processor"); + createPipeWithSourceAttributes(pipeName, extractorAttributes); + + if (PipeStatus.RUNNING.equals(initialStatus)) { + pipeTaskInfo.setPipeStatus(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING)); + } + } + + private void createPipeWithSourceAttributes( + final String pipeName, final Map extractorAttributes) { + final Map connectorAttributes = new HashMap<>(); connectorAttributes.put("connector", "iotdb-thrift-sink"); + createPipeWithAttributes(pipeName, extractorAttributes, connectorAttributes); + } + + private void createPipeWithAttributes( + final String pipeName, + final Map extractorAttributes, + final Map connectorAttributes) { + final Map processorAttributes = new HashMap<>(); + processorAttributes.put("processor", "do-nothing-processor"); final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID); final ConcurrentMap pipeTasks = new ConcurrentHashMap<>(); @@ -120,9 +279,5 @@ private void createPipe(final String pipeName, final PipeStatus initialStatus) { connectorAttributes); final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks); pipeTaskInfo.createPipe(new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta)); - - if (PipeStatus.RUNNING.equals(initialStatus)) { - pipeTaskInfo.setPipeStatus(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING)); - } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java index 4b57cd64e43b2..9855552113c67 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java @@ -19,7 +19,9 @@ package org.apache.iotdb.commons.pipe.agent.task.meta; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; +import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility; import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils; @@ -97,6 +99,64 @@ public boolean isSourceExternal() { .toLowerCase()); } + public boolean mayNeedCompatibleRootUserForIoTDBSource() { + final String pluginName = + sourceParameters + .getStringOrDefault( + Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY, PipeSourceConstant.SOURCE_KEY), + BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + .toLowerCase(); + + return PipeType.USER.equals(getPipeType()) + && (pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + || pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName())) + && !sourceParameters.hasAnyAttributes( + PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY, + PipeSourceConstant.SOURCE_IOTDB_USER_KEY, + PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY, + PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, + PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY, + PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY); + } + + public boolean mayNeedCompatibleRootUserForWriteBackSink() { + final String pluginName = + sinkParameters + .getStringOrDefault( + Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, PipeSinkConstant.SINK_KEY), + BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName()) + .toLowerCase(); + + return PipeType.USER.equals(getPipeType()) + && (pluginName.equals(BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName()) + || pluginName.equals(BuiltinPipePlugin.WRITE_BACK_SINK.getPipePluginName())) + && !sinkParameters.hasAnyAttributes( + PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY, + PipeSinkConstant.SINK_IOTDB_USER_KEY, + PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY, + PipeSinkConstant.SINK_IOTDB_USERNAME_KEY, + PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY, + PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY); + } + + public void enrichSourceWithRootUserForCompatibility( + final String rootUserName, final String password) { + sourceParameters + .getAttribute() + .put(PipeSourceConstant.SOURCE_IOTDB_USER_ID, String.valueOf(IoTDBConstant.SUPER_USER_ID)); + sourceParameters.getAttribute().put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, rootUserName); + sourceParameters.getAttribute().put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, password); + } + + public void enrichWriteBackSinkWithRootUserForCompatibility( + final String rootUserName, final String password) { + sinkParameters + .getAttribute() + .put(PipeSinkConstant.SINK_IOTDB_USER_ID, String.valueOf(IoTDBConstant.SUPER_USER_ID)); + sinkParameters.getAttribute().put(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY, rootUserName); + sinkParameters.getAttribute().put(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY, password); + } + public ByteBuffer serialize() throws IOException { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); From 55de435ce4d0f05f6e8c8e7d39cc91442187d06d Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 18 Jun 2026 16:40:27 +0800 Subject: [PATCH 2/4] Fix ConfigurationFileUtilsTest import --- .../org/apache/iotdb/db/utils/ConfigurationFileUtilsTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/ConfigurationFileUtilsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/ConfigurationFileUtilsTest.java index a21b8fc58c21b..7223ad7dcd905 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/ConfigurationFileUtilsTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/ConfigurationFileUtilsTest.java @@ -36,6 +36,7 @@ import java.nio.file.Files; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; From 0130fcec8be2cc035e821df173c1ead6b34166c6 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 18 Jun 2026 16:49:40 +0800 Subject: [PATCH 3/4] Update AbstractEnv.java --- .../java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 82513929b6971..df0b31090f54b 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -557,7 +557,7 @@ private String buildClusterStatusFailureMessage( .append(processStatusPassed) .append(", expectedNodeSize=") .append( - configNodeWrapperList.size() + dataNodeWrapperList.size() + aiNodeWrapperList.size()) + configNodeWrapperList.size() + dataNodeWrapperList.size() + extraNodeWrappers.size()) .append(", actualNodeSize=") .append(actualNodeSize); if (showClusterStatus != null) { @@ -592,7 +592,7 @@ private List getClusterLogDirs() { final List allNodeWrappers = new ArrayList<>(); allNodeWrappers.addAll(configNodeWrapperList); allNodeWrappers.addAll(dataNodeWrapperList); - allNodeWrappers.addAll(aiNodeWrapperList); + allNodeWrappers.addAll(extraNodeWrappers); return allNodeWrappers.stream() .map(AbstractNodeWrapper::getLogDirPath) .distinct() From 3801d49e817273a168e1330303797cc90e6c55b4 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 18 Jun 2026 17:11:39 +0800 Subject: [PATCH 4/4] Address pipe compatibility review comments --- .../confignode/i18n/ConfigNodeMessages.java | 3 +++ .../confignode/i18n/ConfigNodeMessages.java | 3 +++ .../confignode/manager/ConfigManager.java | 2 +- .../confignode/persistence/pipe/PipeInfo.java | 6 +++--- .../persistence/pipe/PipeTaskInfo.java | 19 +++++++++++-------- .../pipe/PipeTaskInfoAutoRestartTest.java | 10 +++++----- 6 files changed, 26 insertions(+), 17 deletions(-) diff --git a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java index f662fa5871dfb..69a69c76db71a 100644 --- a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java +++ b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java @@ -216,6 +216,9 @@ public final class ConfigNodeMessages { "Failed to drop trigger [%s], this trigger has not been created"; public static final String FAILED_TO_DROP_UDF_THIS_UDF_HAS_NOT_BEEN_CREATED = "Failed to drop UDF [%s], this UDF has not been created"; + public static final String + FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST = + "Failed to enrich pipe %s with root user for compatibility because root user %s does not exist."; public static final String FAILED_TO_FETCH_SCHEMAENGINE_BLACK_LIST_ON_DATANODE = "Failed to fetch schemaengine black list on DataNode {}, {}"; public static final String FAILED_TO_GET_FIELD = "Failed to get field {}"; diff --git a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java index 8bffa1b08311a..6bf3da0e68b19 100644 --- a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java +++ b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java @@ -212,6 +212,9 @@ public final class ConfigNodeMessages { "Failed to drop trigger [%s], this trigger has not been created"; public static final String FAILED_TO_DROP_UDF_THIS_UDF_HAS_NOT_BEEN_CREATED = "Failed to drop UDF [%s], this UDF has not been created"; + public static final String + FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST = + "Failed to enrich pipe %s with root user for compatibility because root user %s does not exist."; public static final String FAILED_TO_FETCH_SCHEMAENGINE_BLACK_LIST_ON_DATANODE = "Failed to fetch schemaengine black list on DataNode {}, {}"; public static final String FAILED_TO_GET_FIELD = "Failed to get field {}"; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index da3104e82bca5..f13b3f7b005f5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -376,7 +376,7 @@ public ConfigManager() throws IOException { CQInfo cqInfo = new CQInfo(); ExternalServiceInfo externalServiceInfo = new ExternalServiceInfo(); this.permissionManager = createPermissionManager(authorInfo); - PipeInfo pipeInfo = new PipeInfo(this.permissionManager::login4Pipe); + PipeInfo pipeInfo = new PipeInfo(userName -> this.permissionManager.login4Pipe(userName, null)); QuotaInfo quotaInfo = new QuotaInfo(); TTLInfo ttlInfo = new TTLInfo(); SubscriptionInfo subscriptionInfo = new SubscriptionInfo(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java index 0364f9f7af9ea..bef26386afd7c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java @@ -49,7 +49,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.function.BiFunction; +import java.util.function.Function; public class PipeInfo implements SnapshotProcessor { @@ -62,10 +62,10 @@ public PipeInfo() throws IOException { this(null); } - public PipeInfo(final BiFunction pipeUserPasswordProvider) + public PipeInfo(final Function pipeUserCurrentPasswordProvider) throws IOException { pipePluginInfo = new PipePluginInfo(); - pipeTaskInfo = new PipeTaskInfo(pipeUserPasswordProvider); + pipeTaskInfo = new PipeTaskInfo(pipeUserCurrentPasswordProvider); } public PipePluginInfo getPipePluginInfo() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 61beb9823ba4d..247f803152bd9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -80,7 +80,7 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiFunction; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -97,16 +97,17 @@ public class PipeTaskInfo implements SnapshotProcessor { // Pure in-memory object, not involved in snapshot serialization and deserialization. private final PipeTaskInfoVersion pipeTaskInfoVersion; - private final BiFunction pipeUserPasswordProvider; + // Accepts a username and returns its current stored password for pipe authentication. + private final Function pipeUserCurrentPasswordProvider; public PipeTaskInfo() { this(null); } - public PipeTaskInfo(final BiFunction pipeUserPasswordProvider) { + public PipeTaskInfo(final Function pipeUserCurrentPasswordProvider) { this.pipeMetaKeeper = new PipeMetaKeeper(); this.pipeTaskInfoVersion = new PipeTaskInfoVersion(); - this.pipeUserPasswordProvider = pipeUserPasswordProvider; + this.pipeUserCurrentPasswordProvider = pipeUserCurrentPasswordProvider; } /////////////////////////////// Lock /////////////////////////////// @@ -1022,7 +1023,7 @@ public void enrichPipeMetasWithRootUserForCompatibility() { } private void enrichPipeMetaWithRootUserForCompatibility(final PipeStaticMeta pipeStaticMeta) { - if (pipeUserPasswordProvider == null) { + if (pipeUserCurrentPasswordProvider == null) { return; } final boolean shouldEnrichSource = pipeStaticMeta.mayNeedCompatibleRootUserForIoTDBSource(); @@ -1032,12 +1033,14 @@ private void enrichPipeMetaWithRootUserForCompatibility(final PipeStaticMeta pip } final String rootUserName = CommonDescriptor.getInstance().getConfig().getDefaultAdminName(); - final String password = pipeUserPasswordProvider.apply(rootUserName, null); + final String password = pipeUserCurrentPasswordProvider.apply(rootUserName); if (Objects.isNull(password)) { throw new PipeException( String.format( - "Failed to enrich pipe %s with root user for compatibility because root user %s does not exist.", - pipeStaticMeta.getPipeName(), rootUserName)); + ConfigNodeMessages + .FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST, + pipeStaticMeta.getPipeName(), + rootUserName)); } if (shouldEnrichSource) { diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java index d61c1771aa6d8..7b78f59253dc8 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java @@ -96,7 +96,7 @@ public void testRecordDataNodePushPipeMetaExceptionsKeepsUserStoppedPipeOutOfAut public void testEnrichOldUserPipeWithRootUserForCompatibility() { final String rootUserName = CommonDescriptor.getInstance().getConfig().getDefaultAdminName(); final String rootPassword = "root-current-password"; - pipeTaskInfo = new PipeTaskInfo((username, password) -> rootPassword); + pipeTaskInfo = new PipeTaskInfo(username -> rootPassword); createPipe("oldPipe", PipeStatus.STOPPED); @@ -124,7 +124,7 @@ public void testEnrichOldUserPipeWithRootUserForCompatibility() { @Test public void testDoNotOverwritePipeWithUserForCompatibility() { - pipeTaskInfo = new PipeTaskInfo((username, password) -> "root-current-password"); + pipeTaskInfo = new PipeTaskInfo(username -> "root-current-password"); createPipeWithSourceAttributes( "newPipe", @@ -149,7 +149,7 @@ public void testDoNotOverwritePipeWithUserForCompatibility() { @Test public void testDoNotEnrichSystemPipeForCompatibility() { - pipeTaskInfo = new PipeTaskInfo((username, password) -> "root-current-password"); + pipeTaskInfo = new PipeTaskInfo(username -> "root-current-password"); createPipeWithSourceAttributes( PipeStaticMeta.generateSubscriptionPipeName("topic", "group"), @@ -173,7 +173,7 @@ public void testDoNotEnrichSystemPipeForCompatibility() { public void testEnrichOldWriteBackSinkWithRootUserForCompatibility() { final String rootUserName = CommonDescriptor.getInstance().getConfig().getDefaultAdminName(); final String rootPassword = "root-current-password"; - pipeTaskInfo = new PipeTaskInfo((username, password) -> rootPassword); + pipeTaskInfo = new PipeTaskInfo(username -> rootPassword); createPipeWithAttributes( "oldWriteBackPipe", @@ -206,7 +206,7 @@ public void testEnrichOldWriteBackSinkWithRootUserForCompatibility() { @Test public void testEnrichLoadedPipeMetasWithRootUserForCompatibility() { final String rootPassword = "root-current-password"; - pipeTaskInfo = new PipeTaskInfo((username, password) -> rootPassword); + pipeTaskInfo = new PipeTaskInfo(username -> rootPassword); createPipeWithSourceAttributes( "loadedPipe",