Executor框架

3/8/2017来源:ASP.NET技巧人气:3522

结构组成

Executor主要由3个部分组成:

1. 任务。包括被执行任务需要的Runnable接口或者Callable接口 2. 任务的执行。包括执行任务的核心接口Executor以及继承自Executor的ExecutorService接口。Executor接口有两个关键类实现了ExecutorService接口的类:ThreadPoolExecutor和ScheduleThreadPoolExecutor 3. 异步计算的结果。包括接口Future和Future的实现类FutureTask 这里写图片描述 主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一个Runnable对象封装为一个Callable对象:Executors.callable(Runnable task)或者Executors.callable(Runnable task,Object resule)

ExecutorService执行任务的方法: (1)ExecutorService.execute(Runnable command) (2)ExecutorService.submit(Runnable task)或者ExecutorService.submit(Callable tasl)

ExecutorService.submit(),将返回一个实现 Future接口的对象,由于FutureTask实现了Runnable接口,因此可以直接创建FutureTask交给ExecutorService执行。

最后,主线程可以执行FuturTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruPPTIfRunning)来取消任务。

成员

主要成员包括:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口和Executors

ThreadPoolExecutor ThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建3种类型的ThreadPoolExecutor:SingleThreadPoolExecutor、FixedTHreadPool和CachedThreadPool FixedThreadPool:创建固定线程数,适用于负载比较重的服务器 SingleThreadExecutor:创建单个线程,适用于需要保证顺序的执行各个任务;并且任意时间点,不会有多个线程的应用场景 CachedThreadPool:可以根据需要创建新线程。CachedThreadPool是无界的线程池,适用与执行很多的短期异步任务的小程序或者负重较轻的服务器 ScheduledThreadPoolExecutor

通常适用Executors来创建ScheduledThreadPoolExecutor,Executors可以创建2种类型的ScheduledThreadPoolExecutor:ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor

ScheduledThreadPoolExecutor:包含若干个线程ScheduledThreadPoolExecutor。适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需要而线程后台线程数量的应用场景 SingleThreadScheduledExecutor:只包含一个线程的ScheduledThreadPoolExecutor。适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景。 Future接口

Future接口和实现Future接口的FutureTask类用来表示异步计算的结果。当我们把Runnable接口或Callable接口的实现类提交(submit)给ThreadPoolExecutor或ScheduledThreadPoolExecutor时,会返回一个实现了Future接口的对象。到目前的JDK8为止,返回的是一个FutureTask对象,但是在未来的Jdk版本汇总,有可能返回的就不是FutureTask了

Runnable接口和Callable接口 Runnable接口和Callable接口的实现类都可以被ThreadPoolExecutor或者ScheduledThreadPoolExecutor执行。他们之间的区别是Runnable不会返回结果,而Callable可以返回结果。

ThreadPoolExecutor详解

Executor框架最核心的类是ThreadPoolExecutor,它是线程池的实现类,主要有4个组件构成:

corePool:核心线程池的大小 maximumPool:最大线程池的大小 BlockingQueue:用来暂时保存任务的工作队列 RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或ThreadPoolExecutor已经饱和时(达到了最大线程池大小且工作队列已满),execute()方法将调用Handler。 工具类Executors可以创建3种类型的ThreadPoolExecutor:FixedThreadPool、SingleThreadExecutor、CachedThreadPool

FixedThreadPool详解

FixedThreadPool被称为可重用固定线程数的线程池。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }

当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后,多余的空闲线程将被终止。FixedThreadPool把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止

FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的 工作队列。使用无界队列后,当线程池的线程数达到corePoolSize后,新任务将在无界队列中等待,也就是说,来一个新任务就往无界队列中加,因此线程池的线程数不会超过corePoolSize。这将使maximumPoolSize和KeepAliveTime变成无效参数。

SingleThreadExecutor详解

public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }

SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1。SingleThreadExecutor跟FixedThreadPool一样都是使用无界队列LinkedBlockingQueue作为工作队列。

CachedThreadPool

CachedThreadPool是一个会根据需要创建新线程的线程池。

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

