java并发-线程池Executor框架

更新时间:2024-01-22 00:23:01 阅读量: 教育文库 文档下载

说明:文章内容仅供预览,部分内容可能不全。下载后的文档,内容与下面显示的完全一致。下载之前请确认下面内容是否您想要的,是否完整无缺。

在面向对象编程中,创建和销毁对象是很浪费时间的,因为创建一个对象要获取内存资源或者更多资源。在java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。

所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些池化资源技术产生的原因 线程池的优点:

1.重用线程池中的线程,减少因对象创建,销毁所带来的性能开销

2.能有效控制线程的最大并发数,提高系统资源利用率,同时避免过多的资源竞争,避免堵塞 3.能够对多线程进行简单有效的管理,使线程的使用简单、高效

若采用\为每个任务分配一个线程\的方式会存在一些缺陷,尤其是当需要创建大量线程时: 线程生命周期的开销非常高、资源消耗、稳定性

任务是一组逻辑工作单元,线程则是使任务异步执行的机制。当存在大量并发任务时,创建、销毁线程需要很大的开销,运用线程池可以大大减小开销

线程池框架Executor

Java中的线程池是通过Executor框架实现的,Executor框架包括类:

Executor、Executors、ExecutorService、ThreadPoolExecutor、Callable、Future和Future Task 关系如下:

Executor所有线程池的接口,只有一个方法,该接口定义执行Runnable任务的方式 Public interface Executor{

Void execute(Runnable command); }

ExecutorService 增加Executor的行为,是Executor实现类的最直接的接口,该接口定义提供对Executor的服务

Executors 线程池工厂类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口

ScheduledExecutorService 定时调度接口。

AbstractExecutorService 执行框架抽象类。

ThreadPoolExecutor JDK中线程池的具体实现,一般用的各种线程池都是基于这个类实现的

ThreadPoolExecutor 线程池类

线程池是一个复杂的任务调度工具,它涉及到任务、线程池等的生命周期问题。要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的。

JDK中的线程池均由ThreadPoolExecutor类实现。其构造方法如下: Public ThreadPoolExecutor(int corePoolSIze,

Int maximumPoolSize, long keepAliveTime, TimeUnit unit,

BlockingQueue workQueue){

This(corePoolSize,maxmumPoolSize,keepAliveTime,unit,workQueue, Executors.defaultThreaFactory(),defaultHandler);

}

参数说明: corePoolSize:核心线程数,线程池中运行的线程数永远不会超过corePoolSize个,默认情况下一直存活,可以设置allowCoreThreadTimeOut为true,此时核心线程数就是0,此时keepAliveTime控制所有线程的超时时间

maximumPoolSize:线程池允许的最大大线程数

keepAliveTime:空闲线程结束的超时时间也叫线程存活时间,当线程数大于core数,那么超过该时间的线程将会被终结

Unit:是一个枚举,表示keepAliveTime的单位

keepAliveTime的单位, java.util.concurrent.TimeUnit类存在静态静态属性: NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS

workQueue: 阻塞队列是java.util.concurrent下的主要用来控制线程同步的工具,如果BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒。同样,如果BlockingQueue是满的,任何试图往里面存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间才会被唤醒继续操作。

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程,阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。具体实现类有:LinkedBlickingQueue,ArrayBlockingQueued等。一般其内部的都是通过Lock和Condition(显示锁(Lock)及Condition的学习与使用)来实现阻塞和唤醒

ThreadPoolExecutor的使用需要注意以下概念: 若线程池中的线程数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

若线程池中的线程数量等于 corePoolSize且缓冲队列 workQueue未满,则任务被放入缓冲队列。

若线程池中线程的数量大于corePoolSize且缓冲队列workQueue满,且线程池中的数量小于maximumPoolSize,则建新的线程来处理被添加的任务。

若线程池中线程的数量大于corePoolSize且缓冲队列workQueue满,且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。

当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止

线程池的工作过程:

1.线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行他们

2.当调用executor()方法添加一个任务时,线程池会做如下判断:

A.如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务

B.如果正在运行的线程数量大于或小于或等于corePoolSize,那么将这个任务放入队列

C.如果这时候队列满了,而且正在运行的数量小于maximumPoolSize,那么还是要创建非核心线 程立刻运行这个任务

D.如果队列满了,而且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会抛出异 常RejectExecutionException

E.当一个线程完成任务时,他会从队列中取下一个任务来执行

F.当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程会判断,如果当前运行的线程数 大于corePoolSize,那么这个线程就被停掉,所以线程池的所有任务完成后,它最终会收缩到 corePoolSize大小

Executors 工厂方法

JDK内部提供了五种最常见的线程池。由Executors类的五个静态工厂方法创建。

newFixedThreadPool(...)

