营销型网站开发流程包括,wordpress百家主题,青海青海西宁网站建设,青海最好的网站建设公司原创/朱季谦
在并发多线程场景下#xff0c;存在需要获取各线程的异步执行结果#xff0c;这时#xff0c;就可以通过ExecutorService线程池结合Callable、Future来实现。
我们先来写一个简单的例子——
public class ExecutorTest {public static void main(String[] ar…
原创/朱季谦
在并发多线程场景下存在需要获取各线程的异步执行结果这时就可以通过ExecutorService线程池结合Callable、Future来实现。
我们先来写一个简单的例子——
public class ExecutorTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executor Executors.newSingleThreadExecutor();Callable callable new MyCallable();Future future executor.submit(callable);System.out.println(打印线程池返回值 future.get());}
}class MyCallable implements CallableString{Overridepublic String call() throws Exception {return 测试返回值;}
}执行完成后会打印出以下结果
打印线程池返回值测试返回值可见线程池执行完异步线程任务我们是可以获取到异步线程里的返回值。
那么ExecutorService、Callable、Future实现有返回结果的多线程是如何实现的呢
首先我们需要创建一个实现函数式接口Callable的类该Callable接口只定义了一个被泛型修饰的call方法这意味着需要返回什么类型的值可以由具体实现类来定义——
FunctionalInterface
public interface CallableV {V call() throws Exception;
}因此我自定义了一个实现Callable接口的类该类的重写了call方法我们在执行多线程时希望返回什么样的结果就可以在该重写的call方法定义。
class MyCallable implements CallableString{Overridepublic String call() throws Exception {return 测试返回值;}
}在自定义的MyCallable类中我在call方法里设置一个很简单的String返回值 “测试返回值”这意味着我是希望在线程池执行完异步线程任务时可以返回“测试返回值”这个字符串给我。
接下来我们就可以创建该MyCallable类的对象然后通过executor.submit(callable)丢到线程池里线程池里会利用空闲线程来帮我们执行一个异步线程任务。
ExecutorService executor Executors.newSingleThreadExecutor();
Callable callable new MyCallable();
Future future executor.submit(callable);值得注意一点是若需要实现获取线程返回值的效果只能通过executor.submit(callable)去执行而不能通过executor.execute(Runnable command)执行因为executor.execute(Runnable command)只能传入实现Runnable 接口的对象但这类对象是不具备返回线程效果的功能。
进入到executor.submit(callable)底层具体实现在AbstractExecutorService类中。可以看到执行到submit方法内部时会将我们传进来的new MyCallable()对象作为参数传入到newTaskFor(task)方法里——
public T FutureT submit(CallableT task) {if (task null) throw new NullPointerException();RunnableFutureT ftask newTaskFor(task);execute(ftask);return ftask;
}这个newTaskFor(task)方法内部具体实现是将new MyCallable()对象传入构造器中生成了一个FutureTask对象。
protected T RunnableFutureT newTaskFor(CallableT callable) {return new FutureTaskT(callable);
}这个FutureTask对象实现RunableFuture接口这个RunableFuture接口又继承了Runnable说明FutureTask类内部会实现一个run方法然后本身就可以当做一个Runnable线程任务借助线程Thread(new FutureTask(.....)).start()方式开启一个新线程去异步执行其内部实现的run方法逻辑。
public class FutureTaskV implements RunnableFutureV{.....}public interface RunnableFutureV extends Runnable, FutureV {/*** Sets this Future to the result of its computation* unless it has been cancelled.*/void run();
}分析到这里可以知道FutureTask的核心方法一定是run方法线程执行start方法后最后会去调用FutureTask的run方法。在讲解这个run方法前我们先去看一下创建FutureTask的初始化构造方法底层逻辑new FutureTask(callable) ——
public class FutureTaskV implements RunnableFutureV {private CallableV callable;......//省略其余源码public FutureTask(CallableV callable) {if (callable null)throw new NullPointerException();//通过构造方法初始化CallableV callable赋值this.callable callable;this.state NEW; // ensure visibility of callable
}......//省略其余源码
}可以看到FutureTask(Callable callable)构造器主要是将我们先前创建的new MyCallable()对象传进来赋值给FutureTask内部定义的Callable callable引用实现子类对象指向父类引用。这一点很关键这就意味着在初始化创建FutureTask对象后我们是可以通过callable.call()来调用我们自定义设置可以返回“测试返回值”的call方法这不就是我们希望在异步线程执行完后能够返回的值吗
我们不妨猜测一下整体返数主流程在Thread(new FutureTask(.....)).start()开启一个线程后当线程获得了CPU时间片就会去执行FutureTask对象里的run方法这时run方法里可以通过callable.call()调用到我们自定义的MyCallable#call()方法进而得到方法返回值 “测试返回值”——到这一步只需要将这个返回值赋值给FutureTask里某个定义的对象属性那么在主线程在通过获取FutureTask里被赋值的X对象属性值不就可以拿到返回字符串值 “测试返回值”了吗
实现上主体流程确实是这样只不过忽略了一些细节而已。 接下来让我们看一下FutureTask的run方法——
public void run() {//如果状态不是NEW或者设置runner为当前线程时说明FutureTask任务已经取消无法继续执行if (state ! NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {//在该文中callable被赋值为指向我们定义的new MyCallable()对象引用CallableV c callable;if (c ! null state NEW) {V result;boolean ran;try {//c.call最后会调用new MyCallable()的call()方法得到字符串返回值“测试返回值”给resultresult c.call();ran true;} catch (Throwable ex) {result null;ran false;setException(ex);}//正常执行完c.call()方法时ran值为true,说明会执行set(result)方法。if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner null;// state must be re-read after nulling runner to prevent// leaked interruptsint s state;if (s INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}根据以上源码简单分析可以看到run方法当中最终确实会执行new MyCallable()的call()方法得到字符串返回值“测试返回值”给result然后执行set(result)方法根据set方法名就不难猜出这是一个会赋值给某个字段的方法。
这里分析会忽略一些状态值的讲解这块会包括线程的取消、终止等内容后面我会出一片专门针对FutureTask源码分析的文章再介绍本文主要还是介绍异步线程返回结果的主要原理。
沿着以上分析追踪至set(result)方法里——
protected void set(V v) {//通过CAS原子操作将运行的线程设置为COMPLETING说明线程已经执行完成中if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//若CAS原子比较赋值成功说明线程可以被正常执行完成的话然后将result结果值赋值给outcomeoutcome v;//线程正常完成结束UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}
}这个方法的主要是若该线程执行能够正常完成话就将得到的返回值赋值给outcome这个outcome是FutureTask的一个Object变量——
private Object outcome;至此就完成了流程的这一步—— 最后就是执行主线程的根据ftask.get()获取执行完成的值这个get可以设置超时时间例如 ftask.get(2,TimeUnit.SECONDS)表示超过2秒还没有获取到线程返回值的话就直接结束该get方法继续主线程往下执行。
System.out.println(打印线程池返回值 ftask.get(2,TimeUnit.SECONDS));进入到get方法可以看到当状态在s COMPLETING时表示任务还没有执行完就会去执行awaitDone(false, 0L)方法这个方法表示将一直做死循环等待线程执行完成才会跳出等待循环继续往下走。若设置了超时时间例如ftask.get(2,TimeUnit.SECONDS))就会在awaitDone方法循环至2秒在2秒内发现线程状态被设置为正常完成时就会跳出循环若2秒后线程没有执行完成也会强制跳出循环了但这种情况将无法获取到线程结果值。
public V get() throws InterruptedException, ExecutionException {int s state;if (s COMPLETING)//循环等待线程执行状态s awaitDone(false, 0L);return report(s);
}最后就是report(s)方法可以看到outcome值最终赋值给Object x若sNORMAL表示线程任务已经正常完成结束就可以根据我们定义的类型进行泛型转换返回我们定义的是String字符串类型故而会返回字符串值也就是 “测试返回值”。
private V report(int s) throws ExecutionException {Object x outcome;if (s NORMAL)//返回线程任务执行结果return (V)x;if (s CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);
}你看最后就能获取到了异步线程执行的结果返回给main主线程—— 以上就是执行线程任务run方法后如何将线程任务结果返回给主线程其实还少一个地方补充就是如何将FutureTask任务丢给线程执行我们这里用到了线程池 但是execute(ftask)底层同样是使用一个了线程通过执行start方法开启一个线程这个新运行的线程最终会执行FutureTask的run方法。
public T FutureT submit(CallableT task) {if (task null) throw new NullPointerException();RunnableFutureT ftask newTaskFor(task);execute(ftask);return ftask;
}可以简单优化下直接用一个线程演示该案例这样看着更好理解些当时生产上是不会有这样直接用一个线程来执行的更多是通过原生线程池——
public static void main(String[] args) throws Exception{Callable callable new MyCallable();RunnableFutureString ftask new FutureTaskString(callable);new Thread(ftask).start();System.out.println(打印线程池返回值 ftask.get());
}