HIVE-29647: Parallelize Parquet split generation directory listing on blob storage#6526
HIVE-29647: Parallelize Parquet split generation directory listing on blob storage#6526deniskuzZ wants to merge 2 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR (HIVE-29647) speeds up Parquet split generation on object/blob stores by overriding MapredParquetInputFormat.listStatus to list multiple recursive input directories concurrently, falling back to the default FileInputFormat listing for other scenarios.
Changes:
- Added a
listStatus(JobConf)override that parallelizes recursive directory listing when multiple input dirs are present on blob storage. - Introduced a dedicated worker thread pool and completion-based result collection for per-directory listings.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Path[] dirs = getInputPaths(job); | ||
| // Only the recursive case (the Tez default) takes the parallel path; non-recursive listing has | ||
| // subtler sub-directory semantics, so defer to the default. | ||
| if (dirs.length <= 1 | ||
| || !job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false) | ||
| || !BlobStorageUtils.isBlobStorageFileSystem(job, dirs[0].getFileSystem(job))) { | ||
| return super.listStatus(job); | ||
| } |
| int numThreads = Math.max(2, HiveConf.getIntVar(job, HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS)); | ||
| ExecutorService pool = newWorkerPool(numThreads); | ||
| CompletionService<List<FileStatus>> completionService = new ExecutorCompletionService<>(pool); | ||
|
|
| FileSystem dirFs = dir.getFileSystem(job); | ||
| List<FileStatus> dirFiles = new ArrayList<>(); | ||
| FileUtils.listStatusRecursively(dirFs, new FileStatus(0, true, 0, 0, 0, dir), dirFiles); |
There was a problem hiding this comment.
we tried to save on HEAD requested per partition and we expect every input to be a dir
| } catch (ExecutionException e) { | ||
| Throwable cause = e.getCause(); | ||
| if (cause instanceof IOException) { | ||
| throw (IOException) cause; | ||
| } | ||
| throw new IOException("Failed to list input directories", cause); | ||
| } finally { |
| @Override | ||
| protected FileStatus[] listStatus(JobConf job) throws IOException { | ||
| Path[] dirs = getInputPaths(job); |
| UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); | ||
|
|
||
| int numThreads = Math.max(2, HiveConf.getIntVar(job, HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS)); | ||
| ExecutorService pool = newWorkerPool(numThreads); |
There was a problem hiding this comment.
For every listStatus call ExecutorService is getting created. Can't we make it static similar to OrcInputFormat has done?
There was a problem hiding this comment.
IcebergInputFormat does the same. It allows changing the pool size in runtime. ORC adds cap on number of listing threads. So it's not uniform even now. ...
|



What changes were proposed in this pull request?
Override
MapredParquetInputFormat.listStatusto list the input directories in parallel when there is more than one recursive input dir on a blob filesystem; all other cases defer to the default listing.Why are the changes needed?
Parquet split generation lists each input directory (typically one per partition) serially, which dominates planning time on object stores where every listing is a network round trip.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Cluster (10 workers) TPC-DS scale 1Tb, external, parquet
query2 150sec (before) / 70 (after) # without the semijoins (see #6525)