从上面的代码可以看出,CachedThreadPool的corePoolSize被设置为0,maximumPoolSize设置为Integer.MAX_VALUE,keepAliveTime设置为60L,说明空闲线程等待新任务的最长时间为60秒。

CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPoolSize是无界的。这意味着,如果主线程提交任务的速度高于maximumPoolSize中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

CachedThreadPool的execute()方法执行示意图: 这里写图片描述

(1)首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPoolSize中有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行 (2)当初始maximumPoolSize为空,或者maximumPoolSize中当前没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.ANANOSECONDS)。这时CachedThreadPool会创建一个新线程执行任务,execute()方法来执行完成 (3)在步骤2中,新创建的线程将任务执行完后,会执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS).这个poll操作会让空闲线程最多在SynchronousQueue中等待60s,如果60s内主线程提交了一个新任务,那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。。

ScheduledThreadPoolExecutor详解

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。主要用来在给定的延迟之后运行任务或者定期执行任务。ScheduledThreadPoolExecutor的功能与Timer类似,但ScheduledThreadPoolExecutor功能更强大更灵活。Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。

运行机制

这里写图片描述 DelayQueue是一个无界队列,所以maximumPoolSize在ScheduledThreadPoolExecutor中没有什么意义。

ScheduledThreadPoolExecutor的执行主要分为两部分 (1)当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFutur接口的ScheduledFutureTask (2)线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。

实现

ScheduledFutureTask主要包含3个成员变量:

/** 表示这个任务被添加到ScheduledThreadPoolExecutor中的序号 */ PRivate final long sequenceNumber; /**任务要被执行的具体时间 */ private long time; /** *表示任务执行的间隔周期 */ private final long period;

DelayQueue封装了一个PriorityQueue,会对队列中的ScheduledFutureTask进行排序。排序时,time小的排在前面。如果time相同,就比较sequenceNumber,sequenceNumber小的排在前面,也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行

这里写图片描述

上图是ScheduledThreadPoolExecutor中的线程1执行某个周期任务的4个步骤。 步骤1:线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务是指ScheduledFutureTask的time大于当前时间。 步骤2:线程1执行ScheduledFutureTask 步骤3:线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间。 步骤4:线程1把这个修改time之后的ScheduledFutureTask返回DelayQueue中(DelayQueue.add())

接下来看看DelayQueue.take()方法的实现:

/** * 检索并删除队列的头部,如有必要,直到具有可用的到期延迟的元素为止 * **/ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly();//获取锁 try { for (;;) { E first = q.peek(); if (first == null) available.await();//如果PriorityQueue为空,则在当前线程的Condition中等待 else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); //获取PriorityQueue中的头元素 first = null; //在等待时不保留引用 if (leader != null)//领导线程不为空,当前线程只能继续等待 available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread;//当前线程被确认为领导线程 try { available.awaitNanos(delay);//等待到延迟时间到达 } finally { if (leader == thisThread)//当前线程被唤醒后,交出leader地位,设置leader为空 leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal();//将等待时间最长的线程从Condition等待队列中移到获取锁的队列里,准备获取锁 lock.unlock(); } }

接下来看看DelayQueue.take()方法的实现:

public boolean add(E e) { return offer(e); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e);//把元素加入到PriorityQueue队列 if (q.peek() == e) {//如果PriorityQueue队列的头部元素是当前加入的元素,设置leader线程为null,唤醒Condition等待队列中的一个等待时间最长的线程进入到同步队列 leader = null; available.signal(); } return true; } finally { lock.unlock(); } }

PriorityQueue的offer()方法:

