[TOC]

JUC之Executors的4种快捷创建线程池的方法

Java通过Executors工厂类提供了4种快捷创建线程池的方法,具体如下:

1
2
3
4
newSingleThreadExecutor()//创建只有一个线程的线程池
newFixedThreadPool(int nThreads)//创建固定大小的线程池
newCachedThreadPool()//创建一个不限制线程数量的线程池,任何提交的任务都将立即执行,但是空闲线程会得到及时回收
newScheduledThreadPool()//创建一个可定期或者延时执行任务的线程池

newSingleThreadExecutor创建“单线程化线程池”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class CreateThreadPoolDemo1 {
public static final int SLEEP_GAP = 500;
static class TargetTask implements Runnable{
static AtomicInteger taskNo = new AtomicInteger(1);
private String taskName;
public TargetTask(){
taskName = "task-" + taskNo.get();
taskNo.incrementAndGet();
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"任务:"+taskName+"doing");
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"运行结束。");
}
}

public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newSingleThreadExecutor();
for(int i=0;i<5;i++){
pool.execute(new TargetTask());
pool.submit(new TargetTask());
}
Thread.sleep(1000);
pool.shutdown();
}
}

pool-1-thread-1任务:task-1doing
pool-1-thread-1运行结束。
pool-1-thread-1任务:task-2doing
pool-1-thread-1运行结束。
pool-1-thread-1任务:task-3doing
pool-1-thread-1运行结束。
pool-1-thread-1任务:task-4doing
pool-1-thread-1运行结束。
pool-1-thread-1任务:task-5doing
pool-1-thread-1运行结束。
pool-1-thread-1任务:task-6doing
pool-1-thread-1运行结束。

  1. 单线程化的线程池中的任务是按照提交的次序顺序执行的。
  2. 池中的唯一线程的存活时间是无限的。
  3. 当池中的唯一线程正繁忙时,新提交的任务实例会进入内部 的阻塞队列中,并且其阻塞队列是无界的。
  4. 适用于任务按照提交次序,一个任务一个任务饿逐个执行的场景。

newFixedThreadPool创建“固定数量的线程池”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CreateThreadPoolDemo2 {
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(3);
for(int i=0;i<5;i++){
pool.execute(new TargetTask());
pool.submit(new TargetTask());
}
Thread.sleep(1000);
pool.shutdown();
}
public static final int SLEEP_GAP = 500;
static class TargetTask implements Runnable{
static AtomicInteger taskNo = new AtomicInteger(1);
private String taskName;
public TargetTask(){
taskName = "task-" + taskNo.get();
taskNo.incrementAndGet();
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"任务:"+taskName+"doing");
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"运行结束。");
}
}
}

pool-1-thread-1任务:task-1doing
pool-1-thread-2任务:task-2doing
pool-1-thread-3任务:task-3doing
pool-1-thread-1运行结束。
pool-1-thread-3运行结束。
pool-1-thread-2运行结束。
pool-1-thread-1任务:task-4doing
pool-1-thread-2任务:task-5doing
pool-1-thread-3任务:task-6doing
pool-1-thread-3运行结束。
pool-1-thread-1运行结束。
pool-1-thread-2运行结束。
pool-1-thread-1任务:task-8doing
pool-1-thread-3任务:task-7doing
pool-1-thread-2任务:task-9doing
pool-1-thread-2运行结束。
pool-1-thread-1运行结束。
pool-1-thread-3运行结束。
pool-1-thread-2任务:task-10doing
pool-1-thread-2运行结束。

  1. 如果线程数没有达到“固定数量”,每次提交一个任务线程池内就创建一个新线程,直到线程达到线程池固定的数量。
  2. 线程池的大小一旦达到“固定数量”就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
  3. 在接收异步任务的执行目标实例时,如果池中的所有线程均在繁忙状态,新任务会进入阻塞队列中(无界的阻塞队列)。
  4. 适用于需要任务长期执行的场景。
  5. 弊端:内部使用无界队列来存放排队任务,当大量任务超过线程池最大容量需要处理时,队列无限增大,使服务器资源迅速耗尽。

