import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class AdvancedTaskProcessor {
// Configure a dedicated thread pool for controlled resource management
private static final ExecutorService executor = new ThreadPoolExecutor(
4, // Core pool size
10, // Maximum pool size
60L, TimeUnit.SECONDS, // Keep-alive time for idle threads
new LinkedBlockingQueue<>(100), // Bounded queue to prevent OOM
new ThreadPoolExecutor.CallerRunsPolicy() // Throttling mechanism
);
public static void main(String[] args) {
List<String> inputs = List.of("DataA", "DataB", "DataC", "ErrorTrigger", "DataE");
System.out.println("Starting asynchronous processing parallel pipeline...");
// Trigger tasks asynchronously
List<CompletableFuture<String>> futures = inputs.stream()
.map(input -> CompletableFuture.supplyAsync(() -> performHeavyTask(input), executor)
.orTimeout(3, TimeUnit.SECONDS) // Prevent stuck threads
.exceptionally(throwable -> "Failed processing for: " + input + " (" + throwable.getMessage() + ")"))
.collect(Collectors.toList());
// Wait for all processes to complete without blocking the pipeline
CompletableFuture<Void> allFutures = CompletableFuture.addAllOf(futures.toArray(new CompletableFuture[0]));
// Extract outcomes when ready
CompletableFuture<List<String>> resultsFuture = allFutures.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
try {
List<String> finalResults = resultsFuture.get(); // Block safely at the final gate
finalResults.forEach(System.out::println);
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
System.err.println("Pipeline execution interrupted: " + e.getMessage());
} finally {
shutdownExecutor();
}
}
private static String performHeavyTask(String data) {
if ("ErrorTrigger".equals(data)) {
throw new RuntimeException("Simulated processing error.");
}
try {
Thread.sleep(1000); // Simulating IO network lag or compute overhead
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Processed successfully: " + data;
}
private static void shutdownExecutor() {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class AdvancedTaskProcessor {
// Configure a dedicated thread pool for controlled resource management
private static final ExecutorService executor = new ThreadPoolExecutor(
4, // Core pool size
10, // Maximum pool size
60L, TimeUnit.SECONDS, // Keep-alive time for idle threads
new LinkedBlockingQueue<>(100), // Bounded queue to prevent OOM
new ThreadPoolExecutor.CallerRunsPolicy() // Throttling mechanism
);
public static void main(String[] args) {
List<String> inputs = List.of("DataA", "DataB", "DataC", "ErrorTrigger", "DataE");
System.out.println("Starting asynchronous processing parallel pipeline...");
// Trigger tasks asynchronously
List<CompletableFuture<String>> futures = inputs.stream()
.map(input -> CompletableFuture.supplyAsync(() -> performHeavyTask(input), executor)
.orTimeout(3, TimeUnit.SECONDS) // Prevent stuck threads
.exceptionally(throwable -> "Failed processing for: " + input + " (" + throwable.getMessage() + ")"))
.collect(Collectors.toList());
// Wait for all processes to complete without blocking the pipeline
CompletableFuture<Void> allFutures = CompletableFuture.addAllOf(futures.toArray(new CompletableFuture[0]));
// Extract outcomes when ready
CompletableFuture<List<String>> resultsFuture = allFutures.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
try {
List<String> finalResults = resultsFuture.get(); // Block safely at the final gate
finalResults.forEach(System.out::println);
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
System.err.println("Pipeline execution interrupted: " + e.getMessage());
} finally {
shutdownExecutor();
}
}
private static String performHeavyTask(String data) {
if ("ErrorTrigger".equals(data)) {
throw new RuntimeException("Simulated processing error.");
}
try {
Thread.sleep(1000); // Simulating IO network lag or compute overhead
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Processed successfully: " + data;
}
private static void shutdownExecutor() {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class AdvancedTaskProcessor {
}
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class AdvancedTaskProcessor {
}