一、虚拟线程

1、Thread.ofVirtual

// 创建一个名叫“我是虚拟线程” 但未启动的虚拟线程
Thread virtualThread = Thread.ofVirtual()
                .name("我是虚拟线程")
                .unstarted(() -> {
                    System.out.println(Thread.currentThread().getName());
                });
// 启动虚拟线程
virtualThread.start();

// 或者使用factory
ThreadFactory threadFactory = Thread.ofVirtual().factory();
        Thread virtualThread3 = threadFactory.newThread(() -> {
            System.out.println(Thread.currentThread().getName());
        });
virtualThread3.setName("我是虚拟线程3");
virtualThread3.start();

2、Thread.startVirtualThread

Thread virtualThread2 = Thread.startVirtualThread(() -> {
                    System.out.println(Thread.currentThread().getName());
                } );

3、Executors.newVirtualThreadPerTaskExecutor()

ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
Future<?> submit = executorService.submit(() -> {
    System.out.println(Thread.currentThread().getName());
});

二、结构化并发编程

Java 19新特性:Structured Concurrency (结构化并发编程)

StructuredTaskScope

在JDK21的官方文档中是这样介绍的:

structured concurrency 的基本 API。 StructuredTaskScope 支持任务拆分为多个并发子任务,在各自的线程中执行,以及子任务必须在主任务继续之前完成的情况。 StructuredTaskScope 可用于确保并发操作的生命周期受 syntax block 限制,就像结构化编程中的顺序操作一样。

内部有两个重要的子类:StructuredTaskScope.ShutdownOnFailure , StructuredTaskScope.ShutdownOnSuccess

StructuredTaskScope.ShutdownOnSuccess

一个 StructuredTaskScope 捕获第一个子任务成功完成的结果。捕获后,它会调用 shutdown 方法来中断未完成的线程并唤醒所有者。此类实现的策略适用于任何子任务的结果都可以(“调用任何”)以及不再需要其他未完成的子任务的结果的情况。

除非另有说明,否则将 null 参数传递给此类中的方法将导致抛出 NullPointerException

总结:并行执行任务,当有一个线程成功了,就会中断其他还未完成的的线程,并返回成功的结果。

适用场景:并发查询同一个数据,比如获取距离最近的服务器IP。

public class StructuredTaskScopeDemo {

