Skip to content

Cds #308

@siddharthrgade21-a11y

Description

@siddharthrgade21-a11y

import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class AdvancedTaskProcessor {

// Configure a dedicated thread pool for controlled resource management
private static final ExecutorService executor = new ThreadPoolExecutor(
        4,                                 // Core pool size
        10,                                // Maximum pool size
        60L, TimeUnit.SECONDS,            // Keep-alive time for idle threads
        new LinkedBlockingQueue<>(100),   // Bounded queue to prevent OOM
        new ThreadPoolExecutor.CallerRunsPolicy() // Throttling mechanism
);

public static void main(String[] args) {
    List<String> inputs = List.of("DataA", "DataB", "DataC", "ErrorTrigger", "DataE");

    System.out.println("Starting asynchronous processing parallel pipeline...");

    // Trigger tasks asynchronously
    List<CompletableFuture<String>> futures = inputs.stream()
            .map(input -> CompletableFuture.supplyAsync(() -> performHeavyTask(input), executor)
                    .orTimeout(3, TimeUnit.SECONDS) // Prevent stuck threads
                    .exceptionally(throwable -> "Failed processing for: " + input + " (" + throwable.getMessage() + ")"))
            .collect(Collectors.toList());

    // Wait for all processes to complete without blocking the pipeline
    CompletableFuture<Void> allFutures = CompletableFuture.addAllOf(futures.toArray(new CompletableFuture[0]));

    // Extract outcomes when ready
    CompletableFuture<List<String>> resultsFuture = allFutures.thenApply(v -> 
            futures.stream()
                   .map(CompletableFuture::join)
                   .collect(Collectors.toList())
    );

    try {
        List<String> finalResults = resultsFuture.get(); // Block safely at the final gate
        finalResults.forEach(System.out::println);
    } catch (InterruptedException | ExecutionException e) {
        Thread.currentThread().interrupt();
        System.err.println("Pipeline execution interrupted: " + e.getMessage());
    } finally {
        shutdownExecutor();
    }
}

private static String performHeavyTask(String data) {
    if ("ErrorTrigger".equals(data)) {
        throw new RuntimeException("Simulated processing error.");
    }
    try {
        Thread.sleep(1000); // Simulating IO network lag or compute overhead
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    return "Processed successfully: " + data;
}

private static void shutdownExecutor() {
    executor.shutdown();
    try {
        if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
            executor.shutdownNow();
        }
    } catch (InterruptedException e) {
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }
}

}

import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class AdvancedTaskProcessor {

// Configure a dedicated thread pool for controlled resource management
private static final ExecutorService executor = new ThreadPoolExecutor(
        4,                                 // Core pool size
        10,                                // Maximum pool size
        60L, TimeUnit.SECONDS,            // Keep-alive time for idle threads
        new LinkedBlockingQueue<>(100),   // Bounded queue to prevent OOM
        new ThreadPoolExecutor.CallerRunsPolicy() // Throttling mechanism
);

public static void main(String[] args) {
    List<String> inputs = List.of("DataA", "DataB", "DataC", "ErrorTrigger", "DataE");

    System.out.println("Starting asynchronous processing parallel pipeline...");

    // Trigger tasks asynchronously
    List<CompletableFuture<String>> futures = inputs.stream()
            .map(input -> CompletableFuture.supplyAsync(() -> performHeavyTask(input), executor)
                    .orTimeout(3, TimeUnit.SECONDS) // Prevent stuck threads
                    .exceptionally(throwable -> "Failed processing for: " + input + " (" + throwable.getMessage() + ")"))
            .collect(Collectors.toList());

    // Wait for all processes to complete without blocking the pipeline
    CompletableFuture<Void> allFutures = CompletableFuture.addAllOf(futures.toArray(new CompletableFuture[0]));

    // Extract outcomes when ready
    CompletableFuture<List<String>> resultsFuture = allFutures.thenApply(v -> 
            futures.stream()
                   .map(CompletableFuture::join)
                   .collect(Collectors.toList())
    );

    try {
        List<String> finalResults = resultsFuture.get(); // Block safely at the final gate
        finalResults.forEach(System.out::println);
    } catch (InterruptedException | ExecutionException e) {
        Thread.currentThread().interrupt();
        System.err.println("Pipeline execution interrupted: " + e.getMessage());
    } finally {
        shutdownExecutor();
    }
}

private static String performHeavyTask(String data) {
    if ("ErrorTrigger".equals(data)) {
        throw new RuntimeException("Simulated processing error.");
    }
    try {
        Thread.sleep(1000); // Simulating IO network lag or compute overhead
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    return "Processed successfully: " + data;
}

private static void shutdownExecutor() {
    executor.shutdown();
    try {
        if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
            executor.shutdownNow();
        }
    } catch (InterruptedException e) {
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }
}

}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions