[TOC]

JUC之手写一个简单的线程池

​ 在之前介绍了线程池的快捷创建方式以及其中的隐患,紧接着介绍了线程池的标准创建方式,了解了线程池的基本构造后,我们可以自己尝试手写一个简单的线程池来加深我们对线程池的理解。

阻塞队列

​ 首先我们需要一个阻塞队列,用来存放任务,可以向其中添加任务,获取其中的任务,还要实现当队列为空时,不能获取任务,当队列已满时,不能向其中添加任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/**
* 自定义阻塞队列类
* @param <T>
*/
class BlockingQueueTest<T> {
java.util.logging.Logger logger= getLogger("BlockingQueueTest");
// 1. 任务队列
private final Deque<T> deque = new ArrayDeque<>();
// 2. 锁
private final ReentrantLock lock = new ReentrantLock();
// 3. 生产者条件
private final Condition addCon = lock.newCondition();
// 4. 消费者条件
private final Condition delCon = lock.newCondition();
// 5. 队列容量
private final int len;

/**
* 构造方法,指定队列的最大容量
* @param len
*/
public BlockingQueueTest(int len) {
this.len = len;
}

/**
* 判断阻塞队列是否已满
* @return
*/
public boolean isTrue(){
return size()==len;
}

/**
* 阻塞获取队列中的任务
*/
public T get(){
lock.lock();
try {
while(deque.isEmpty()){
try {
delCon.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = deque.getFirst();
deque.removeFirst();
addCon.signal();
return t;
}finally {
lock.unlock();
}
}

/**
* 有超时时间的阻塞获取队列中的任务
* @param time
* @param timeUnit
*/
public T poll(Long time, TimeUnit timeUnit){
lock.lock();
try {
//将等待时间统一转换为纳秒
long nanos = timeUnit.toNanos(time);
while(deque.isEmpty()){
try {
if(nanos <= 0){
return null;
}
//返回的值为等待一段时间后剩余的时间
nanos = delCon.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = deque.getFirst();
deque.removeFirst();
addCon.signal();
return t;
}finally {
lock.unlock();
}
}

/**
* 阻塞添加任务
* @param task
*/
public void put(T task){
lock.lock();
try{
while(deque.size()==len){
try {
logger.log(Level.INFO,task+"等待加入任务队列---------");
addCon.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.log(Level.INFO,task+"加入任务队列========");
deque.addLast(task);
delCon.signal();
}finally {
lock.unlock();
}
}

/**
* 带超时时间的添加任务
* @param time
* @param unit
* @return
*/
public boolean offer(T task,Long time,TimeUnit unit){
lock.lock();
try{
long nanos = unit.toNanos(time);
while(deque.size()==len){
try {
logger.log(Level.INFO,task+"等待加入任务队列---------");
if(nanos<=0){
return false;
}
nanos = addCon.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.log(Level.INFO,task+"加入任务队列========");
deque.addLast(task);
delCon.signal();
return true;
}finally {
lock.unlock();
}
}

/**
* 获取队列的容量
* @return
*/
public int size(){
lock.lock();
try{
return deque.size();
}finally {
lock.unlock();
}
}

/**
* 带拒绝策略的添加任务
* @param rejectPoilicy
* @param task
*/
public void tryPut(RejectPoilicy<T> rejectPoilicy, T task) {
lock.lock();
try{
//判断队列是否已满
if(deque.size()==len){
rejectPoilicy.reject(this,task);
}else {
logger.log(Level.INFO,task+"加入任务队列========");
deque.addLast(task);
delCon.signal();
}
}finally {
lock.unlock();
}
}
}

拒绝策略

​ 由于拒绝策略有好几种实现方案,这里我们使用定义一个拒绝策略的接口,让调用者使用函数式编程或Lambda表达式来自己实现其中的方法。定义的接口如下:

1
2
3
4
5
6
7
8
/**
* 拒绝策略接口
* @param <T>
*/
@FunctionalInterface
interface RejectPoilicy<T>{
void reject(BlockingQueueTest<T> queue,T task);
}

线程工厂

​ 这里我没有单独写一个线程工厂,而是在线程池的里面定义了一个内部类,来执行提交的任务,同时在里面通过名字来核心线程与普通线程,这决定了他们在执行完任务后会不会结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
//原子类
AtomicInteger threadNo = new AtomicInteger(1);
/**
* 工作线程类
*/
class Worker extends Thread{

private Runnable task;
private String threadName;

private Worker(Runnable task) {
this.task=task;
setThreadName();
this.setName(threadName);
logger.log(Level.INFO,threadName+"已创建..........");
}

private void setThreadName(){
threadName = "simpleThread-" + threadNo.get();
threadNo.incrementAndGet();
}

@Override
public void run() {
char c = threadName.charAt(13);
if((c-'0')<=coreSize){//对比线程编号,小于核心线程数的线程调用阻塞执行任务,执行结束后不移出线程队列
gorun();
}else{//大于核心线程数的采用限时等待执行,执行结束后移除出线程队列
go();
}
}
/**
* 普通线程执行完任务后限时等待任务队列,移除
*/
private void go(){
while(task != null || (task = blockingQueueTest.poll(outTime,unit)) != null){
try {
logger.log(Level.INFO,"普通线程"+threadName+"正在执行"+task+"任务....................");
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null;
}
}
synchronized (workers){
logger.log(Level.INFO,"*******worker被移除"+this);
workers.remove(this);
}
}

/**
* 核心线程执行完任务后阻塞等待任务队列,不移除
*/
private void gorun(){
while(task != null || (task = blockingQueueTest.get()) != null){
try {
logger.log(Level.INFO,"核心线程"+threadName+"正在执行"+task+"任务....................");
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null;
}
}
}
}

线程池类

​ 关于线程池要使用的基本都已经编写完,接下来编写线程池类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* 自定义线程池类
*/
class ThreadPool{
java.util.logging.Logger logger= getLogger("ThreadPool");
//阻塞任务队列
private final BlockingQueueTest<Runnable> blockingQueueTest;
//线程集合
private final HashSet<Worker> workers = new HashSet<>();
//核心线程数
private final int coreSize;
//最大线程数
private final int maxThreadSize;
//超时时间
private final long outTime;
//时间单位
private final TimeUnit unit;
//原子类
AtomicInteger threadNo = new AtomicInteger(1);
//线程工厂
// private final ThreadFactory threadFactory;
//拒绝策略
private final RejectPoilicy<Runnable> rejectPoilicy;

public ThreadPool(int coreSize,int maxThreadSize, long outTime, TimeUnit unit, int queueSize, RejectPoilicy<Runnable> rejectPoilicy) {
this.coreSize = coreSize;
this.maxThreadSize=maxThreadSize;
this.outTime = outTime;
this.unit = unit;
this.blockingQueueTest = new BlockingQueueTest<>(queueSize);
// this.threadFactory=threadFactory;
this.rejectPoilicy = rejectPoilicy;
logger.log(Level.INFO,"====================ThreadPool初始化成功=================");
}

public void execute(Runnable task){
synchronized (workers){
//如果任务数没有超过核心线程数,直接交给worker执行
if(workers.size()<coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
logger.log(Level.INFO, "=============================新增worker:" + worker);
logger.log(Level.INFO, "=============新增task:" + task);
worker.start();
}else if(!blockingQueueTest.isTrue()) {
//如果线程数达到核心线程数,阻塞队列不满,则将任务加入任务队列
blockingQueueTest.put(task);
}else if(workers.size()<maxThreadSize){
//如果阻塞队列已满,单线程数小于最大线程数,则立即创建线程执行该任务
Worker worker = new Worker(task);
workers.add(worker);
logger.log(Level.INFO, "=============================新增worker:" + worker);
logger.log(Level.INFO, "=============新增task:" + task);
worker.start();
}else{
//线程数大于最大线程数,为其执行拒绝策略
blockingQueueTest.tryPut(rejectPoilicy,task);
}
}
}
/**
* 工作线程类
*/
//.....
}

测试

接下来编写一个测试类来实验一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import static java.util.logging.Logger.getLogger;
public class TestPool {
public static void main(String[] args) {
java.util.logging.Logger logger= getLogger("TestPool");
ThreadPool pool = new ThreadPool(2,4, 1, TimeUnit.SECONDS,
3, (queue,task)->{
/**
* 1.阻塞等待加入等待队列
* queue.put(task);
* 一直等待加入队列
* 2.带等待时间的等待加入
* queue.offer(task, 1500L, TimeUnit.MILLISECONDS);
* 在预期时间内等待,超时则退出等待
* 3.让调用者放弃任务执行
* logger.log(Level.INFO,"^^^^^^^^^^^^放弃执行"+task+"了^^^^^^^");
* 放弃无法加入等待队列的任务
* 4.让调用者抛出异常
* throw new RuntimeException("任务执行失败"+task);
* 之后的代码不会执行
* 5.让调用者自己执行任务
* task.run();
*/
queue.put(task);
});
for (int i = 0; i < 10; i++) {
int j=i;
pool.execute(()-> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.log(Level.INFO, String.valueOf(j));
});
}
}
}

五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool
信息: ====================ThreadPool初始化成功=================
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool$Worker
信息: simpleThread-1已创建……….
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool execute
信息: =============================新增worker:Thread[simpleThread-1,5,main]
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool execute
信息: =============新增task:suan.JUC.TestPool$$Lambda$2/1072408673@b4c966a
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool$Worker
信息: simpleThread-2已创建……….
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool$Worker gorun
信息: 核心线程simpleThread-1正在执行suan.JUC.TestPool$$Lambda$2/1072408673@b4c966a任务………………..
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool execute
信息: =============================新增worker:Thread[simpleThread-2,5,main]
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool execute
信息: =============新增task:suan.JUC.TestPool$$Lambda$2/1072408673@4e50df2e
五月 20, 2023 10:00:22 下午 suan.JUC.BlockingQueueTest put
信息: suan.JUC.TestPool$$Lambda$2/1072408673@1d81eb93加入任务队列========
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool$Worker gorun
信息: 核心线程simpleThread-2正在执行suan.JUC.TestPool$$Lambda$2/1072408673@4e50df2e任务………………..
五月 20, 2023 10:00:22 下午 suan.JUC.BlockingQueueTest put
信息: suan.JUC.TestPool$$Lambda$2/1072408673@7291c18f加入任务队列========
五月 20, 2023 10:00:22 下午 suan.JUC.BlockingQueueTest put
信息: suan.JUC.TestPool$$Lambda$2/1072408673@34a245ab加入任务队列========
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool$Worker
信息: simpleThread-3已创建……….
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool execute
信息: =============================新增worker:Thread[simpleThread-3,5,main]
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool execute
信息: =============新增task:suan.JUC.TestPool$$Lambda$2/1072408673@6e8cf4c6
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool$Worker
信息: simpleThread-4已创建……….
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool$Worker go
信息: 普通线程simpleThread-3正在执行suan.JUC.TestPool$$Lambda$2/1072408673@6e8cf4c6任务………………..
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool execute
信息: =============================新增worker:Thread[simpleThread-4,5,main]
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool execute
信息: =============新增task:suan.JUC.TestPool$$Lambda$2/1072408673@34c45dca
五月 20, 2023 10:00:22 下午 suan.JUC.BlockingQueueTest put
信息: suan.JUC.TestPool$$Lambda$2/1072408673@52cc8049等待加入任务队列———
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool$Worker go
信息: 普通线程simpleThread-4正在执行suan.JUC.TestPool$$Lambda$2/1072408673@34c45dca任务………………..
五月 20, 2023 10:00:23 下午 suan.JUC.TestPool lambda$main$1
信息: 5
五月 20, 2023 10:00:23 下午 suan.JUC.TestPool lambda$main$1
信息: 0
五月 20, 2023 10:00:23 下午 suan.JUC.TestPool lambda$main$1
信息: 6
五月 20, 2023 10:00:23 下午 suan.JUC.TestPool lambda$main$1
信息: 1
五月 20, 2023 10:00:23 下午 suan.JUC.BlockingQueueTest put
信息: suan.JUC.TestPool$$Lambda$2/1072408673@52cc8049加入任务队列========
五月 20, 2023 10:00:23 下午 suan.JUC.ThreadPool$Worker go
信息: 普通线程simpleThread-3正在执行suan.JUC.TestPool$$Lambda$2/1072408673@1d81eb93任务………………..
五月 20, 2023 10:00:23 下午 suan.JUC.BlockingQueueTest put
信息: suan.JUC.TestPool$$Lambda$2/1072408673@27973e9b等待加入任务队列———
五月 20, 2023 10:00:23 下午 suan.JUC.ThreadPool$Worker gorun
信息: 核心线程simpleThread-1正在执行suan.JUC.TestPool$$Lambda$2/1072408673@7291c18f任务………………..
五月 20, 2023 10:00:23 下午 suan.JUC.BlockingQueueTest put
信息: suan.JUC.TestPool$$Lambda$2/1072408673@27973e9b加入任务队列========
五月 20, 2023 10:00:23 下午 suan.JUC.ThreadPool$Worker gorun
信息: 核心线程simpleThread-2正在执行suan.JUC.TestPool$$Lambda$2/1072408673@52cc8049任务………………..
五月 20, 2023 10:00:23 下午 suan.JUC.ThreadPool$Worker go
信息: 普通线程simpleThread-4正在执行suan.JUC.TestPool$$Lambda$2/1072408673@34a245ab任务………………..
五月 20, 2023 10:00:23 下午 suan.JUC.BlockingQueueTest put
信息: suan.JUC.TestPool$$Lambda$2/1072408673@312b1dae加入任务队列========
五月 20, 2023 10:00:24 下午 suan.JUC.TestPool lambda$main$1
信息: 2
五月 20, 2023 10:00:24 下午 suan.JUC.ThreadPool$Worker go
信息: 普通线程simpleThread-3正在执行suan.JUC.TestPool$$Lambda$2/1072408673@27973e9b任务………………..
五月 20, 2023 10:00:24 下午 suan.JUC.TestPool lambda$main$1
信息: 3
五月 20, 2023 10:00:24 下午 suan.JUC.TestPool lambda$main$1
信息: 4
五月 20, 2023 10:00:24 下午 suan.JUC.TestPool lambda$main$1
信息: 7
五月 20, 2023 10:00:24 下午 suan.JUC.ThreadPool$Worker gorun
信息: 核心线程simpleThread-1正在执行suan.JUC.TestPool$$Lambda$2/1072408673@312b1dae任务………………..
五月 20, 2023 10:00:25 下午 suan.JUC.TestPool lambda$main$1
信息: 8
五月 20, 2023 10:00:25 下午 suan.JUC.TestPool lambda$main$1
信息: 9
五月 20, 2023 10:00:25 下午 suan.JUC.ThreadPool$Worker go
信息: *******worker被移除Thread[simpleThread-4,5,main]
五月 20, 2023 10:00:26 下午 suan.JUC.ThreadPool$Worker go
信息: *******worker被移除Thread[simpleThread-3,5,main]

成功,且由于核心线程并没有结束,所以我们的进程并没有结束。

以上就是手写一个线程类的思路代码,另外这只是一个简单的实现,很多功能向关闭线程池的功能就没有编写,仅仅作为加深线程池的理解的一种方式,如有错误或可改进的地方,欢迎指正!