newCachedThreadPool创建“可缓存线程池”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CreateThreadPoolDemo3 {
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
pool.execute(new TargetTask());
pool.submit(new TargetTask());
}
Thread.sleep(1000);
pool.shutdown();
}
public static final int SLEEP_GAP = 500;
static class TargetTask implements Runnable{
static AtomicInteger taskNo = new AtomicInteger(1);
private String taskName;
public TargetTask(){
taskName = "task-" + taskNo.get();
taskNo.incrementAndGet();
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"任务:"+taskName+"doing");
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"运行结束。");
}
}
}

pool-1-thread-1任务:task-1doing
pool-1-thread-2任务:task-2doing
pool-1-thread-3任务:task-3doing
pool-1-thread-4任务:task-4doing
pool-1-thread-5任务:task-5doing
pool-1-thread-6任务:task-6doing
pool-1-thread-7任务:task-7doing
pool-1-thread-8任务:task-8doing
pool-1-thread-9任务:task-9doing
pool-1-thread-10任务:task-10doing
pool-1-thread-9运行结束。
pool-1-thread-8运行结束。
pool-1-thread-4运行结束。
pool-1-thread-1运行结束。
pool-1-thread-7运行结束。
pool-1-thread-5运行结束。
pool-1-thread-10运行结束。
pool-1-thread-2运行结束。
pool-1-thread-3运行结束。
pool-1-thread-6运行结束。

  1. 在接收新的异步任务target执行目标实例时,如果池内所有线程繁忙,此线程池就会添加新线程来处理任务。
  2. 此线程池不会对线程池大小进行限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
  3. 如果部分线程空闲,也就是存量线程的数量超过了处理任务数量,就会回收空闲(60秒不执行任务)线程。
  4. 适用场景:需要快速处理突发性强、耗时较短的任务场景,如Netty的NIO处理场景、REST API接口的瞬时削峰场景。
  5. 弊端:没有最大线程数量限制,如果大量的异步任务执行目标实例同时提交,可能回因为创建过多线程而导致资源耗尽。

newScheduledThreadPool创建“可调度线程池”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class CreateThreadPoolDemo4 {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
for (int i = 0; i < 4; i++) {
service.scheduleAtFixedRate
(new CreateThreadPoolDemo1.TargetTask(),0,1000, TimeUnit.MILLISECONDS);
// service.scheduleWithFixedDelay
// (new CreateThreadPoolDemo1.TargetTask(),0,1000,TimeUnit.MILLISECONDS);
}
Thread.sleep(1000);
service.shutdown();
}
public static final int SLEEP_GAP = 500;
static class TargetTask implements Runnable{
static AtomicInteger taskNo = new AtomicInteger(1);
private String taskName;
public TargetTask(){
taskName = "task-" + taskNo.get();
taskNo.incrementAndGet();
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"任务:"+taskName+"doing");
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"运行结束。");
}
}
}

​ newScheduledThreadPool工厂方法可以创建一个执行“延时”和 “周期性”任务的可调度线程池,所创建的线程池为 ScheduleExecutorService类型的实例。ScheduleExecutorService接 口中有多个重要的接收被调目标任务的方法,其中 scheduleAtFixedRate和scheduleWithFixedDelay使用得比较多。

scheduleAtFixedRate:

1
2
3
4
5
6
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, //异步任务target执行目标实例
long initialDelay, //首次执行延时
long period, //两次开始执行最小间隔时间
TimeUnit unit //所设置的时间的计时单位,如TimeUnit.SECONDS常量
);

scheduleWithFixedDelay:

1
2
3
4
5
6
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, //异步任务target执行目标实例
long initialDelay, //首次执行延时
long delay, //前一次执行结束到下一次执行开始的间隔时间(间隔执行延迟时间)
TimeUnit unit //所设置的时间的计时单位,如TimeUnit.SECONDS常量
);
  1. 当被调任务的执行时间大于指定的间隔时间时, ScheduleExecutorService并不会创建一个新的线程去并发执行这个任 务,而是等待前一次调度执行完毕。
  2. 适用场景:周期性地执行任务的场景。

Executors快捷创建线程池的潜在问题

FixedThreadPool和SingleThreadPool

​ 这两个工厂方法所创建的线程池,工作队列(任务排队的队列)的长度都为Integer.MAX_VALUE,可能会堆积大量的任务,从而导致OOM(即耗尽内存资源)。

CachedThreadPool和ScheduledThreadPool

​ 这两个工厂方法所创建的线程池允许创建的线程数量为Integer.MAX_VALUE,可能会导致创建大量的线程,从而导致OOM。