From 725903f0f2115f7aeefbb233394a30fe1d400f91 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Thu, 18 Jun 2026 16:22:49 +0000 Subject: [PATCH 1/2] Added upgrade step to remove old compaction temp files Closes #6285 --- .../manager/upgrade/Upgrader11to12.java | 117 ++++++++++++++ .../upgrade/TmpFileUpgrade11to12TestIT.java | 150 ++++++++++++++++++ 2 files changed, 267 insertions(+) create mode 100644 test/src/main/java/org/apache/accumulo/test/upgrade/TmpFileUpgrade11to12TestIT.java diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java index d13ca3c016d..d924f4b5bfe 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java @@ -32,11 +32,18 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.BatchDeleter; @@ -60,6 +67,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.core.file.FilePrefix; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.SystemTables; @@ -81,8 +89,10 @@ import org.apache.accumulo.core.util.Encoding; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.TextUtil; +import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; import org.apache.accumulo.core.util.tables.TableNameUtil; +import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.codec.VersionedProperties; import org.apache.accumulo.server.conf.store.NamespacePropKey; @@ -91,7 +101,9 @@ import org.apache.accumulo.server.conf.store.TablePropKey; import org.apache.accumulo.server.init.FileSystemInitializer; import org.apache.accumulo.server.init.InitialConfiguration; +import org.apache.accumulo.server.util.FindCompactionTmpFiles.DeleteStats; import org.apache.accumulo.server.util.PropUtil; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.zookeeper.KeeperException; @@ -300,6 +312,8 @@ public void upgradeMetadata(ServerContext context) { removeCompactColumnsFromTable(context, SystemTables.METADATA.tableName()); LOG.info("Removing bulk file columns from metadata table"); removeBulkFileColumnsFromTable(context, SystemTables.METADATA.tableName()); + LOG.info("Removing major compaction temp files from prior versions"); + deleteCompactionTempFiles(context, new DeleteStats(), new HashSet()); } private static void addAssistantManager(ServerContext context) { @@ -921,6 +935,7 @@ private void upgradeTabletsMetadata(@NonNull ServerContext context, String metaN } } + // visible for IT public void removeScanServerRanges(ServerContext context) { try (BatchDeleter batchDeleter = context.createBatchDeleter(Ample.DataLevel.USER.metaTable(), Authorizations.EMPTY, 4)) { @@ -988,4 +1003,106 @@ void moveTableProperties(ServerContext context) { LOG.info( "Moving table properties from system configuration to namespace configurations complete."); } + + // visible for IT + public void deleteCompactionTempFiles(final ServerContext ctx, final DeleteStats stats, + final Collection deletedFiles) { + + final String pattern = "/tables/*/*/*"; + final Collection vols = ctx.getVolumeManager().getVolumes(); + final ExecutorService svc = Executors.newFixedThreadPool(vols.size()); + final List> futures = new ArrayList<>(vols.size()); + final Set oldCompactionTmpFiles = new HashSet<>(); + + for (Volume vol : vols) { + final Path volPattern = new Path(vol.getBasePath() + pattern); + LOG.info("Looking for old compaction tmp files that match pattern: {}", volPattern); + futures.add(svc.submit(() -> { + try { + FileStatus[] files = vol.getFileSystem().globStatus(volPattern, + (p) -> (p.getName().startsWith("" + FilePrefix.FULL_COMPACTION.getPrefix()) + || p.getName().startsWith("" + FilePrefix.COMPACTION.getPrefix())) + && p.getName().endsWith(".rf_tmp")); + Arrays.stream(files).forEach(fs -> oldCompactionTmpFiles.add(fs.getPath())); + } catch (IOException e) { + LOG.error("Error looking for old compaction tmp files in volume: {}", vol, e); + } + return null; + })); + } + svc.shutdown(); + + LOG.info("Waiting for tasks to complete finding files"); + while (futures.size() > 0) { + Iterator> iter = futures.iterator(); + while (iter.hasNext()) { + Future future = iter.next(); + if (future.isDone()) { + iter.remove(); + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Error getting list of old compaction tmp files", e); + } + } + } + int remaining = futures.size(); + if (remaining > 0) { + LOG.debug("Waiting for {} tasks to complete", remaining); + UtilWaitThread.sleep(3_000); + } + } + LOG.info("Found {} old compaction tmp files:", oldCompactionTmpFiles.size()); + oldCompactionTmpFiles.forEach(p -> LOG.debug("{}", p)); + + LOG.info("Deleting old compaction tmp files..."); + final ExecutorService delSvc = Executors.newFixedThreadPool(vols.size()); + // use a linked list to make removal from the middle of the list quick + final List> delFutures = new LinkedList<>(); + + oldCompactionTmpFiles.forEach(p -> { + delFutures.add(delSvc.submit(() -> { + if (ctx.getVolumeManager().exists(p)) { + boolean result = ctx.getVolumeManager().delete(p); + if (result) { + LOG.debug("Removed old temp file {}", p); + deletedFiles.add(p); + } else { + LOG.error( + "Unable to remove old temp file {}, operation returned false with no exception", p); + } + return result; + } + return true; + })); + }); + delSvc.shutdown(); + + while (delFutures.size() > 0) { + Iterator> iter = delFutures.iterator(); + while (iter.hasNext()) { + Future future = iter.next(); + if (future.isDone()) { + iter.remove(); + try { + if (future.get()) { + stats.success++; + } else { + stats.failure++; + } + } catch (InterruptedException | ExecutionException e) { + stats.error++; + LOG.error("Error deleting a compaction tmp file", e); + } + } + } + int remaining = oldCompactionTmpFiles.size(); + if (remaining > 0) { + LOG.debug("Waiting on {} background delete operations", remaining); + UtilWaitThread.sleep(3_000); + } + } + LOG.info("Deletion of compaction tmp files completed. Success:{}, Failure:{}, Error:{}", + stats.success, stats.failure, stats.error); + } } diff --git a/test/src/main/java/org/apache/accumulo/test/upgrade/TmpFileUpgrade11to12TestIT.java b/test/src/main/java/org/apache/accumulo/test/upgrade/TmpFileUpgrade11to12TestIT.java new file mode 100644 index 00000000000..29351c2ea18 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/upgrade/TmpFileUpgrade11to12TestIT.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.upgrade; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.SystemTables; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.manager.upgrade.Upgrader11to12; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.util.FindCompactionTmpFiles.DeleteStats; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TmpFileUpgrade11to12TestIT extends SharedMiniClusterBase { + + private static final Logger LOG = LoggerFactory.getLogger(TmpFileUpgrade11to12TestIT.class); + + @BeforeAll + public static void start() throws Exception { + SharedMiniClusterBase.startMiniCluster(); + } + + @AfterAll + public static void stop() throws Exception { + stopMiniCluster(); + } + + @Test + public void testDeleteOldCompactionTmpFiles() throws Exception { + + final Set fileNames = + Set.of("A00001.rf", "F000002.rf", "F000003.rf_tmp", "C000004.rf", "C000005.rf_tmp", + "C000006.rf_tmp_" + ExternalCompactionId.generate(UUID.randomUUID()), "A000007.rf_tmp"); + + final Set validFileNames = new HashSet<>(fileNames); + validFileNames.removeAll(Set.of("C000005.rf_tmp", "A000007.rf_tmp")); + + final ServerContext ctx = getCluster().getServerContext(); + String[] tableNames = getUniqueNames(10); + + final Set volumePaths = new HashSet<>(); + ctx.getVolumeManager().getVolumes().forEach(v -> volumePaths.add(v.getBasePath())); + + SortedSet splits = new TreeSet<>(); + IntStream.range(0, 9).forEach(i -> splits.add(new Text("" + i))); + + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + for (String tableName : tableNames) { + client.tableOperations().create(tableName); + client.tableOperations().addSplits(tableName, splits); + } + + final Map tableNameIdMap = ctx.tableOperations().tableIdMap(); + final Map> tableNameDirsMap = new HashMap<>(); + final AtomicInteger numTabletDirs = new AtomicInteger(0); + for (Entry e : tableNameIdMap.entrySet()) { + final TableId tid = TableId.of(e.getValue()); + if (SystemTables.containsTableId(tid)) { + continue; + } + try (TabletsMetadata tm = + ctx.getAmple().readTablets().forTable(tid).fetch(ColumnType.DIR).build()) { + Set tableDirs = new HashSet<>(); + tm.forEach(t -> { + numTabletDirs.incrementAndGet(); + tableDirs.add(t.getDirName()); + }); + tableNameDirsMap.put(tid, tableDirs); + } + } + + final Set createdPaths = new HashSet<>(); + for (Entry> e : tableNameDirsMap.entrySet()) { + if (SystemTables.containsTableId(e.getKey())) { + continue; + } + for (String tabletDirName : e.getValue()) { + String tabletDirPath = Constants.HDFS_TABLES_DIR + Path.SEPARATOR + e.getKey() + + Path.SEPARATOR + tabletDirName; + for (String fileName : fileNames) { + for (String volume : volumePaths) { + Path p = + new Path(volume + Path.SEPARATOR + tabletDirPath + Path.SEPARATOR + fileName); + assertTrue(ctx.getVolumeManager().createNewFile(p)); + createdPaths.add(p); + } + } + } + } + LOG.info("Created {} files", createdPaths.size()); + LOG.info("{}", createdPaths); + + Upgrader11to12 upgrader = new Upgrader11to12(); + DeleteStats stats = new DeleteStats(); + Set deleted = new HashSet(); + upgrader.deleteCompactionTempFiles(ctx, stats, deleted); + assertEquals(numTabletDirs.get() * 2, deleted.size()); + assertEquals(0, stats.error); + assertEquals(0, stats.failure); + assertEquals(deleted.size(), stats.success); + + for (Path delete : deleted) { + assertFalse(validFileNames.contains(delete.getName())); + } + } + } + +} From 81c35d372affb39a1703af2a605b8457e85d8a4f Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Tue, 23 Jun 2026 17:57:21 +0000 Subject: [PATCH 2/2] Implemented PR suggestions --- .../accumulo/manager/upgrade/Upgrader11to12.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java index d924f4b5bfe..3dba571907c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java @@ -40,6 +40,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -935,7 +936,7 @@ private void upgradeTabletsMetadata(@NonNull ServerContext context, String metaN } } - // visible for IT + @VisibleForTesting public void removeScanServerRanges(ServerContext context) { try (BatchDeleter batchDeleter = context.createBatchDeleter(Ample.DataLevel.USER.metaTable(), Authorizations.EMPTY, 4)) { @@ -1004,7 +1005,7 @@ void moveTableProperties(ServerContext context) { "Moving table properties from system configuration to namespace configurations complete."); } - // visible for IT + @VisibleForTesting public void deleteCompactionTempFiles(final ServerContext ctx, final DeleteStats stats, final Collection deletedFiles) { @@ -1012,7 +1013,7 @@ public void deleteCompactionTempFiles(final ServerContext ctx, final DeleteStats final Collection vols = ctx.getVolumeManager().getVolumes(); final ExecutorService svc = Executors.newFixedThreadPool(vols.size()); final List> futures = new ArrayList<>(vols.size()); - final Set oldCompactionTmpFiles = new HashSet<>(); + final Set oldCompactionTmpFiles = ConcurrentHashMap.newKeySet(); for (Volume vol : vols) { final Path volPattern = new Path(vol.getBasePath() + pattern); @@ -1060,13 +1061,14 @@ public void deleteCompactionTempFiles(final ServerContext ctx, final DeleteStats // use a linked list to make removal from the middle of the list quick final List> delFutures = new LinkedList<>(); + final Set filesDeleted = ConcurrentHashMap.newKeySet(); oldCompactionTmpFiles.forEach(p -> { delFutures.add(delSvc.submit(() -> { if (ctx.getVolumeManager().exists(p)) { boolean result = ctx.getVolumeManager().delete(p); if (result) { LOG.debug("Removed old temp file {}", p); - deletedFiles.add(p); + filesDeleted.add(p); } else { LOG.error( "Unable to remove old temp file {}, operation returned false with no exception", p); @@ -1096,12 +1098,13 @@ public void deleteCompactionTempFiles(final ServerContext ctx, final DeleteStats } } } - int remaining = oldCompactionTmpFiles.size(); + int remaining = delFutures.size(); if (remaining > 0) { LOG.debug("Waiting on {} background delete operations", remaining); UtilWaitThread.sleep(3_000); } } + deletedFiles.addAll(filesDeleted); LOG.info("Deletion of compaction tmp files completed. Success:{}, Failure:{}, Error:{}", stats.success, stats.failure, stats.error); }