抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

线程池的使用场景

1.服务器接收大量请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁的次数
从而提高服务器的工作效率
2.在实际开发中,如果需要创建5个以上的线程,就可以使用线程池了

线程池构造方法参数

avatar
corePoolSize指的是核心线程数
keepAliveTime:如果线程池当前线程数多于corePoolSize,那么如果多余的线程空闲时间超过keepAliveTime,他们就会终止
ThreadFactory:用来创建线程
1.默认使用Executors.defaultThreadFactory()
2.创建出来的线程都在同一个线程组里
3.如果自己指定ThreadFactory,那么就可以改变线程名,线程组,优先级,是否守护线程等。
workQueue:工作队列
有三种常见的队列类型
1.直接交换:SynchronousQueue 这种队列不存储任务,所以尽量把maxPoolSize设置大一些
2.无界队列:LinkedBlockingQueue 队列存储没有界限
3.有界队列:ArrayBlockingQueue

corePoolSize和maxPoolSize的关系

corePoolSize是核心线程数量,maxPoolSize是最大线程数量,当核心线程数量达到
核心线程数的阈值时会继续创建线程来执行任务,直到达到最大线程数量。
他们之间线程的添加的规则:
1.如果线程数少于核心线程数,会创建一个线程执行任务。
2.当线程数达到corePoolSize但是少于maxPoolSize,则将任务存储在workQueue中
3.如果队列已经满了,并且线程数少于maxPoolSize,则会创建一个新的线程数
4.如果队列已经满,并且线程数大于等于maxPoolSize,开始拒绝策略。

newFixedThreadPool

1.newFixedThreadPool易造成大量内存占用,可能导致OOM

1
2
3
4
5
6
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

newFixedThreadPool线程池的核心线程数是固定的,它使用了近乎于无界的LinkedBlockingQueue阻塞队列。当核心线程用完后,任务会入队到阻塞队列,如果任务执行的时间比较长,没有释放,会导致越来越多的任务堆积到阻塞队列,最后导致机器的内存使用不停的飙升,造成JVM OOM。

newSingleThreadExecutor

newSingleThreadExecutor 当请求堆积的时候,可能会造成占用大量内存(和newFixedThreadPool相似)

newCacheThreadPool

源码:

1
2
3
public static ExecutorService newCachedThreadPool(){
return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>());
}

弊端在于第二个参数maximumPoolSize被设置了Integer.MAX_VALUE,这可能会创建数量非常多的线程甚至导致OOM

ScheduledThreadPool

支持定时及周期性任务执行的线程池
源码

1
2
3
4
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

停止线程池

1.shutdown
特点:正在执行的任务和存储在队列的任务执行完后结束
2.isShutdown
判读线程是否进入停止状态
3.isTerminated
判读整个程序不仅开始停止而且已经执行完毕
4.awaitTermination
测试一段时间检测线程的任务是否执行完毕
5.shutdownNow

线程池的拒绝时机

1.当Executor关闭时,提交新任务会被拒绝(当已经执行力shutdown后)
2.以及当前Executor对最大线程的工作队列容量使用有界边界并且饱和
拒绝策略:
AbortPolicy(抛出异常)
DiscardPolicy(任务丢弃)
DiscardOldestPolicy(丢弃最老最久的任务)
CallerRunsPolicy(谁提交谁执行)

线程池的钩子方法

线程池的钩子方法的作用主要用于:每个任务执行前后,做日志处理和统计
钩子方法:

1
2
3
protected void beforeExecute(Thread t, Runnable r) { } // 任务执行前
protected void afterExecute(Runnable r, Throwable t) { } // 任务执行后
protected void terminated() { } // 线程池执行结束后

示例:使用beforeExecute()辅助实现线程池的暂停与恢复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.silkage.threadpool;

import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/*
*描述:演示每个任务执行前后放钩子函数
*
* */
public class PauseableThreadPool extends ThreadPoolExecutor {

private boolean isPaused;
private final ReentrantLock lock = new ReentrantLock();
private Condition unpaused = lock.newCondition();

public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}

public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}

public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
//钩子函数
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
lock.unlock();
}

}

private void pause(){
lock.lock();
try {
isPaused = true;
}finally {
lock.unlock();
}
}
public void resume(){
lock.lock();
try {
isPaused = false;
unpaused.signalAll();
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
lock.unlock();
}
}

public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被执行");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(1500);
pauseableThreadPool.pause();
System.out.println("线程池被暂停了");
Thread.sleep(1500);
pauseableThreadPool.resume();
System.out.println("恢复了");
}
}

线程池的状态

1.running 接收新任务并处理排队任务
2.shutdown 不接受新任务,但处理排队任务
3.stop 不接受新任务,也不处理排队任务,并中断进行的任务
4.tidying 所有任务都已经终止,workerCount为零时,线程会转换到tidying状态,并将运行terminate()钩子方法
5.terminated terminate()运行完成

线程池的组成部分

线程池管理器
工作线程
任务队列
任务接口

线程池、ThreadPoolExecutor、ExecutorService、Executor、Executors的关系

ThreadPoolExecutor继承于AbstractExecutorService
而AbstractExecutorService继承于ExecutorService(interface)
ExecutorService(interface)继承于Executor(interface)

使用线程池

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/*
* 描述:演示newFixedThreadPool
* */
public class FixedThreadPoolTest {
public static void main(String[] args) {
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(4);
for (int i = 0; i < 1000; i++) {
//执行线程
executorService.execute(new Task());
}
}
}
class Task implements Runnable{

@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
}
}