    public static void main(String[] args) {

        try (var structuredTaskScope = new StructuredTaskScope.ShutdownOnSuccess<>()) {

            // 并发五个线程
            structuredTaskScope.fork(() -> threadTask(1));
            structuredTaskScope.fork(() -> threadTask(2));
            structuredTaskScope.fork(() -> threadTask(3));
            structuredTaskScope.fork(() -> threadTask(4));
            structuredTaskScope.fork(() -> threadTask(5));

            // 加入主线程
            structuredTaskScope.join();

            // 获取并发执行结果,如果有异常抛出异常
            Object result = structuredTaskScope.result(Exception::new);

            // 打印并发执行结果
            System.out.printf("并发执行返回的结果:%s", result.toString());

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    static String threadTask(int i) throws InterruptedException {
        Thread.sleep(1000);
        String result = String.format("CurrentThread:%s ==> 我是第%s个线程\n", Thread.currentThread(), i);
        System.out.printf(result);
        return result;
    }
}

执行结果:可以看到ShutdownOnSuccess返回就是第一个执行成功的线程的返回。

// 第一次执行
/*
CurrentThread:VirtualThread[#28]/runnable@ForkJoinPool-1-worker-4 ==> 我是第4个线程
CurrentThread:VirtualThread[#30]/runnable@ForkJoinPool-1-worker-1 ==> 我是第5个线程
CurrentThread:VirtualThread[#26]/runnable@ForkJoinPool-1-worker-3 ==> 我是第3个线程
CurrentThread:VirtualThread[#24]/runnable@ForkJoinPool-1-worker-2 ==> 我是第2个线程
CurrentThread:VirtualThread[#22]/runnable@ForkJoinPool-1-worker-5 ==> 我是第1个线程
并发执行返回的结果:CurrentThread:VirtualThread[#28]/runnable@ForkJoinPool-1-worker-4 ==> 我是第4个线程
*/
    
// 第二次执行
/*
CurrentThread:VirtualThread[#22]/runnable@ForkJoinPool-1-worker-5 ==> 我是第1个线程
CurrentThread:VirtualThread[#30]/runnable@ForkJoinPool-1-worker-2 ==> 我是第5个线程
CurrentThread:VirtualThread[#24]/runnable@ForkJoinPool-1-worker-4 ==> 我是第2个线程
CurrentThread:VirtualThread[#26]/runnable@ForkJoinPool-1-worker-3 ==> 我是第3个线程
CurrentThread:VirtualThread[#28]/runnable@ForkJoinPool-1-worker-1 ==> 我是第4个线程
并发执行返回的结果:CurrentThread:VirtualThread[#22]/runnable@ForkJoinPool-1-worker-5 ==> 我是第1个线程
*/

// 第三次执行
/*
CurrentThread:VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1 ==> 我是第1个线程
CurrentThread:VirtualThread[#28]/runnable@ForkJoinPool-1-worker-3 ==> 我是第4个线程
CurrentThread:VirtualThread[#24]/runnable@ForkJoinPool-1-worker-4 ==> 我是第2个线程
CurrentThread:VirtualThread[#31]/runnable@ForkJoinPool-1-worker-5 ==> 我是第5个线程
CurrentThread:VirtualThread[#26]/runnable@ForkJoinPool-1-worker-2 ==> 我是第3个线程
并发执行返回的结果:CurrentThread:VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1 ==> 我是第1个线程
*/

StructuredTaskScope.ShutdownOnFailure

一个StructuredTaskScope,捕获第一个子任务异常完成的异常。捕获后,它会调用 shutdown 方法来中断未完成的线程并唤醒所有者。此类实现的策略适用于需要所有子任务的结果(“全部调用”)的情况;如果任何子任务失败,则不再需要其他未完成子任务的结果。

除非另有说明,否则将 null 参数传递给此类中的方法将导致抛出 NullPointerException

总结:执行多个任务,只要有一个失败(出现异常或其他主动抛出异常情况),就停止其他未执行完的任务,使用scope.throwIfFailed捕捉并抛出异常。 如果所有任务均正常,则使用 StructuredTaskScope.Subtask.get() 获取结果。

public class StructuredTaskScopeDemo {

    public static void main(String[] args) {

        try (var structuredTaskScope = new StructuredTaskScope.ShutdownOnFailure()) {

            // 并发五个线程
            StructuredTaskScope.Subtask<String> subtask1 = structuredTaskScope.fork(() -> threadTask(1));
            StructuredTaskScope.Subtask<String> subtask2 = structuredTaskScope.fork(() -> threadTask(2));
            StructuredTaskScope.Subtask<String> subtask3 = structuredTaskScope.fork(() -> threadTask(3));
            StructuredTaskScope.Subtask<String> subtask4 = structuredTaskScope.fork(() -> threadTask(4));
            StructuredTaskScope.Subtask<String> subtask5 = structuredTaskScope.fork(() -> threadTask(5));

            // 加入主线程
            structuredTaskScope.join();

            // 如果有异常抛出异常
            structuredTaskScope.throwIfFailed(Exception::new);

            // 打印并发执行结果
            System.out.printf("res1并发执行返回的结果:%s",subtask1.get());
            System.out.printf("res2并发执行返回的结果:%s",subtask2.get());
            System.out.printf("res3并发执行返回的结果:%s",subtask3.get());
            System.out.printf("res4并发执行返回的结果:%s",subtask4.get());
            System.out.printf("res5并发执行返回的结果:%s",subtask5.get());


        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    static String threadTask(int i) throws InterruptedException {
        Thread.sleep(1000);
        if (i == 4) {
            int a = 1 / 0;
        }
        String result = String.format("CurrentThread:%s ==> 我是第%s个线程\n", Thread.currentThread(), i);
        System.out.printf(result);
        return result;
    }
}

控制打印:

/**
CurrentThread:VirtualThread[#26]/runnable@ForkJoinPool-1-worker-4 ==> 我是第3个线程
CurrentThread:VirtualThread[#30]/runnable@ForkJoinPool-1-worker-1 ==> 我是第5个线程
CurrentThread:VirtualThread[#22]/runnable@ForkJoinPool-1-worker-5 ==> 我是第1个线程
CurrentThread:VirtualThread[#24]/runnable@ForkJoinPool-1-worker-2 ==> 我是第2个线程
java.lang.Exception: java.lang.ArithmeticException: / by zero
	at java.base/java.util.concurrent.StructuredTaskScope$ShutdownOnFailure.throwIfFailed(StructuredTaskScope.java:1318)
	at com.likegakki.boot.study.thread.StructuredTaskScopeDemo.main(StructuredTaskScopeDemo.java:61)
Caused by: java.lang.ArithmeticException: / by zero
	at com.likegakki.boot.study.thread.StructuredTaskScopeDemo.threadTask(StructuredTaskScopeDemo.java:79)
	at com.likegakki.boot.study.thread.StructuredTaskScopeDemo.lambda$main$3(StructuredTaskScopeDemo.java:54)
	at java.base/java.util.concurrent.StructuredTaskScope$SubtaskImpl.run(StructuredTaskScope.java:889)
	at java.base/java.lang.VirtualThread.run(VirtualThread.java:311)
*/
// 所以当有一个子线程出现异常时,structuredTaskScope没有返回结果,所有任务全部失败处理。

通过上面的控制打印可以看到虚拟线程其实是依赖 runnable@ForkJoinPool 调度执行的。

ForkJoinPool是一个线程池,内部维护多个平台线程,平台线程去调度虚拟线程。

一个平台线程可以调度多个虚拟线程,虚拟线程数量是否有上限?

上代码:

public class VirtualThreadDemo02 {

    public static void main(String[] args) throws InterruptedException {

        Pattern forkJoinPoolName = Pattern.compile("ForkJoinPool-[\\d?]");
        Pattern platformThreadName = Pattern.compile("worker-[\\d?]");

        // 创建一个线程池的集合
        Set<String> forkJoinPoolNames = new ConcurrentHashSet<>();
        // 创建一个平台线程的集合
        Set<String> platformThreadNames = new ConcurrentHashSet<>();

        // 生成虚拟线程集合
        List<Thread> virtualThreadList = IntStream
                .range(0, 100)
                .mapToObj(index -> Thread
                        .ofVirtual()
                        .name(String.format("virtual-thread-%s", index))
                        .unstarted(() -> {
                            String threadName = Thread.currentThread().toString();
                            updateSet(threadName, platformThreadName, platformThreadNames);
                            updateSet(threadName, forkJoinPoolName, forkJoinPoolNames);
                        })).toList();

        // 获取启动时间
        LocalDateTime startTime = LocalDateTime.now();
        virtualThreadList.forEach(Thread::start);
        for (Thread thread : virtualThreadList) {
            thread.join();
        }
        LocalDateTime endTime = LocalDateTime.now();
        System.out.printf("消耗时间: %s毫秒\n",  Duration.between(startTime,endTime).toMillis());
        System.out.printf("线程池数量: %s\n", forkJoinPoolNames.size());
        System.out.printf("平台线程数量: %s\n", platformThreadNames.size());
        System.out.printf("虚拟线程数量: %s\n", virtualThreadList.size());
    }

    private static void updateSet(String threadName, Pattern pattern, Set<String> set) {
        Matcher matcher = pattern.matcher(threadName);
        if (matcher.find()) {
            String group = matcher.group();
            set.add(group);
        }
    }
}

执行结果:

线程池数量 CPU线程数 平台线程数 虚拟线程数 消耗时间(毫秒)
1 12 9 100 4
1 12 9 1000 15
1 12 9 10000 72
1 12 9 100000 199
1 12 9 1000000 1228
1 12 9 10000000 8191

三、其他

ForkJoinPoolExecutorService 的区别

ExecutorService:一般普通的线程池,用于执行一些异步操作。所有任务都会提交一个任务队列中,当前有线程空闲时会去任务队列中领取一个任务处理,当所有线程都在工作时,队列中的任务就会阻塞住。

ForkJoinPool:主打一个分而治之的思想,将一个大任务切分成多个小任务,多个小任务并行执行。ForkJoinPool在设计上采用了“工作窃取算法”,空闲的线程会从其他忙碌的线程的任务队列中窃取任务来执行,最大化利用核心。

虚拟线程依赖 ForkJoinPool 调度实现,但是stream流中的ForkJoinPool与虚拟线程依赖的的ForkJoinPool不是同一类。

所以一个虚拟线程的生命周期不一定固定在一个平台线程中,当虚拟线程A出现阻塞,平台线程会继续执行其他虚拟线程,当A阻塞完毕,ForkJoinPool 内部的工作窃取算法会让其他空闲平台线程调度虚拟线程A继续执行。