个人随笔
目录
并发(十四)、Future类的使用和原理(转)
2021-05-17 10:36:38

java异步接口Future和Callable

Future是Concurrent包提供的一种异步得到结果的接口。
Future接口:

  1. public interface Future<V> {
  2. // 取消当前的计算
  3. boolean cancel(boolean mayInterruptIfRunning);
  4. // 计算是否被取消
  5. boolean isCancelled();
  6. // 计算是否已经结束
  7. boolean isDone();
  8. // 得到计算的结果
  9. V get() throws InterruptedException, ExecutionException;
  10. // 带有超时时间的get方法
  11. V get(long timeout, TimeUnit unit)
  12. throws InterruptedException, ExecutionException, TimeoutException;
  13. }

Future里面的任务,必须实现Callable接口。
Callable接口:

  1. public interface Callable<V> {
  2. // 要实现具体的计算逻辑
  3. V call() throws Exception;
  4. }

state状态
当前计算的情况有下面这些状态

  1. private volatile int state;
  2. private static final int NEW = 0;//初始
  3. private static final int COMPLETING = 1;//任务已经执行完或者出错,准备赋值
  4. private static final int NORMAL = 2;//任务正常执行完,并且已经赋值完
  5. private static final int EXCEPTIONAL = 3;//任务失败,把异常赋值回去
  6. private static final int CANCELLED = 4;//取消
  7. private static final int INTERRUPTING = 5;//准备中断计算过程
  8. private static final int INTERRUPTED = 6;//对计算进行中断

下面以FutureTask类为例,叙述一遍Future的使用方法和原理。

一、使用FutureTask
FutureTask的实现接口:

测试代码:

  1. public class TestMain {
  2. public static void main(String[] args) {
  3. long start = System.currentTimeMillis();
  4. Callable<Clothes> callable = new Callable<Clothes>() {
  5. @Override
  6. public Clothes call() throws Exception {
  7. Clothes clothes = new Clothes();
  8. clothes.washClothes();
  9. return clothes;
  10. }
  11. };
  12. FutureTask<Clothes> futureTask = new FutureTask<Clothes>(callable);
  13. new Thread(futureTask).start();
  14. Dishes dishes = new Dishes();
  15. dishes.washDishes();
  16. Clothes clothes = null;
  17. try {
  18. clothes = futureTask.get();
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. } catch (ExecutionException e) {
  22. e.printStackTrace();
  23. }
  24. if (clothes != null) {
  25. System.out.println("===========over===========");
  26. }
  27. long end = System.currentTimeMillis();
  28. System.out.println("all tasks cost "+(end - start)+" millis");
  29. /*运行结果:
  30. start to wash dishes
  31. machine starts to wash cloths
  32. all dishes are cleaned!!!
  33. all clothes are cleaned!!!
  34. ===========over===========
  35. all tasks cost 3003 millis
  36. */
  37. }
  38. static class Clothes {
  39. public void washClothes() {
  40. System.out.println("machine starts to wash cloths");
  41. try {
  42. Thread.sleep(3000);
  43. } catch (InterruptedException e) {
  44. e.printStackTrace();
  45. }
  46. System.out.println("all clothes are cleaned!!!");
  47. }
  48. }
  49. static class Dishes {
  50. public void washDishes() {
  51. System.out.println("start to wash dishes");
  52. try {
  53. Thread.sleep(2000);
  54. } catch (InterruptedException e) {
  55. e.printStackTrace();
  56. }
  57. System.out.println("all dishes are cleaned!!!");
  58. }
  59. }
  60. }

由结果可看出Future的方法是异步的,所以总耗时是3s,如果同步的话应该是5s。
在代码的最开头,我们new了一个Callable对象,Callable对象会被当做任务丢到FutureTask里面执行。
启动线程就可以开始执行任务了。
下面具体说一下Future帮我们干了什么,为什么他是异步的。

二、 FutureTask的run()方法

  1. public void run() {
  2. if (state != NEW ||
  3. !UNSAFE.compareAndSwapObject(this, runnerOffset,
  4. null, Thread.currentThread()))
  5. return;
  6. try {
  7. Callable<V> c = callable;// 全局变量,构造函数赋值
  8. if (c != null && state == NEW) {
  9. V result;
  10. boolean ran;
  11. try {
  12. result = c.call();// callable的返回值
  13. ran = true;// Callable计算已经结束了
  14. } catch (Throwable ex) {
  15. result = null;
  16. ran = false;
  17. setException(ex);
  18. }
  19. if (ran)
  20. set(result);
  21. }
  22. } finally {
  23. // runner must be non-null until state is settled to
  24. // prevent concurrent calls to run()
  25. runner = null;
  26. // state must be re-read after nulling runner to prevent
  27. // leaked interrupts
  28. int s = state;
  29. if (s >= INTERRUPTING)
  30. handlePossibleCancellationInterrupt(s);
  31. }
  32. }

