什麼是流#
流曾經是一個讓我很困擾的概念 ———— 讀寫文件,網路請求,這些都可以是流,為什麼我讀寫一個文件,不是一個簡單的操作,而是一個「流」呢?
流是資料的集合 ,但這些資料 無需一次性獲取完成 ,也就不需要一次性佔用大量記憶體,從而可以更好的處理大容量的資料,如讀寫大文件,耗時較長的網路請求等。
示例#
以下是一個 WebServer,提供了一個大文件 big.file
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
fs.readFile('./big.file', (err, data) => {
if(err) throw err;
res.end(data);
})
});
server.listen(8000);
這種寫法導致的結果是請求時會佔用大量記憶體,因為一次性將整個文件讀取到記憶體中,然後再一次性返回給客戶端。如果文件非常大,則會導致伺服器崩潰或響應緩慢。這時,我們可以使用流來解決這個問題,將文件分成小塊逐塊發送,從而減少記憶體的佔用。
Node 的 fs 模組可以使用 createReadStream 方法創建一個可讀的流,我們可以將它導入(pipe)到響應對象裡面。
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const src = fs.createReadStream('./big.file');
src.pipe(res);
});
server.listen(8000);
這樣,記憶體的使用量就可以保持在一個較低的水平,因為沒有把整個文件一次性讀入記憶體。
流的類型#
在 Node.js 中有四種基本的流類型:Readable(可讀流),Writable(可寫流),Duplex(雙向流),Transform(轉換流)。
- 可讀流是資料可以被消費的源的抽象。一個例子就是
fs.createReadStream
方法。 - 可寫流是資料可以被寫入目標的抽象。一個例子就是
fs.createWriteStream
方法。 - 雙向流即是可讀的也是可寫的。一個例子是 TCP socket。
- 轉換流是基於雙向流的,可以在讀或者寫的時候被用來更改或者轉換資料。一個例子是
zlib.createGzip
使用 gzip 算法壓縮資料。你可以將轉換流想象成一個函數,它的輸入是可寫流,輸出是可讀流。你或許也聽過將轉換流成為「通過流(through streams)」。
所有的流都是 EventEmitter 的實例。觸發它們的事件可以讀或者寫入資料,然而,我們可以使用 pipe
方法消費流的資料。
pipe 方法#
用法:
readableSrc.pipe(writableDest)
pipe
方法返回目標流,可以讓我們做鏈式調用 pipe
。
pipe
方法是消費流最簡單的方法。通常建議使用 pipe 方法或者事件來消費流。
事件#
流可以被事件直接消費。首先看一個 pipe
方法等價的事件消費流的實現:
# readable.pipe(writable)
readable.on('data', (chunk) => {
writable.write(chunk);
// chunk是Buffer 如果要進行一些處理 要先使用 toString() 方法轉換為字串
// 如: JSON.parse(chunk.toString())
});
readable.on('end', () => {
writable.end();
});
這裡用到了事件 data
和 end
。
在可讀流上最重要的事件有:
data
事件,當流傳遞給消費者一個資料塊的時候會觸發。end
事件,當在流中沒有可以消費的資料的時候會觸發。
在可寫流上面最重要的事件有:
drain
事件,當可寫流可以接受更多的資料時的一個標誌。finish
事件,當所有的資料都寫入到系統中時會觸發。
HTTP 流#
無論是 HTTP Request
還是 HTTP Response
,都可以以流式進行輸入 / 輸出。
HTTP Response
在客戶端是可讀流,在服務端是可寫流; HTTP Request
則相反。
示例#
以下是一個示例,服務端請求一個流式輸出的 API,同時將其流式輸出給客戶端 —— 這似乎沒什麼意義,但同時包含了 HTTP Request
和 HTTP Response
的處理,並且後續可以衍生出進一步的處理。
最簡單的例子自然是直接使用 pipe
方法:
app.post('/stream', async (req, res) => {
try {
const response = await fetch('http://another.stream/');
response.body.pipe(res);
} catch(e) {
// ...
}
})
⚠️ 記得導入 node-fetch
。
直接使用 pipe
方法無法對流資料做進一步的處理,因此根據前面提到的 事件 ,將其先改寫成等價的形式:
app.post('/stream', async (req, res) => {
try {
const response = await fetch('http://another.stream/');
response.on('data', (chunk) => {
res.write(chunk);
});
response.on('end', () => {
res.end();
});
} catch(e) {
// ...
}
})
此時,就可以進行進一步的處理了。
實例:
下列代碼通過 fetch
請求了 Dify , 一個開源的 LLMOps 平台的 API(這裡不是打廣告),並處理流式輸出的 API 返回內容,封裝為與 OpenAI API 相同的格式再進行流式輸出,以適配各種基於 OpenAI API 開發的應用。
// const stream = await fetch(...)
stream.on('data', (chunk) => {
console.log(`Received chunk: ${chunk}`);
if (!chunk.toString().startsWith('data:')) return;
const chunkObj = JSON.parse(chunk.toString().split("data: ")[1]);
if (chunkObj.event != 'message') {
console.log('Not a message, skip.');
return;
}
const chunkContent = chunkObj.answer;
const chunkId = chunkObj.conversation_id;
const chunkCreate = chunkObj.created_at;
res.write("data: " + JSON.stringify({
"id": chunkId,
"object": "chat.completion.chunk",
"created": chunkCreate,
"model": data.model,
"choices": [{
"index": 0,
"delta": {
"content": chunkContent
},
"finish_reason": null
}]
}) + "\n\n");
})
stream.on('end', () => {
console.log('end event detected...')
res.write("data: [DONE]\n\n");
res.end();
})