yunwu

yunwu

關於 Node.js 中的流 (stream)

什麼是流#

流曾經是一個讓我很困擾的概念 ———— 讀寫文件,網路請求,這些都可以是流,為什麼我讀寫一個文件,不是一個簡單的操作,而是一個「流」呢?

流是資料的集合 ,但這些資料 無需一次性獲取完成 ,也就不需要一次性佔用大量記憶體,從而可以更好的處理大容量的資料,如讀寫大文件,耗時較長的網路請求等。

示例#

以下是一個 WebServer,提供了一個大文件 big.file

const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
    fs.readFile('./big.file', (err, data) => {
        if(err) throw err;
        res.end(data);
    })
});
server.listen(8000);

這種寫法導致的結果是請求時會佔用大量記憶體,因為一次性將整個文件讀取到記憶體中,然後再一次性返回給客戶端。如果文件非常大,則會導致伺服器崩潰或響應緩慢。這時,我們可以使用流來解決這個問題,將文件分成小塊逐塊發送,從而減少記憶體的佔用。

Node 的 fs 模組可以使用 createReadStream 方法創建一個可讀的流,我們可以將它導入(pipe)到響應對象裡面。

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

這樣,記憶體的使用量就可以保持在一個較低的水平,因為沒有把整個文件一次性讀入記憶體。

流的類型#

在 Node.js 中有四種基本的流類型:Readable(可讀流),Writable(可寫流),Duplex(雙向流),Transform(轉換流)。

  • 可讀流是資料可以被消費的源的抽象。一個例子就是 fs.createReadStream 方法。
  • 可寫流是資料可以被寫入目標的抽象。一個例子就是 fs.createWriteStream 方法。
  • 雙向流即是可讀的也是可寫的。一個例子是 TCP socket。
  • 轉換流是基於雙向流的,可以在讀或者寫的時候被用來更改或者轉換資料。一個例子是 zlib.createGzip 使用 gzip 算法壓縮資料。你可以將轉換流想象成一個函數,它的輸入是可寫流,輸出是可讀流。你或許也聽過將轉換流成為「通過流(through streams)」。

所有的流都是 EventEmitter 的實例。觸發它們的事件可以讀或者寫入資料,然而,我們可以使用 pipe 方法消費流的資料。

pipe 方法#

用法:

readableSrc.pipe(writableDest)

pipe 方法返回目標流,可以讓我們做鏈式調用 pipe

pipe 方法是消費流最簡單的方法。通常建議使用 pipe 方法或者事件來消費流。

事件#

流可以被事件直接消費。首先看一個 pipe 方法等價的事件消費流的實現:

# readable.pipe(writable)
readable.on('data', (chunk) => {
  writable.write(chunk);
  // chunk是Buffer 如果要進行一些處理 要先使用 toString() 方法轉換為字串
  // 如: JSON.parse(chunk.toString())
});
readable.on('end', () => {
  writable.end();
});

這裡用到了事件 dataend

在可讀流上最重要的事件有:

  • data 事件,當流傳遞給消費者一個資料塊的時候會觸發。
  • end 事件,當在流中沒有可以消費的資料的時候會觸發。

在可寫流上面最重要的事件有:

  • drain 事件,當可寫流可以接受更多的資料時的一個標誌。
  • finish 事件,當所有的資料都寫入到系統中時會觸發。

HTTP 流#

無論是 HTTP Request 還是 HTTP Response ,都可以以流式進行輸入 / 輸出。

HTTP Response 在客戶端是可讀流,在服務端是可寫流; HTTP Request 則相反。

示例#

以下是一個示例,服務端請求一個流式輸出的 API,同時將其流式輸出給客戶端 —— 這似乎沒什麼意義,但同時包含了 HTTP RequestHTTP Response 的處理,並且後續可以衍生出進一步的處理。

最簡單的例子自然是直接使用 pipe 方法:

app.post('/stream', async (req, res) => {
	try {
    const response = await fetch('http://another.stream/');
    response.body.pipe(res);
  } catch(e) {
    // ...
  }
})

⚠️ 記得導入 node-fetch

直接使用 pipe 方法無法對流資料做進一步的處理,因此根據前面提到的 事件 ,將其先改寫成等價的形式:

app.post('/stream', async (req, res) => {
	try {
    const response = await fetch('http://another.stream/');
    response.on('data', (chunk) => {
      res.write(chunk);
    });
    response.on('end', () => {
      res.end();
    });
  } catch(e) {
    // ...
  }
})

此時,就可以進行進一步的處理了。

實例:

Github

下列代碼通過 fetch 請求了 Dify , 一個開源的 LLMOps 平台的 API(這裡不是打廣告),並處理流式輸出的 API 返回內容,封裝為與 OpenAI API 相同的格式再進行流式輸出,以適配各種基於 OpenAI API 開發的應用。

// const stream = await fetch(...)
stream.on('data', (chunk) => {
    console.log(`Received chunk: ${chunk}`);
    if (!chunk.toString().startsWith('data:')) return;
    const chunkObj = JSON.parse(chunk.toString().split("data: ")[1]);
    if (chunkObj.event != 'message') {
        console.log('Not a message, skip.');
        return;
    }
    const chunkContent = chunkObj.answer;
    const chunkId = chunkObj.conversation_id;
    const chunkCreate = chunkObj.created_at;
    res.write("data: " + JSON.stringify({
        "id": chunkId,
        "object": "chat.completion.chunk",
        "created": chunkCreate,
        "model": data.model,
        "choices": [{
            "index": 0,
            "delta": {
                "content": chunkContent
            },
            "finish_reason": null
        }]
    }) + "\n\n");
})
stream.on('end', () => {
    console.log('end event detected...')
    res.write("data: [DONE]\n\n");
    res.end();
})
載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。