“关注转发,随笔感悟不间断分享”
虚拟线程,期待已久,让人兴奋,它终于出现了,目前已经合并到 Java 19 中,后面我们将能够在 JDK 19 使用它。
关于 Java 并发模型的几句话
Thread 类是 Java 访问 OS Thread API 的方式。此类中执行的大多数操作都会进行系统调用。在生产中我们很少直接使用线程,大多使用 Java 并发包和线程池、锁以及其他,Java 具有出色的多线程内置工具类。
并发和并行
并发性和并行性是两个经常混为一谈并且大部分人困惑的概念。
并行意味着同时执行两个或多个任务,只有在 CPU 支持的情况下才有可能,需要多个内核来实现并行性。然而,现代 CPU 几乎都是多核的,而单核 CPU 已经过时,不再被广泛使用。这是因为现代应用程序需要充分利用多个内核,去高效的完成一些程序执行。
并发是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机上运行。例如,JavaScript 是一种单线程语言,一个线程管理代码产生的所有任务。JS 使用 async/await 来做到这一点,我们稍后会讨论其他实现并发的方法。
从操作系统的角度来看,CPU 必须处理多个进程的线程。线程数总是高于内核数, CPU 必须执行上下文切换。简要说明每个线程都有一个优先级,可以是空闲、工作或等待 CPU 周期。CPU 必须遍历所有非空闲的线程,并根据优先级分配其有限的资源。此外,它必须确保具有相同优先级的所有线程获得相同数量的 CPU 时间,否则某些应用程序可能会暂停。每次将内核分配给不同的线程时,都必须暂停当前运行的线程并在寄存器保留运行状态。 除此之外,它还必须跟踪是否有一些空闲线程没有被唤醒可以看到,整个过程相当复杂和高成本,我们作为开发人员应该合理使用适量的使用线程。
在理想情况下,线程数应该接近 CPU 核心数,这样我们就可以最大限度地减少 CPU 上下文切换。
现阶段 Java 服务器并发问题
平台线程:JVM平台下的线程概念
大多数公司的服务端(负载最大的服务器)都是用 Java 编写的,使用 Java 来解决负载问题,从它仍然是最流行的服务器语言来看,这是事实?,但Java也不是完美的。
我们处理请求的通常方式是专门为它们分配一个平台线程,这就是“每个请求的线程模型”。客户端请求获取数据或进行处理时,该线程被占用并且不能被其他任何人使用。服务器启动并分配预定义数量的线程(例如 200 用于 Tomcat)。它们被放置在线程池中并等待请求。它们的初始状态称为“Parked”,在这种状态下它们不会占用 CPU 资源。
这很容易编写、理解和调试,但是如果客户端请求执行阻塞调用的东西怎么办?阻塞调用是等待第三方调用完成的操作,例如 SQL 查询、对不同服务的请求或对操作系统的简单 IO 操作。当阻塞调用发生时,线程必须等待。当它在等待时,线程不可用,CPU 必须管理它,因为它不是空闲的。这增加上下文切换的成本。可能会有朋友问,“为什么不生成 10k 个线程,同时处理 10k 个请求”,这个操作本身操作系统不会阻止,甚至可以通过适当的配置生成 100 万个线程,但是有基准测试显示 80% 的 CPU 利用率用于在启动大量线程后进行上下文切换的管理,与此同时,操作系统还需要 CPU 来运行和管理其他进程。
为了解决可扩展性问题,常规做法是扩展服务器的多个节点。这种方式可行的,但会产生更多的成本。
并发模型
谈谈其他语言采用的几个并发模型
Callbacks
回调是一个简单而强大的概念。它们是作为参数传递给其他函数或过程的对象。父函数将回调传递给子函数,子函数然后可以使用回调通知父函数某些事件,例如“我已经完成了我的任务”。这是一种在单线程上实现并发的方法。回调形成一个堆栈跟踪,可以使调试更容易。当嵌套是一层或两层时它们很好,但当您需要构建更复杂的回调链时很快就会失控。
Async/Await and Promisses
Promise 代表最终的计算(或失败)。一个函数可以返回一个 promise,例如一个 http 请求的结果,然后调用函数可以将它们的逻辑链接到它。这就是在大多数流行语言中实现并发的方式。Java 也有 Promise,但叫法不同,被称为Futures,但是只有CompletableFuture具有 Promise 的完整特性支持。Java 中的大多数操作都是阻塞的,Futures 无论如何都会占用线程。
Async/await 是 Promises 的语法糖。它简化链接、订阅和管理 Promise 的操作。通常你可以将一个函数标记为async并且它的结果在内部被封装在 Promise 中。
在函数上使用 async 本质上使它成为非阻塞的,但是阻塞的函数(没有 async 前缀)不能调用它们,除非它们使用 await。async/await改进了生成器的缺点,提供了在不阻塞主线程的情况下使用同步代码实现异步访问资源的能力。其实 async/await 技术背后的秘密就是 Promise 和生成器应用,往底层说,就是微任务和协程应用。
Coroutines (Continuation + routine)
协程有几个关键属性:
- 它们可以随时暂停和恢复
- 它们是一种可以记住其状态和堆栈跟踪的数据结构
- 他们可以让出/将控制权交给其他协程(子程序)
- 它们必须具有 isDone()、yield() 和 run() 函数
这是 JS 中的协程示例
JavaScript 有一个 yield 机制。有了它,您可以创建所谓的生成器。
function *getNumbersGen() {
let temp = 5;
console.log("1");
yield 1;
console.log("2");
yield 2;
console.log("3");
yield 3;
console.log("Temp " + temp);
}
这是我们的简单生成器。'*' 将函数标记为生成器,然后该函数可以使用 yield。该yield关键字用于暂停和恢复生成器功能(与挂起/运行相同)。
现在,如果我们执行以下代码,它会从生成器中获取数字,直到它停止生成它们。
for (let n of getNumbersGen()) {
console.log("Num " + n);
}
我们得到
"1"
"Num 1"
"2"
"Num 2"
"3"
"Num 3"
"Temp 5"
虚拟线程
Thread 类保持不变并使用相同的 API,这样可以无缝迁移。
为了切换到虚拟线程,不需要学习新东西,只需要简化一些东西。
- 永远不要合并虚拟线程,它们很廉价而且没有意义
- 停止使用线程局部变量。如果你产生数百万个线程,你就会遇到内存问题。根据 Ron Pressler 的说法:“线程局部变量不应该暴露给最终用户,并且应该作为内部实现细节保留”。
虚拟线程相对于平台线程的优势分析
- 上下文切换实际上变得无成本。它们由 JVM 管理,这意味着 JVM 将在线程之间执行上下文切换。
- 尾递归调用优化。他们在 JEP 中提到尾递归调用优化是在线程上完成的。这样可以为堆栈节省大量内存。
- 廉价的启动/停止。当我们停止操作系统线程时,我们必须进行系统调用来终止线程,然后释放它占用的内存。当启动操作系统线程时,我们再次进行系统调用。启动和杀死一个虚拟线程只是删除对象然后让 GC 删除它的问题。
- 突破硬件限制。如前所述,操作系统可以处理多线程,但仍然是有限制的,无法同时启动大量的平台线程,然后当前可以生成数千万个虚拟线程。
- 可调整大小的堆栈。虚拟线程存在于 RAM 中。他们的堆栈和元数据也在那里。平台线程必须分配一个固定的堆栈大小(Java 为 1MB),并且该堆栈不能调整大小,这意味着如果超过它就会得到stackoverflow。此外,建议虚拟线程所需的最小内存约为 200-300 字节。
使用虚拟线程
如下:
for (int i = 0; i < 1_000_000; i++) {
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}}).start();
}
这里我们尝试创建 100 万个常规线程,线程所做的只是休眠 1 秒然后死掉。很明显,这段代码会出现 OutOfMemory 错误。
现在创建一个新的虚拟线程,我们使用Thread.startVirtualThread(runnable)。
for (int i = 0; i < 1_000_000; i++) {
Thread.startVirtualThread(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}});
}
这段代码运行很好,甚至可以在我的电脑上生成超过 2000 万个这样的线程。一切源于用户模式线程只不过是 JVM 管理的内存中的一个对象。
深入研究一下,通常当我们使用线程时,我们将它们与线程池一起使用。下面定义一段需要执行的阻塞代码
static void someWork(String taskName) {
try {
System.out.println(Thread.currentThread() + " executing " + taskName);
new URL("https://site.test/200?sleep=2000").getContent();
System.out.println(Thread.currentThread() + " completed " + taskName);
} catch (Exception e) {
e.printStackTrace();
}
}
打印运行代码的平台线程,然后进行 2 秒的 http 调用,然后再次打印平台线程。
try (ExecutorService executor = Executors.newFixedThreadPool(5)) {
for (int i = 1; i <= 10; i++) {
String taskName = "Task" + i;
executor.execute(() -> someWork(taskName));
}
}
我们用 5 个线程创建固定大小的线程池,然后提交 10 个任务,这些任务将只执行前面描述的someWork方法。输出如下:
Thread[#25,pool-1-thread-5,5,main] executing Task5
Thread[#24,pool-1-thread-4,5,main] executing Task4
Thread[#21,pool-1-thread-1,5,main] executing Task1
Thread[#22,pool-1-thread-2,5,main] executing Task2
Thread[#23,pool-1-thread-3,5,main] executing Task3
Thread[#21,pool-1-thread-1,5,main] completed Task1
Thread[#22,pool-1-thread-2,5,main] completed Task2
Thread[#21,pool-1-thread-1,5,main] executing Task6
Thread[#22,pool-1-thread-2,5,main] executing Task7
Thread[#25,pool-1-thread-5,5,main] completed Task5
Thread[#23,pool-1-thread-3,5,main] completed Task3
Thread[#24,pool-1-thread-4,5,main] completed Task4
Thread[#25,pool-1-thread-5,5,main] executing Task8
Thread[#23,pool-1-thread-3,5,main] executing Task9
Thread[#24,pool-1-thread-4,5,main] executing Task10
Thread[#22,pool-1-thread-2,5,main] completed Task7
Thread[#21,pool-1-thread-1,5,main] completed Task6
Thread[#25,pool-1-thread-5,5,main] completed Task8
Thread[#24,pool-1-thread-4,5,main] completed Task10
Thread[#23,pool-1-thread-3,5,main] completed Task9
注意每个任务是如何由同一个线程执行的。(例如Task4由Thread[#24,pool-1-thread-4,5,main]执行完成),当进行阻塞调用时,线程正在等待并在 2 秒后恢复。
现在让我们将其转换为用户模式线程。代码是一样的,我们只需要使用Executors.newVirtualThreadPerTaskExecutor()每次提交任务时都会创建一个新的虚拟线程。
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 1; i <= 10; i++) {
String taskName = "Task" + i;
executor.execute(() -> someWork(taskName));
}
}
这次我们得到以下输出
VirtualThread[#25]/runnable@ForkJoinPool-1-worker-4 executing Task4
VirtualThread[#26]/runnable@ForkJoinPool-1-worker-5 executing Task5
VirtualThread[#28]/runnable@ForkJoinPool-1-worker-7 executing Task7
VirtualThread[#23]/runnable@ForkJoinPool-1-worker-2 executing Task2
VirtualThread[#21]/runnable@ForkJoinPool-1-worker-1 executing Task1
VirtualThread[#24]/runnable@ForkJoinPool-1-worker-3 executing Task3
VirtualThread[#27]/runnable@ForkJoinPool-1-worker-6 executing Task6
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-8 executing Task8
VirtualThread[#32]/runnable@ForkJoinPool-1-worker-3 executing Task10
VirtualThread[#31]/runnable@ForkJoinPool-1-worker-8 executing Task9
VirtualThread[#26]/runnable@ForkJoinPool-1-worker-1 completed Task5
VirtualThread[#21]/runnable@ForkJoinPool-1-worker-1 completed Task1
VirtualThread[#25]/runnable@ForkJoinPool-1-worker-9 completed Task4
VirtualThread[#24]/runnable@ForkJoinPool-1-worker-2 completed Task3
VirtualThread[#27]/runnable@ForkJoinPool-1-worker-2 completed Task6
VirtualThread[#32]/runnable@ForkJoinPool-1-worker-9 completed Task10
VirtualThread[#28]/runnable@ForkJoinPool-1-worker-1 completed Task7
VirtualThread[#23]/runnable@ForkJoinPool-1-worker-7 completed Task2
VirtualThread[#31]/runnable@ForkJoinPool-1-worker-6 completed Task9
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1 completed Task8
注意现在任务是如何由两个线程执行的,第一个在阻塞调用之前执行代码,第二个在之后执行。例如Task5首先由ForkJoinPool-1-worker-5执行,然后由ForkJoinPool-1-worker-1 执行。这表明我们没有阻塞执行线程。另请注意,这里使用的是fork join pool。这个池的大小是核心数,由 JVM 管理。
这与之前给出的 JavaScript 示例非常相似。线程互相让出控制,状态被保留然后恢复,一个真正的协同程序,最好是它与常规阻塞代码相同。
服务器发送事件
接下来介绍用户模式线程的一个很酷的例子。本质上,我们打开一个 http 连接并且从不关闭它,然后服务器就能够不断地向客户端推送数据。它非常轻量级且比 WebSockets 廉价。问题是,为了实现它,我们需要一个线程不断运行并向流发送事件,如果线程死亡,我们会断开连接。您已经可以看出在这里使用平台线程不是最佳实践。我们可以在 spring boot 中使用虚拟线程做到这一点
@GetMapping("/sse")
public SseEmitter streamSSE() {
SseEmitter emitter = new SseEmitter();
ExecutorService sseMvcExecutor = Executors.newVirtualThreadPerTaskExecutor();
sseMvcExecutor.execute(() -> {
try {
while (true) {
SseEmitter.SseEventBuilder event = SseEmitter.event()
.data("SSE time -> " + LocalTime.now().toString())
.id(UUID.randomUUID().toString())
.name("Custom SSE");
emitter.send(event);
Thread.sleep(500);
}
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}
SseEmitter 是 Spring MVC 的一个特殊类,它实现了 SSE 协议。这里所做的是创建一个无限循环并每 500 毫秒向客户端发送新数据。然后任意数量的客户都可以订阅它
收到的消息事件如下
data:SSE time -> 13:41:59.878294
id:f2059d56-d27e-461b-8f7c-1aa75b3aab64
event:Custom SSE
data:SSE time -> 13:42:00.383703
id:fdaac3bb-47aa-45b0-9382-549ff7549f08
event:Custom SSE
何时不使用虚拟线程
管理虚拟线程会降低性能。对于负载较小的应用程序,平台线程优于虚拟线程。此外,如果你的应用程序是 CPU 密集型的,例如执行大量数学计算,使用虚拟线程是没有意义的,因为无论如何在计算时它们都必须占用 OS 线程。一般来说,虚拟线程不会让你的应用程序更快,它只会提高吞吐量,如果吞吐量不是问题,那么坚持使用平台线程。