[TOC]
JUC之手写一个简单的线程池
在之前介绍了线程池的快捷创建方式以及其中的隐患,紧接着介绍了线程池的标准创建方式,了解了线程池的基本构造后,我们可以自己尝试手写一个简单的线程池来加深我们对线程池的理解。
阻塞队列
首先我们需要一个阻塞队列,用来存放任务,可以向其中添加任务,获取其中的任务,还要实现当队列为空时,不能获取任务,当队列已满时,不能向其中添加任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
|
class BlockingQueueTest<T> { java.util.logging.Logger logger= getLogger("BlockingQueueTest"); private final Deque<T> deque = new ArrayDeque<>(); private final ReentrantLock lock = new ReentrantLock(); private final Condition addCon = lock.newCondition(); private final Condition delCon = lock.newCondition(); private final int len;
public BlockingQueueTest(int len) { this.len = len; }
public boolean isTrue(){ return size()==len; }
public T get(){ lock.lock(); try { while(deque.isEmpty()){ try { delCon.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = deque.getFirst(); deque.removeFirst(); addCon.signal(); return t; }finally { lock.unlock(); } }
public T poll(Long time, TimeUnit timeUnit){ lock.lock(); try { long nanos = timeUnit.toNanos(time); while(deque.isEmpty()){ try { if(nanos <= 0){ return null; } nanos = delCon.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = deque.getFirst(); deque.removeFirst(); addCon.signal(); return t; }finally { lock.unlock(); } }
public void put(T task){ lock.lock(); try{ while(deque.size()==len){ try { logger.log(Level.INFO,task+"等待加入任务队列---------"); addCon.await(); } catch (InterruptedException e) { e.printStackTrace(); } } logger.log(Level.INFO,task+"加入任务队列========"); deque.addLast(task); delCon.signal(); }finally { lock.unlock(); } }
public boolean offer(T task,Long time,TimeUnit unit){ lock.lock(); try{ long nanos = unit.toNanos(time); while(deque.size()==len){ try { logger.log(Level.INFO,task+"等待加入任务队列---------"); if(nanos<=0){ return false; } nanos = addCon.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } logger.log(Level.INFO,task+"加入任务队列========"); deque.addLast(task); delCon.signal(); return true; }finally { lock.unlock(); } }
public int size(){ lock.lock(); try{ return deque.size(); }finally { lock.unlock(); } }
public void tryPut(RejectPoilicy<T> rejectPoilicy, T task) { lock.lock(); try{ if(deque.size()==len){ rejectPoilicy.reject(this,task); }else { logger.log(Level.INFO,task+"加入任务队列========"); deque.addLast(task); delCon.signal(); } }finally { lock.unlock(); } } }
|
拒绝策略
由于拒绝策略有好几种实现方案,这里我们使用定义一个拒绝策略的接口,让调用者使用函数式编程或Lambda表达式来自己实现其中的方法。定义的接口如下:
1 2 3 4 5 6 7 8
|
@FunctionalInterface interface RejectPoilicy<T>{ void reject(BlockingQueueTest<T> queue,T task); }
|
线程工厂
这里我没有单独写一个线程工厂,而是在线程池的里面定义了一个内部类,来执行提交的任务,同时在里面通过名字来核心线程与普通线程,这决定了他们在执行完任务后会不会结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| AtomicInteger threadNo = new AtomicInteger(1);
class Worker extends Thread{
private Runnable task; private String threadName;
private Worker(Runnable task) { this.task=task; setThreadName(); this.setName(threadName); logger.log(Level.INFO,threadName+"已创建.........."); }
private void setThreadName(){ threadName = "simpleThread-" + threadNo.get(); threadNo.incrementAndGet(); }
@Override public void run() { char c = threadName.charAt(13); if((c-'0')<=coreSize){ gorun(); }else{ go(); } }
private void go(){ while(task != null || (task = blockingQueueTest.poll(outTime,unit)) != null){ try { logger.log(Level.INFO,"普通线程"+threadName+"正在执行"+task+"任务...................."); task.run(); }catch (Exception e){ e.printStackTrace(); }finally { task = null; } } synchronized (workers){ logger.log(Level.INFO,"*******worker被移除"+this); workers.remove(this); } }
private void gorun(){ while(task != null || (task = blockingQueueTest.get()) != null){ try { logger.log(Level.INFO,"核心线程"+threadName+"正在执行"+task+"任务...................."); task.run(); }catch (Exception e){ e.printStackTrace(); }finally { task = null; } } } }
|
线程池类
关于线程池要使用的基本都已经编写完,接下来编写线程池类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
class ThreadPool{ java.util.logging.Logger logger= getLogger("ThreadPool"); private final BlockingQueueTest<Runnable> blockingQueueTest; private final HashSet<Worker> workers = new HashSet<>(); private final int coreSize; private final int maxThreadSize; private final long outTime; private final TimeUnit unit; AtomicInteger threadNo = new AtomicInteger(1);
private final RejectPoilicy<Runnable> rejectPoilicy;
public ThreadPool(int coreSize,int maxThreadSize, long outTime, TimeUnit unit, int queueSize, RejectPoilicy<Runnable> rejectPoilicy) { this.coreSize = coreSize; this.maxThreadSize=maxThreadSize; this.outTime = outTime; this.unit = unit; this.blockingQueueTest = new BlockingQueueTest<>(queueSize);
this.rejectPoilicy = rejectPoilicy; logger.log(Level.INFO,"====================ThreadPool初始化成功================="); }
public void execute(Runnable task){ synchronized (workers){ if(workers.size()<coreSize) { Worker worker = new Worker(task); workers.add(worker); logger.log(Level.INFO, "=============================新增worker:" + worker); logger.log(Level.INFO, "=============新增task:" + task); worker.start(); }else if(!blockingQueueTest.isTrue()) { blockingQueueTest.put(task); }else if(workers.size()<maxThreadSize){ Worker worker = new Worker(task); workers.add(worker); logger.log(Level.INFO, "=============================新增worker:" + worker); logger.log(Level.INFO, "=============新增task:" + task); worker.start(); }else{ blockingQueueTest.tryPut(rejectPoilicy,task); } } }
}
|
测试
接下来编写一个测试类来实验一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import java.util.ArrayDeque; import java.util.Deque; import java.util.HashSet; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import static java.util.logging.Logger.getLogger; public class TestPool { public static void main(String[] args) { java.util.logging.Logger logger= getLogger("TestPool"); ThreadPool pool = new ThreadPool(2,4, 1, TimeUnit.SECONDS, 3, (queue,task)->{
queue.put(task); }); for (int i = 0; i < 10; i++) { int j=i; pool.execute(()-> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } logger.log(Level.INFO, String.valueOf(j)); }); } } }
|
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool
信息: ====================ThreadPool初始化成功=================
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool$Worker
信息: simpleThread-1已创建……….
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool execute
信息: =============================新增worker:Thread[simpleThread-1,5,main]
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool execute
信息: =============新增task:suan.JUC.TestPool$$Lambda$2/1072408673@b4c966a
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool$Worker
信息: simpleThread-2已创建……….
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool$Worker gorun
信息: 核心线程simpleThread-1正在执行suan.JUC.TestPool$$Lambda$2/1072408673@b4c966a任务………………..
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool execute
信息: =============================新增worker:Thread[simpleThread-2,5,main]
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool execute
信息: =============新增task:suan.JUC.TestPool$$Lambda$2/1072408673@4e50df2e
五月 20, 2023 10:00:22 下午 suan.JUC.BlockingQueueTest put
信息: suan.JUC.TestPool$$Lambda$2/1072408673@1d81eb93加入任务队列========
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool$Worker gorun
信息: 核心线程simpleThread-2正在执行suan.JUC.TestPool$$Lambda$2/1072408673@4e50df2e任务………………..
五月 20, 2023 10:00:22 下午 suan.JUC.BlockingQueueTest put
信息: suan.JUC.TestPool$$Lambda$2/1072408673@7291c18f加入任务队列========
五月 20, 2023 10:00:22 下午 suan.JUC.BlockingQueueTest put
信息: suan.JUC.TestPool$$Lambda$2/1072408673@34a245ab加入任务队列========
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool$Worker
信息: simpleThread-3已创建……….
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool execute
信息: =============================新增worker:Thread[simpleThread-3,5,main]
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool execute
信息: =============新增task:suan.JUC.TestPool$$Lambda$2/1072408673@6e8cf4c6
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool$Worker
信息: simpleThread-4已创建……….
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool$Worker go
信息: 普通线程simpleThread-3正在执行suan.JUC.TestPool$$Lambda$2/1072408673@6e8cf4c6任务………………..
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool execute
信息: =============================新增worker:Thread[simpleThread-4,5,main]
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool execute
信息: =============新增task:suan.JUC.TestPool$$Lambda$2/1072408673@34c45dca
五月 20, 2023 10:00:22 下午 suan.JUC.BlockingQueueTest put
信息: suan.JUC.TestPool$$Lambda$2/1072408673@52cc8049等待加入任务队列———
五月 20, 2023 10:00:22 下午 suan.JUC.ThreadPool$Worker go
信息: 普通线程simpleThread-4正在执行suan.JUC.TestPool$$Lambda$2/1072408673@34c45dca任务………………..
五月 20, 2023 10:00:23 下午 suan.JUC.TestPool lambda$main$1
信息: 5
五月 20, 2023 10:00:23 下午 suan.JUC.TestPool lambda$main$1
信息: 0
五月 20, 2023 10:00:23 下午 suan.JUC.TestPool lambda$main$1
信息: 6
五月 20, 2023 10:00:23 下午 suan.JUC.TestPool lambda$main$1
信息: 1
五月 20, 2023 10:00:23 下午 suan.JUC.BlockingQueueTest put
信息: suan.JUC.TestPool$$Lambda$2/1072408673@52cc8049加入任务队列========
五月 20, 2023 10:00:23 下午 suan.JUC.ThreadPool$Worker go
信息: 普通线程simpleThread-3正在执行suan.JUC.TestPool$$Lambda$2/1072408673@1d81eb93任务………………..
五月 20, 2023 10:00:23 下午 suan.JUC.BlockingQueueTest put
信息: suan.JUC.TestPool$$Lambda$2/1072408673@27973e9b等待加入任务队列———
五月 20, 2023 10:00:23 下午 suan.JUC.ThreadPool$Worker gorun
信息: 核心线程simpleThread-1正在执行suan.JUC.TestPool$$Lambda$2/1072408673@7291c18f任务………………..
五月 20, 2023 10:00:23 下午 suan.JUC.BlockingQueueTest put
信息: suan.JUC.TestPool$$Lambda$2/1072408673@27973e9b加入任务队列========
五月 20, 2023 10:00:23 下午 suan.JUC.ThreadPool$Worker gorun
信息: 核心线程simpleThread-2正在执行suan.JUC.TestPool$$Lambda$2/1072408673@52cc8049任务………………..
五月 20, 2023 10:00:23 下午 suan.JUC.ThreadPool$Worker go
信息: 普通线程simpleThread-4正在执行suan.JUC.TestPool$$Lambda$2/1072408673@34a245ab任务………………..
五月 20, 2023 10:00:23 下午 suan.JUC.BlockingQueueTest put
信息: suan.JUC.TestPool$$Lambda$2/1072408673@312b1dae加入任务队列========
五月 20, 2023 10:00:24 下午 suan.JUC.TestPool lambda$main$1
信息: 2
五月 20, 2023 10:00:24 下午 suan.JUC.ThreadPool$Worker go
信息: 普通线程simpleThread-3正在执行suan.JUC.TestPool$$Lambda$2/1072408673@27973e9b任务………………..
五月 20, 2023 10:00:24 下午 suan.JUC.TestPool lambda$main$1
信息: 3
五月 20, 2023 10:00:24 下午 suan.JUC.TestPool lambda$main$1
信息: 4
五月 20, 2023 10:00:24 下午 suan.JUC.TestPool lambda$main$1
信息: 7
五月 20, 2023 10:00:24 下午 suan.JUC.ThreadPool$Worker gorun
信息: 核心线程simpleThread-1正在执行suan.JUC.TestPool$$Lambda$2/1072408673@312b1dae任务………………..
五月 20, 2023 10:00:25 下午 suan.JUC.TestPool lambda$main$1
信息: 8
五月 20, 2023 10:00:25 下午 suan.JUC.TestPool lambda$main$1
信息: 9
五月 20, 2023 10:00:25 下午 suan.JUC.ThreadPool$Worker go
信息: *******worker被移除Thread[simpleThread-4,5,main]
五月 20, 2023 10:00:26 下午 suan.JUC.ThreadPool$Worker go
信息: *******worker被移除Thread[simpleThread-3,5,main]
成功,且由于核心线程并没有结束,所以我们的进程并没有结束。
以上就是手写一个线程类的思路代码,另外这只是一个简单的实现,很多功能向关闭线程池的功能就没有编写,仅仅作为加深线程池的理解的一种方式,如有错误或可改进的地方,欢迎指正!