一、虚拟线程
1、Thread.ofVirtual
// 创建一个名叫“我是虚拟线程” 但未启动的虚拟线程
Thread virtualThread = Thread.ofVirtual()
.name("我是虚拟线程")
.unstarted(() -> {
System.out.println(Thread.currentThread().getName());
});
// 启动虚拟线程
virtualThread.start();
// 或者使用factory
ThreadFactory threadFactory = Thread.ofVirtual().factory();
Thread virtualThread3 = threadFactory.newThread(() -> {
System.out.println(Thread.currentThread().getName());
});
virtualThread3.setName("我是虚拟线程3");
virtualThread3.start();
2、Thread.startVirtualThread
Thread virtualThread2 = Thread.startVirtualThread(() -> {
System.out.println(Thread.currentThread().getName());
} );
3、Executors.newVirtualThreadPerTaskExecutor()
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
Future<?> submit = executorService.submit(() -> {
System.out.println(Thread.currentThread().getName());
});
二、结构化并发编程
Java 19新特性:Structured Concurrency (结构化并发编程)
StructuredTaskScope
在JDK21的官方文档中是这样介绍的:
structured concurrency 的基本 API。
StructuredTaskScope
支持任务拆分为多个并发子任务,在各自的线程中执行,以及子任务必须在主任务继续之前完成的情况。StructuredTaskScope
可用于确保并发操作的生命周期受 syntax block 限制,就像结构化编程中的顺序操作一样。
内部有两个重要的子类:StructuredTaskScope.ShutdownOnFailure
, StructuredTaskScope.ShutdownOnSuccess
StructuredTaskScope.ShutdownOnSuccess
一个
StructuredTaskScope
捕获第一个子任务成功完成的结果。捕获后,它会调用 shutdown 方法来中断未完成的线程并唤醒所有者。此类实现的策略适用于任何子任务的结果都可以(“调用任何”)以及不再需要其他未完成的子任务的结果的情况。除非另有说明,否则将
null
参数传递给此类中的方法将导致抛出NullPointerException
。
总结:并行执行任务,当有一个线程成功了,就会中断其他还未完成的的线程,并返回成功的结果。
适用场景:并发查询同一个数据,比如获取距离最近的服务器IP。
public class StructuredTaskScopeDemo {
public static void main(String[] args) {
try (var structuredTaskScope = new StructuredTaskScope.ShutdownOnSuccess<>()) {
// 并发五个线程
structuredTaskScope.fork(() -> threadTask(1));
structuredTaskScope.fork(() -> threadTask(2));
structuredTaskScope.fork(() -> threadTask(3));
structuredTaskScope.fork(() -> threadTask(4));
structuredTaskScope.fork(() -> threadTask(5));
// 加入主线程
structuredTaskScope.join();
// 获取并发执行结果,如果有异常抛出异常
Object result = structuredTaskScope.result(Exception::new);
// 打印并发执行结果
System.out.printf("并发执行返回的结果:%s", result.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
static String threadTask(int i) throws InterruptedException {
Thread.sleep(1000);
String result = String.format("CurrentThread:%s ==> 我是第%s个线程\n", Thread.currentThread(), i);
System.out.printf(result);
return result;
}
}
执行结果:可以看到ShutdownOnSuccess返回就是第一个执行成功的线程的返回。
// 第一次执行
/*
CurrentThread:VirtualThread[#28]/runnable@ForkJoinPool-1-worker-4 ==> 我是第4个线程
CurrentThread:VirtualThread[#30]/runnable@ForkJoinPool-1-worker-1 ==> 我是第5个线程
CurrentThread:VirtualThread[#26]/runnable@ForkJoinPool-1-worker-3 ==> 我是第3个线程
CurrentThread:VirtualThread[#24]/runnable@ForkJoinPool-1-worker-2 ==> 我是第2个线程
CurrentThread:VirtualThread[#22]/runnable@ForkJoinPool-1-worker-5 ==> 我是第1个线程
并发执行返回的结果:CurrentThread:VirtualThread[#28]/runnable@ForkJoinPool-1-worker-4 ==> 我是第4个线程
*/
// 第二次执行
/*
CurrentThread:VirtualThread[#22]/runnable@ForkJoinPool-1-worker-5 ==> 我是第1个线程
CurrentThread:VirtualThread[#30]/runnable@ForkJoinPool-1-worker-2 ==> 我是第5个线程
CurrentThread:VirtualThread[#24]/runnable@ForkJoinPool-1-worker-4 ==> 我是第2个线程
CurrentThread:VirtualThread[#26]/runnable@ForkJoinPool-1-worker-3 ==> 我是第3个线程
CurrentThread:VirtualThread[#28]/runnable@ForkJoinPool-1-worker-1 ==> 我是第4个线程
并发执行返回的结果:CurrentThread:VirtualThread[#22]/runnable@ForkJoinPool-1-worker-5 ==> 我是第1个线程
*/
// 第三次执行
/*
CurrentThread:VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1 ==> 我是第1个线程
CurrentThread:VirtualThread[#28]/runnable@ForkJoinPool-1-worker-3 ==> 我是第4个线程
CurrentThread:VirtualThread[#24]/runnable@ForkJoinPool-1-worker-4 ==> 我是第2个线程
CurrentThread:VirtualThread[#31]/runnable@ForkJoinPool-1-worker-5 ==> 我是第5个线程
CurrentThread:VirtualThread[#26]/runnable@ForkJoinPool-1-worker-2 ==> 我是第3个线程
并发执行返回的结果:CurrentThread:VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1 ==> 我是第1个线程
*/
StructuredTaskScope.ShutdownOnFailure
一个
StructuredTaskScope
,捕获第一个子任务异常完成的异常。捕获后,它会调用 shutdown 方法来中断未完成的线程并唤醒所有者。此类实现的策略适用于需要所有子任务的结果(“全部调用”)的情况;如果任何子任务失败,则不再需要其他未完成子任务的结果。除非另有说明,否则将
null
参数传递给此类中的方法将导致抛出NullPointerException
。
总结:执行多个任务,只要有一个失败(出现异常或其他主动抛出异常情况),就停止其他未执行完的任务,使用scope.throwIfFailed捕捉并抛出异常。 如果所有任务均正常,则使用 StructuredTaskScope.Subtask.get() 获取结果。
public class StructuredTaskScopeDemo {
public static void main(String[] args) {
try (var structuredTaskScope = new StructuredTaskScope.ShutdownOnFailure()) {
// 并发五个线程
StructuredTaskScope.Subtask<String> subtask1 = structuredTaskScope.fork(() -> threadTask(1));
StructuredTaskScope.Subtask<String> subtask2 = structuredTaskScope.fork(() -> threadTask(2));
StructuredTaskScope.Subtask<String> subtask3 = structuredTaskScope.fork(() -> threadTask(3));
StructuredTaskScope.Subtask<String> subtask4 = structuredTaskScope.fork(() -> threadTask(4));
StructuredTaskScope.Subtask<String> subtask5 = structuredTaskScope.fork(() -> threadTask(5));
// 加入主线程
structuredTaskScope.join();
// 如果有异常抛出异常
structuredTaskScope.throwIfFailed(Exception::new);
// 打印并发执行结果
System.out.printf("res1并发执行返回的结果:%s",subtask1.get());
System.out.printf("res2并发执行返回的结果:%s",subtask2.get());
System.out.printf("res3并发执行返回的结果:%s",subtask3.get());
System.out.printf("res4并发执行返回的结果:%s",subtask4.get());
System.out.printf("res5并发执行返回的结果:%s",subtask5.get());
} catch (Exception e) {
e.printStackTrace();
}
}
static String threadTask(int i) throws InterruptedException {
Thread.sleep(1000);
if (i == 4) {
int a = 1 / 0;
}
String result = String.format("CurrentThread:%s ==> 我是第%s个线程\n", Thread.currentThread(), i);
System.out.printf(result);
return result;
}
}
控制打印:
/**
CurrentThread:VirtualThread[#26]/runnable@ForkJoinPool-1-worker-4 ==> 我是第3个线程
CurrentThread:VirtualThread[#30]/runnable@ForkJoinPool-1-worker-1 ==> 我是第5个线程
CurrentThread:VirtualThread[#22]/runnable@ForkJoinPool-1-worker-5 ==> 我是第1个线程
CurrentThread:VirtualThread[#24]/runnable@ForkJoinPool-1-worker-2 ==> 我是第2个线程
java.lang.Exception: java.lang.ArithmeticException: / by zero
at java.base/java.util.concurrent.StructuredTaskScope$ShutdownOnFailure.throwIfFailed(StructuredTaskScope.java:1318)
at com.likegakki.boot.study.thread.StructuredTaskScopeDemo.main(StructuredTaskScopeDemo.java:61)
Caused by: java.lang.ArithmeticException: / by zero
at com.likegakki.boot.study.thread.StructuredTaskScopeDemo.threadTask(StructuredTaskScopeDemo.java:79)
at com.likegakki.boot.study.thread.StructuredTaskScopeDemo.lambda$main$3(StructuredTaskScopeDemo.java:54)
at java.base/java.util.concurrent.StructuredTaskScope$SubtaskImpl.run(StructuredTaskScope.java:889)
at java.base/java.lang.VirtualThread.run(VirtualThread.java:311)
*/
// 所以当有一个子线程出现异常时,structuredTaskScope没有返回结果,所有任务全部失败处理。
通过上面的控制打印可以看到虚拟线程其实是依赖 runnable@ForkJoinPool 调度执行的。
ForkJoinPool是一个线程池,内部维护多个平台线程,平台线程去调度虚拟线程。
一个平台线程可以调度多个虚拟线程,虚拟线程数量是否有上限?
上代码:
public class VirtualThreadDemo02 {
public static void main(String[] args) throws InterruptedException {
Pattern forkJoinPoolName = Pattern.compile("ForkJoinPool-[\\d?]");
Pattern platformThreadName = Pattern.compile("worker-[\\d?]");
// 创建一个线程池的集合
Set<String> forkJoinPoolNames = new ConcurrentHashSet<>();
// 创建一个平台线程的集合
Set<String> platformThreadNames = new ConcurrentHashSet<>();
// 生成虚拟线程集合
List<Thread> virtualThreadList = IntStream
.range(0, 100)
.mapToObj(index -> Thread
.ofVirtual()
.name(String.format("virtual-thread-%s", index))
.unstarted(() -> {
String threadName = Thread.currentThread().toString();
updateSet(threadName, platformThreadName, platformThreadNames);
updateSet(threadName, forkJoinPoolName, forkJoinPoolNames);
})).toList();
// 获取启动时间
LocalDateTime startTime = LocalDateTime.now();
virtualThreadList.forEach(Thread::start);
for (Thread thread : virtualThreadList) {
thread.join();
}
LocalDateTime endTime = LocalDateTime.now();
System.out.printf("消耗时间: %s毫秒\n", Duration.between(startTime,endTime).toMillis());
System.out.printf("线程池数量: %s\n", forkJoinPoolNames.size());
System.out.printf("平台线程数量: %s\n", platformThreadNames.size());
System.out.printf("虚拟线程数量: %s\n", virtualThreadList.size());
}
private static void updateSet(String threadName, Pattern pattern, Set<String> set) {
Matcher matcher = pattern.matcher(threadName);
if (matcher.find()) {
String group = matcher.group();
set.add(group);
}
}
}
执行结果:
线程池数量 | CPU线程数 | 平台线程数 | 虚拟线程数 | 消耗时间(毫秒) |
---|---|---|---|---|
1 | 12 | 9 | 100 | 4 |
1 | 12 | 9 | 1000 | 15 |
1 | 12 | 9 | 10000 | 72 |
1 | 12 | 9 | 100000 | 199 |
1 | 12 | 9 | 1000000 | 1228 |
1 | 12 | 9 | 10000000 | 8191 |
三、其他
ForkJoinPool 和 ExecutorService 的区别
ExecutorService:一般普通的线程池,用于执行一些异步操作。所有任务都会提交一个任务队列中,当前有线程空闲时会去任务队列中领取一个任务处理,当所有线程都在工作时,队列中的任务就会阻塞住。
ForkJoinPool:主打一个分而治之的思想,将一个大任务切分成多个小任务,多个小任务并行执行。ForkJoinPool在设计上采用了“工作窃取算法”,空闲的线程会从其他忙碌的线程的任务队列中窃取任务来执行,最大化利用核心。
虚拟线程依赖 ForkJoinPool 调度实现,但是stream流中的ForkJoinPool与虚拟线程依赖的的ForkJoinPool不是同一类。
所以一个虚拟线程的生命周期不一定固定在一个平台线程中,当虚拟线程A出现阻塞,平台线程会继续执行其他虚拟线程,当A阻塞完毕,ForkJoinPool 内部的工作窃取算法会让其他空闲平台线程调度虚拟线程A继续执行。