CompletableFuture超时处理 配置线程池

CompletableFuture

  • 简介
  • 使用方法
  • 代码

简介

  项目中一个统计的业务场景,使用原生的CompletableFuture异步多个任务查询mysql数据,少量请求无问题,但是测试过程中大量请求进来,线程没有设置超时时间,导致大量线程处于等待状态,接口响应缓慢。
  因此需要在原生的CompletableFuture中封装,使用自定义线程池、设置超时时间保证接口稳定性。

使用方法

  工具类中主要封装了 supplyAsync()、runAsync()、allOf()这三个方法,目前我的项目中业务场景这三个方法比较常用。
  将工具类引入项目中,CompletableFuture.supplyAsync() 可直接替换为FutureUtil.supplyAsync(),其他方法同理,替换类名即可,示例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 有返回值的异步任务
FutureUtil.supplyAsync(() -> {
// 逻辑代码
})

// 无返回值的异步任务
FutureUtil.runAsync(() -> {
// 逻辑代码
})

// 统一等待异步任务执行完成
FutureUtil.allOf(() -> {
// 逻辑代码
})

默认超时时间为可在工具类中设置,可手动设置超时时间,示例如下:

1
2
3
4
5
6
// 前两个参数分别为 超时时间、时间单位
FutureUtil.allOf(1000, TimeUnit.MILLISECONDS,() -> { // 异步任务 });

FutureUtil.runAsync(1000, TimeUnit.MILLISECONDS,() -> { // 逻辑代码 });
FutureUtil.supplyAsync(1000, TimeUnit.MILLISECONDS,() -> { // 逻辑代码 });

详细使用可自行查看工具类中源码,代码贴下面了👇👇👇

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* 多任务处理工具类
* @Author WhHao
* @Date 2022/8/10 16:04
* @Return
*/
@Slf4j
public class FutureUtil {

/**
* cpu 核心数
*/
private static final int AVALIABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();

// 最大超时时间
private static final int TIMEOUT_VALUE = 1500;
// 时间单位
private static final TimeUnit TIMEOUT_UNIT = TimeUnit.MILLISECONDS;


/**
* Singleton delay scheduler, used only for starting and * cancelling tasks.
*/
public static final class Delayer {

static final ScheduledThreadPoolExecutor delayer;

/**
* 异常线程,不做请求处理,只抛出异常
*/
static {
delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
delayer.setRemoveOnCancelPolicy(true);
}

static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
return delayer.schedule(command, delay, unit);
}

static final class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CompletableFutureScheduler");
return t;
}
}
}

/**
* 根据服务器cpu自定义线程池
*/
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
AVALIABLE_PROCESSORS,
3 * AVALIABLE_PROCESSORS,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(20),
new ThreadPoolExecutor.CallerRunsPolicy()
);

/**
* 有返回值的异步
* @param supplier
* @param <T>
* @return
*/
public static <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier){
return supplyAsync(TIMEOUT_VALUE,TIMEOUT_UNIT,supplier);
}

/**
* 有返回值的异步 - 可设置超时时间
* @param timeout
* @param unit
* @param supplier
* @param <T>
* @return
*/
public static <T> CompletableFuture<T> supplyAsync(long timeout, TimeUnit unit,Supplier<T> supplier){
return CompletableFuture.supplyAsync(supplier, threadPoolExecutor)
.applyToEither(timeoutAfter(timeout,unit), Function.identity())
.exceptionally(throwable -> {
throwable.printStackTrace();
log.error(throwable.getMessage());
return null;
});
}

/**
* 无返回值的异步
* @param runnable
* @return
*/
public static CompletableFuture runAsync(Runnable runnable){
return runAsync(TIMEOUT_VALUE,TIMEOUT_UNIT,runnable);
}

/**
* 无返回值的异步 - 可设置超时时间
* @param runnable
* @return
*/
public static CompletableFuture runAsync(long timeout, TimeUnit unit,Runnable runnable){
return CompletableFuture.runAsync(runnable,threadPoolExecutor)
.applyToEither(timeoutAfter(timeout,unit), Function.identity())
.exceptionally(throwable -> {
throwable.printStackTrace();
log.error(throwable.getMessage());
return null;
});
}

/**
* 统一处理异步结果
* @param futures
* @return
*/
public static CompletableFuture allOf(CompletableFuture... futures){
return allOf(TIMEOUT_VALUE,TIMEOUT_UNIT,futures);
}

/**
* 统一处理异步结果 - 可设置超时时间
* @param futures
* @return
*/
public static CompletableFuture allOf(long timeout, TimeUnit unit,CompletableFuture... futures){
return CompletableFuture.allOf(futures)
.applyToEither(timeoutAfter(timeout,unit), Function.identity())
.exceptionally(throwable -> {
throwable.printStackTrace();
log.error(throwable.getMessage());
return null;
});
}

/**
* 异步超时处理
* @param timeout
* @param unit
* @param <T>
* @return
*/
public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
CompletableFuture<T> result = new CompletableFuture<T>();
// timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
return result;
}

public static <T> CompletableFuture<T> timeoutAfter() {
CompletableFuture<T> result = new CompletableFuture<T>();
// timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), TIMEOUT_VALUE, TIMEOUT_UNIT);
return result;
}

}

CompletableFuture超时处理 配置线程池
http://www.codersand.fun/2023/04/21/course/CompletableFuture超时处理/
作者
吴昊
发布于
2023年4月21日
许可协议