Java异步编程

一、Future

1. 使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class FutureTest {
@Autowired
privite UserService userService;
public static void main(String[] args) throws ExecutionException, InterruptedException {
//使用内部线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
//创建新的线程去异步执行查询用户列表
FutureTask<List<User>> userFutureTask = new FutureTask<>(new Callable<List<User>>() {
@Override
public List<User> call() throws Exception {
return userService.getUserList();
}
});
//模拟主线程其它操作耗时
Thread.sleep(3000);
executorService.submit(userFutureTask);
//获取用户列表信息结果
List<User> userList = userFutureTask.get();
}

2. 缺点

  • Future.get()是阻塞调用,在线程获取结果之前get方法会一直阻塞
  • Future.isDone()可以在程序中轮询的方式查询执行结果,但会耗费无谓的CPU资源

二、CompletableFuture

CompletableFuture实现FutureCompletionStage接口,提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方

1. 简单异步回调方法使用

supplyAsync()runAsync()的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class CompletableFutureTest {
@Autowired
privite UserService userService;
public static void main(String[] args) throws ExecutionException, InterruptedException {
//使用内部线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
//创建新的线程去异步执行查询用户列表
CompletableFuture<List<User>> useFutureTask = CompletableFuture.supplyAsync(() -> userService.getUserList(),executor);
//获取用户列表信息结果
List<User> userList = useFutureTask.get();

//模拟主线程其它操作耗时
Thread.sleep(3000);

//创建新的线程去打印信息
CompletableFuture<Void> printFutureTask = CompletableFuture.runAsync(() -> System.out.println("打印信息"),executor);


}

thenRun()thenRunAsync()的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CompletableFutureTest {
@Autowired
privite UserService userService;
public static void main(String[] args) {
//使用内部线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
//创建新的线程去异步执行查询用户列表
CompletableFuture<List<User>> useFutureTask = CompletableFuture.supplyAsync(() -> userService.getUserList(),executor);
//等待查询用户列表线程完成后再去去打印信息
CompletableFuture thenRunFuture = useFutureTask.thenRun(() ->{System.out.println("打印信息")});
//打印:null(thenRun()默认没有返回值)
System.out.println(thenRunFuture.get());

}
//方法源码:
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}

thenAccept()thenAcceptAsync()的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class CompletableFutureTest {
@Autowired
privite UserService userService;
public static void main(String[] args) {
//创建新的线程去异步执行查询用户列表
CompletableFuture<String> futureTask = CompletableFuture.supplyAsync(() -> {
return "打印信息";
});
//将futureTask中返回到信息作为参数带入到thenAcceptFuture中
CompletableFuture thenAcceptFuture = futureTask.thenAccept((res) ->{
if ("打印信息".equals(res)) {
System.out.println("打印信息成功");
}
});
//打印:null(thenAccept()默认没有返回值)
System.out.println(thenAcceptFuture.get());

}

thenApply()thenApplyAsync()的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class CompletableFutureTest {
@Autowired
privite UserService userService;
public static void main(String[] args) {
//创建新的线程去异步执行查询用户列表
CompletableFuture<String> futureTask = CompletableFuture.supplyAsync(() -> {
return "打印信息";
});
//将futureTask中返回到信息作为参数带入到thenApplyFuture中
CompletableFuture<String> thenApplyFuture = futureTask.thenApply((res) ->{
if ("打印信息".equals(res)) {
return "打印信息成功";
}
});
//打印:打印信息成功
System.out.println(thenApplyFuture.get());

}

exceptionally()的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class CompletableFutureTest {
@Autowired
privite UserService userService;
public static void main(String[] args) {
//创建新的线程去异步执行查询用户列表
CompletableFuture<String> futureTask = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程名称:" + Thread.currentThread().getName());
throw new RuntimeException();
return "打印信息";
});
//将futureTask中返回到信息作为参数带入到thenApplyFuture中
CompletableFuture<String> exceptionFuture = futureTask.exceptionally((e) ->{
e.printStackTrace();
return "程序异常";
});
//打印:当前线程名称:ForkJoinPool.commonPool-worker-1
//java.util.concurrent.CompletionException: java.lang.RuntimeException
//at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
//...
//Caused by: java.lang.RuntimeException
//...
//程序异常
System.out.println(thenApplyFuture.get());

}

whenComplete()的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class CompletableFutureTest {
@Autowired
privite UserService userService;
public static void main(String[] args) {
//创建新的线程去异步执行查询用户列表
CompletableFuture<String> futureTask = CompletableFuture.supplyAsync(() -> {
return "打印信息";
});
//将futureTask中返回到信息作为参数带入到thenApplyFuture中
CompletableFuture<String> whenCompleteFuture = futureTask.whenComplete((res,throwable) ->{
System.out.println("回调:"+res);
});
//打印:
//回调:打印信息
//打印信息
System.out.println(thenApplyFuture.get());

}

handle()的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CompletableFutureTest {
@Autowired
privite UserService userService;
public static void main(String[] args) {
//创建新的线程去异步执行查询用户列表
CompletableFuture<String> futureTask = CompletableFuture.supplyAsync(() -> {
return "打印信息";
});
//将futureTask中返回到信息作为参数带入到thenApplyFuture中
CompletableFuture<String> handleFuture = futureTask.handle((res,throwable) ->{
return "回调:"+res;
});
//打印:
//回调:打印信息
System.out.println(thenApplyFuture.get());
}
}

2. 多任务组合处理方法使用

AND组合:thenCombine()thenCombineAsync()thenAcceptBoth()runAfterBoth()的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class CompletableFutureTest {
public static void main(String[] args) {
//使用内部线程池
ExecutorService executor = Executors.newFixedThreadPool(10);

CompletableFuture<String> first = CompletableFuture.completedFuture("第一个异步任务");
CompletableFuture<String> second = CompletableFuture.supplyAsync(() -> "第二个异步任务").thenCombineAsync(first, (s,f) -> {
System.out.println(f);
System.out.println(s);
return "两个异步任务的组合";
}, executor);
//打印:
//第一个异步任务
//第二个异步任务
//两个异步任务的组合
System.out.println(second.join());
}
}

OR组合:applyToEither()acceptEitherAsync()acceptEither()runAfterEither()的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CompletableFutureTest {
public static void main(String[] args) {
//使用内部线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
//第一个异步任务,休眠2秒,保证它执行晚点
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000L);
System.out.println("执行完第一个异步任务");
} catch (Exception e) {
return "第一个任务异常";
}
return "第一个异步任务";
});
CompletableFuture<Void> second = CompletableFuture.supplyAsync(() -> "第二个异步任务").acceptEitherAsync(first, s -> {
System.out.println(s);
}, executor);
//打印:
//第二个异步任务
//null
//执行完第一个异步任务
System.out.println(second.join());
}
}