newSingleThreadExecutor(...) newCachedThreadPool(...) newScheduledThreadPool(...)

newSingleThreadScheduledExecutor()

单线程的线程池 newSingleThreadExecutor:单个后台线程(其缓冲队列是无界的) 这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务 Public static ExecutorService newSingleThreadExecutor(){

Return new FinalizableDelegatedExecutorService(

new ThreadPoolExecutor(1,1,0L,TimeOut.MILLISECONDS, new LinkedBlockingQueue())); }

返回单线程的Executor,将多个任务交给此Exector时,这个线程处理完一个任务后接着处理下一个任务,若该线程出现异常,将会有一个新的线程来替代。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

创建固定大小的线程池 newFixedThreadPool:只有核心线程的线程池,大小固定(其缓冲队列是无界的)

Public static ExecutorService newFixedThreadPool(){

Return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(Runnable));

}

每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。 返回一个包含指定数目线程的线程池,如果任务数量多于线程数目,那么没有执行的任务必须等待,直到有任务完成为止。

可缓存的线程池 newCachedThreadPool:无界线程池,可以进行自动线程回收 Public static ExecutorService new CachedThreadPool(){

Return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS, new SyschronousQueue() ); }

如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。 SynchronousQueue是一个缓冲去为1的阻塞队列。

newCachedThreadPool方法创建的线程池可以自动的扩展线程池的容量。核心线程数量为0。 SynchronousQueue是个特殊的队列。 SynchronousQueue队列的容量为0。当试图为SynchronousQueue添加Runnable,则执行会失败。只有当一边从SynchronousQueue取数据,一边向SynchronousQueue添加数据才可以成功。SynchronousQueue仅仅起到数据交换的作用,并不保存线程。但newCachedThreadPool()方法没有线程上限。Runable添加到SynchronousQueue会被立刻取出。

根据用户的任务数创建相应的线程来处理,该线程池不会对线程数目加以限制,完全依赖于JVM能创建线程的数量,可能引起内存不足。

定时任务调度的线程池 newScheduledThreadPool:

创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。 例:

public class ScheduledThreadPoolTest { public static void main(String[] args) {

ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); ses.scheduleWithFixedDelay(new Runnable() {

@Override

public void run() { try {

Thread.sleep(1000);

} catch (InterruptedException e) { e.printStackTrace(); }

System.out.println(new Date()); }

}, 1000, 2000, TimeUnit.MILLISECONDS); } }

单线程的定时任务调度线程池 newSingleThreadScheduledExecutor

此线程池支持定时以及周期性执行任务的需求

线程池最常用的提交任务的方法有两种: executor:

ExecutorService.executor(Runnable runnable); submit:

FutureTask task=ExecutorService.submit(Runnable runnable); FutureTasktask=ExecutorService.submit(Runnable runnable); FuturnTasktask=ExecutorService.submit(Callable callable); Submit实现:

public Futuresubmit(Callabletask){

If(task==null)thow new NullPointerException(){

FutureTaskftask =newTaskFor(task); executor(ftask); Return ftask; } }

可以看出submit开启的是有返回结果的任务,会返回一个FutureTask对象,这样就能通过get()方法得到结果。Submit最终调用的也是executor(Runnable runnable),submit只是将callable对象或Runnable封装成一个FutureTask对象,因为FutureTask是个Runnable,所以可以在executor中执行。关于Callable对象和Runnable怎么封装成FuturnTask对象,见Callable和Future、FutureTask的使用。

Executor接口

Executor是一个线程执行接口。任务执行的主要抽象不是Thead,而是Executor。 public interface Executor{

void executor(Runnable command); }

Executor将任务的提交过程与执行过程解耦,并用Runnable来表示任务。执行的任务放入run方法中即可,将Runnable接口的实现类交给线程池的execute方法,作为它的一个参数。如果需要给任务传递参数,可以通过创建一个Runnable接口的实现类来完成。

Executor可以支持多种不同类型的任务执行策略。

Executor基于生产者消费者模式,提交任务的操作相当于生产者,执行任务的线程则相当于消费者

ExecutorService接口

线程池接口。ExecutorService在Executor的基础上增加了一些方法,其中有两个核心的方法: Future submit(Runnable task)

Future submit(Callable task)

这两个方法都是向线程池中提交任务,它们的区别在于Runnable在执行完毕后没有结果,Callable执行完毕后有一个结果。这在多个线程中传递状态和结果是非常有用的。另外他们的相同点在于都返回一个Future对象。Future对象可以阻塞线程直到运行完毕(获取结果,如果有的话),也可以取消任务执行,当然也能够检测任务是否被取消或者是否执行完毕。

本文来源:https://www.bwwdw.com/article/uypo.html

Top