企业彩铃制作网站,环境设计专业就业方向,天津市今天新闻头条,做网站ddos攻击文章目录 一、Executor接口二、ExecutorService接口三、ThreadPoolExecutor类1、状态2、Worker3、扩展 四、ForkJoinPool类1、工作窃取算法2、Fork/Join的设计3、执行原理 五、ScheduledThreadPool类1、ScheduledExecutorService2、比较Timer 六、Executors类 Executor 框架是 … 文章目录 一、Executor接口二、ExecutorService接口三、ThreadPoolExecutor类1、状态2、Worker3、扩展 四、ForkJoinPool类1、工作窃取算法2、Fork/Join的设计3、执行原理 五、ScheduledThreadPool类1、ScheduledExecutorService2、比较Timer 六、Executors类 Executor 框架是 Java5 之后引进的在 Java 5 之后通过 Executor 来启动线程比使用 Thread 的 start 方法更好除了更易管理效率更好用线程池实现节约开销。 Executor 框架不仅包括了线程池的管理还提供了线程工厂、队列以及拒绝策略等Executor 框架让并发编程变得更加简单。
Executor框架的组成
任务Runnable/Callable任务通过Runnable接口或Callable接口进行定义。Runnable接口或Callable接口实现类都可以被 ThreadPoolExecutor执行任务的执行Executor任务执行机制的核心接口 Executor以及继承自Executor接口的ExecutorService接口。ThreadPoolExecutor实现了ExecutorService接口异步的计算结果FutureFuture接口以及Future接口的实现类FutureTask类都可以代表异步计算的结果。当我们把Runnable接口或Callable接口的实现类提交给ThreadPoolExecutor执行后就会返回一个Future对象
一、Executor接口
线程池简化了线程的管理工作 并且JUC提供了一种灵活的线程池实现来作为Executor框架的一部分。在Java类库中任务执行的主要抽象不是Thread而是ExecutorExecutor只定义了execute一个方法是最顶层的接口。 /*** Executes the given command at some time in the future. The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {code Executor} implementation.** param command the runnable task* throws RejectedExecutionException if this task cannot be* accepted for execution* throws NullPointerException if command is null*/void execute(Runnable command);execute方法接受一个Runnable参数这个方法定义为在未来的某个时间执行传入的方法方法的运行可以在一个新的线程在线程池或者在调用的线程中该方法无法接收到线程的执行结果当然Runnable接口本身也没有返回值。
虽然Executor是个简单的接口但它却为灵活且强大的异步任务执行框架提供了基础该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来并用Runnable来表示任务。
Executor的实现还提供了对生命周期的支持以及统计信息收集、应用程序管理机制和性能监视等机制。
**Executor基于生产者-消费者模式提交任务的操作相当于生产者执行任务的线程相当于消费者。**如果要在程序中实现一个生产者-消费者的设计那么最简单的方式就是使用Executor。
二、ExecutorService接口
Executor框架使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象它的run方法执行任务后不能返回一个值或者抛出一个受检查的异常。许多任务实际上都是存在延迟的计算——执行数据库查询从网络上获取资源或者计算某个复杂的功能。对于这些任务Callable是一种更好的抽象它认为主入口点(call)将返回一个值并可能抛出一个异常。
因此引入了ExecutorService它继承自Executor并且在其基础上增加了很多功能可以生成用于跟踪一个或多个异步任务进度的Future的方法以解决Executor的局限性。 Future表示一个任务的生命周期并且提供了响应的方法来判断是否已经完成或取消以及获取任务的结果和取消任务等。 在Future规范中包含的隐含意义是任务的生命周期只能前进不能后退就像ExecutorService的生命周期一样。当某个任务完成后它就永远停留在完成状态。 get方法的行为取决于任务的状态尚未开始、正在运行、已完成。如果任务已完成那么get会立即返回或者抛出一个Exception如果任务没有完成那么get将阻塞并直到任务完成。如果任务抛出了一场那么get将该异常封装为ExecutionException并重新抛出。如果任务被取消那么get将抛出CancellationException。如果get抛出了ExecutionException那么可以通过getCause来获得被封装的初始异常。 定义了以下方法 void shutdown()启动有序关机其中执行先前提交的任务但不接受新任务。如果调用已经关闭则没有额外的效果。不会阻塞等待先前提交的任务执行完成 ListRunnable shutdownNow()尝试停止所有正在执行的任务停止对等待任务的处理并返回等待执行的任务列表。不会阻塞等待正在执行的任务终止 只是尽最大努力停止处理正在执行的任务之外没有任何保证。例如典型的实现将通过Thread.interrupt取消因此任何未能响应中断的任务可能永远不会终止 boolean isShutdown()如果此执行器已关闭调用了关闭方法则返回true boolean isTerminated()如果关闭后所有任务都已完成则返回true。注意除非先调用shutdown或shutdownNow否则isTerminated永远不会为真 boolean awaitTermination(long timeout, TimeUnit unit)阻塞直到所有任务在关机请求后完成执行或者超时发生或者当前线程被中断以先发生的为准 T FutureT submit(Callablet task)提交一个带返回值的任务以供执行并返回表示该任务的挂起结果的Future。Future的get方法将在成功完成任务时返回任务的结果 T FutureT submit(Runnable task, T result)提交可运行任务以供执行并返回表示该任务的Future。Future的get方法将在成功完成时返回给定的结果 Future? submit(Runnable task)提交可运行任务以供执行并返回表示该任务的Future。Future的get方法将在成功完成时返回null T Listfuturet invokeAll(Collection? extends CallableT tasks)执行给定的任务并在所有任务完成时返回保存其状态和结果的future列表。每个Future对象的isDone方法都会返回true。请注意已完成的任务可以正常终止也可以抛出异常终止。如果在执行此操作时修改了给定的集合则此方法的结果是未定义的 T T invokeAny(Collection? extends CallableT tasks)执行给定的任务如果有成功完成的任务则返回成功完成的任务的结果(即不抛出异常)。在正常或异常返回时未完成的任务将被取消。如果在执行此操作时修改了给定的集合则此方法的结果是未定义的。
三、ThreadPoolExecutor类
ThreadPoolExecutor实现了ExecutorService实际上是继承了AbstractExecutorService为了在广泛的上下文中发挥作用该类提供了许多可调参数和可扩展性挂钩
corePoolSize指定了线程池中的线程数量它的数量决定了添加的任务是开辟新的线程去执行还是放到workQueue任务队列中去maximumPoolSize指定了线程池中的最大线程数量这个参数会根据你使用的workQueue任务队列的类型决定线程池会开辟的最大线程数量keepAliveTime当线程池中空闲线程数量超过corePoolSize时多余的线程救急线程会在多长时间内被销毁unitkeepAliveTime的单位workQueue任务队列被添加到线程池中但尚未被执行的任务它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种threadFactory线程工厂用于创建线程一般用默认即可handler拒绝策略当任务太多来不及处理时如何拒绝任务
1、状态
RUNNING(-129)接受新任务并且处理排队任务SHUTDOWN(029)不接受新任务但是处理排队任务STOP(129)不接受新任务也不处理排队任务同时中断处理中的任务TYDING(229)所有任务被终止工作线程数为0之后进入该状态并且会执行terminated()钩子函数TERMINATED(329)terminated()钩子函数执行完毕后
状态单调地随时间增加但不需要达到每个状态。
RUNNING-SHUTDOWN调用shutdown()RUNNING/SHUTDOWN-STOP调用shutdownNow()STOP-TYDING当队列和池都为空时TIDYING - TERMINATEDterminated()钩子函数执行完毕后
ThreadPoolExecutor中对于状态的记录保存在一个AtomicInteger类型的变量中其中高三位就是用于记录线程池的状态而低的29位用于记录线程数量。
2、Worker
ThreadPoolExecutor中定义了一个私有静态类Worker其继承自AbstractQueuedSynchronizer类并实现了Runnable接口。其中维护了线程实例Thread、任务实例Runnable、线程任务计数器long变量。
这个类适当地扩展了AbstractQueuedSynchronizer以简化获取和释放围绕每个任务执行的锁。这可以防止中断这些中断旨在唤醒等待任务的工作线程而不是中断正在运行的任务。我们实现了一个简单的不可重入互斥锁而不是使用ReentrantLock因为我们不希望工作任务在调用setCorePoolSize等池控制方法时能够重新获得锁。此外为了在线程实际开始运行任务之前抑制中断我们将锁状态初始化为负值并在启动时(在runWorker中)清除它。
3、扩展
该类还定义了三个protected类型的钩子函数
beforeExecute线程池任务运行前执行afterExecute线程池任务运行后执行terminated线程池退出后执行
这几个方法在ThreadPoolExecutor中为空实现。
四、ForkJoinPool类
Fork/Join框架是Java7提供的一个用于并行执行任务的框架是一个把大任务分割成若干个小任务最终汇总每个小任务结果后得到大任务结果的框架。
Fork就是把一个大任务切分为若干个子任务并行的执行Join就是合并这些子任务的执行结果最后得到这个大任务的结果。比如计算12…10000可以分割成10个子任务每个子任务分别对1000个数进行求和最终汇总这10个子任务的结果。
1、工作窃取算法
ForkJoinPool是运行ForkJoinTasks的ExecutorService。**ForkJoinPool与其他类型的ExecutorService的区别主要在于使用了工作窃取。工作窃取算法是指某个线程从其他队列里窃取任务来执行。**那么为什么要使用工作窃取算法呢**假如我们需要做一个比较大的任务可以把这个任务分割为若干个互不干扰的子任务为了减少线程间的竞争把这些子任务分别放到不同的队列里并为每个队列创建一个单独的线程来执行队列里的任务线程和队列一一对应。**比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着不如去帮其他线程干活于是它就去其他线程的队列里窃取一个任务来执行。而这时它们会访问同一个队列所以为了减少窃取任务线程之间的竞争通常会使用双端队列被窃取任务线程永远从双端队列的头部拿任务而窃取任务的线程永远从双端队列的尾部拿任务执行。 工作窃取算法的优点充分利用线程进行并行计算减少了线程间的竞争 工作窃取算法的缺点在某些情况下还是存在竞争比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源比如创建多个线程和多个双端队列。 2、Fork/Join的设计
想要设计一个Fork/Join框架需要完成两个步骤
分割任务需要有一个fork类来把大任务分割成子任务有可能子任务还是很大所以还需要不停地分割直到分割出的子任务足够小执行任务并合并结果分割的子任务分别放在双端队列里然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里启动一个线程从队列里拿数据然后合并这些数据。
Fork/Join使用两个类来完成以上两件事情 ForkJoinTask抽象类我们要使用ForkJoin框架必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。通常情况下我们不需要直接继承ForkJoinTask类只需要继承它的子类Fork/Join框架提供了以下两个子类 RecursiveAction抽象类用于没有返回结果的任务RecursiveTask抽象类用于有返回结果的任务 ForkJoinPoolForkJoinTask需要通过ForkJoinPool来执行
任务分割出的子任务会添加到当前工作线程所维护的双端队列中进入队列的头部。当一个工作线程的队列里暂时没有任务时它会随机从其他工作线程的队列的尾部获取一个线程。
public class CountTask extends RecursiveTaskInteger {private int start;private int end;public CountTask(int start, int end) {this.start start;this.end end;}Overrideprotected Integer compute() {int sum 0;// 如果任务足够小就计算任务boolean canCompute (end - start) THRESHOLD;if (canCompute) {for (int i start; i end; i) {sum i;}} else {// 如果任务大于阈值就分裂成两个子任务计算int middle (start end) / 2;CountTask leftTask new CountTask(start, middle);CountTask rightTask new CountTask(middle 1, end);// 执行子任务leftTask.fork();rightTask.fork();// 等待子任务执行完并得到其结果int leftResult leftTask.join();int rightResult rightTask.join();// 合并子任务sum leftResult rightResult;}return sum;}public static void main(String[] args) {ForkJoinPool forkJoinPool new ForkJoinPool();CountTask task new CountTask(1, 4);// 执行一个任务FutureInteger result forkJoinPool.submit(task);try {System.out.println(result.get());} catch (InterruptedException e) {} catch (ExecutionException e) {}}
}RecursiveTask需要实现compute方法在这个方法里首先需要判断任务是否足够小如果足够小就直接执行任务。如果不足够小就必须分割成两个子任务每个子任务在调用fork方法时又会进入compute方法。最后使用join方法等待子任务执行完成并得到其结果。 ForkJoinTask在执行的时候可能会抛出异常但是我们没办法在主线程里直接捕获异常所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了并且可以通过ForkJoinTask的getException方法来获取异常。 getException方法返回Throwable对象如果任务被取消了则返回CancellationException如果任务没有完成或者没有抛出异常则返回null。 3、执行原理
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务而ForkJoinWorkerThread数组负责执行这些任务。
当我们调用ForkJoinTask的fork方法时程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务然后立即返回结果。
pushTask方法把当前任务存放在ForkJoinTask数组队列里然后再调用ForkJoinPool的signalWork反复噶唤醒或创建一个工作线程来执行任务。
五、ScheduledThreadPool类
ScheduledThreadPoolExecutor主要用来在给定的延迟后运行任务或者定期执行任务。ScheduledThreadPoolExecutor使用任务队列DelayQueue封装了一个PriorityQueuePriorityQueue会对队列中的任务进行排序执行所需时间短的放在前面先被执行(ScheduledFutureTask的time变量小的先执行)如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask的squenceNumber变量小的先执行)。
1、ScheduledExecutorService
ScheduledThreadPool类继承自ThreadPoolExecutor并且实现了ScheduledExecutorService接口。
ScheduledExecutorService接口定义了几个方法
ScheduleFuture? schedule(Runnable command, long delay, TimeUnit unit)提交在给定延迟后启用的一次性任务ScheduleFuture schedule(Callable command, long delay, TimeUnit unit)提交在给定延迟之后启用的带返回值的一次性任务ScheduleFuture? scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)提交一个周期性任务任务开始时进行计时如果任务执行时间过长甚至超过period时间会导致任务连续执行ScheduleFuture? scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)提交一个周期性任务任务执行完成之后才进行计时 ScheduledFuture继承自Delayed接口和Future接口自己本身没有定义新的方法。 Delayed接口是一个混合风格的接口用于标记应该在给定延迟后执行的对象。这个接口定义了一个getDelay方法用于返回剩余的延迟时间。此外这个接口继承了Comparable接口意味着此接口的实现必须定义一个compareTo方法该方法提供与其getDelay方法一致的排序 2、比较Timer
Timer对系统时钟的变化敏感ScheduledThreadPoolExecutor不是Timer只有一个执行线程因此长时间运行的任务可以延迟其他任务。 ScheduledThreadPoolExecutor可以配置任意数量的线程。 此外如果你想通过提供 ThreadFactory你可以完全控制创建的线程在TimerTask中抛出的运行时异常会杀死一个线程从而导致 Timer 死机即计划任务将不再运行。ScheduledThreadExecutor不仅捕获运行时异常还允许您在需要时处理它们通过重写afterExecute方法ThreadPoolExecutor。抛出异常的任务将被取消但其他任务将继续运行
六、Executors类
Executors是Java中用于创建线程池的工厂类它提供了一系列的静态工厂方法用于创建不同类型的线程池。这些工厂方法隐藏了线程池的复杂性使得线程池的创建变得非常简单。Executors工厂类提供的线程池有以下几种类型
newCachedThreadPool()CachedThreadPool的corePoolSize 被设置为0maximumPoolSize被设置为Integer.MAX.VALUE即它是无界的这也就意味着如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时CachedThreadPool会不断创建新的线程。极端情况下这样会导致耗尽 cpu和内存资源newFixedThreadPool(int nThreads)创建一个固定大小的线程池其中包含指定数量的线程。线程数量是固定的不会自动扩展即没有救急线程newSingleThreadExecutor()创建一个单线程的线程池。这个线程池中只包含一个线程用于串行执行任务。适用于需要按顺序执行任务的场景newScheduledThreadPool(int corePoolSize)创建一个固定大小的线程池用于定时执行任务。线程数量固定不会自动扩展。适用于定时执行任务的场景newSingleThreadScheduledExecutor()创建一个单线程的定时执行线程池。只包含一个线程用于串行定时执行任务newWorkStealingPool(int parallelism)该线程池维护足够的线程以支持给定的并行级别并且可以使用多个队列来减少争用。并行性级别对应于积极参与或可用参与任务处理的最大线程数。实际的线程数可以动态地增加和减少。工作窃取池不能保证所提交任务的执行顺序
除此之外还提供了创建ThreadFactory实例将Runnable实例转换为Callable实例等方法。