Skip to content

HIVE-29647: Parallelize Parquet split generation directory listing on blob storage#6526

Open
deniskuzZ wants to merge 2 commits into
apache:masterfrom
deniskuzZ:HIVE-29647
Open

HIVE-29647: Parallelize Parquet split generation directory listing on blob storage#6526
deniskuzZ wants to merge 2 commits into
apache:masterfrom
deniskuzZ:HIVE-29647

Conversation

@deniskuzZ
Copy link
Copy Markdown
Member

@deniskuzZ deniskuzZ commented Jun 4, 2026

What changes were proposed in this pull request?

Override MapredParquetInputFormat.listStatus to list the input directories in parallel when there is more than one recursive input dir on a blob filesystem; all other cases defer to the default listing.

Why are the changes needed?

Parquet split generation lists each input directory (typically one per partition) serially, which dominates planning time on object stores where every listing is a network round trip.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Cluster (10 workers) TPC-DS scale 1Tb, external, parquet
query2 150sec (before) / 70 (after) # without the semijoins (see #6525)

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR (HIVE-29647) speeds up Parquet split generation on object/blob stores by overriding MapredParquetInputFormat.listStatus to list multiple recursive input directories concurrently, falling back to the default FileInputFormat listing for other scenarios.

Changes:

  • Added a listStatus(JobConf) override that parallelizes recursive directory listing when multiple input dirs are present on blob storage.
  • Introduced a dedicated worker thread pool and completion-based result collection for per-directory listings.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +83 to +90
Path[] dirs = getInputPaths(job);
// Only the recursive case (the Tez default) takes the parallel path; non-recursive listing has
// subtler sub-directory semantics, so defer to the default.
if (dirs.length <= 1
|| !job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false)
|| !BlobStorageUtils.isBlobStorageFileSystem(job, dirs[0].getFileSystem(job))) {
return super.listStatus(job);
}
Comment on lines +96 to +99
int numThreads = Math.max(2, HiveConf.getIntVar(job, HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS));
ExecutorService pool = newWorkerPool(numThreads);
CompletionService<List<FileStatus>> completionService = new ExecutorCompletionService<>(pool);

Comment on lines +105 to +107
FileSystem dirFs = dir.getFileSystem(job);
List<FileStatus> dirFiles = new ArrayList<>();
FileUtils.listStatusRecursively(dirFs, new FileStatus(0, true, 0, 0, 0, dir), dirFiles);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we tried to save on HEAD requested per partition and we expect every input to be a dir

Comment on lines +118 to +124
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
}
throw new IOException("Failed to list input directories", cause);
} finally {
Comment on lines +81 to +83
@Override
protected FileStatus[] listStatus(JobConf job) throws IOException {
Path[] dirs = getInputPaths(job);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();

int numThreads = Math.max(2, HiveConf.getIntVar(job, HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS));
ExecutorService pool = newWorkerPool(numThreads);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For every listStatus call ExecutorService is getting created. Can't we make it static similar to OrcInputFormat has done?

Copy link
Copy Markdown
Member Author

@deniskuzZ deniskuzZ Jun 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IcebergInputFormat does the same. It allows changing the pool size in runtime. ORC adds cap on number of listing threads. So it's not uniform even now. ...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

@sonarqubecloud
Copy link
Copy Markdown

sonarqubecloud Bot commented Jun 4, 2026

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants