线程和线程池的理解

线程有个异常叫InterruptedException,这个异常是在调用方法interrupt方法触发的。不是每次调用这个方法都能触发这个异常,条件是:

  1. 线程处于阻塞态
  2. 线程处于限期等待
  3. 线程处于无限期等待

不能中断I/O阻塞和synchroized锁阻塞。还有就是一个线程你调用了无限循环,这个时候你调用interrupt方法也是没办法使线程提前结束的。但是你可以使用interrupt给线程设上的标记用来停止线程。比如这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class InterruptExample {

private static class MyThread2 extends Thread {
@Override
public void run() {
while (!interrupted()) {
// ..
}
System.out.println("Thread end");
}
}
}

线程池的中断操作:
调用shutDown()方法会阻止接收新的任务,但是会继续执行已经提交的任务。但是调用shutDownNow方法,则相当于调用线程的interrupt方法。


使用锁有synchronizedReentrantLock,两者区别是RenntrantLock可以中断,还是公平锁。

线程池

是否新建线程的流程:

graph TD
提交任务 ==> 核心线程池是否已经满 
核心线程池是否已经满 ==否==> 创建线程执行任务
核心线程池是否已经满 ==是==> 队列是否已经满了
队列是否已经满了 ==否==> 将任务存储在队列里面
队列是否已经满了 ==是==> 线程池是否已经满了
线程池是否已经满了 ==否==> 创建线程执行任务
线程池是否已经满了 ==是==> 按照策略处理无法执行的任务

线程池可以设定线程的空闲的存活时间keepAliveTime,当一个线程空闲下来后,超过keepAliveTime时间,线程池会判断当前运行的线程数量是否大于核心线程数量,如果超过了,那么空闲的线程就会被停掉。所以所有任务都完成后,线程池里面的线程数都会收缩到核心线程数大小。注意:当设置了allowCoreThreadTimeOut不关心线程存活时间,那么在线程空闲下来后,不会判读空闲线程是否超过核心线程数就直接被停掉。所以最终线程数=0

线程池里面的BlockingQueue有几种往队列加数据的方法:

抛出异常 特殊值 阻塞 超时
插入 add offer put offer(e,time.unit)
移除 remove poll take poll(time,unit)
检测 element peek 不可用 不可用
  • 抛出异常:当队列满了,插入的新值插不进去了就会立马抛出一个异常。
  • 特殊值:当插入数据失败,返回一个boolean值,true成功,false失败。然后拿SynchronousQueue调用这个方法的时候,必须有另一个线程执行了取值的操作,这个offer才会返回true。
  • 阻塞:一个线程插入一个数据的时候,必须等到另一个线程执行取数据的操作,不然就一直阻塞当前线程。
  • 超时:插入数据失败在阻塞一个规定的时间。

关于使用SynchronousQueue.offer演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
SynchronousQueue<String> queue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println("取值" + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("put 1");
System.out.println( "结果:"+queue.offer("hahah"));
System.out.println("hahaha");
}).start();

运行这个代码你可以发现,当没调用take这个取值的操作,我们offer返回的结果一直都是false(这个现象只针对SynchronousQueue阻塞队列)。


讲一下我对线程池实现的思路分析,线程池里面有个核心线程,非核心线程和任务存储的数据结构。为什么会出现核心线程这个东西,因为线程的开启和关闭开销是很大的,所以在有很多任务需要执行的时候,我们应该有一个办法来实现线程的重复使用。降低线程的频繁创建和关闭的次数。核心线程就是这个作用,核心线程是不会停止的,用阻塞的方式实现线程不会停止,然后当核心线程都在运行的时候,就把任务添加到我们的任务存储的数据结构里面,核心线程一直从任务存储的数据结构里面拿去新任务。当任务存储结构满了,就需要开辟临时线程了,临时线程我们可以给他设置过期时间,当处于空闲后超过过期时间那么这个临时线程就可以停止了。

现在看下基于上面的思路是怎么实现的:

现在来分析ThreadPoolExecutor线程池源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//1 创建我们的核心线程数量
if (workerCountOf(c) < corePoolSize) {
// addWorker的返回值用来判断新的任务是否添加成功
if (addWorker(command, true))
return;
c = ctl.get();
}
/**
*2 调用offer他是不会阻塞线程的,这个方法的返回值要看具体的实现类。
* 比如拿SyncronousQueue来说,这个队列比较特殊,他不存储任何数据,所以如果你只
* 调用offer,他的返回值只会是false。只有当你在其他线程调用了workQueue.take或者poll。这里才会返回true。
* workQueue.offer返回值的含义只代表任务是否添加进队列。添加不进去就代表需要开辟非核心线程了。
**/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3 线程池第一次提交任务并且线程数小于核心线程数的时候走的是这里。
else if (!addWorker(command, false))
reject(command);
}

