JavaJava异步编程
DreamCollector一、Future
1. 使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class FutureTest { @Autowired privite UserService userService; public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); FutureTask<List<User>> userFutureTask = new FutureTask<>(new Callable<List<User>>() { @Override public List<User> call() throws Exception { return userService.getUserList(); } }); Thread.sleep(3000); executorService.submit(userFutureTask); List<User> userList = userFutureTask.get(); }
|
2. 缺点
- Future.get()是阻塞调用,在线程获取结果之前get方法会一直阻塞
- Future.isDone()可以在程序中轮询的方式查询执行结果,但会耗费无谓的CPU资源
二、CompletableFuture
CompletableFuture
实现Future
和CompletionStage
接口,提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方
1. 简单异步回调方法使用
supplyAsync()、runAsync()的用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class CompletableFutureTest { @Autowired privite UserService userService; public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture<List<User>> useFutureTask = CompletableFuture.supplyAsync(() -> userService.getUserList(),executor); List<User> userList = useFutureTask.get();
Thread.sleep(3000);
CompletableFuture<Void> printFutureTask = CompletableFuture.runAsync(() -> System.out.println("打印信息"),executor);
}
|
thenRun()、thenRunAsync()的用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class CompletableFutureTest { @Autowired privite UserService userService; public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture<List<User>> useFutureTask = CompletableFuture.supplyAsync(() -> userService.getUserList(),executor); CompletableFuture thenRunFuture = useFutureTask.thenRun(() ->{System.out.println("打印信息")}); System.out.println(thenRunFuture.get());
} public CompletableFuture<Void> thenRun(Runnable action) { return uniRunStage(null, action); }
public CompletableFuture<Void> thenRunAsync(Runnable action) { return uniRunStage(asyncPool, action); }
|
thenAccept()、thenAcceptAsync()的用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class CompletableFutureTest { @Autowired privite UserService userService; public static void main(String[] args) { CompletableFuture<String> futureTask = CompletableFuture.supplyAsync(() -> { return "打印信息"; }); CompletableFuture thenAcceptFuture = futureTask.thenAccept((res) ->{ if ("打印信息".equals(res)) { System.out.println("打印信息成功"); } }); System.out.println(thenAcceptFuture.get());
}
|
thenApply()、thenApplyAsync()的用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class CompletableFutureTest { @Autowired privite UserService userService; public static void main(String[] args) { CompletableFuture<String> futureTask = CompletableFuture.supplyAsync(() -> { return "打印信息"; }); CompletableFuture<String> thenApplyFuture = futureTask.thenApply((res) ->{ if ("打印信息".equals(res)) { return "打印信息成功"; } }); System.out.println(thenApplyFuture.get());
}
|
exceptionally()的用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class CompletableFutureTest { @Autowired privite UserService userService; public static void main(String[] args) { CompletableFuture<String> futureTask = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程名称:" + Thread.currentThread().getName()); throw new RuntimeException(); return "打印信息"; }); CompletableFuture<String> exceptionFuture = futureTask.exceptionally((e) ->{ e.printStackTrace(); return "程序异常"; }); System.out.println(thenApplyFuture.get());
}
|
whenComplete()的用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class CompletableFutureTest { @Autowired privite UserService userService; public static void main(String[] args) { CompletableFuture<String> futureTask = CompletableFuture.supplyAsync(() -> { return "打印信息"; }); CompletableFuture<String> whenCompleteFuture = futureTask.whenComplete((res,throwable) ->{ System.out.println("回调:"+res); }); System.out.println(thenApplyFuture.get());
}
|
handle()的用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class CompletableFutureTest { @Autowired privite UserService userService; public static void main(String[] args) { CompletableFuture<String> futureTask = CompletableFuture.supplyAsync(() -> { return "打印信息"; }); CompletableFuture<String> handleFuture = futureTask.handle((res,throwable) ->{ return "回调:"+res; }); System.out.println(thenApplyFuture.get()); } }
|
2. 多任务组合处理方法使用
AND组合:thenCombine()、thenCombineAsync()、thenAcceptBoth()、runAfterBoth()的用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class CompletableFutureTest { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> first = CompletableFuture.completedFuture("第一个异步任务"); CompletableFuture<String> second = CompletableFuture.supplyAsync(() -> "第二个异步任务").thenCombineAsync(first, (s,f) -> { System.out.println(f); System.out.println(s); return "两个异步任务的组合"; }, executor); System.out.println(second.join()); } }
|
OR组合:applyToEither()、acceptEitherAsync()、acceptEither()、runAfterEither()的用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class CompletableFutureTest { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000L); System.out.println("执行完第一个异步任务"); } catch (Exception e) { return "第一个任务异常"; } return "第一个异步任务"; }); CompletableFuture<Void> second = CompletableFuture.supplyAsync(() -> "第二个异步任务").acceptEitherAsync(first, s -> { System.out.println(s); }, executor); System.out.println(second.join()); } }
|
allOf()的用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class CompletableFutureTest { public static void main(String[] args) { CompletableFuture<Void> a = CompletableFuture.runAsync(()->{ System.out.println("任务一完成"); }); CompletableFuture<Void> b = CompletableFuture.runAsync(() -> { System.out.println("任务二完成"); }); CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((m,k)->{ System.out.println("finish"); }); } }
|
anyOf()的用法(只要其中一个任务完成,另一个就不会再继续)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class CompletableFutureTest { public static void main(String[] args) { CompletableFuture<Void> a = CompletableFuture.runAsync(()->{ try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务一完成"); }); CompletableFuture<Void> b = CompletableFuture.runAsync(() -> { System.out.println("任务二完成"); }); CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m,k)->{ System.out.println("finish"); });
anyOfFuture.join(); } }
|
thenCompose()、thenCompose()的用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class CompletableFutureTest { public static void main(String[] args) { ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture<String> first= CompletableFuture.completedFuture("第一个任务"); CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> "第二个任务", executor) .thenComposeAsync(data -> { System.out.println(data); return first; }, executor);
System.out.println(future.join()); executor.shutdown();
} }
|
3. 总结
方法 |
描述 |
区别 |
supplyAsync(Supplier supplier); |
提供异步执行,支持返回值,默认线程池为ForkJoinPool.commonPool |
|
supplyAsync(Supplier supplier,Executor executor); |
提供异步执行,支持返回值,线程池为用户自定义线程池 |
|
runAsync(Runnable runnable); |
提供异步执行,不支持返回值,默认线程池也为ForkJoinPool.commonPool |
|
runAsync(Runnable runnable,Executor executor); |
提供异步执行,不支持返回值,线程池为用户自定义线程池 |
|
thenRun(Runnable action); |
默认没有返回值,待先任务执行完后再去执行后任务 |
当传入自定义线程池时,先后两个任务共用同一个线程池 |
thenRunAsync(Runnable action); |
默认没有返回值,待先任务执行完后再去执行后任务 |
当传入自定义线程池时,先任务使用自定义的线程池,后任务使用ForkJoin线程池 |
thenAccept(Consumer<? super T> action) |
默认没有返回值,把先任务执行完后的结果以参数的形式带入后任务中执行 |
同thenRun() |
thenAcceptAsync(Consumer<? super T> action, Executor executor) |
默认没有返回值,把先任务执行完后的结果以参数的形式带入后任务中执行 |
同thenRunAsync() |
thenApply(Function<? super T,? extends U> fn) |
默认有返回值,把先任务执行完后的结果以参数的形式带入后任务中执行 |
同thenRun() |
thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) |
默认有返回值,把先任务执行完后的结果以参数的形式带入后任务中执行 |
同thenRunAsync() |
exceptionally(Function fn) |
默认有返回值,当任务执行异常时,以异常为的参数传入回调并返回值 |
|
get() |
抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch) |
|
join() |
抛出的是uncheck异常(即RuntimeException),不会强制开发者抛出 |