run方法很直接,直接调用Callable的call方法获取值,获取到值了,就令ran为true,然后调用set方法。

三、 set()方法
set方法会调用finishCompletion,

  1. protected void set(V v) {
  2. if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  3. outcome = v;
  4. // outcome是计算出来的结果,或者抛出的异常
  5. UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
  6. finishCompletion();
  7. }
  8. }

3.1、 finishCompletion()方法
WaitNode节点是Treiber stack的节点,就是一个排队等待的线程队列。

  1. private void finishCompletion() {
  2. // assert state > COMPLETING;
  3. for (WaitNode q; (q = waiters) != null;) {
  4. if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
  5. for (;;) {
  6. //唤醒队列中等待的线程
  7. Thread t = q.thread;
  8. if (t != null) {
  9. q.thread = null;
  10. LockSupport.unpark(t);//唤醒线程,加锁在get里面,如果没有加锁,正常向下执行
  11. }
  12. WaitNode next = q.next;
  13. if (next == null)
  14. break;
  15. q.next = null; // unlink to help gc
  16. q = next;
  17. }
  18. break;
  19. }
  20. }
  21. done();// protected方法,钩子函数,留给程序员自己实现
  22. callable = null; // to reduce footprint
  23. }

四、 get()方法

  1. public V get() throws InterruptedException, ExecutionException {
  2. int s = state;
  3. if (s <= COMPLETING)
  4. //计算没完成,进入awaitDone
  5. s = awaitDone(false, 0L);
  6. return report(s);
  7. }

4.1 awaitDone()
awaitDone用死循环等待结果,也就是说会阻塞在这里。

  1. private int awaitDone(boolean timed, long nanos)
  2. throws InterruptedException {
  3. final long deadline = timed ? System.nanoTime() + nanos : 0L;
  4. WaitNode q = null;
  5. boolean queued = false;
  6. for (;;) {
  7. //死循环等待
  8. if (Thread.interrupted()) {
  9. removeWaiter(q);
  10. throw new InterruptedException();
  11. }
  12. int s = state;
  13. if (s > COMPLETING) {
  14. // 计算完成了就返回
  15. if (q != null)
  16. q.thread = null;
  17. return s;
  18. }
  19. // COMPLETING是一个很短暂的状态,调用Thread.yield期望让出时间片,之后重试循环
  20. else if (s == COMPLETING) // cannot time out yet
  21. Thread.yield();
  22. else if (q == null)
  23. q = new WaitNode();
  24. else if (!queued)
  25. /* 当前节点未入栈
  26. * 这是Treiber Stack算法入栈的逻辑。
  27. * Treiber Stack是一个基于CAS的无锁并发栈实现,
  28. * 更多可以参考https://en.wikipedia.org/wiki/Treiber_Stack
  29. */
  30. queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
  31. q.next = waiters, q);
  32. else if (timed) {
  33. nanos = deadline - System.nanoTime();
  34. if (nanos <= 0L) {
  35. // 超时,移除节点
  36. removeWaiter(q);
  37. return state;
  38. }
  39. LockSupport.parkNanos(this, nanos);
  40. }
  41. else //这里阻塞住
  42. LockSupport.park(this);
  43. }
  44. }

4.2 report()
report把结果赋值回去

  1. private V report(int s) throws ExecutionException {
  2. Object x = outcome;
  3. if (s == NORMAL)//正常得到了结果
  4. return (V)x;
  5. if (s >= CANCELLED)
  6. throw new CancellationException();
  7. throw new ExecutionException((Throwable)x);
  8. }

总结
FutureTask在底层开了一个死循环用于等待结果,当线程得到结果时,跳出循环,借此实现的异步操作。注意,我们一般在使用Future的时候,都不会用本文中new线程的方式,而是采用连接池中的线程。
————————————————
版权声明:本文为CSDN博主「CPeony」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_15764477/article/details/109366729

 327

啊!这个可能是世界上最丑的留言输入框功能~


当然,也是最丑的留言列表

有疑问发邮件到 : suibibk@qq.com 侵权立删
Copyright : 个人随笔   备案号 : 粤ICP备18099399号-2