public boolean offer(E e) { if (e == null) throw new NullPointerException(); modCount++; int i = size;//PriorityQueue队列的元素数量 if (i >= queue.length) grow(i + 1);//如果很小时,length是size的双倍,如果size增大到大于等于length时,需要扩容50% size = i + 1; if (i == 0) queue[0] = e; else siftUp(i, e); return true; }

FutureTask详解

简介

FutureTask除了实现Future接口外,还实现了Runnable接口,因此,FutureTask可以交给Executor执行。也可以调用线程执行FutureTask.run()。根据FutureTask.run()方法被执行的时机,FutureTask可以处于下面3中状态:

1. 未启动。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。 2. 已启动。FutureTask.run()方法被执行的过程中 3. 已完成。FutureTask.run()方法执行完后正常结束,或被取消FutureTask.cancel()或者执行run方法抛出异常而结束

当FuturTask处于未启动或已启动时,执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成时,执行FutureTask.get()方法时,将导致调用线程立即返回结果或者抛出异常。

当FutureTask处于未启动时,执行FutureTask.cancel()方法将导致任务永远不会被执行;如果是在已启动时,调用FutureTask.cancle(true)方法将已中断执行此任务线程的方式来试图停止任务;当FutureTask处于已启动状态时,执行FutureTask.cancle(false)不会对正在执行此任务的线程产生影响,当FutureTask处于已完成状态时,执行FutureTask.cancel方法将返回false

使用

应用场景:

1. Future用于异步获取执行结果或者取消任务。 2. 在高并发场景下确保任务只执行一次。

实例如下:

import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class FutureTaskDemo { private final FutureTask<Long> future = new FutureTask<Long>(new Callable<Long>() { @Override public Long call() throws Exception { Thread.currentThread().setName("Thread(3)"); System.out.println(Thread.currentThread().getName() + ":开始业务操作! "); try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ":业务逻辑执行结束!"); return Math.round(Math.random() * 1000);//返回一个随机数 } }); private final Thread loader = new Thread(future); public void start() { System.out.println(Thread.currentThread().getName() + ": 启动loader线程!"); loader.start();//启动loader线程 System.out.println(Thread.currentThread().getName() + ": loader线程已启动!"); } public Long get() { try { System.out.println(Thread.currentThread().getName() + ": 开始调用get方法"); long start = System.currentTimeMillis(); Long result = future.get();//调用FutureTask.get()方法 System.out.println(Thread.currentThread().getName() + ": 获取的结果为: " + result); System.out.println(Thread.currentThread().getName() + ": 消耗的时间为: " + (System.currentTimeMillis() - start)); return result; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ": got nothing"); return null; } public static void main(String[] args) { Thread.currentThread().setName("Thread(main)"); final FutureTaskDemo demo = new FutureTaskDemo(); demo.start(); new Thread(new Runnable() { @Override public void run() { Thread.currentThread().setName("Thread(1)"); System.out.println("尝试在延迟时间到达之前获取结果"); demo.get(); } }).start(); new Thread(new Runnable() { @Override public void run() { Thread.currentThread().setName("Thread(2)"); try { Thread.sleep(6000); System.out.println("尝试在延迟时间到达后获取结果 "); demo.get(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }

输出结果如下:

Thread(main): 启动loader线程! Thread(main): loader线程已启动! Thread(3):开始业务操作! 尝试在延迟时间到达之前获取结果 Thread(1): 开始调用get方法 Thread(3):业务逻辑执行结束! Thread(1): 获取的结果为: 580 Thread(1): 消耗的时间为: 4999 尝试在延迟时间到达后获取结果 Thread(2): 开始调用get方法 Thread(2): 获取的结果为: 580 Thread(2): 消耗的时间为: 0

通过上面的执行结果我们可以看出,当Thread3开始执行任务,但是还未执行完成时,去尝试获取执行结果,Thread1会阻塞直到Thread3执行完成后;如果Thread3执行完成后,去Thread2尝试获取执行结果,会立即返回

源码实现

成员变量:

/** * 此任务的运行状态,最初为NEW。 运行状态只在方法set,setException和cancel中转换到终端状态。 * 在完成期间,状态可以接受COMPLETING(当结果被设置时)或INTERRUPTING(仅在中断运行器以满足 * 取消(真)时)的瞬时值。 从这些中间状态到最终状态的转换使用更便宜的有序/延迟写入,因为值是唯一的,不能进一步修改。 **/ private volatile int state;

state的值如下:

private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;

可能的状态值变化: NEW -> COMPLETING -> NORMAL NEW -> COMPLETING -> EXCEPTIONAL NEW -> CANCELLED *NEW -> INTERRUPTING -> INTERRUPTED FutureTask.run()方法:

public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }

FuturTask.get()方法的实现:

public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }

FuturTask.cancel()方法的实现:

public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }