Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 39 additions & 19 deletions core/src/main/java/org/apache/accumulo/core/util/Merge.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.trace.TraceUtil;
Expand All @@ -63,7 +64,7 @@ public static class MergeException extends Exception {
private static final Logger log = LoggerFactory.getLogger(Merge.class);

protected void message(String format, Object... args) {
log.info(String.format(format, args));
log.info("{}", String.format(format, args));
}

public static class MemoryConverter implements IStringConverter<Long> {
Expand Down Expand Up @@ -95,6 +96,9 @@ static class Opts extends ClientOpts {
Text begin = null;
@Parameter(names = {"-e", "--end"}, description = "end tablet", converter = TextConverter.class)
Text end = null;
@Parameter(names = {"--dry-run"},
description = "Will only list which tablets ranges it plans on merging. No merge will be performed")
boolean dryRun = false;
}

public void start(String[] args) throws MergeException {
Expand All @@ -112,11 +116,18 @@ public void start(String[] args) throws MergeException {
if (opts.goalSize == null || opts.goalSize < 1) {
AccumuloConfiguration tableConfig =
new ConfigurationCopy(client.tableOperations().getConfiguration(opts.tableName));
opts.goalSize = tableConfig.getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
long newGoalSize = tableConfig.getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
message("Invalid goal size: " + opts.goalSize + " Using the "
+ Property.TABLE_SPLIT_THRESHOLD.getKey() + " value of : " + newGoalSize);
opts.goalSize = newGoalSize;
Comment on lines +119 to +122

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives more information for the current behavior and doesn't change the current behavior. However, I keep thinking that a goal size of 0 is entirely valid, and makes much more sense than a goal size of 1 when trying to merge away consecutive empty tablets.

Having a goal size of zero auto-magically readjust to the tablet split threshold seems nonsensical to me when an empty size is valid.

I'm glad that this message was added, but I think it might be nice to change the behavior so that you don't have to do -s 1 to get an effective goal size of 0.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A behavior change could be done later, 4.0 rather than change 2.1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree that a size of 0 should be a valid value, but I thought that would be a large behavior change in a patch release.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2.1 could support a behavior change by adding a new MergeTablets command in the shell.
The old MergeCommand behavior is preserved but we can support a new command with different behavior.

}

message("Merging tablets in table %s to %d bytes", opts.tableName, opts.goalSize);
mergomatic(client, opts.tableName, opts.begin, opts.end, opts.goalSize, opts.force);
mergomatic(client, opts.tableName, opts.begin, opts.end, opts.goalSize, opts.force,
opts.dryRun);
} catch (MergeException e) {
TraceUtil.setException(span, e, true);
throw e;
} catch (Exception ex) {
TraceUtil.setException(span, ex, true);
throw new MergeException(ex);
Expand All @@ -142,10 +153,10 @@ public Size(KeyExtent extent, long size) {
}

public void mergomatic(AccumuloClient client, String table, Text start, Text end, long goalSize,
boolean force) throws MergeException {
boolean force, boolean dryRun) throws MergeException {
try {
if (table.equals(MetadataTable.NAME)) {
throw new IllegalArgumentException("cannot merge tablets on the metadata table");
if (table.equals(MetadataTable.NAME) || table.equals(RootTable.NAME)) {
throw new IllegalArgumentException("cannot merge tablets on the " + table + " table");
}
List<Size> sizes = new ArrayList<>();
long totalSize = 0;
Expand All @@ -156,19 +167,19 @@ public void mergomatic(AccumuloClient client, String table, Text start, Text end
totalSize += next.size;
sizes.add(next);
if (totalSize > goalSize) {
totalSize = mergeMany(client, table, sizes, goalSize, force, false);
totalSize = mergeMany(client, table, sizes, goalSize, force, false, dryRun);
}
}
if (sizes.size() > 1) {
mergeMany(client, table, sizes, goalSize, force, true);
mergeMany(client, table, sizes, goalSize, force, true, dryRun);
}
} catch (Exception ex) {
throw new MergeException(ex);
}
}

protected long mergeMany(AccumuloClient client, String table, List<Size> sizes, long goalSize,
boolean force, boolean last) throws MergeException {
boolean force, boolean last, boolean dryRun) throws MergeException {
// skip the big tablets, which will be the typical case
while (!sizes.isEmpty()) {
if (sizes.get(0).size < goalSize) {
Expand All @@ -192,21 +203,21 @@ protected long mergeMany(AccumuloClient client, String table, List<Size> sizes,
}

if (numToMerge > 1) {
mergeSome(client, table, sizes, numToMerge);
mergeSome(client, table, sizes, numToMerge, dryRun);
} else {
if (numToMerge == 1 && sizes.size() > 1) {
// here we have the case of a merge candidate that is surrounded by candidates that would
// split
if (force) {
mergeSome(client, table, sizes, 2);
mergeSome(client, table, sizes, 2, dryRun);
} else {
sizes.remove(0);
}
}
}
if (numToMerge == 0 && sizes.size() > 1 && last) {
// That's the last tablet, and we have a bunch to merge
mergeSome(client, table, sizes, sizes.size());
mergeSome(client, table, sizes, sizes.size(), dryRun);
}
long result = 0;
for (Size s : sizes) {
Expand All @@ -215,16 +226,16 @@ protected long mergeMany(AccumuloClient client, String table, List<Size> sizes,
return result;
}

protected void mergeSome(AccumuloClient client, String table, List<Size> sizes, int numToMerge)
throws MergeException {
merge(client, table, sizes, numToMerge);
protected void mergeSome(AccumuloClient client, String table, List<Size> sizes, int numToMerge,
boolean dryRun) throws MergeException {
merge(client, table, sizes, numToMerge, dryRun);
for (int i = 0; i < numToMerge; i++) {
sizes.remove(0);
}
}

protected void merge(AccumuloClient client, String table, List<Size> sizes, int numToMerge)
throws MergeException {
protected void merge(AccumuloClient client, String table, List<Size> sizes, int numToMerge,
boolean dryRun) throws MergeException {
try {
Text start = sizes.get(0).extent.prevEndRow();
Text end = sizes.get(numToMerge - 1).extent.endRow();
Expand All @@ -233,21 +244,30 @@ protected void merge(AccumuloClient client, String table, List<Size> sizes, int
: Key.toPrintableString(start.getBytes(), 0, start.getLength(), start.getLength()),
end == null ? "+inf"
: Key.toPrintableString(end.getBytes(), 0, end.getLength(), end.getLength()));
if (dryRun) {
message("dry-run would have started a Fate Merge for table %s tablet range (%s to %s]",
table,
start == null ? "-inf"
: Key.toPrintableString(start.getBytes(), 0, start.getLength(), start.getLength()),
end == null ? "+inf"
: Key.toPrintableString(end.getBytes(), 0, end.getLength(), end.getLength()));
return;
}
client.tableOperations().merge(table, start, end);
} catch (Exception ex) {
throw new MergeException(ex);
}
}

protected Iterator<Size> getSizeIterator(AccumuloClient client, String tablename, Text start,
protected Iterator<Size> getSizeIterator(AccumuloClient client, String tableName, Text start,
Text end) throws MergeException {
// open up metadata, walk through the tablets.

TableId tableId;
TabletsMetadata tablets;
try {
ClientContext context = (ClientContext) client;
tableId = context.getTableId(tablename);
tableId = context.getTableId(tableName);
tablets = TabletsMetadata.builder(context).scanMetadataTable()
.overRange(new KeyExtent(tableId, end, start).toMetaRange()).fetch(FILES, PREV_ROW)
.build();
Expand Down
20 changes: 10 additions & 10 deletions core/src/test/java/org/apache/accumulo/core/util/MergeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ static class MergeTester extends Merge {
protected void message(String format, Object... args) {}

@Override
protected Iterator<Size> getSizeIterator(AccumuloClient client, String tablename,
protected Iterator<Size> getSizeIterator(AccumuloClient client, String tableName,
final Text start, final Text end) throws MergeException {
final Iterator<Size> impl = tablets.iterator();
return new Iterator<>() {
Expand Down Expand Up @@ -103,8 +103,8 @@ public void remove() {
}

@Override
protected void merge(AccumuloClient client, String table, List<Size> sizes, int numToMerge)
throws MergeException {
protected void merge(AccumuloClient client, String table, List<Size> sizes, int numToMerge,
boolean dryRun) throws MergeException {
List<Size> merge = new ArrayList<>();
for (int i = 0; i < numToMerge; i++) {
merge.add(sizes.get(i));
Expand All @@ -127,46 +127,46 @@ public void testMergomatic() throws Exception {
// Merge everything to the last tablet
int i;
MergeTester test = new MergeTester(10, 20, 30);
test.mergomatic(null, "table", null, null, 1000, false);
test.mergomatic(null, "table", null, null, 1000, false, false);
assertEquals(1, test.merges.size());
assertArrayEquals(new int[] {10, 20, 30}, sizes(test.merges.get(i = 0)));

// Merge ranges around tablets that are big enough
test = new MergeTester(1, 2, 100, 1000, 17, 1000, 4, 5, 6, 900);
test.mergomatic(null, "table", null, null, 1000, false);
test.mergomatic(null, "table", null, null, 1000, false, false);
assertEquals(2, test.merges.size());
assertArrayEquals(new int[] {1, 2, 100}, sizes(test.merges.get(i = 0)));
assertArrayEquals(new int[] {4, 5, 6, 900}, sizes(test.merges.get(++i)));

// Test the force option
test = new MergeTester(1, 2, 100, 1000, 17, 1000, 4, 5, 6, 900);
test.mergomatic(null, "table", null, null, 1000, true);
test.mergomatic(null, "table", null, null, 1000, true, false);
assertEquals(3, test.merges.size());
assertArrayEquals(new int[] {1, 2, 100}, sizes(test.merges.get(i = 0)));
assertArrayEquals(new int[] {17, 1000}, sizes(test.merges.get(++i)));
assertArrayEquals(new int[] {4, 5, 6, 900}, sizes(test.merges.get(++i)));

// Limit the low-end of the merges
test = new MergeTester(1, 2, 1000, 17, 1000, 4, 5, 6, 900);
test.mergomatic(null, "table", new Text("00004"), null, 1000, false);
test.mergomatic(null, "table", new Text("00004"), null, 1000, false, false);
assertEquals(1, test.merges.size());
assertArrayEquals(new int[] {4, 5, 6, 900}, sizes(test.merges.get(i = 0)));

// Limit the upper end of the merges
test = new MergeTester(1, 2, 1000, 17, 1000, 4, 5, 6, 900);
test.mergomatic(null, "table", null, new Text("00004"), 1000, false);
test.mergomatic(null, "table", null, new Text("00004"), 1000, false, false);
assertEquals(1, test.merges.size());
assertArrayEquals(new int[] {1, 2}, sizes(test.merges.get(i = 0)));

// Limit both ends
test = new MergeTester(1, 2, 1000, 17, 1000, 4, 5, 6, 900);
test.mergomatic(null, "table", new Text("00002"), new Text("00004"), 1000, true);
test.mergomatic(null, "table", new Text("00002"), new Text("00004"), 1000, true, false);
assertEquals(1, test.merges.size());
assertArrayEquals(new int[] {17, 1000}, sizes(test.merges.get(i = 0)));

// Clump up tablets into larger values
test = new MergeTester(100, 250, 500, 600, 100, 200, 500, 200);
test.mergomatic(null, "table", null, null, 1000, false);
test.mergomatic(null, "table", null, null, 1000, false, false);
assertEquals(3, test.merges.size());
assertArrayEquals(new int[] {100, 250, 500}, sizes(test.merges.get(i = 0)));
assertArrayEquals(new int[] {600, 100, 200}, sizes(test.merges.get(++i)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
*/
package org.apache.accumulo.shell.commands;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.util.Merge;
import org.apache.accumulo.shell.Shell;
import org.apache.accumulo.shell.Shell.Command;
Expand All @@ -28,14 +32,15 @@
import org.apache.hadoop.io.Text;

public class MergeCommand extends Command {
private Option verboseOpt, forceOpt, sizeOpt, allOpt;
private Option verboseOpt, forceOpt, sizeOpt, allOpt, dryRunOpt;

@Override
public int execute(final String fullCommand, final CommandLine cl, final Shell shellState)
throws Exception {
boolean verbose = shellState.isVerbose();
boolean force = false;
boolean all = false;
boolean dryRun = false;
long size = -1;
final String tableName = OptUtil.getTableOpt(cl, shellState);
final Text startRow = OptUtil.getStartRow(cl);
Expand All @@ -52,6 +57,9 @@ public int execute(final String fullCommand, final CommandLine cl, final Shell s
if (cl.hasOption(sizeOpt.getOpt())) {
size = ConfigurationTypeHelper.getFixedMemoryAsBytes(cl.getOptionValue(sizeOpt.getOpt()));
}
if (cl.hasOption(dryRunOpt)) {
dryRun = true;
}
if (startRow == null && endRow == null && size < 0 && !all) {
if (!shellState
.confirm(" Warning!!! Are you REALLY sure you want to merge the entire table { "
Expand All @@ -60,21 +68,7 @@ public int execute(final String fullCommand, final CommandLine cl, final Shell s
return 0;
}
}
if (size < 0) {
shellState.getAccumuloClient().tableOperations().merge(tableName, startRow, endRow);
} else {
final boolean finalVerbose = verbose;
final Merge merge = new Merge() {
@Override
protected void message(String fmt, Object... args) {
if (finalVerbose) {
shellState.getWriter().println(String.format(fmt, args));
}
}
};
merge.mergomatic(shellState.getAccumuloClient(), tableName, startRow, endRow, size, force);
}
return 0;
return executeMerge(shellState, tableName, startRow, endRow, size, verbose, force, dryRun);
}

@Override
Expand All @@ -96,18 +90,56 @@ public Options getOptions() {
forceOpt = new Option("f", "force", false,
"merge small tablets to large tablets, even if it goes over the given size");
// Using the constructor does not allow for empty option
Option.Builder builder = Option.builder().longOpt("all").hasArg(false)
Option.Builder allBuilder = Option.builder().longOpt("all").hasArg(false)
.desc("allow an entire table to be merged into one tablet without prompting"
+ " the user for confirmation");
allOpt = builder.build();
allOpt = allBuilder.build();
Option.Builder dryRunBuilder = Option.builder().longOpt("dry-run").hasArg(false)
.desc("print the ranges it will merge, but do not perform any merge operations");
dryRunOpt = dryRunBuilder.build();

o.addOption(OptUtil.startRowOpt());
o.addOption(OptUtil.endRowOpt());
o.addOption(OptUtil.tableOpt("table to be merged"));
o.addOption(verboseOpt);
o.addOption(sizeOpt);
o.addOption(forceOpt);
o.addOption(allOpt);
o.addOption(dryRunOpt);
return o;
}

// This method is stubbed out to allow for mock testing
int executeMerge(Shell shellState, String tableName, Text startRow, Text endRow, long size,
boolean verbose, boolean force, boolean dryRun) throws AccumuloException,
TableNotFoundException, AccumuloSecurityException, Merge.MergeException {
if (size < 0) {
if (dryRun) {
shellState.getWriter()
.println(String.format(
"dry-run would have started a Fate Merge for table %s tablet range (%s to %s]",
tableName,
startRow == null ? "-inf"
: Key.toPrintableString(startRow.getBytes(), 0, startRow.getLength(),
startRow.getLength()),
endRow == null ? "+inf" : Key.toPrintableString(endRow.getBytes(), 0,
endRow.getLength(), endRow.getLength())));
return 0;
}
shellState.getAccumuloClient().tableOperations().merge(tableName, startRow, endRow);
} else {
final boolean finalVerbose = verbose;
final Merge merge = new Merge() {
@Override
protected void message(String fmt, Object... args) {
if (finalVerbose) {
shellState.getWriter().println(String.format(fmt, args));
}
}
};
merge.mergomatic(shellState.getAccumuloClient(), tableName, startRow, endRow, size, force,
dryRun);
}
return 0;
}
}
Loading