线程和线程池的理解
线程有个异常叫InterruptedException
,这个异常是在调用方法interrupt
方法触发的。不是每次调用这个方法都能触发这个异常,条件是:
- 线程处于阻塞态
- 线程处于限期等待
- 线程处于无限期等待
不能中断I/O阻塞和synchroized锁阻塞。还有就是一个线程你调用了无限循环,这个时候你调用interrupt方法也是没办法使线程提前结束的。但是你可以使用interrupt给线程设上的标记用来停止线程。比如这样:
1 | public class InterruptExample { |
线程池的中断操作:
调用shutDown()
方法会阻止接收新的任务,但是会继续执行已经提交的任务。但是调用shutDownNow
方法,则相当于调用线程的interrupt
方法。
使用锁有synchronized
和ReentrantLock
,两者区别是RenntrantLock
可以中断,还是公平锁。
线程池
是否新建线程的流程:
graph TD 提交任务 ==> 核心线程池是否已经满 核心线程池是否已经满 ==否==> 创建线程执行任务 核心线程池是否已经满 ==是==> 队列是否已经满了 队列是否已经满了 ==否==> 将任务存储在队列里面 队列是否已经满了 ==是==> 线程池是否已经满了 线程池是否已经满了 ==否==> 创建线程执行任务 线程池是否已经满了 ==是==> 按照策略处理无法执行的任务
线程池可以设定线程的空闲的存活时间keepAliveTime
,当一个线程空闲下来后,超过keepAliveTime
时间,线程池会判断当前运行的线程数量是否大于核心线程数量,如果超过了,那么空闲的线程就会被停掉。所以所有任务都完成后,线程池里面的线程数都会收缩到核心线程数大小。注意:当设置了allowCoreThreadTimeOut
不关心线程存活时间,那么在线程空闲下来后,不会判读空闲线程是否超过核心线程数就直接被停掉。所以最终线程数=0
线程池里面的BlockingQueue
有几种往队列加数据的方法:
抛出异常 | 特殊值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | add | offer | put | offer(e,time.unit) |
移除 | remove | poll | take | poll(time,unit) |
检测 | element | peek | 不可用 | 不可用 |
- 抛出异常:当队列满了,插入的新值插不进去了就会立马抛出一个异常。
- 特殊值:当插入数据失败,返回一个boolean值,true成功,false失败。然后拿SynchronousQueue调用这个方法的时候,必须有另一个线程执行了取值的操作,这个offer才会返回true。
- 阻塞:一个线程插入一个数据的时候,必须等到另一个线程执行取数据的操作,不然就一直阻塞当前线程。
- 超时:插入数据失败在阻塞一个规定的时间。
关于使用SynchronousQueue.offer
演示:
1 | SynchronousQueue<String> queue = new SynchronousQueue<>(); |
运行这个代码你可以发现,当没调用take这个取值的操作,我们offer返回的结果一直都是false(这个现象只针对SynchronousQueue阻塞队列)。
讲一下我对线程池实现的思路分析,线程池里面有个核心线程,非核心线程和任务存储的数据结构。为什么会出现核心线程这个东西,因为线程的开启和关闭开销是很大的,所以在有很多任务需要执行的时候,我们应该有一个办法来实现线程的重复使用。降低线程的频繁创建和关闭的次数。核心线程就是这个作用,核心线程是不会停止的,用阻塞的方式实现线程不会停止,然后当核心线程都在运行的时候,就把任务添加到我们的任务存储的数据结构里面,核心线程一直从任务存储的数据结构里面拿去新任务。当任务存储结构满了,就需要开辟临时线程了,临时线程我们可以给他设置过期时间,当处于空闲后超过过期时间那么这个临时线程就可以停止了。
现在看下基于上面的思路是怎么实现的:
现在来分析ThreadPoolExecutor
线程池源码:
1 | public void execute(Runnable command) { |
现在进入只要的addWorker方法:
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
这里很有必要看下我们的Worker
类:
可以看Worker
类继承了Runable
,然后在构造函数里面创建了线程Thread
并且将自己传了进去自己,一个worker就是一个任务。所以在调用Thread.start的时候,会调用到Worker
的run
方法。
1 | private final class Worker |
在runWorker
方法里面实现了核心线程的不停运行:
可以看到在runWorker
方法里面有一个while
循环,有了while循环我们就知道了,如果getTask方法返回的值如果不是null,那么这个while循环就会一直为true,那我们的线程不就一直运行下去了吗。哈哈
1 | final void runWorker(Worker w) { |
现在来看下getTask
方法:
在下面我标记1和2的位置,1的位置用来判断当前线程数是否大于核心线程数:
- 小于的话代表当前线程是核心线程。那么就调用workQueue.take()方法。(在最上面我写了take方法是一个阻塞方法,他会阻塞当前的线程直到其他线程调用了put或者offer方法)。这也就实现了核心线程的永不死亡。
- 如果不是核心线程就调用workQueue.poll(timeOut,TIME_TYPE)。这个方法实现了延迟线程的死亡。timeOut的值是我们在创建线程池的时候自己设定的。所以非核心线程是会死亡的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//1
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
//2 这里是关键。实现了核心线程不死,非核心线程可以死亡。
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
至此已经分析完了,核心线程不会死亡的原因。
在看了整个任务添加流程(addWork)方法。我们可以看到这个方法返回的是一个boolean变量。用来判读任务添加失败和成功。那什么情况会失败呢?
线程池有五种状态:他们数值依次变大
int COUNT__BITS = Interger.SIZE - 3
- RUNNING (int RUNNIT = -1 << COUNT_BITS)
- SHUTDOWN (int SHUTDOWN = 0 << COUNT_BITS)
- STOP (int STOP = 1 << COUNT_BITS)
- TIDYING (int TIDYING = 2 << COUNT_BITS)
- TERMINATED (int TERMINATED = 3 << COUNT_BITS)
addWorker
方法false
的情况:
- 线程池当前状态 大于等于SHUTDOWN状态 并且 线程池当前状态大于等于STOP状态 或者 是任务不等于null 或者 队列为空 就返回false。返回false就是拒绝的意思。
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
- 线程数超过设置的最大线程数,就拒绝。
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
- 当线程池处于SHUTDOWN状态并且队列里面已经存在之前已经添加的但还没执行的相同的任务。
1 | if (! isRunning(recheck) && remove(command)) |
- 事物存储的数据结构存不下新的事物的时候,然后非核心线程也到了上限值。
1 | // offer:往队列里面添加数据,添加数据失败则返回false |
这里讲到了拒绝,那就在分析下线程池的拒绝策略:
java官方实现的拒绝策略有四种:
classDiagram RejectedExecutionHandler <|-- abortpolicy rejectedexecutionhandler <|-- callerrunspolicy discardoldestpolicy discardpolicy rejectedexecutionhandler: +regectedexecution()< pre>
- AbortPolicy终止策略:直接抛出异常RejectedxecutionException
- CallerRunsPolicy调用者运行策略:除非线程池现在处于shutDown状态,那么被拒绝的任务将由调用者线程自己处理。
- DiscardOldestPolicy如果线程池没shutDown,就丢弃队列里面最久没执行的事物,然后将当前任务添加进线程池里面。
- DiscardPolicy默默丢弃任务无任何感知。
当然我们可以继承自
RejectedExecutionHandler
实现recectedExecution
方法。
shutDown和shutDownNow的区别:
- shutDown中断空前的workers。没有任何返回。
- showDownNow中断所有的workders,并且返回所有的任务队列里面的任务
java提供了几个默认的线程池:
- newCachedThreadPool:该线程池可以无限扩展,当需求增加时会自动添加新的线程,当需求降低时,会自动回收空闲线程。通常用来执行许多短期异步任务的程序性能,
1
2
3
4
5public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
} - newScheduledThreadPool创建一个以延迟或者定时来执行任务的线程池,工作队列为DelayedWorkQueue.适用于需要多个后台线程执行的周期性任务。
- newSingleThreadExecutor只有一个线程的线程池,用来执行需要要保证顺序的任务场景。
一篇挺好的分析线程池源码的文章:文章1
创建线程的几种方式:
- 继承thread
1
2
3
4
5
6
7
8
9
10
11
12public class MyThread extends Thread{//继承Thread类
public void run(){
//重写run方法
}
}
public class Main {
public static void main(String[] args){
new MyThread().start();//创建并启动线程
}
} - 继承Runable
1
2
3
4
5
6
7
8
9
10
11
12
13public class MyRunable implements Runnable {//实现Runnable接口
public void run(){
//重写run方法
}
}
public class Main {
public static void main(String[] args){
//创建并启动线程
MyRunable myRunable=new MyRunable();
Thread thread=new Thread(myRunable);
thread().start();
} - 使用Callable和FutureTask来创建futureTask.get方法会阻塞当前正在运行的线程.
1
2
3
4
5
6
7
8
9
10
11
12FutureTask futureTask = new FutureTask(new Callable() {
public Object call() throws Exception {
Thread.sleep(1000);
System.out.println("执行任务");
return "线程里的返回值";
}
});
Thread thread = new Thread(futureTask);
thread.start();
System.out.println("==:" + futureTask.get());
System.out.println("结束了"); - 使用线程池:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17ExecutorService mFixedPool = Executors.newFixedThreadPool(4);
Future future = mFixedPool.submit(new Callable<Object>() {
public Object call() throws Exception {
System.out.println("在子线程执行任务");
return "我是返回值";
}
});
try {
System.out.println("拿到返回值:"+future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("一切都结束了");
mFixedPool.shutdown();
实现等待所有线程结束在继续运行的实现有如下几种方式:
- 使用CountDownLatch
- 使用join
比如main线程想等thread1线程结束了,main在结束,那么在main线程只需要调用thread1.join就行.
关于线程状态控制的方法的记忆要这么记忆:
第一组: wait / notify / notifyAll
第二组: join / sleep / yield
join / sleep / yield : 他们会自动到达某个条件就苏醒过来.而 wait 不会,他需要你触发 notify notifyAll 才可以. 那 sleep 和 wait 的区别是,sleep他会阻塞当前线程,并且不会释放对象锁,而wait让当前线程处于等待状态,并且释放对象锁.
1 | public class SleepDemo { |