CompletionService

1/13/2023 异步多线程

# CompletionService介绍

CompletionService 的实现目标是任务先完成可优先获取到,即结果按照完成先后顺序排序。

# ExecutorService和CompletionService对比

# ExecutorService执行4个任务

ExecutorService executorService = Executors.newFixedThreadPool(4);
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
futures.add(executorService.submit(D));

// 遍历 Future list,通过 get() 方法获取每个 future 结果
for (Future future:futures) {
    Integer result = future.get();
    // 其他业务逻辑
}
1
2
3
4
5
6
7
8
9
10
11
12

# CompletionService 执行4个任务

ExecutorService executorService = Executors.newFixedThreadPool(4);

// ExecutorCompletionService 是 CompletionService 唯一实现类
CompletionService executorCompletionService= new ExecutorCompletionService<>(executorService );

List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorCompletionService.submit(A));
futures.add(executorCompletionService.submit(B));
futures.add(executorCompletionService.submit(C));
futures.add(executorCompletionService.submit(D));

// 遍历 Future list,通过 get() 方法获取每个 future 结果
for (int i=0; i<futures.size(); i++) {
    Integer result = executorCompletionService.take().get();
    // 其他业务逻辑
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# Future.get方法缺陷

  • 如果 Future 结果没有完成,调用 get() 方法,程序会阻塞在那里,直至获取返回结果
  • 第一种实现方式,假设任务A 由于参数原因,执行时间相对任务 B,C,D 都要长很多,但是按照程序的执行顺序,程序在 get() 任务 A 的执行结果会阻塞在那里,导致任务 B,C,D 的后续任务没办法执行。又因为每个任务执行时间是不固定的,所以无论怎样调整将任务放到 List 的顺序,都不合适.

# CompletionService 如何获取最先执行完的任务

# CompletionService 实现原理

canal3

哪个任务执行完了,就直接将执行结果放到队列中,这样消费者拿到的结果自然就是最早拿到的那个了 从上图中看到,有任务,有结果队列,那 CompletionService 自然也就围绕着几个关键字来实现

  • 既然是异步任务,自然可能用到 RunnableCallable
  • 既然能获取到结果,自然也会用到 Future
  • 实际上就是一个将异步任务的生产和任务完成结果的消费解耦的服务

# CompletionService 源码

public interface CompletionService<V> {

    Future<V> submit(Callable<V> task);

    Future<V> submit(Runnable task, V result);

    Future<V> take() throws InterruptedException;

    Future<V> poll();

    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
1
2
3
4
5
6
7
8
9
10
11
12
  • Take: 如果队列为空,那么调用 take() 方法的线程会被阻塞
  • Poll: 如果队列为空,那么调用 poll() 方法的线程会返回 null
  • Poll-timeout: 以超时的方式获取并移除阻塞队列中的第一个元素,如果超时时间到,队列还是空,那么该方法会返回 null

所以说,按大类划分上面5个方法,其实就是两个功能

  • 提交异步任务 (submit)
  • 从队列中拿取并移除第一个元素 (take/poll)

注意:CompletionService 只是接口,ExecutorCompletionService 是该接口的唯一实现类

# ExecutorCompletionService 源码分析

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }

    /**
    * 构造函数,默认队列为 LinkedBlockingQueue
    **/
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    /**
    * 构造函数
    **/
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

    /**
    * execute 是提交 Runnable 类型的任务,本身得不到返回值,但又可以将执行结果放到阻塞队列里面,
    * 所以这里应该仔细查看下QueueingFuture这个方法
    */
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

canal3

# 如何将异步任务结果放到这个阻塞队列中

/**
* execute 是提交 Runnable 类型的任务,本身得不到返回值,但又可以将执行结果放到阻塞队列里面,
* 所以这里应该仔细查看下QueueingFuture这个方法
*/
public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
}
1
2
3
4
5
6
7
8
9
10

canal3

QueueingFuture 继承了 FutureTaskFutureTask 重写了 Runnablerun() 方法,无论是set() 正常结果,还是setException() 结果,都会调用 finishCompletion() 方法:

public class FutureTask<V> implements RunnableFuture<V> {
    /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

上述方法会执行 done() 方法,而 QueueingFuture 恰巧重写了 FutureTaskdone() 方法,方法实现很简单,就是将 task 放到阻塞队列中:

private class QueueingFuture extends FutureTask<Void> {
    // 执行到此的 task 已经是前序步骤 set 过结果的 task,
    // 所以就可以通过消费阻塞队列获取相应的结果了
    protected void done() { completionQueue.add(task); }
}
1
2
3
4
5

# CompletionService 的主要用途

1、假设你有一组针对某个问题的 solvers,每个都返回一个类型为 Result 的值,并且想要并发地运行它们,处理每个返回一个非空值的结果,在某些方法使用(Result r)

void solve(Executor e,Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException
{
     CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
     for (Callable<Result> s : solvers)
         ecs.submit(s);
     int n = solvers.size();
     for (int i = 0; i < n; ++i) {
         Result r = ecs.take().get();
         if (r != null)
             use(r);
     }
 }
1
2
3
4
5
6
7
8
9
10
11
12

2、假设你想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备好时取消所有其他任务

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException 
{
     CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
     int n = solvers.size();
     List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
     Result result = null;
     try {
         for (Callable<Result> s : solvers)
             futures.add(ecs.submit(s));
         for (int i = 0; i < n; ++i) {
             try {
                 Result r = ecs.take().get();
                 if (r != null) {
                     result = r;
                     break;
                 }
             } catch (ExecutionException ignore) {}
         }
     }
     finally {
         for (Future<Result> f : futures)
             // 注意这里的参数给的是 true
             f.cancel(true);
     }

     if (result != null)
         use(result);
 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

使用 ExecutorCompletionService,需要自己创建线程池,好处就是可以让多个ExecutorCompletionService的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。

# 总结

CompletionService 的应用场景还是非常多的,比如

  • Dubbo 中的 Forking Cluster
  • 多仓库文件/镜像下载(从最近的服务中心下载后终止其他下载过程)
  • 多服务调用(天气预报服务,最先获取到的结果)

CompletionService 不但能满足获取最快结果,还能起到一定 load balancer 作用,获取可用服务的结果,使用也非常简单,只需要遵循范式即可