yunwu

yunwu

关于 Node.js 中的流 (stream)

什么是流#

流曾经是一个让我很困扰的概念 ———— 读写文件,网络请求,这些都可以是流,为什么我读写一个文件,不是一个简单的操作,而是一个 “流” 呢?

流是数据的集合 ,但这些数据 无需一次性获取完成 ,也就不需要一次性占用大量内存,从而可以更好的处理大容量的数据,如读写大文件,耗时较长的网络请求等。

示例#

以下是一个 WebServer,提供了一个大文件 big.file

const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
    fs.readFile('./big.file', (err, data) => {
        if(err) throw err;
        res.end(data);
    })
});
server.listen(8000);

这种写法导致的结果是请求时会占用大量内存,因为一次性将整个文件读取到内存中,然后再一次性返回给客户端。如果文件非常大,则会导致服务器崩溃或响应缓慢。这时,我们可以使用流来解决这个问题,将文件分成小块逐块发送,从而减少内存的占用。

Node 的 fs 模块可以使用 createReadStream 方法创建一个可读的流,我们可以将它导入(pipe)到响应对象里面。

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

这样,内存的使用量就可以保持在一个较低的水平,因为没有把整个文件一次性读入内存。

流的类型#

在 Node.js 中有四种基本的流类型:Readable(可读流),Writable(可写流),Duplex(双向流),Transform(转换流)。

  • 可读流是数据可以被消费的源的抽象。一个例子就是 fs.createReadStream 方法。
  • 可读流是数据可以被写入目标的抽象。一个例子就是 fs.createWriteStream 方法。
  • 双向流即是可读的也是可写的。一个例子是 TCP socket。
  • 转换流是基于双向流的,可以在读或者写的时候被用来更改或者转换数据。一个例子是 zlib.createGzip 使用 gzip 算法压缩数据。你可以将转换流想象成一个函数,它的输入是可写流,输出是可读流。你或许也听过将转换流成为 “通过流(through streams)”。

所有的流都是 EventEmitter 的实例。触发它们的事件可以读或者写入数据,然而,我们可以使用 pipe 方法消费流的数据。

pipe 方法#

用法:

readableSrc.pipe(writableDest)

pipe 方法返回目标流,可以让我们做链式调用 pipe

pipe 方法是消费流最简单的方法。通常建议使用 pipe 方法或者事件来消费流。

事件#

流可以被事件直接消费。首先看一个 pipe 方法等价的事件消费流的实现:

# readable.pipe(writable)
readable.on('data', (chunk) => {
  writable.write(chunk);
  // chunk是Buffer 如果要进行一些处理 要先使用 toString() 方法转换为字符串
  // 如: JSON.parse(chunk.toString())
});
readable.on('end', () => {
  writable.end();
});

这里用到了事件 dataend

在可读流上最重要的事件有:

  • data 事件,当流传递给消费者一个数据块的时候会触发。
  • end 事件,当在流中没有可以消费的数据的时候会触发。

在可写流上面最重要的事件有:

  • drain 事件,当可写流可以接受更多的数据时的一个标志。
  • finish 事件,当所有的数据都写入到系统中时会触发。

HTTP 流#

无论是 HTTP Request 还是 HTTP Response ,都可以以流式进行输入 / 输出。

HTTP Response 在客户端是可读流,在服务端是可写流; HTTP Request 则相反。

示例#

以下是一个示例,服务端请求一个流式输出的 API,同时将其流式输出给客户端 —— 这似乎没什么意义,但同时包含了 HTTP RequestHTTP Response 的处理,并且后续可以衍生出进一步的处理。

最简单的例子自然是直接使用 pipe 方法:

app.post('/stream', async (req, res) => {
	try {
    const response = await fetch('http://another.stream/');
    response.body.pipe(res);
  } catch(e) {
    // ...
  }
})

⚠️ 记得导入 node-fetch

直接使用 pipe 方法无法对流数据做进一步的处理,因此根据前面提到的 事件 ,将其先改写成等价的形式:

app.post('/stream', async (req, res) => {
	try {
    const response = await fetch('http://another.stream/');
    response.on('data', (chunk) => {
      res.write(chunk);
    });
    response.on('end', () => {
      res.end();
    });
  } catch(e) {
    // ...
  }
})

此时,就可以进行进一步的处理了。

实例:

Github

下列代码通过 fetch 请求了 Dify , 一个开源的 LLMOps 平台的 API(这里不是打广告),并处理流式输出的 API 返回内容,封装为与 OpenAI API 相同的格式再进行流式输出,以适配各种基于 OpenAI API 开发的应用。

// const stream = await fetch(...)
stream.on('data', (chunk) => {
    console.log(`Received chunk: ${chunk}`);
    if (!chunk.toString().startsWith('data:')) return;
    const chunkObj = JSON.parse(chunk.toString().split("data: ")[1]);
    if (chunkObj.event != 'message') {
        console.log('Not a message, skip.');
        return;
    }
    const chunkContent = chunkObj.answer;
    const chunkId = chunkObj.conversation_id;
    const chunkCreate = chunkObj.created_at;
    res.write("data: " + JSON.stringify({
        "id": chunkId,
        "object": "chat.completion.chunk",
        "created": chunkCreate,
        "model": data.model,
        "choices": [{
            "index": 0,
            "delta": {
                "content": chunkContent
            },
            "finish_reason": null
        }]
    }) + "\n\n");
})
stream.on('end', () => {
    console.log('end event detected...')
    res.write("data: [DONE]\n\n");
    res.end();
})
加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。