虚拟线程是一种比传统 Java 线程(OS线程)更轻量的线程实现。与传统线程由操作系统管理不同,虚拟线程由 JVM 管理,多个虚拟线程可以在少量的操作系统线程(平台线程)上运行。虚拟线程的创建、切换和销毁的开销远小于传统线程,同时虚拟线程可以和普通线程一样的使用。
1、虚拟线程的创建 1.1 构建 Thread.Builder 1 Thread.Builder builder = Thread.ofVirtual().name("worker-" , 0 );
name
有2个不同参数列表的实现
name(String *name*)
指定虚拟线程的名称,后期通过builder
启动的虚拟线程名称都是相同的
name(String *prefix*, long *start*)
指定虚拟线程的prefix
和开始线程的索引
1.2 使用Builder启动虚拟线程 1 Thread thread = builder.start();
由于虚拟线程也是通过Thread
创建产生,当Thread
实例创建以后,后续的使用和普通线程是相同的
1.3 未捕获异常处理 1 2 3 builder.uncaughtExceptionHandler((t, e) -> { System.out.println(t.getName() + "抛出了异常" + e.getMessage()); });
2、虚拟线程栈查看 1 2 jcmd <PID> Thread.dump_to_file -format=text <file> jcmd <PID> Thread.dump_to_file -format=json <file>
输出示例
1 2 3 4 5 6 7 8 9 10 11 12 { "tid" : "20" , "name" : "vt-worker-0" , "stack" : [ "java.base\/java.lang.VirtualThread.parkNanos(VirtualThread.java:635)" , "java.base\/java.lang.VirtualThread.sleepNanos(VirtualThread.java:807)" , "java.base\/java.lang.Thread.sleep(Thread.java:556)" , "java.base\/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446)" , "cn.probiecoder.concurrency.VirtualThread.lambda$0(VirtualThread.java:15)" , "java.base\/java.lang.VirtualThread.run(VirtualThread.java:329)" ] }
使用jcmd <PID> Thread.print
直接输出线程栈信息时无详细的虚拟线程信息
3、线程数的设置 可以通过如下 System.property
设置ForkJoinPool
的参数
1 2 3 jdk.virtualThreadScheduler.parallelism jdk.virtualThreadScheduler.maxPoolSize jdk.virtualThreadScheduler.minRunnable
4、底层实现 在执行start()
方法时会创建正式的虚拟线程,并指定虚拟线程名称,所依托的普通线程调度器scheduler
ThreadBuilder.VirtualThreadBuilder
1 2 3 4 5 6 7 8 public Thread unstarted (Runnable task) { Objects.requireNonNull(task); var thread = newVirtualThread(scheduler, nextThreadName(), characteristics(), task); UncaughtExceptionHandler uhe = uncaughtExceptionHandler(); if (uhe != null ) thread.uncaughtExceptionHandler(uhe); return thread; }
VirtualThread
继承sealed
类BaseVirtualThread
,继承Thread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { super (name, characteristics, false ); Objects.requireNonNull(task); if (scheduler == null ) { Thread parent = Thread.currentThread(); if (parent instanceof VirtualThread vparent) { scheduler = vparent.scheduler; } else { scheduler = DEFAULT_SCHEDULER; } } this .scheduler = scheduler; this .cont = new VThreadContinuation (this , task); this .runContinuation = this ::runContinuation; }
默认的调度器为ForkJoinPool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();private static ForkJoinPool createDefaultScheduler () { ForkJoinWorkerThreadFactory factory = pool -> { PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread (pool); return AccessController.doPrivileged(pa); }; PrivilegedAction<ForkJoinPool> pa = () -> { int parallelism, maxPoolSize, minRunnable; String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism" ); String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize" ); String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable" ); if (parallelismValue != null ) { parallelism = Integer.parseInt(parallelismValue); } else { parallelism = Runtime.getRuntime().availableProcessors(); } if (maxPoolSizeValue != null ) { maxPoolSize = Integer.parseInt(maxPoolSizeValue); parallelism = Integer.min(parallelism, maxPoolSize); } else { maxPoolSize = Integer.max(parallelism, 256 ); } if (minRunnableValue != null ) { minRunnable = Integer.parseInt(minRunnableValue); } else { minRunnable = Integer.max(parallelism / 2 , 1 ); } Thread.UncaughtExceptionHandler handler = (t, e) -> { }; boolean asyncMode = true ; return new ForkJoinPool (parallelism, factory, handler, asyncMode, 0 , maxPoolSize, minRunnable, pool -> true , 30 , SECONDS); }; return AccessController.doPrivileged(pa); }
将虚拟线程绑定到PlatformThread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @ChangesCurrentThread @ReservedStackAccess private void mount () { notifyJvmtiMount(true ); Thread carrier = Thread.currentCarrierThread(); setCarrierThread(carrier); if (interrupted) { carrier.setInterrupt(); } else if (carrier.isInterrupted()) { synchronized (interruptLock) { if (!interrupted) { carrier.clearInterrupt(); } } } carrier.setCurrentThread(this ); }