1 Star 1 Fork 1.2K

openharmony_ci / applications_app_samples

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
ParallelTaskExecuteUtil.java 6.94 KB
一键复制 编辑 原始数据 按行查看 历史
openharmony_ci 提交于 2024-01-13 00:22 . CI测试,请勿合入!!!
package com.huawei.utils;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/**
* @description juc并发执行工具类
* @author developer-ci
* @version update at 2024-01-09 15:27:20
*/
@Slf4j
@Component
public class ParallelTaskExecuteUtil {
/**
* 超时限制
*/
private static final int TIME_OUT_LIMIT = 60;
/**
* 核心线程数
*/
private static final int CORE_POOL_CORE_SIZE = 8;
/**
* 最大线程数
*/
private static final int CORE_POOL_MAX_SIZE = 16;
/**
* 线程存活时间
*/
private static final long CORE_POOL_KEEP_ALIVE_TIME = 120;
/**
* LinkedBlockingQueue size
*/
private static final int THREAD_POOL_QUEUE_CAPACITY = 8;
/**
* 独立线程池
*/
private static final ThreadPoolExecutor parallelUtilThreadPool = new ThreadPoolExecutor(CORE_POOL_CORE_SIZE,
CORE_POOL_MAX_SIZE, CORE_POOL_KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(THREAD_POOL_QUEUE_CAPACITY), new ThreadPoolExecutor.CallerRunsPolicy());
/**
* runnable 实现
*/
@FunctionalInterface
public interface IExecutorConsumerTask {
void doRunExecuteTask();
}
/**
* runnable 实现
*/
@FunctionalInterface
public interface IExecutorProviderTask {
Object doRunExecuteTask();
}
/**
* @param task 任务
* @param executorService 自定义
* @return 具有返回类型的CompletableFuture<Object> 对象
*/
public CompletableFuture<Object> addProviderExecuteTask(IExecutorProviderTask task,
ExecutorService executorService) {
return CompletableFuture.supplyAsync(task::doRunExecuteTask, executorService).handle((result, exception) -> {
if (Objects.nonNull(exception)) {
log.error("Failed to do execute current task, caused by: {}", exception.getMessage());
return CompletableFuture.<Object>completedFuture(exception);
} else {
return CompletableFuture.completedFuture(result);
}
}).thenApplyAsync(future -> {
future.whenComplete((res, throwable) -> {
if (Objects.nonNull(throwable)) {
throw new RuntimeException("Task execution failed", throwable);
}
});
return future.join();
});
}
/**
* 无返回类型 CompletableFuture<Void> 对象
*
* @param task IExecutorVoidTask
* @param executorService 自定义
*/
public CompletableFuture<Void> addConsumerTask(IExecutorConsumerTask task, ExecutorService executorService) {
return runAsync(task::doRunExecuteTask, executorService).whenComplete((result, ex) -> {
if (Objects.nonNull(ex)) {
log.error("Failed to do execute current task, caused by: {}", ex.getMessage());
throw new RuntimeException(ex.getMessage());
}
});
}
/**
* 任务执行完毕
*
* @param task 任务执行
* @return 具有返回类型的CompletableFuture<Object> 对象
*/
public CompletableFuture<Object> ExecuteTaskWithThreadPool(IExecutorProviderTask task) {
return supplyAsync(task::doRunExecuteTask, parallelUtilThreadPool).handle((result, exception) -> {
if (Objects.nonNull(exception)) {
log.error("Failed to do execute current task, caused by: {}", exception.getMessage());
return CompletableFuture.<Object>completedFuture(exception);
} else {
return CompletableFuture.completedFuture(result);
}
}).thenApplyAsync(future -> {
future.whenComplete((res, throwable) -> {
if (Objects.nonNull(throwable)) {
throw new RuntimeException("Task execution failed", throwable);
}
});
return future.join();
});
}
/**
* @param task IExecutorVoidTask
* @return {@link CompletableFuture}<{@link Void}>
*/
public CompletableFuture<Void> ExecuteVoidTaskWithThreadPool(IExecutorConsumerTask task) {
return runAsync(task::doRunExecuteTask, parallelUtilThreadPool).whenComplete((result, ex) -> {
if (Objects.nonNull(ex)) {
log.error("Failed to do execute current task, caused by: {}", ex.getMessage());
throw new RuntimeException(ex.getMessage());
}
});
}
/**
* <p>
* 聚合
* </p>
*
* @param providerTaskList {@linkplain CompletableFuture<Object>}
* @return List<Object>{@linkplain List<CompletableFuture>}
*/
public List<CompletableFuture<?>> doProviderTasksCollect(List<CompletableFuture<Object>> providerTaskList) {
return providerTaskList.stream().peek(CompletableFuture::join).collect(Collectors.toList());
}
/**
* 任务完成。
*
* @param consumerTaskList {@linkplain CompletableFuture<Object>}
*/
public void doConsumerTask(List<CompletableFuture<Object>> consumerTaskList)
throws ExecutionException, InterruptedException, TimeoutException {
checkTaskComplete(consumerTaskList);
}
/**
* 任务完成
* <p>
*
* @param consumerTaskList {@linkplain CompletableFuture<Void>}
*/
public void doConsumerVoidTask(List<CompletableFuture<Void>> consumerTaskList)
throws ExecutionException, InterruptedException, TimeoutException {
checkTaskComplete(consumerTaskList);
}
/**
* 任务聚合
*
* @param consumerTaskList CompletableFuture<T>
* @throws ExecutionException 执行异常
* @throws InterruptedException 打断异常
* @throws TimeoutException 超时异常
*/
private <T> void checkTaskComplete(List<CompletableFuture<T>> consumerTaskList)
throws ExecutionException, InterruptedException, TimeoutException {
if (CollectionUtils.isNotEmpty(consumerTaskList)) {
allOf(consumerTaskList.toArray(new CompletableFuture[0])).get(TIME_OUT_LIMIT, TimeUnit.SECONDS);
}
}
}
1
https://gitee.com/openharmony_ci/applications_app_samples.git
git@gitee.com:openharmony_ci/applications_app_samples.git
openharmony_ci
applications_app_samples
applications_app_samples
master

搜索帮助