diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index f29214b458..30052f6164 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -67,6 +67,7 @@ public class ParquetProperties { public static final boolean DEFAULT_STATISTICS_ENABLED = true; public static final boolean DEFAULT_SIZE_STATISTICS_ENABLED = true; + public static final long DEFAULT_DICTIONARY_CHECK_THRESHOLD_RAW_SIZE_BYTES = 0; public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true; /** @@ -131,6 +132,7 @@ public static WriterVersion fromString(String name) { private final int rowGroupRowCountLimit; private final int pageRowCountLimit; private final boolean pageWriteChecksumEnabled; + private final long dictionaryCheckThresholdRawSizeBytes; private final ColumnProperty byteStreamSplitEnabled; private final Map extraMetaData; private final ColumnProperty statistics; @@ -163,6 +165,7 @@ private ParquetProperties(Builder builder) { this.rowGroupRowCountLimit = builder.rowGroupRowCountLimit; this.pageRowCountLimit = builder.pageRowCountLimit; this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled; + this.dictionaryCheckThresholdRawSizeBytes = builder.dictionaryCheckThresholdRawSizeBytes; this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build(); this.extraMetaData = builder.extraMetaData; this.statistics = builder.statistics.build(); @@ -322,6 +325,17 @@ public boolean getPageWriteChecksumEnabled() { return pageWriteChecksumEnabled; } + /** + * Returns the byte threshold after which the dictionary compression check is performed. + * A value of 0 means check on the first page. Higher values delay the check until that + * many raw bytes have been accumulated across pages. + * + * @return the byte threshold for the dictionary compression check + */ + public long getDictionaryCheckThresholdRawSizeBytes() { + return dictionaryCheckThresholdRawSizeBytes; + } + public OptionalLong getBloomFilterNDV(ColumnDescriptor column) { Long ndv = bloomFilterNDVs.getValue(column); return ndv == null ? OptionalLong.empty() : OptionalLong.of(ndv); @@ -415,6 +429,7 @@ public static class Builder { private int rowGroupRowCountLimit = DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT; private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT; private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED; + private long dictionaryCheckThresholdRawSizeBytes = DEFAULT_DICTIONARY_CHECK_THRESHOLD_RAW_SIZE_BYTES; private final ColumnProperty.Builder byteStreamSplitEnabled; private Map extraMetaData = new HashMap<>(); private final ColumnProperty.Builder statistics; @@ -450,6 +465,7 @@ private Builder(ParquetProperties toCopy) { this.allocator = toCopy.allocator; this.pageRowCountLimit = toCopy.pageRowCountLimit; this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled; + this.dictionaryCheckThresholdRawSizeBytes = toCopy.dictionaryCheckThresholdRawSizeBytes; this.bloomFilterNDVs = ColumnProperty.builder(toCopy.bloomFilterNDVs); this.bloomFilterFPPs = ColumnProperty.builder(toCopy.bloomFilterFPPs); this.bloomFilterEnabled = ColumnProperty.builder(toCopy.bloomFilterEnabled); @@ -709,6 +725,20 @@ public Builder withPageWriteChecksumEnabled(boolean val) { return this; } + /** + * Set the raw data byte threshold after which the dictionary compression check is performed. + * A value of 0 means check on the first page. Higher values delay the check until that + * many raw bytes have been accumulated across pages. + * + * @param val byte threshold (default: 0) + * @return this builder for method chaining + */ + public Builder withDictionaryCheckThresholdRawSizeBytes(long val) { + Preconditions.checkArgument(val >= 0, "dictionaryCheckThresholdRawSizeBytes must be >= 0"); + this.dictionaryCheckThresholdRawSizeBytes = val; + return this; + } + public Builder withExtraMetaData(Map extraMetaData) { this.extraMetaData = extraMetaData; return this; diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java index 4c03e6b65e..0cb45d6269 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java @@ -111,7 +111,9 @@ static ValuesWriter dictWriterWithFallBack( ValuesWriter writerToFallBackTo) { if (parquetProperties.isDictionaryEnabled(path)) { return FallbackValuesWriter.of( - dictionaryWriter(path, parquetProperties, dictPageEncoding, dataPageEncoding), writerToFallBackTo); + dictionaryWriter(path, parquetProperties, dictPageEncoding, dataPageEncoding), + writerToFallBackTo, + parquetProperties.getDictionaryCheckThresholdRawSizeBytes()); } else { return writerToFallBackTo; } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java index 7f56ef2192..9ea7f886f3 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java @@ -30,7 +30,12 @@ public class FallbackValuesWriter FallbackValuesWriter of( I initialWriter, F fallBackWriter) { - return new FallbackValuesWriter<>(initialWriter, fallBackWriter); + return new FallbackValuesWriter<>(initialWriter, fallBackWriter, /*dictionaryCheckThresholdRawSizeBytes=*/ 0); + } + + public static FallbackValuesWriter of( + I initialWriter, F fallBackWriter, long dictionaryCheckThresholdRawSizeBytes) { + return new FallbackValuesWriter<>(initialWriter, fallBackWriter, dictionaryCheckThresholdRawSizeBytes); } /** @@ -43,6 +48,17 @@ public static = dictionaryCheckThresholdRawSizeBytes) { + compressionChecked = true; BytesInput bytes = initialWriter.getBytes(); - if (!initialWriter.isCompressionSatisfying(rawDataByteSize, bytes.size())) { + try { + cumulativeEncodedBytes = Math.addExact(cumulativeEncodedBytes, bytes.size()); + } catch (ArithmeticException e) { + // overflow, keep the previous value + } + // Compare cumulative raw vs cumulative encoded so the column-chunk dictionary + // (which is itself cumulative) is amortized over all pages it covers, not charged + // against a single page. + if (!initialWriter.isCompressionSatisfying(cumulativeRawBytes, cumulativeEncodedBytes)) { fallBack(); } else { return bytes; } } - return currentWriter.getBytes(); + BytesInput result = currentWriter.getBytes(); + if (!fellBackAlready && !compressionChecked) { + // Accumulate dictionary-encoded size for pages flushed before the check fires. + try { + cumulativeEncodedBytes = Math.addExact(cumulativeEncodedBytes, result.size()); + } catch (ArithmeticException e) { + // overflow, keep the previous value + } + } + return result; } @Override @@ -103,7 +141,6 @@ public Encoding getEncoding() { @Override public void reset() { rawDataByteSize = 0; - firstPage = false; currentWriter.reset(); } @@ -131,8 +168,10 @@ public void resetDictionary() { } currentWriter = initialWriter; fellBackAlready = false; + compressionChecked = false; + cumulativeRawBytes = 0; + cumulativeEncodedBytes = 0; initialUsedAndHadDictionary = false; - firstPage = true; } @Override diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/fallback/TestFallbackCumulativeBias.java b/parquet-column/src/test/java/org/apache/parquet/column/values/fallback/TestFallbackCumulativeBias.java new file mode 100644 index 0000000000..2a4583ed40 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/fallback/TestFallbackCumulativeBias.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.fallback; + +import static org.junit.Assert.assertTrue; + +import org.apache.parquet.bytes.DirectByteBufferAllocator; +import org.apache.parquet.bytes.TrackingByteBufferAllocator; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter; +import org.apache.parquet.column.values.plain.PlainValuesWriter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Demonstrates the bias in PARQUET-3479: when the dictionary check fires on a later page, + * it compares the CUMULATIVE dictionary size against only the CURRENT page's raw bytes, + * causing spurious fallback even when dictionary encoding is cumulatively beneficial. + * + *

Scenario: 100 ints per page, 50 new distinct values per page, 2 pages before check. + *

    + *
  • dictionaryByteSize = 100 distinct × 4 bytes = 400 bytes (cumulative after 2 pages)
  • + *
  • Per-page rawDataByteSize = 100 × 4 = 400 bytes
  • + *
  • Page 0 encoded: 80 bytes (RLE, 6-bit width for values 0..49)
  • + *
  • Page 1 encoded: 93 bytes (RLE, 7-bit width for values 50..99)
  • + *
+ * + *

BUG (per-page on page 1): encoded(93) + dict(400) = 493 >= pageRaw(400) → FALLBACK + *

CORRECT (cumulative): totalEncoded(173) + dict(400) = 573 < totalRaw(800) → KEEP + */ +public class TestFallbackCumulativeBias { + + private TrackingByteBufferAllocator allocator; + + @Before + public void initAllocator() { + allocator = TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator()); + } + + @After + public void closeAllocator() { + allocator.close(); + } + + /** + * Baseline: With threshold=0 and low cardinality data repeated on page 1, + * dictionary IS kept because the full dictionary cost is small relative to raw data. + * This proves the dictionary is genuinely beneficial for this type of data. + */ + @Test + public void testBaselineDictionaryKeptWithThresholdZero() throws Exception { + FallbackValuesWriter writer = createWriter(0); + try { + // 100 values, all mod 5 → only 5 distinct values, dict = 20 bytes, raw = 400 bytes + for (int i = 0; i < 100; i++) { + writer.writeInteger(i % 5); + } + writer.getBytes(); + assertTrue( + "Baseline: dictionary should be kept with low cardinality (threshold=0)", + writer.getEncoding().usesDictionary()); + } finally { + writer.close(); + } + } + + /** + * BUG DEMONSTRATION: Even with genuinely-beneficial dictionary data across all pages, + * a delayed threshold check causes spurious fallback because the cumulative dictionary + * size is compared against only a single page's raw bytes. + * + *

This test uses moderate-cardinality data where each page introduces new distinct values. + * The dictionary grows cumulatively but encoding is still more efficient than raw overall. + * The test asserts CORRECT behavior (dictionary kept) and FAILS on current buggy code. + */ + @Test + public void testDelayedCheckShouldNotCauseFallbackWhenDictionaryIsBeneficial() throws Exception { + // Threshold = 800 bytes. Each page has 100 ints × 4 = 400 raw bytes. + // Cumulative crosses 800 on page 2's getBytes() (page0=400, page1=800 >= 800). + // Actually: cumulativeRawBytes is updated INSIDE getBytes() via addExact, so: + // After page 0 getBytes(): cumulative = 400 (< 800, no check) + // After page 1 getBytes(): cumulative = 800 (>= 800, CHECK FIRES) + long threshold = 800; + FallbackValuesWriter writer = createWriter(threshold); + try { + // Page 0: 100 values cycling 0..49 (50 distinct). Dict = 200 bytes. + for (int i = 0; i < 100; i++) { + writer.writeInteger(i % 50); + } + writer.getBytes(); // cumulative = 400, < 800, no check + writer.reset(); + + // Page 1: 100 values cycling 50..99 (50 new distinct). Dict grows to 400 bytes. + for (int i = 0; i < 100; i++) { + writer.writeInteger(50 + (i % 50)); + } + writer.getBytes(); // cumulative = 800, >= 800, CHECK FIRES HERE + // At this point: + // dictionaryByteSize = 100 × 4 = 400 (cumulative, all distinct values seen so far) + // rawDataByteSize (current page) = 100 × 4 = 400 + // bytes.size() (current page encoded) ≈ ~115 bytes (RLE, 7-bit width for 100 values) + // + // BUG comparison: encoded(~115) + dict(400) = ~515 >= pageRaw(400) → FALLBACK! + // CORRECT comparison: totalEncoded(~230) + dict(400) = ~630 < totalRaw(800) → KEEP! + Encoding enc = writer.getEncoding(); + + assertTrue( + "Dictionary encoding should be preserved when it is cumulatively beneficial, " + + "even when the check fires on a later page (PARQUET-3479 bias bug). " + + "The per-page comparison incorrectly charges the full cumulative dictionary " + + "against a single page's raw bytes.", + enc.usesDictionary()); + } finally { + writer.close(); + } + } + + private FallbackValuesWriter createWriter(long threshold) { + int dictPageSize = 1024 * 1024; // large enough to not trigger shouldFallBack() + PlainIntegerDictionaryValuesWriter dictWriter = new PlainIntegerDictionaryValuesWriter( + dictPageSize, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN_DICTIONARY, allocator); + PlainValuesWriter plainWriter = new PlainValuesWriter(1024, 1024 * 1024, allocator); + return FallbackValuesWriter.of(dictWriter, plainWriter, threshold); + } + + /** + * Threshold=0: a non-beneficial dictionary still falls back on page 1 (backward compat). + * High cardinality (all unique) means encodedSize + dictSize >= rawSize → FALLBACK. + */ + @Test + public void testThresholdZeroNonBeneficialFallsBack() throws Exception { + FallbackValuesWriter writer = createWriter(0); + try { + // 100 unique values → dict = 400 bytes, encoded ≈ 100 bytes, raw = 400. + // encoded + dict = ~500 >= 400 → fallback + for (int i = 0; i < 100; i++) { + writer.writeInteger(i); + } + writer.getBytes(); + assertTrue( + "Threshold=0: non-beneficial dictionary should fall back on page 1", + !writer.getEncoding().usesDictionary()); + } finally { + writer.close(); + } + } + + /** + * Adversarial: high-cardinality data with a delayed threshold STILL correctly falls back. + * Proves the cumulative fix doesn't blanket-keep dictionaries. + */ + @Test + public void testHighCardinalityWithDelayedThresholdStillFallsBack() throws Exception { + // Threshold = 800. Each page: 100 unique ints × 4 = 400 raw bytes. + long threshold = 800; + FallbackValuesWriter writer = createWriter(threshold); + try { + // Page 0: 100 unique values 0..99 + for (int i = 0; i < 100; i++) { + writer.writeInteger(i); + } + writer.getBytes(); // cumulative = 400 < 800, no check + writer.reset(); + + // Page 1: 100 unique values 100..199, dict now has 200 entries = 800 bytes + for (int i = 100; i < 200; i++) { + writer.writeInteger(i); + } + writer.getBytes(); // cumulative = 800, check fires + // cumulativeEncoded(page0 + page1) + dict(800) vs cumulativeRaw(800) + // Even cumulatively: encoded(~200) + dict(800) = ~1000 >= 800 → FALLBACK + assertTrue( + "High-cardinality data should still trigger fallback even with cumulative comparison", + !writer.getEncoding().usesDictionary()); + } finally { + writer.close(); + } + } + + /** + * resetDictionary() zeroes the accumulator: second column chunk's decision + * is independent of the first chunk. + */ + @Test + public void testResetDictionaryIsolatesChunks() throws Exception { + long threshold = 800; + FallbackValuesWriter writer = createWriter(threshold); + try { + // Chunk 1: low cardinality → keeps dictionary (so initialWriter.resetDictionary is called) + for (int i = 0; i < 100; i++) { + writer.writeInteger(i % 5); + } + writer.getBytes(); + writer.reset(); + for (int i = 0; i < 100; i++) { + writer.writeInteger(i % 5); + } + writer.getBytes(); // check fires: cumulative=800, dict=20, encoded tiny → keep + assertTrue( + "Chunk 1 should keep dictionary (low cardinality)", + writer.getEncoding().usesDictionary()); + + // Reset for chunk 2 (reset per-page state, then reset dictionary for new column chunk) + writer.reset(); + writer.resetDictionary(); + + // Chunk 2: high cardinality → should fall back (proves accumulator was zeroed, + // not left with chunk 1's favorable cumulative values) + for (int i = 0; i < 100; i++) { + writer.writeInteger(i); + } + writer.getBytes(); + writer.reset(); + for (int i = 100; i < 200; i++) { + writer.writeInteger(i); + } + writer.getBytes(); // check fires with fresh accumulators + assertTrue( + "Chunk 2 should fall back (high cardinality, accumulators were reset)", + !writer.getEncoding().usesDictionary()); + } finally { + writer.close(); + } + } + + /** + * Helper test that prints exact encoded sizes to verify arithmetic witness. + * Uses DictionaryValuesWriter directly (not via FallbackValuesWriter) to measure + * without triggering fallback logic. + */ + @Test + public void testArithmeticWitness() throws Exception { + int dictPageSize = 1024 * 1024; + PlainIntegerDictionaryValuesWriter dictWriter = new PlainIntegerDictionaryValuesWriter( + dictPageSize, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN_DICTIONARY, allocator); + try { + // Page 0: 100 values cycling 0..49 + for (int i = 0; i < 100; i++) { + dictWriter.writeInteger(i % 50); + } + long page0Encoded = dictWriter.getBytes().size(); + dictWriter.reset(); + + // Page 1: 100 values cycling 50..99 + for (int i = 0; i < 100; i++) { + dictWriter.writeInteger(50 + (i % 50)); + } + long page1Encoded = dictWriter.getBytes().size(); + + long dictSize = 100 * 4; // 100 distinct ints × 4 bytes = 400 + long pageRaw = 100 * 4; // 100 ints × 4 bytes = 400 + long totalRaw = pageRaw * 2; // 800 + + // Verify the arithmetic demonstrates the bias + assertTrue( + "Bug comparison should trigger fallback: encoded(" + page1Encoded + ") + dict(" + dictSize + ") = " + + (page1Encoded + dictSize) + " >= pageRaw(" + pageRaw + ")", + (page1Encoded + dictSize) >= pageRaw); + assertTrue( + "Correct cumulative comparison should keep dictionary: totalEncoded(" + + (page0Encoded + page1Encoded) + ") + dict(" + dictSize + ") = " + + (page0Encoded + page1Encoded + dictSize) + " < totalRaw(" + totalRaw + ")", + (page0Encoded + page1Encoded + dictSize) < totalRaw); + } finally { + dictWriter.close(); + } + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/fallback/TestFallbackValuesWriter.java b/parquet-column/src/test/java/org/apache/parquet/column/values/fallback/TestFallbackValuesWriter.java new file mode 100644 index 0000000000..b9373437b1 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/fallback/TestFallbackValuesWriter.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.fallback; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.parquet.bytes.DirectByteBufferAllocator; +import org.apache.parquet.bytes.TrackingByteBufferAllocator; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter; +import org.apache.parquet.column.values.plain.PlainValuesWriter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestFallbackValuesWriter { + + private TrackingByteBufferAllocator allocator; + + @Before + public void initAllocator() { + allocator = TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator()); + } + + @After + public void closeAllocator() { + allocator.close(); + } + + /** + * With threshold=0, the check fires on the first page and falls back for high-cardinality data. + */ + @Test + public void testThresholdZeroFallsBackImmediately() throws Exception { + int dictPageSize = 1024 * 1024; + + PlainIntegerDictionaryValuesWriter dictWriter = new PlainIntegerDictionaryValuesWriter( + dictPageSize, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN_DICTIONARY, allocator); + PlainValuesWriter plainWriter = new PlainValuesWriter(1024, 1024 * 1024, allocator); + FallbackValuesWriter writer = + FallbackValuesWriter.of(dictWriter, plainWriter, 0); + + try { + for (int i = 0; i < 1000; i++) { + writer.writeInteger(i); + } + writer.getBytes(); + + assertFalse( + "Should fall back to plain encoding with threshold=0 and high cardinality", + writer.getEncoding().usesDictionary()); + } finally { + writer.close(); + } + } + + /** + * With a large threshold, the check never fires and dictionary encoding is preserved. + */ + @Test + public void testLargeThresholdPreservesDictionary() throws Exception { + int dictPageSize = 1024 * 1024; + + PlainIntegerDictionaryValuesWriter dictWriter = new PlainIntegerDictionaryValuesWriter( + dictPageSize, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN_DICTIONARY, allocator); + PlainValuesWriter plainWriter = new PlainValuesWriter(1024, 1024 * 1024, allocator); + FallbackValuesWriter writer = + FallbackValuesWriter.of(dictWriter, plainWriter, Long.MAX_VALUE); + + try { + for (int i = 0; i < 1000; i++) { + writer.writeInteger(i); + } + writer.getBytes(); + + assertTrue( + "Dictionary encoding should be preserved with large threshold", + writer.getEncoding().usesDictionary()); + } finally { + writer.close(); + } + } + + /** + * Threshold is crossed only after a reset() (page flush). cumulativeRawBytes accumulates + * across pages while rawDataByteSize resets per page. + */ + @Test + public void testThresholdCrossedAfterReset() throws Exception { + int dictPageSize = 1024 * 1024; + long threshold = 500; + + PlainIntegerDictionaryValuesWriter dictWriter = new PlainIntegerDictionaryValuesWriter( + dictPageSize, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN_DICTIONARY, allocator); + PlainValuesWriter plainWriter = new PlainValuesWriter(1024, 1024 * 1024, allocator); + FallbackValuesWriter writer = + FallbackValuesWriter.of(dictWriter, plainWriter, threshold); + + try { + // Write ~300 bytes (75 ints * 4 bytes = 300) — below threshold + for (int i = 0; i < 75; i++) { + writer.writeInteger(i); + } + // Simulate page flush — check should NOT fire (cumulative = 300 < 500) + writer.getBytes(); + assertTrue( + "Should still use dictionary before threshold is crossed", + writer.getEncoding().usesDictionary()); + writer.reset(); + + // Write another ~300 bytes (75 ints * 4 = 300, cumulative now 600 > 500) + for (int i = 75; i < 150; i++) { + writer.writeInteger(i); + } + // Check SHOULD fire now and fall back (high cardinality, bad compression) + writer.getBytes(); + assertFalse( + "Should fall back after cumulative bytes cross threshold", + writer.getEncoding().usesDictionary()); + } finally { + writer.close(); + } + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 868ae634c1..64e5d47ac0 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -161,6 +161,15 @@ public static enum JobSummaryLevel { public static final String BLOCK_ROW_COUNT_LIMIT = "parquet.block.row.count.limit"; public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit"; public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled"; + /** + * Raw data byte threshold after which the dictionary compression check is performed. + * Once cumulative raw bytes (excluding nulls) written to a column chunk reach this value, + * the writer evaluates whether dictionary encoding is effective. If not, it falls back to + * plain encoding. A value of 0 means check on the first page. + */ + public static final String DICTIONARY_CHECK_THRESHOLD_RAW_SIZE_BYTES = + "parquet.dictionary.check.threshold.raw.size.bytes"; + public static final String STATISTICS_ENABLED = "parquet.column.statistics.enabled"; public static final String SIZE_STATISTICS_ENABLED = "parquet.size.statistics.enabled"; @@ -412,6 +421,16 @@ public static boolean getPageWriteChecksumEnabled(Configuration conf) { return conf.getBoolean(PAGE_WRITE_CHECKSUM_ENABLED, ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED); } + public static void setDictionaryCheckThresholdRawSizeBytes(Configuration conf, long val) { + conf.setLong(DICTIONARY_CHECK_THRESHOLD_RAW_SIZE_BYTES, val); + } + + public static long getDictionaryCheckThresholdRawSizeBytes(Configuration conf) { + return conf.getLong( + DICTIONARY_CHECK_THRESHOLD_RAW_SIZE_BYTES, + ParquetProperties.DEFAULT_DICTIONARY_CHECK_THRESHOLD_RAW_SIZE_BYTES); + } + public static void setStatisticsEnabled(JobContext jobContext, boolean enabled) { getConfiguration(jobContext).setBoolean(STATISTICS_ENABLED, enabled); } @@ -526,6 +545,7 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .withRowGroupRowCountLimit(getBlockRowCountLimit(conf)) .withPageRowCountLimit(getPageRowCountLimit(conf)) .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf)) + .withDictionaryCheckThresholdRawSizeBytes(getDictionaryCheckThresholdRawSizeBytes(conf)) .withStatisticsEnabled(getStatisticsEnabled(conf)); new ColumnConfigParser() .withColumnConfig( diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 8eb5f7f17b..d43fb0ebfb 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -771,6 +771,17 @@ public SELF withPageWriteChecksumEnabled(boolean enablePageWriteChecksum) { return self(); } + /** + * Set the raw data byte threshold after which the dictionary compression check is performed. + * + * @param val byte threshold (0 means checking on the first page for every column chunk) + * @return this builder for method chaining. + */ + public SELF withDictionaryCheckThresholdRawSizeBytes(long val) { + encodingPropsBuilder.withDictionaryCheckThresholdRawSizeBytes(val); + return self(); + } + /** * Set max Bloom filter bytes for related columns. * diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDictionaryEarlyCheck.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDictionaryEarlyCheck.java new file mode 100644 index 0000000000..ee90835016 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDictionaryEarlyCheck.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.bytes.TrackingByteBufferAllocator; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Integration test verifying that the dictionary check threshold affects encoding in written Parquet files. + */ +public class TestDictionaryEarlyCheck { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private TrackingByteBufferAllocator allocator; + + @Before + public void initAllocator() { + allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator()); + } + + @After + public void closeAllocator() { + allocator.close(); + } + + @Test + public void testLargeThresholdPreservesDictionaryInFile() throws IOException { + Configuration conf = new Configuration(); + MessageType schema = MessageTypeParser.parseMessageType("message test { required int32 val; }"); + GroupWriteSupport.setSchema(schema, conf); + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + + Path file = new Path(temp.getRoot().toString(), "large_threshold.parquet"); + + ParquetWriter writer = ExampleParquetWriter.builder(new Path(file.toString())) + .withAllocator(allocator) + .withCompressionCodec(UNCOMPRESSED) + .withDictionaryPageSize(1024 * 1024) + .enableDictionaryEncoding() + .withDictionaryCheckThresholdRawSizeBytes(Long.MAX_VALUE) + .withWriterVersion(org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0) + .withConf(conf) + .build(); + + // Write 1000 distinct values — would normally trigger fallback with threshold=0 + for (int i = 0; i < 1000; i++) { + writer.write(factory.newGroup().append("val", i)); + } + writer.close(); + + ParquetMetadata footer = readFooter(conf, file, NO_FILTER); + for (BlockMetaData block : footer.getBlocks()) { + for (ColumnChunkMetaData column : block.getColumns()) { + assertTrue( + "Dictionary encoding should be preserved with large threshold, got: " + column.getEncodings(), + column.getEncodings().contains(Encoding.PLAIN_DICTIONARY)); + } + } + } + + @Test + public void testZeroThresholdFallsBackInFile() throws IOException { + Configuration conf = new Configuration(); + MessageType schema = MessageTypeParser.parseMessageType("message test { required int32 val; }"); + GroupWriteSupport.setSchema(schema, conf); + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + + Path file = new Path(temp.getRoot().toString(), "zero_threshold.parquet"); + + ParquetWriter writer = ExampleParquetWriter.builder(new Path(file.toString())) + .withAllocator(allocator) + .withCompressionCodec(UNCOMPRESSED) + .withDictionaryPageSize(1024 * 1024) + .enableDictionaryEncoding() + .withDictionaryCheckThresholdRawSizeBytes(0) + .withWriterVersion(org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0) + .withConf(conf) + .build(); + + for (int i = 0; i < 1000; i++) { + writer.write(factory.newGroup().append("val", i)); + } + writer.close(); + + ParquetMetadata footer = readFooter(conf, file, NO_FILTER); + for (BlockMetaData block : footer.getBlocks()) { + for (ColumnChunkMetaData column : block.getColumns()) { + assertFalse( + "Should fall back from dictionary with threshold=0, got: " + column.getEncodings(), + column.getEncodings().contains(Encoding.PLAIN_DICTIONARY)); + } + } + } +}