现在进入只要的addWorker方法:

// core:是核心线程的话就为true 否则为false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
/**
*这里有个双循环,外层的循环是判断线程池是否是可运行状态
*第二层循环判断线程数量是否已经超过线程数量设定。
**/
for (int c = ctl.get();;) {
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;

for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 这里创建了Worker对象,你查看worker的构造函数,
// 你会发现在构造函数里面创建了线程thread。
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();

if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 开启了线程 这里很关键,哈哈。你第一次看到这里的时候一定蒙蔽,
// 这里直接调用了thread.start.那核心线程是怎么一直运行下去的还不会停止?
// 下面我会带你看worker类
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

这里很有必要看下我们的Worker类:

可以看Worker类继承了Runable,然后在构造函数里面创建了线程Thread并且将自己传了进去自己,一个worker就是一个任务。所以在调用Thread.start的时候,会调用到Workerrun方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}

runWorker方法里面实现了核心线程的不停运行:

可以看到在runWorker方法里面有一个while循环,有了while循环我们就知道了,如果getTask方法返回的值如果不是null,那么这个while循环就会一直为true,那我们的线程不就一直运行下去了吗。哈哈

1
2
3
4
5
6
7
8
9
10
final void runWorker(Worker w) {
Runnable task = w.firstTask;
try {
while (task != null || (task = getTask()) != null) {

}
} finally {
processWorkerExit(w, completedAbruptly);
}
}

现在来看下getTask方法:

在下面我标记1和2的位置,1的位置用来判断当前线程数是否大于核心线程数:

  1. 小于的话代表当前线程是核心线程。那么就调用workQueue.take()方法。(在最上面我写了take方法是一个阻塞方法,他会阻塞当前的线程直到其他线程调用了put或者offer方法)。这也就实现了核心线程的永不死亡。
  2. 如果不是核心线程就调用workQueue.poll(timeOut,TIME_TYPE)。这个方法实现了延迟线程的死亡。timeOut的值是我们在创建线程池的时候自己设定的。所以非核心线程是会死亡的。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    private Runnable getTask() {
    boolean timedOut = false;

    for (;;) {
    int c = ctl.get();
    if (runStateAtLeast(c, SHUTDOWN)
    && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
    decrementWorkerCount();
    return null;
    }

    int wc = workerCountOf(c);
    //1
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    if ((wc > maximumPoolSize || (timed && timedOut))
    && (wc > 1 || workQueue.isEmpty())) {
    if (compareAndDecrementWorkerCount(c))
    return null;
    continue;
    }

    //2 这里是关键。实现了核心线程不死,非核心线程可以死亡。
    try {
    Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();
    if (r != null)
    return r;
    timedOut = true;
    } catch (InterruptedException retry) {
    timedOut = false;
    }
    }
    }

至此已经分析完了,核心线程不会死亡的原因。

在看了整个任务添加流程(addWork)方法。我们可以看到这个方法返回的是一个boolean变量。用来判读任务添加失败和成功。那什么情况会失败呢?

线程池有五种状态:他们数值依次变大

int COUNT__BITS = Interger.SIZE - 3

  • RUNNING (int RUNNIT = -1 << COUNT_BITS)
  • SHUTDOWN (int SHUTDOWN = 0 << COUNT_BITS)
  • STOP (int STOP = 1 << COUNT_BITS)
  • TIDYING (int TIDYING = 2 << COUNT_BITS)
  • TERMINATED (int TERMINATED = 3 << COUNT_BITS)
  1. addWorker方法false的情况:
  • 线程池当前状态 大于等于SHUTDOWN状态 并且 线程池当前状态大于等于STOP状态 或者 是任务不等于null 或者 队列为空 就返回false。返回false就是拒绝的意思。
1
2
3
4
5
6
7
8
9
10
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;//拒绝新任务的添加
}
}
  1. 线程数超过设置的最大线程数,就拒绝。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;//拒绝新任务的添加
}
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
// 超过了设置的最大线程数量就返回false
return false;
}
}
  1. 当线程池处于SHUTDOWN状态并且队列里面已经存在之前已经添加的但还没执行的相同的任务。
1
2
3
4
5
6
7
8
if (! isRunning(recheck) && remove(command))
reject(command);

