什么是流#
流曾经是一个让我很困扰的概念 ———— 读写文件,网络请求,这些都可以是流,为什么我读写一个文件,不是一个简单的操作,而是一个 “流” 呢?
流是数据的集合 ,但这些数据 无需一次性获取完成 ,也就不需要一次性占用大量内存,从而可以更好的处理大容量的数据,如读写大文件,耗时较长的网络请求等。
示例#
以下是一个 WebServer,提供了一个大文件 big.file
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
fs.readFile('./big.file', (err, data) => {
if(err) throw err;
res.end(data);
})
});
server.listen(8000);
这种写法导致的结果是请求时会占用大量内存,因为一次性将整个文件读取到内存中,然后再一次性返回给客户端。如果文件非常大,则会导致服务器崩溃或响应缓慢。这时,我们可以使用流来解决这个问题,将文件分成小块逐块发送,从而减少内存的占用。
Node 的 fs 模块可以使用 createReadStream 方法创建一个可读的流,我们可以将它导入(pipe)到响应对象里面。
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const src = fs.createReadStream('./big.file');
src.pipe(res);
});
server.listen(8000);
这样,内存的使用量就可以保持在一个较低的水平,因为没有把整个文件一次性读入内存。
流的类型#
在 Node.js 中有四种基本的流类型:Readable(可读流),Writable(可写流),Duplex(双向流),Transform(转换流)。
- 可读流是数据可以被消费的源的抽象。一个例子就是
fs.createReadStream
方法。 - 可读流是数据可以被写入目标的抽象。一个例子就是
fs.createWriteStream
方法。 - 双向流即是可读的也是可写的。一个例子是 TCP socket。
- 转换流是基于双向流的,可以在读或者写的时候被用来更改或者转换数据。一个例子是
zlib.createGzip
使用 gzip 算法压缩数据。你可以将转换流想象成一个函数,它的输入是可写流,输出是可读流。你或许也听过将转换流成为 “通过流(through streams)”。
所有的流都是 EventEmitter 的实例。触发它们的事件可以读或者写入数据,然而,我们可以使用 pipe
方法消费流的数据。
pipe 方法#
用法:
readableSrc.pipe(writableDest)
pipe
方法返回目标流,可以让我们做链式调用 pipe
。
pipe
方法是消费流最简单的方法。通常建议使用 pipe 方法或者事件来消费流。
事件#
流可以被事件直接消费。首先看一个 pipe
方法等价的事件消费流的实现:
# readable.pipe(writable)
readable.on('data', (chunk) => {
writable.write(chunk);
// chunk是Buffer 如果要进行一些处理 要先使用 toString() 方法转换为字符串
// 如: JSON.parse(chunk.toString())
});
readable.on('end', () => {
writable.end();
});
这里用到了事件 data
和 end
。
在可读流上最重要的事件有:
data
事件,当流传递给消费者一个数据块的时候会触发。end
事件,当在流中没有可以消费的数据的时候会触发。
在可写流上面最重要的事件有:
drain
事件,当可写流可以接受更多的数据时的一个标志。finish
事件,当所有的数据都写入到系统中时会触发。
HTTP 流#
无论是 HTTP Request
还是 HTTP Response
,都可以以流式进行输入 / 输出。
HTTP Response
在客户端是可读流,在服务端是可写流; HTTP Request
则相反。
示例#
以下是一个示例,服务端请求一个流式输出的 API,同时将其流式输出给客户端 —— 这似乎没什么意义,但同时包含了 HTTP Request
和 HTTP Response
的处理,并且后续可以衍生出进一步的处理。
最简单的例子自然是直接使用 pipe
方法:
app.post('/stream', async (req, res) => {
try {
const response = await fetch('http://another.stream/');
response.body.pipe(res);
} catch(e) {
// ...
}
})
⚠️ 记得导入 node-fetch
。
直接使用 pipe
方法无法对流数据做进一步的处理,因此根据前面提到的 事件 ,将其先改写成等价的形式:
app.post('/stream', async (req, res) => {
try {
const response = await fetch('http://another.stream/');
response.on('data', (chunk) => {
res.write(chunk);
});
response.on('end', () => {
res.end();
});
} catch(e) {
// ...
}
})
此时,就可以进行进一步的处理了。
实例:
下列代码通过 fetch
请求了 Dify , 一个开源的 LLMOps 平台的 API(这里不是打广告),并处理流式输出的 API 返回内容,封装为与 OpenAI API 相同的格式再进行流式输出,以适配各种基于 OpenAI API 开发的应用。
// const stream = await fetch(...)
stream.on('data', (chunk) => {
console.log(`Received chunk: ${chunk}`);
if (!chunk.toString().startsWith('data:')) return;
const chunkObj = JSON.parse(chunk.toString().split("data: ")[1]);
if (chunkObj.event != 'message') {
console.log('Not a message, skip.');
return;
}
const chunkContent = chunkObj.answer;
const chunkId = chunkObj.conversation_id;
const chunkCreate = chunkObj.created_at;
res.write("data: " + JSON.stringify({
"id": chunkId,
"object": "chat.completion.chunk",
"created": chunkCreate,
"model": data.model,
"choices": [{
"index": 0,
"delta": {
"content": chunkContent
},
"finish_reason": null
}]
}) + "\n\n");
})
stream.on('end', () => {
console.log('end event detected...')
res.write("data: [DONE]\n\n");
res.end();
})