Building a Mental Model of Node.js Streams
构建 Node.js 流的心理模型
Have you ever worked with Node.js streams? What was your experience like?
您曾经使用过 Node.js 流吗?你的经历是什么样的?
When I first tried to work with streams, I was confused, to say the least. The concept was completely new to me. I thought I could just ignore them, but it turns out they're everywhere in Node.js. Even core modules like fs
and http
use streams under the hood. So, I had to learn them and understand how they work.
至少可以说,当我第一次尝试使用流时,我很困惑。这个概念对我来说是全新的。我以为我可以忽略它们,但事实证明它们在 Node.js 中无处不在。甚至像 fs
和 http
这样的核心模块也在底层使用流。所以,我必须学习它们并了解它们是如何工作的。
What helped me was building a strong mental model that consists of multiple concepts. In this article, we'll explore these concepts and build a mental model of Node.js streams together.
对我有帮助的是建立一个由多个概念组成的强大的心理模型。在本文中,我们将探讨这些概念并共同构建 Node.js 流的思维模型。
PermalinkWhat are Node.js Streams?
什么是 Node.js 流?
The main idea behind streams is that they take pieces of data from one place and transfer them to another. There are 4 important parts that I want to highlight based on this definition:
流背后的主要思想是它们从一个地方获取数据并将其传输到另一个地方。根据这个定义,我想强调 4 个重要部分:
Streams transfer data in pieces, not as a whole
流传输数据是分段的,而不是整体的Streams transfer pieces of data in a specific size
流传输特定大小的数据块Streams aren't interested in the transferred data
流对传输的数据不感兴趣Streams simply provide a mechanism for data transfer
流只是提供一种数据传输机制
A common analogy used to describe streams is a pipe. However, this analogy often misses 2 crucial parts: the producer and the consumer. Let's use the same analogy but make it more complete.
用于描述流的常见类比是管道。然而,这个类比常常忽略了两个关键部分:生产者和消费者。让我们使用相同的类比,但使其更完整。
Imagine a huge reservoir of water, and you have a house nearby. To supply water to your house, you need to build a pipe from the reservoir to your home.
想象一个巨大的水库,并且附近有一座房子。为了给你的房子供水,你需要修建一条从水库到你家的管道。
P.S. I'm not a plumber, so don't take this drawing too seriously.
附:我不是水管工,所以不要太认真地对待这张图。
This analogy illustrates the three key parts of a stream:
这个类比说明了流的三个关键部分:
The water reservoir is a producer of water
水库是水的生产者The pipe is a stream that transfers water from the reservoir to your home
管道是将水从水库输送到您家的一条溪流Your home is a consumer of water
您的家是水的消耗者
Coming back to Node.js streams. Let's compare the pipe analogy to how they behave:
回到 Node.js 流。让我们将管道类比与它们的行为进行比较:
The pipe doesn't transfer the entire reservoir of water all at once
管道不会一次性输送整个水库的水The pipe transfers water in pieces, each of a specific size that it can handle
管道将水分块输送,每块水都有其可以处理的特定尺寸The pipe is not interested in the water itself, and it's just a way to transfer it
管道对水本身并不感兴趣,它只是传输水的一种方式The pipe is just a mechanism to transfer water from one place to another
管道只是将水从一个地方输送到另一个地方的机构
Looks pretty similar to Node.js streams, right?
看起来与 Node.js 流非常相似,对吧?
PermalinkWhen are Node.js streams used?
何时使用 Node.js 流?
Before going into the specific details of what streams are and how they work, let's first understand when they’re used.
在详细了解什么是流以及它们如何工作之前,让我们首先了解它们何时使用。
PermalinkReal-time data processing
实时数据处理
Streams work great for processing when we deal with data that is partial or generated incrementally over time. Streams are highly effective for processing data that is generated incrementally or received in parts over time.
当我们处理部分数据或随时间增量生成的数据时,流非常适合处理。流对于处理增量生成的数据或随着时间的推移部分接收的数据非常有效。
An ideal example of this is a WebSocket protocol. In short, it's a protocol that allows you to establish a two-way communication channel between the client and the server.
WebSocket 协议就是一个理想的例子。简而言之,它是一个允许您在客户端和服务器之间建立双向通信通道的协议。
We'll get into more details on this protocol in the upcoming articles. We'll take the WS library as an example. It uses streams heavily. Here is an example where the abstraction called Sender
implements a backpressure mechanism.
我们将在接下来的文章中详细介绍该协议。我们将以 WS 库为例。它大量使用流。这是一个名为 Sender
的抽象实现背压机制的示例。
We'll talk about the backpressure in the upcoming section. And it is just one example. You can explore the library further and see other use-cases.
我们将在下一节中讨论背压。这只是一个例子。您可以进一步探索该库并查看其他用例。
PermalinkNetwork interactions 网络互动
Every time you create a server using Node.js API, you're creating a duplex stream. HTTP module in Node.js uses the abstraction called Scoket
to create a connection with a network socket. This Socket
abstraction extends from the Duplex
stream.
每次使用 Node.js API 创建服务器时,您都会创建一个双工流。 Node.js 中的 HTTP 模块使用名为 Scoket
的抽象来创建与网络套接字的连接。此 Socket
抽象扩展自 Duplex
流。
ObjectSetPrototypeOf(Socket.prototype, stream.Duplex.prototype);
ObjectSetPrototypeOf(Socket, stream.Duplex);
Whenever you see a construction like the following:
每当你看到类似下面的结构时:
import { createServer } from 'http';
const server = createServer();
Know that under the hood, you're creating a duplex stream.
要知道,在幕后,您正在创建双工流。
PermalinkWorking with large datasets
处理大型数据集
Imagine that you have a file that is 100GB in size. You need to parse it and process some data. How would you do it?
假设您有一个大小为 100GB 的文件。您需要解析它并处理一些数据。你会怎么做?
If you try to read the file using API, like readFileSync
or readFile
you'll crash your program.
如果您尝试使用 API 读取文件,例如 readFileSync
或 readFile
,您的程序将会崩溃。
import { readFileSync, readFile } from 'fs';
const largeFilePath = 'path/to/large/file.txt';
// Both of these will crash your program
const data = readFileSync(largeFilePath);
const asyncData = await readFile(largeFilePath);
The problem is that you're trying to load the whole file content into memory using these read APIs. Doesn't sound efficient at all. What we can do instead is to process the file's content in chunks.
问题是您尝试使用这些读取 API 将整个文件内容加载到内存中。听起来一点效率都没有。我们可以做的是分块处理文件的内容。
import { createReadStream } from 'fs';
const largeFilePath = 'path/to/large/file.txt';
const stream = createReadStream(largeFilePath);
stream.on('data', (chunk) => {
// Process the chunk here
});
With this approach, we're not waiting for the whole file to be loaded into memory. Whenever a chunk of data is ready, we're processing it.
通过这种方法,我们不需要等待整个文件加载到内存中。每当一块数据准备好时,我们就会对其进行处理。
PermalinkData transformation 数据转换
All previous examples were about the cases where we either read data from somewhere or write data to somewhere. But we can also use streams to transform data that we already have in memory.
前面的所有示例都是关于我们从某处读取数据或将数据写入某处的情况。但我们也可以使用流来转换内存中已有的数据。
A good example of this is data compression/decompression. Here is an example taken from the zlib module in Node.js documentation.
一个很好的例子是数据压缩/解压缩。以下是取自 Node.js 文档中 zlib 模块的示例。
async function do_gzip(input, output) {
const gzip = createGzip();
// Create a read stream to read data from the input
const source = createReadStream(input);
// Create a write stream to write data to the output
const destination = createWriteStream(output);
// Pipe the source stream to the gzip stream,
// then to the destination stream
await pipe(source, gzip, destination); }
}
In this code snippet, we're creating a read stream, and whenever data comes from this read stream, we pass it down to the gzip. When the gzip stream compresses the data, we pass it down to the write stream.
在此代码片段中,我们创建一个读取流,每当数据来自该读取流时,我们都会将其传递给 gzip。当 gzip 流压缩数据时,我们将其传递到写入流。
You don't have to understand how this code works just yet. Just understand that streams can be used to transform different data.
您还不必了解这段代码是如何工作的。只需了解流可用于转换不同的数据即可。
PermalinkDon't use streams in this case
在这种情况下不要使用流
You don't want to use streams when the data you're working with is already in memory. There is just little to no benefit you can gain from using streams.
当您正在使用的数据已经在内存中时,您不想使用流。使用流几乎没有任何好处。
So please, try to avoid using streams when all pieces of data that you need are already in memory. Don't make your life harder.
因此,当您需要的所有数据都已在内存中时,请尽量避免使用流。不要让你的生活变得更加艰难。
PermalinkCore concepts on Node.js streams
Node.js 流的核心概念
You understand what streams are, when to use them, and when not to. Now, you're ready to dive deeper into some of the core concepts of streams in Node.js.
您了解流是什么、何时使用它们以及何时不使用它们。现在,您已准备好深入了解 Node.js 中流的一些核心概念。
PermalinkEvent-driven architecture
事件驱动架构
You know that streams are like pipes. But what exactly makes them work this way? It is all thanks to even-driven concepts that streams are built upon. In particular, all streams in Node.js are extended from the EventEmitter
class.
您知道流就像管道。但究竟是什么让它们以这种方式工作呢?这一切都要归功于流所构建的偶数驱动概念。特别是,Node.js 中的所有流都是从 EventEmitter
类扩展的。
The way EventEmitter
works is very simple. It has some internal state where it stores all events and listeners of these events.
EventEmitter
的工作方式非常简单。它有一些内部状态,用于存储所有事件和这些事件的侦听器。
class EventEmitter {
// Map of events and their listeners
// Each event can have multiple listeners
#events = new Map<string, (() => void)[]>();
// Register a new listener for the event
on(eventName: string, callback: () => void) {
if (!this.#events.has(eventName)) {
this.#events.set(eventName, [callback]);
}
this.#events.get(eventName).push(callback);
}
// Triggers all listeners related to the event.
emit(eventName: string) {
const listeners = this.#events.get(eventName);
if (!listeners) {
return;
}
listeners.forEach((listener) => listener());
}
}
It is a very simplified version, but it gives you an idea of how EventEmitter
works. You can read the full implementation in the Node.js source code.
这是一个非常简化的版本,但它让您了解 EventEmitter
的工作原理。您可以在 Node.js 源代码中阅读完整的实现。
When you work with streams, you can add a listener to some predefined set of events.
使用流时,您可以向某些预定义的事件集添加侦听器。
stream.on('data', () => {});
In this example, we add a listener to the data
event. Whenever a chunk of data is ready, the stream calls the emit
with the data
event name, and all listeners are called.
在此示例中,我们向 data
事件添加侦听器。每当一块数据准备就绪时,流就会使用 data
事件名称调用 emit
,并调用所有侦听器。
It's the exact mechanism that makes streams work like pipes, where we get data from one end and pass it through to the other end.
这是使流像管道一样工作的确切机制,我们从一端获取数据并将其传递到另一端。
PermalinkBackpressure 背压
Streams can be used to process large datasets efficiently. But there is a catch: what if the rate of data production is so high that at some point in time, we have more data in our program than allocated memory can handle? Right, the program will crash.
流可用于有效地处理大型数据集。但有一个问题:如果数据生成率如此之高,以至于在某个时间点,程序中的数据超出了分配的内存可以处理的数据量,该怎么办?对了,程序会崩溃。
This means that just the abstraction of a stream is not enough to prevent such cases from happening. Streams have a backpressure mechanism in place for such cases.
这意味着仅仅对流进行抽象并不足以防止此类情况的发生。对于这种情况,流有一个反压机制。
Backpressure might sound like a fancy term, but in reality, it is quite simple. The main idea of backpressure is that we have some limit on how much data we can process at a time.
背压可能听起来像是一个花哨的术语,但实际上,它非常简单。背压的主要思想是我们对一次可以处理的数据量有一些限制。
Let's get back to the example with reading a large file. There are 2 parts of this process that we're interested in: the producer of data and the consumer of data. The producer of data is the underlying OS mechanism that reads the file and produces the data.
让我们回到读取大文件的示例。我们对这个过程的两个部分感兴趣:数据的生产者和数据的消费者。数据的生产者是读取文件并生成数据的底层操作系统机制。
If the producer tries to push too much data, a stream can signal to the producer that it needs to slow down because it can't take any more data at the moment. But how does the stream know when it's full?
如果生产者尝试推送太多数据,流可以向生产者发出信号,表明它需要放慢速度,因为它目前无法获取更多数据。但是流如何知道它何时已满呢?
Each stream has an internal buffer, and whenever new data comes in and the old one comes out, the "buffering" mechanism comes into play.
每个流都有一个内部缓冲区,每当新数据进来和旧数据出来时,“缓冲”机制就会发挥作用。
PermalinkBuffering 缓冲
Each stream has an internal buffer. If we work with API that enables backpressure mechanism, then this buffer is used to store data that comes into the stream.
每个流都有一个内部缓冲区。如果我们使用启用反压机制的 API,那么该缓冲区将用于存储进入流的数据。
If data comes into the stream but doesn't come out of the stream, the buffer steadily gets filled until it reaches the cap. The cap, in this case, is highWaterMark
property set for each individual stream.
如果数据进入流但没有从流中出来,则缓冲区会稳定地被填满,直到达到上限。在本例中,上限是为每个单独的流设置的 highWaterMark
属性。
Here is an example of how we can set highWaterMark
property when reading a file.
这是我们在读取文件时如何设置 highWaterMark
属性的示例。
import { createReadStream } from 'node:fs';
const filePath = 'path/to/file.txt';
const writeStream = createReadStream(filePath, { highWaterMark: 1024 });
The highWaterMark
is set to 64KB for createReadStream
function by default. When the internal buffer frees up some space, the stream can start reading more data from the source.
默认情况下, createReadStream
函数的 highWaterMark
设置为 64KB。当内部缓冲区释放一些空间时,流可以开始从源读取更多数据。
PermalinkPiping and chaining 管道和链条
In more or less complex Node.js applications you'll need to transform data that comes from a stream or send this data to some other destination. In cases like this, a concept called as "piping" comes useful.
在或多或少复杂的 Node.js 应用程序中,您需要转换来自流的数据或将此数据发送到其他目的地。在这种情况下,称为“管道”的概念就派上用场了。
You can create a chain of streams where one stream is connected to another stream and whenever data comes into the first stream in the chain it goes through the whole chain of streams. If you're familiar with reactive programming and things like RxJS, then this concept should be familiar to you.
您可以创建一个流链,其中一个流连接到另一个流,每当数据进入链中的第一个流时,它就会遍历整个流链。如果您熟悉响应式编程和 RxJS 之类的东西,那么您应该熟悉这个概念。
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
import { pipeline } from 'node:stream';
const source = createReadStream('path/to/file.txt');
const destination = createWriteStream('path/to/file.txt.gz');
const gzip = createGzip();
await pipeline(source, gzip, destination);
In this example the source
stream triggers the whole pipeline. It goes like this:
在此示例中, source
流触发整个管道。事情是这样的:
source
stream reads data from the file
source
流从文件中读取数据source
stream passes this data to thegzip
stream
source
流将此数据传递给gzip
流gzip
stream compresses the data
gzip
流压缩数据gzip
stream passes the compressed data to thedestination
stream
gzip
流将压缩后的数据传递给destination
流destination
stream writes the compressed data to the file
destination
Stream将压缩后的数据写入文件The whole pipeline is finished
整个管道已经完成
Every stage of the pipeline has its own internal buffer and backpressure mechanism. It means that if the gzip
stream can't handle the data that comes from the source
stream, it can signal to the source
stream to slow down. The same thing goes for the destination
stream.
管道的每个阶段都有自己的内部缓冲和反压机制。这意味着如果 gzip
流无法处理来自 source
流的数据,它可以向 source
流发出信号以减慢速度。 destination
流也是如此。
PermalinkConclusion 结论
Streams are at the heart of any Node.js application, whether you use them explicitly or not. It is also one of the most powerful features existing in Node.js. Streams are used in many different places in Node.js, from network interactions to file processing.
流是任何 Node.js 应用程序的核心,无论您是否显式使用它们。它也是 Node.js 中现有的最强大的功能之一。流在 Node.js 中的许多不同地方都有使用,从网络交互到文件处理。
They are especially useful when you need to process large datasets or work with real-time data. The core mental model of streams is built around the following concepts:
当您需要处理大型数据集或使用实时数据时,它们特别有用。流的核心心理模型围绕以下概念构建:
Data over time 随时间变化的数据
Event-driven architecture
事件驱动架构Backpressure 背压
Buffering 缓冲
Piping and chaining 管道和链条
By understanding these concepts and having a clear picture of how streams operate at a conceptual level, you can build more efficient Node.js apps.
通过理解这些概念并清楚地了解流在概念层面上的运行方式,您可以构建更高效的 Node.js 应用程序。
Subscribe to my newsletter
订阅我的时事通讯
Read articles from Pavel Romanov directly inside your inbox. Subscribe to the newsletter, and don't miss out.
直接在收件箱中阅读帕维尔·罗曼诺夫 (Pavel Romanov) 的文章。订阅时事通讯,不要错过。
Written by 撰写者
Software Engineer. 软件工程师。
Focused on Node.js and JavaScript.
专注于 Node.js 和 JavaScript。
Here to share my learnings and to learn something new.
在这里分享我的经验并学习新的东西。
Software Engineer.
Focused on Node.js and JavaScript.
Here to share my learnings and to learn something new.