java异步接口Future和Callable
Future是Concurrent包提供的一种异步得到结果的接口。
Future接口:
public interface Future<V> {
// 取消当前的计算
boolean cancel(boolean mayInterruptIfRunning);
// 计算是否被取消
boolean isCancelled();
// 计算是否已经结束
boolean isDone();
// 得到计算的结果
V get() throws InterruptedException, ExecutionException;
// 带有超时时间的get方法
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future里面的任务,必须实现Callable接口。
Callable接口:
public interface Callable<V> {
// 要实现具体的计算逻辑
V call() throws Exception;
}
state状态
当前计算的情况有下面这些状态
private volatile int state;
private static final int NEW = 0;//初始
private static final int COMPLETING = 1;//任务已经执行完或者出错,准备赋值
private static final int NORMAL = 2;//任务正常执行完,并且已经赋值完
private static final int EXCEPTIONAL = 3;//任务失败,把异常赋值回去
private static final int CANCELLED = 4;//取消
private static final int INTERRUPTING = 5;//准备中断计算过程
private static final int INTERRUPTED = 6;//对计算进行中断
下面以FutureTask类为例,叙述一遍Future的使用方法和原理。
一、使用FutureTask
FutureTask的实现接口:
测试代码:
public class TestMain {
public static void main(String[] args) {
long start = System.currentTimeMillis();
Callable<Clothes> callable = new Callable<Clothes>() {
@Override
public Clothes call() throws Exception {
Clothes clothes = new Clothes();
clothes.washClothes();
return clothes;
}
};
FutureTask<Clothes> futureTask = new FutureTask<Clothes>(callable);
new Thread(futureTask).start();
Dishes dishes = new Dishes();
dishes.washDishes();
Clothes clothes = null;
try {
clothes = futureTask.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
if (clothes != null) {
System.out.println("===========over===========");
}
long end = System.currentTimeMillis();
System.out.println("all tasks cost "+(end - start)+" millis");
/*运行结果:
start to wash dishes
machine starts to wash cloths
all dishes are cleaned!!!
all clothes are cleaned!!!
===========over===========
all tasks cost 3003 millis
*/
}
static class Clothes {
public void washClothes() {
System.out.println("machine starts to wash cloths");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("all clothes are cleaned!!!");
}
}
static class Dishes {
public void washDishes() {
System.out.println("start to wash dishes");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("all dishes are cleaned!!!");
}
}
}
由结果可看出Future的方法是异步的,所以总耗时是3s,如果同步的话应该是5s。
在代码的最开头,我们new了一个Callable对象,Callable对象会被当做任务丢到FutureTask里面执行。
启动线程就可以开始执行任务了。
下面具体说一下Future帮我们干了什么,为什么他是异步的。
二、 FutureTask的run()方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;// 全局变量,构造函数赋值
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();// callable的返回值
ran = true;// Callable计算已经结束了
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
run方法很直接,直接调用Callable的call方法获取值,获取到值了,就令ran为true,然后调用set方法。
三、 set()方法
set方法会调用finishCompletion,
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
// outcome是计算出来的结果,或者抛出的异常
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
3.1、 finishCompletion()方法
WaitNode节点是Treiber stack的节点,就是一个排队等待的线程队列。
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
//唤醒队列中等待的线程
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);//唤醒线程,加锁在get里面,如果没有加锁,正常向下执行
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();// protected方法,钩子函数,留给程序员自己实现
callable = null; // to reduce footprint
}
四、 get()方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
//计算没完成,进入awaitDone
s = awaitDone(false, 0L);
return report(s);
}
4.1 awaitDone()
awaitDone用死循环等待结果,也就是说会阻塞在这里。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//死循环等待
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
// 计算完成了就返回
if (q != null)
q.thread = null;
return s;
}
// COMPLETING是一个很短暂的状态,调用Thread.yield期望让出时间片,之后重试循环
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
/* 当前节点未入栈
* 这是Treiber Stack算法入栈的逻辑。
* Treiber Stack是一个基于CAS的无锁并发栈实现,
* 更多可以参考https://en.wikipedia.org/wiki/Treiber_Stack
*/
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
// 超时,移除节点
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else //这里阻塞住
LockSupport.park(this);
}
}
4.2 report()
report把结果赋值回去
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)//正常得到了结果
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
总结
FutureTask在底层开了一个死循环用于等待结果,当线程得到结果时,跳出循环,借此实现的异步操作。注意,我们一般在使用Future的时候,都不会用本文中new线程的方式,而是采用连接池中的线程。
————————————————
版权声明:本文为CSDN博主「CPeony」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_15764477/article/details/109366729