CompletionService:如何批量执行异步任务?
# 25 | CompletionService:如何批量执行异步任务?
在《23 | Future:如何用多线程实现最优的“烧水泡茶”程序?》的最后,我给你留了道思考题,如何优化一个询价应用的核心代码?如果采用“ThreadPoolExecutor+Future”的方案,你的优化结果很可能是下面示例代码这样:用三个线程异步执行询价,通过三次调用 Future 的 get() 方法获取询价结果,之后将询价结果保存在数据库中。
// 创建线程池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 异步向电商 S1 询价
Future<Integer> f1 =
executor.submit(
()->getPriceByS1());
// 异步向电商 S2 询价
Future<Integer> f2 =
executor.submit(
()->getPriceByS2());
// 异步向电商 S3 询价
Future<Integer> f3 =
executor.submit(
()->getPriceByS3());
// 获取电商 S1 报价并保存
r=f1.get();
executor.execute(()->save(r));
// 获取电商 S2 报价并保存
r=f2.get();
executor.execute(()->save(r));
// 获取电商 S3 报价并保存
r=f3.get();
executor.execute(()->save(r));
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
上面的这个方案本身没有太大问题,但是有个地方的处理需要你注意,那就是如果获取电商 S1 报价的耗时很长,那么即便获取电商 S2 报价的耗时很短,也无法让保存 S2 报价的操作先执行,因为这个主线程都阻塞在了 f1.get()
操作上。这点小瑕疵你该如何解决呢?
估计你已经想到了,增加一个阻塞队列,获取到 S1、S2、S3 的报价都进入阻塞队列,然后在主线程中消费阻塞队列,这样就能保证先获取到的报价先保存到数据库了。下面的示例代码展示了如何利用阻塞队列实现先获取到的报价先保存到数据库。
// 创建阻塞队列
BlockingQueue<Integer> bq =
new LinkedBlockingQueue<>();
// 电商 S1 报价异步进入阻塞队列
executor.execute(()->
bq.put(f1.get()));
// 电商 S2 报价异步进入阻塞队列
executor.execute(()->
bq.put(f2.get()));
// 电商 S3 报价异步进入阻塞队列
executor.execute(()->
bq.put(f3.get()));
// 异步保存所有报价
for (int i=0; i<3; i++) {
Integer r = bq.take();
executor.execute(()->save(r));
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 利用 CompletionService 实现询价系统
不过在实际项目中,并不建议你这样做,因为 Java SDK 并发包里已经提供了设计精良的 CompletionService。利用 CompletionService 不但能帮你解决先获取到的报价先保存到数据库的问题,而且还能让代码更简练。
CompletionService 的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中,不同的是 CompletionService 是把任务执行结果的 Future 对象加入到阻塞队列中,而上面的示例代码是把任务最终的执行结果放入了阻塞队列中。
那到底该如何创建 CompletionService 呢?
CompletionService 接口的实现类是 ExecutorCompletionService,这个实现类的构造方法有两个,分别是:
ExecutorCompletionService(Executor executor)
;ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
。
这两个构造方法都需要传入一个线程池,如果不指定 completionQueue,那么默认会使用无界的 LinkedBlockingQueue。任务执行结果的 Future 对象就是加入到 completionQueue 中。
下面的示例代码完整地展示了如何利用 CompletionService 来实现高性能的询价系统。其中,我们没有指定 completionQueue,因此默认使用无界的 LinkedBlockingQueue。之后通过 CompletionService 接口提供的 submit() 方法提交了三个询价操作,这三个询价操作将会被 CompletionService 异步执行。最后,我们通过 CompletionService 接口提供的 take() 方法获取一个 Future 对象(前面我们提到过,加入到阻塞队列中的是任务执行结果的 Future 对象),调用 Future 对象的 get() 方法就能返回询价操作的执行结果了。
// 创建线程池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs = new
ExecutorCompletionService<>(executor);
// 异步向电商 S1 询价
cs.submit(()->getPriceByS1());
// 异步向电商 S2 询价
cs.submit(()->getPriceByS2());
// 异步向电商 S3 询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
for (int i=0; i<3; i++) {
Integer r = cs.take().get();
executor.execute(()->save(r));
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# CompletionService 接口说明
下面我们详细地介绍一下 CompletionService 接口提供的方法,CompletionService 接口提供的方法有 5 个,这 5 个方法的方法签名如下所示。
其中,submit() 相关的方法有两个。一个方法参数是Callable<V> task
,前面利用 CompletionService 实现询价系统的示例代码中,我们提交任务就是用的它。另外一个方法有两个参数,分别是Runnable task
和V result
,这个方法类似于 ThreadPoolExecutor 的 <T> Future<T> submit(Runnable task, T result)
,这个方法在《23 | Future:如何用多线程实现最优的“烧水泡茶”程序?》中我们已详细介绍过,这里不再赘述。
CompletionService 接口其余的 3 个方法,都是和阻塞队列相关的,take()、poll() 都是从阻塞队列中获取并移除一个元素;它们的区别在于如果阻塞队列是空的,那么调用 take() 方法的线程会被阻塞,而 poll() 方法会返回 null 值。 poll(long timeout, TimeUnit unit)
方法支持以超时的方式获取并移除阻塞队列头部的一个元素,如果等待了 timeout unit 时间,阻塞队列还是空的,那么该方法会返回 null 值。
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take()
throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException;
2
3
4
5
6
7
8
# 利用 CompletionService 实现 Dubbo 中的 Forking Cluster
Dubbo 中有一种叫做Forking 的集群模式,这种集群模式下,支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了。例如你需要提供一个地址转坐标的服务,为了保证该服务的高可用和性能,你可以并行地调用 3 个地图服务商的 API,然后只要有 1 个正确返回了结果 r,那么地址转坐标这个服务就可以直接返回 r 了。这种集群模式可以容忍 2 个地图服务商服务异常,但缺点是消耗的资源偏多。
geocoder(addr) {
// 并行执行以下 3 个查询服务,
r1=geocoderByS1(addr);
r2=geocoderByS2(addr);
r3=geocoderByS3(addr);
// 只要 r1,r2,r3 有一个返回
// 则返回
return r1|r2|r3;
}
2
3
4
5
6
7
8
9
10
利用 CompletionService 可以快速实现 Forking 这种集群模式,比如下面的示例代码就展示了具体是如何实现的。首先我们创建了一个线程池 executor 、一个 CompletionService 对象 cs 和一个Future<Integer>
类型的列表 futures,每次通过调用 CompletionService 的 submit() 方法提交一个异步任务,会返回一个 Future 对象,我们把这些 Future 对象保存在列表 futures 中。通过调用 cs.take().get()
,我们能够拿到最快返回的任务执行结果,只要我们拿到一个正确返回的结果,就可以取消所有任务并且返回最终结果了。
// 创建线程池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs =
new ExecutorCompletionService<>(executor);
// 用于保存 Future 对象
List<Future<Integer>> futures =
new ArrayList<>(3);
// 提交异步任务,并保存 future 到 futures
futures.add(
cs.submit(()->geocoderByS1()));
futures.add(
cs.submit(()->geocoderByS2()));
futures.add(
cs.submit(()->geocoderByS3()));
// 获取最快返回的任务执行结果
Integer r = 0;
try {
// 只要有一个成功返回,则 break
for (int i = 0; i < 3; ++i) {
r = cs.take().get();
// 简单地通过判空来检查是否成功返回
if (r != null) {
break;
}
}
} finally {
// 取消所有任务
for(Future<Integer> f : futures)
f.cancel(true);
}
// 返回结果
return r;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 总结
当需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。
CompletionService 的实现类 ExecutorCompletionService,需要你自己创建线程池,虽看上去有些啰嗦,但好处是你可以让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
# 课后思考
本章使用 CompletionService 实现了一个询价应用的核心功能,后来又有了新的需求,需要计算出最低报价并返回,下面的示例代码尝试实现这个需求,你看看是否存在问题呢?
// 创建线程池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs = new
ExecutorCompletionService<>(executor);
// 异步向电商 S1 询价
cs.submit(()->getPriceByS1());
// 异步向电商 S2 询价
cs.submit(()->getPriceByS2());
// 异步向电商 S3 询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
// 并计算最低报价
AtomicReference<Integer> m =
new AtomicReference<>(Integer.MAX_VALUE);
for (int i=0; i<3; i++) {
executor.execute(()->{
Integer r = null;
try {
r = cs.take().get();
} catch (Exception e) {}
save(r);
m.set(Integer.min(m.get(), r));
});
}
return m;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
欢迎在留言区与我分享你的想法,也欢迎你在留言区记录你的思考过程。感谢阅读,如果你觉得这篇文章对你有帮助的话,也欢迎把它分享给更多的朋友。
# 精选评论
点击查看
我觉得问题出在return m这里需要等待三个线程执行完成,但是并没有。
...
AtomicReference<Integer> m = new AtomicReference<>(Integer.MAX_VALUE);
CountDownLatch latch = new CountDownLatch(3);
for(int i=0; i<3; i++) {
executor.execute(()->{
Integer r = null;
try {
r = cs.take().get();
} catch(Exception e) {}
save(r);
m.set(Integer.min(m.get(), r));
latch.countDown();
});
latch.await();
return m;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
试过返回值是2147483647,也就是int的最大值。没有等待操作完成就猴急的返回了。 m.set(Integer.min(m.get(), r)... 这个操作也不是原子操作。
试着自己弄了一下:
public Integer run(){
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
AtomicReference<Integer> m = new AtomicReference<>(Integer.MAX_VALUE);
// 异步向电商 S1 询价
cs.submit(()->getPriceByS1());
// 异步向电商 S2 询价
cs.submit(()->getPriceByS2());
// 异步向电商 S3 询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
// 并计算最低报价
for (int i=0; i<3; i++) {
Integer r = logIfError(()->cs.take().get());
executor.execute(()-> save(r));
m.getAndUpdate(v->Integer.min(v, r));
}
return m.get();
}
不知道可不可行
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
m.get()和m.set()不是原子性操作,正确代码是:do{int expect = m.get();int min= Integer.min(expect,r);}while(!m.compareAndSet(expect,min))。老师,是这样吗?
先调用m.get()并跟r比较,再调用m.set(),这里存在竞态条件,线程并不安全
executor.execute(Callable)提交任务是非阻塞的 return m;很大概率返回 Integer.Maxvalue,而且老师为了确保返回这个max还特意加入了save这个阻塞的方法
我实际测试了第一段代码,确实是异步的,f1.get不会阻塞主线程。。。
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
Future<Integer> f1 = executor.submit(()->getPriceByS1());
Future<Integer> f2 = executor.submit(()->getPriceByS2());
Future<Integer> f3 = executor.submit(()->getPriceByS3());
executor.execute(()-> {
try {
save(f1.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
executor.execute(()-> {
try {
save(f2.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
executor.execute(()-> {
try {
save(f3.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
}
private static Integer getPriceByS1() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
private static Integer getPriceByS2() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2;
}
private static Integer getPriceByS3() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 3;
}
private static void save(Integer i) {
System.out.println("save " + i);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
最后的问题 Math.min不是原子操作 会出现竞态条件
老师,我看到几节中的demo都有在线程池里面取消任务执行,我之前看源码了解到,调用cancel时,如果线程已经在执行任务了,是没得办法终止这个任务的运行的,这种情况有没办法处理呢
置顶的留言,return是不是应该在for外面?
// 获取电商 S1 报价并保存
r=f1.get();
executor.execute(()->save(r));
如果把r=f1.get()放进execute里应该是也能保证先执行完的先保存
2
3
4
5
使用相同的线程池,会导致前面查询报价线程池不够使用。应该使用两个线程
重新发过,刚刚的代码有误!
1.for循环线程池执行属于异步导致未等比价结果就 return了,需要等待三次比价结果才能 return,可以用 CountDownLatch
2. m. set( Integer. min( m. get(), r))存在竞态条件,可以更改为
Integer o;
do{
o= m. get();
if(o<=r){ break;}
}
while(! m. compareAndSet( o, r));
3.还有一个小问题就是 try- catch捕获异常后的处理,提高程序鲁棒性
2
3
4
5
6
7
8
9
10
这一期的评论把我看懵了,future.get()就是阻塞当前线程啊
老师stampedLock的获取锁源码,老师能帮忙解惑下么?阻塞的读线程cowait是挂在写节点的下方么?老师能解惑下基于的理论模型
private long acquireWrite(boolean interruptible, long deadline) {
WNode node = null, p;
for (int spins = -1;;) { // spin while enqueuing
long m, s, ns;
//如果当前的state是无锁状态即100000000
if ((m = (s = state) & ABITS) == 0L) {
//设置成写锁
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
return ns;
}
else if (spins < 0)
//当前锁状态为写锁状态,并且队列为空,设置自旋值
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
else if (spins > 0) {
//自旋操作,就是让线程在此自旋
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
//如果队列尾元素为空,初始化队列
else if ((p = wtail) == null) { // initialize queue
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
//当前要加入的元素为空,初始化当前元素,前置节点为尾节点
else if (node == null)
node = new WNode(WMODE, p);
//队列的稳定性判断,当前的前置节点是否改变,重新设置
else if (node.prev != p)
node.prev = p;
//将当前节点加入尾节点中
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
好多说第一个例子不阻塞的,都是没有实际运行过的,凭感觉臆测