allOf()的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CompletableFutureTest {
public static void main(String[] args) {
CompletableFuture<Void> a = CompletableFuture.runAsync(()->{
System.out.println("任务一完成");
});
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
System.out.println("任务二完成");
});
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((m,k)->{
System.out.println("finish");
});
//打印:
//任务一完成
//任务二完成
//finish
}
}

anyOf()的用法(只要其中一个任务完成,另一个就不会再继续)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CompletableFutureTest {
public static void main(String[] args) {
CompletableFuture<Void> a = CompletableFuture.runAsync(()->{
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务一完成");
});
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
System.out.println("任务二完成");
});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m,k)->{
System.out.println("finish");
// return "捡田螺的小男孩";
});

//打印
//任务二完成
//finish
anyOfFuture.join();
}
}

thenCompose()thenCompose()的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CompletableFutureTest {
public static void main(String[] args) {
//使用线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<String> first= CompletableFuture.completedFuture("第一个任务");
//第二个异步任务
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "第二个任务", executor)
.thenComposeAsync(data -> {
System.out.println(data);
//使用第一个任务作为返回
return first;
}, executor);

//第二个任务
//第一个任务
System.out.println(future.join());
executor.shutdown();

}
}

3. 总结

方法 描述 区别
supplyAsync(Supplier supplier); 提供异步执行,支持返回值,默认线程池为ForkJoinPool.commonPool
supplyAsync(Supplier supplier,Executor executor); 提供异步执行,支持返回值,线程池为用户自定义线程池
runAsync(Runnable runnable); 提供异步执行,不支持返回值,默认线程池也为ForkJoinPool.commonPool
runAsync(Runnable runnable,Executor executor); 提供异步执行,不支持返回值,线程池为用户自定义线程池
thenRun(Runnable action); 默认没有返回值,待先任务执行完后再去执行后任务 当传入自定义线程池时,先后两个任务共用同一个线程池
thenRunAsync(Runnable action); 默认没有返回值,待先任务执行完后再去执行后任务 当传入自定义线程池时,先任务使用自定义的线程池,后任务使用ForkJoin线程池
thenAccept(Consumer<? super T> action) 默认没有返回值,把先任务执行完后的结果以参数的形式带入后任务中执行 thenRun()
thenAcceptAsync(Consumer<? super T> action, Executor executor) 默认没有返回值,把先任务执行完后的结果以参数的形式带入后任务中执行 thenRunAsync()
thenApply(Function<? super T,? extends U> fn) 默认有返回值,把先任务执行完后的结果以参数的形式带入后任务中执行 thenRun()
thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) 默认有返回值,把先任务执行完后的结果以参数的形式带入后任务中执行 thenRunAsync()
exceptionally(Function fn) 默认有返回值,当任务执行异常时,以异常为的参数传入回调并返回值
get() 抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)
join() 抛出的是uncheck异常(即RuntimeException),不会强制开发者抛出

image-20221020115047358

image-20221020115410017