虚拟线程Virtual threads

虚拟线程是一种比传统 Java 线程(OS线程)更轻量的线程实现。与传统线程由操作系统管理不同,虚拟线程由 JVM 管理,多个虚拟线程可以在少量的操作系统线程(平台线程)上运行。虚拟线程的创建、切换和销毁的开销远小于传统线程,同时虚拟线程可以和普通线程一样的使用。

1、虚拟线程的创建

1.1 构建 Thread.Builder
1
Thread.Builder builder = Thread.ofVirtual().name("worker-", 0);    

name有2个不同参数列表的实现

  • name(String *name*)指定虚拟线程的名称,后期通过builder启动的虚拟线程名称都是相同的
  • name(String *prefix*, long *start*)指定虚拟线程的prefix和开始线程的索引
1.2 使用Builder启动虚拟线程
1
Thread thread = builder.start();

由于虚拟线程也是通过Thread创建产生,当Thread实例创建以后,后续的使用和普通线程是相同的

1.3 未捕获异常处理
1
2
3
builder.uncaughtExceptionHandler((t, e) -> {
System.out.println(t.getName() + "抛出了异常" + e.getMessage());
});

2、虚拟线程栈查看

1
2
jcmd <PID> Thread.dump_to_file -format=text <file>
jcmd <PID> Thread.dump_to_file -format=json <file>

输出示例

1
2
3
4
5
6
7
8
9
10
11
12
{
"tid": "20",
"name": "vt-worker-0",
"stack": [
"java.base\/java.lang.VirtualThread.parkNanos(VirtualThread.java:635)",
"java.base\/java.lang.VirtualThread.sleepNanos(VirtualThread.java:807)",
"java.base\/java.lang.Thread.sleep(Thread.java:556)",
"java.base\/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446)",
"cn.probiecoder.concurrency.VirtualThread.lambda$0(VirtualThread.java:15)",
"java.base\/java.lang.VirtualThread.run(VirtualThread.java:329)"
]
}

使用jcmd <PID> Thread.print直接输出线程栈信息时无详细的虚拟线程信息

3、线程数的设置

可以通过如下 System.property设置ForkJoinPool的参数

1
2
3
jdk.virtualThreadScheduler.parallelism
jdk.virtualThreadScheduler.maxPoolSize
jdk.virtualThreadScheduler.minRunnable

4、底层实现

在执行start()方法时会创建正式的虚拟线程,并指定虚拟线程名称,所依托的普通线程调度器scheduler

ThreadBuilder.VirtualThreadBuilder

1
2
3
4
5
6
7
8
public Thread unstarted(Runnable task) {
Objects.requireNonNull(task);
var thread = newVirtualThread(scheduler, nextThreadName(), characteristics(), task);
UncaughtExceptionHandler uhe = uncaughtExceptionHandler();
if (uhe != null)
thread.uncaughtExceptionHandler(uhe);
return thread;
}

VirtualThread继承sealedBaseVirtualThread,继承Thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
super(name, characteristics, /*bound*/ false);
Objects.requireNonNull(task);

// choose scheduler if not specified
if (scheduler == null) {
Thread parent = Thread.currentThread();
if (parent instanceof VirtualThread vparent) {
scheduler = vparent.scheduler;
} else {
scheduler = DEFAULT_SCHEDULER;
}
}

this.scheduler = scheduler;
this.cont = new VThreadContinuation(this, task);
this.runContinuation = this::runContinuation;
}

默认的调度器为ForkJoinPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();

private static ForkJoinPool createDefaultScheduler() {
ForkJoinWorkerThreadFactory factory = pool -> {
PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
return AccessController.doPrivileged(pa);
};
PrivilegedAction<ForkJoinPool> pa = () -> {
int parallelism, maxPoolSize, minRunnable;
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
if (parallelismValue != null) {
parallelism = Integer.parseInt(parallelismValue);
} else {
parallelism = Runtime.getRuntime().availableProcessors();
}
if (maxPoolSizeValue != null) {
maxPoolSize = Integer.parseInt(maxPoolSizeValue);
parallelism = Integer.min(parallelism, maxPoolSize);
} else {
maxPoolSize = Integer.max(parallelism, 256);
}
if (minRunnableValue != null) {
minRunnable = Integer.parseInt(minRunnableValue);
} else {
minRunnable = Integer.max(parallelism / 2, 1);
}
Thread.UncaughtExceptionHandler handler = (t, e) -> { };
boolean asyncMode = true; // FIFO
return new ForkJoinPool(parallelism, factory, handler, asyncMode,
0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
};
return AccessController.doPrivileged(pa);
}

将虚拟线程绑定到PlatformThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@ChangesCurrentThread
@ReservedStackAccess
private void mount() {
// notify JVMTI before mount
notifyJvmtiMount(/*hide*/true);

// sets the carrier thread
Thread carrier = Thread.currentCarrierThread();
setCarrierThread(carrier);

// sync up carrier thread interrupt status if needed
if (interrupted) {
carrier.setInterrupt();
} else if (carrier.isInterrupted()) {
synchronized (interruptLock) {
// need to recheck interrupt status
if (!interrupted) {
carrier.clearInterrupt();
}
}
}

// set Thread.currentThread() to return this virtual thread
carrier.setCurrentThread(this);
}

虚拟线程Virtual threads
https://probiecoder.cn/java/virtual_thread.html
作者
duwei
发布于
2025年4月22日
许可协议