public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
  1. 事物存储的数据结构存不下新的事物的时候,然后非核心线程也到了上限值。
1
2
3
4
5
6
7
8
9
10
// offer:往队列里面添加数据,添加数据失败则返回false
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}// 队列里面添加事物失败才会走这个判断
else if (!addWorker(command, false))
reject(command);

这里讲到了拒绝,那就在分析下线程池的拒绝策略:

java官方实现的拒绝策略有四种:

classDiagram
RejectedExecutionHandler <|-- abortpolicy rejectedexecutionhandler <|-- callerrunspolicy discardoldestpolicy discardpolicy rejectedexecutionhandler: +regectedexecution()< pre>
  1. AbortPolicy终止策略:直接抛出异常RejectedxecutionException
  2. CallerRunsPolicy调用者运行策略:除非线程池现在处于shutDown状态,那么被拒绝的任务将由调用者线程自己处理。
  3. DiscardOldestPolicy如果线程池没shutDown,就丢弃队列里面最久没执行的事物,然后将当前任务添加进线程池里面。
  4. DiscardPolicy默默丢弃任务无任何感知。

当然我们可以继承自RejectedExecutionHandler实现recectedExecution方法。


shutDown和shutDownNow的区别:

  • shutDown中断空前的workers。没有任何返回。
  • showDownNow中断所有的workders,并且返回所有的任务队列里面的任务

java提供了几个默认的线程池:

  1. newCachedThreadPool:该线程池可以无限扩展,当需求增加时会自动添加新的线程,当需求降低时,会自动回收空闲线程。通常用来执行许多短期异步任务的程序性能,
    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }
  2. newScheduledThreadPool创建一个以延迟或者定时来执行任务的线程池,工作队列为DelayedWorkQueue.适用于需要多个后台线程执行的周期性任务。
  3. newSingleThreadExecutor只有一个线程的线程池,用来执行需要要保证顺序的任务场景。

一篇挺好的分析线程池源码的文章:文章1


创建线程的几种方式:

  1. 继承thread
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class MyThread extends Thread{//继承Thread类
      public void run(){
      //重写run方法
      }
    }

    public class Main {
      public static void main(String[] args){
        new MyThread().start();//创建并启动线程
      }

    }
  2. 继承Runable
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class MyRunable implements Runnable {//实现Runnable接口
      public void run(){
      //重写run方法
      }
    }

    public class Main {
      public static void main(String[] args){
        //创建并启动线程
        MyRunable myRunable=new MyRunable();
        Thread thread=new Thread(myRunable);
        thread().start();
      }
  3. 使用Callable和FutureTask来创建
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    FutureTask futureTask = new FutureTask(new Callable() {
    @Override
    public Object call() throws Exception {
    Thread.sleep(1000);
    System.out.println("执行任务");
    return "线程里的返回值";
    }
    });
    Thread thread = new Thread(futureTask);
    thread.start();
    System.out.println("==:" + futureTask.get());
    System.out.println("结束了");
    futureTask.get方法会阻塞当前正在运行的线程.
  4. 使用线程池:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    ExecutorService mFixedPool = Executors.newFixedThreadPool(4);
    Future future = mFixedPool.submit(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
    System.out.println("在子线程执行任务");
    return "我是返回值";
    }
    });
    try {
    System.out.println("拿到返回值:"+future.get());
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    }
    System.out.println("一切都结束了");
    mFixedPool.shutdown();

实现等待所有线程结束在继续运行的实现有如下几种方式:

  1. 使用CountDownLatch
  2. 使用join

比如main线程想等thread1线程结束了,main在结束,那么在main线程只需要调用thread1.join就行.

关于线程状态控制的方法的记忆要这么记忆:
第一组: wait / notify / notifyAll
第二组: join / sleep / yield

join / sleep / yield : 他们会自动到达某个条件就苏醒过来.而 wait 不会,他需要你触发 notify notifyAll 才可以. 那 sleep 和 wait 的区别是,sleep他会阻塞当前线程,并且不会释放对象锁,而wait让当前线程处于等待状态,并且释放对象锁.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class SleepDemo {
private static final Object lock = new Object();

public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (lock) {
System.out.println("Thread 1 acquired lock");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread 1 released lock");
}
});

Thread t2 = new Thread(() -> {
synchronized (lock) {
System.out.println("Thread 2 acquired lock");
}
});

t1.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.start();
}
}
//上面代码的运行效果是,t2一直运行不起来,因为t1拿到了对象锁,并且调用了sleep导致锁不能被释放,所以t2也没办法运行下去.