ストリームとは何ですか#
ストリームは、私にとって非常に困惑する概念でした - ファイルの読み書き、ネットワークリクエストなど、これらはすべてストリームになります。なぜファイルを読み書きするのが単純な操作ではなく、「ストリーム」と呼ばれるのでしょうか?
ストリームはデータのコレクションであり、これらのデータは一度に完全に取得する必要がないため、大量のメモリを一度に使用する必要がありません。そのため、大容量のデータ(大きなファイルの読み書き、長時間かかるネットワークリクエストなど)をより効果的に処理できます。
サンプル#
以下は、Web サーバーで提供される大きなファイル big.file
の例です。
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
fs.readFile('./big.file', (err, data) => {
if(err) throw err;
res.end(data);
})
});
server.listen(8000);
この方法では、メモリを大量に使用するため、リクエスト時に大量のメモリを占有します。ファイルが非常に大きい場合、サーバーがクラッシュしたり、応答が遅くなったりする可能性があります。この場合、ストリームを使用してこの問題を解決し、ファイルを小さなチャンクに分割して送信し、メモリの使用量を減らすことができます。
Node の fs モジュールでは、createReadStream メソッドを使用して読み込み可能なストリームを作成できます。これをレスポンスオブジェクトにパイプ(pipe)できます。
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const src = fs.createReadStream('./big.file');
src.pipe(res);
});
server.listen(8000);
これにより、メモリの使用量が低いレベルに保たれるため、ファイル全体を一度にメモリに読み込む必要はありません。
ストリームの種類#
Node.js では、次の 4 つの基本的なストリームの種類があります:Readable(読み込み可能なストリーム)、Writable(書き込み可能なストリーム)、Duplex(双方向ストリーム)、Transform(変換ストリーム)。
- Readable(読み込み可能なストリーム)は、データを消費できるソースの抽象化です。例えば、
fs.createReadStream
メソッドです。 - Writable(書き込み可能なストリーム)は、データを書き込むことができるターゲットの抽象化です。例えば、
fs.createWriteStream
メソッドです。 - Duplex(双方向ストリーム)は、読み込みと書き込みの両方が可能なストリームです。例えば、TCP ソケットです。
- Transform(変換ストリーム)は、双方向ストリームを基にしており、データの変更や変換を読み込みまたは書き込み時に行うために使用されます。例えば、
zlib.createGzip
は gzip アルゴリズムを使用してデータを圧縮するために使用されます。変換ストリームは、入力が書き込み可能なストリームであり、出力が読み込み可能なストリームである関数と考えることができます。変換ストリームはしばしば「スルーストリーム(through streams)」と呼ばれます。
すべてのストリームは EventEmitter のインスタンスです。データの読み書きをトリガーするためにイベントを使用できます。ただし、ストリームのデータを消費するためには、pipe
メソッドを使用することもできます。
pipe メソッド#
使用方法:
readableSrc.pipe(writableDest)
pipe
メソッドは、ターゲットストリームを返し、pipe
をチェーンできるようにします。
pipe
メソッドは、ストリームのデータを消費するための最も簡単な方法です。通常、ストリームのデータを消費するためには、pipe
メソッドまたはイベントを使用することが推奨されます。
イベント#
ストリームは直接イベントを消費することもできます。まず、pipe
メソッドと等価なイベントを使用してストリームのデータを消費する実装を見てみましょう:
# readable.pipe(writable)
readable.on('data', (chunk) => {
writable.write(chunk);
// chunkはBufferです。処理する場合は、まずtoString()メソッドを使用して文字列に変換する必要があります。
// 例: JSON.parse(chunk.toString())
});
readable.on('end', () => {
writable.end();
});
ここでは、data
イベントと end
イベントを使用しています。
読み込み可能なストリームで最も重要なイベントは次のとおりです:
data
イベント:ストリームがデータチャンクを消費者に渡すときに発生します。end
イベント:ストリーム内に消費できるデータがない場合に発生します。
書き込み可能なストリームで最も重要なイベントは次のとおりです:
drain
イベント:書き込み可能なストリームがさらにデータを受け入れることができることを示すフラグです。finish
イベント:すべてのデータがシステムに書き込まれたときに発生します。
HTTP ストリーム#
HTTPリクエスト
および HTTPレスポンス
の両方がストリーム形式で入出力できます。
HTTPレスポンス
はクライアント側では読み込み可能なストリームであり、サーバー側では書き込み可能なストリームです。一方、HTTPリクエスト
はその逆です。
サンプル#
以下は、サーバーがストリーム形式で提供される API をリクエストし、それをクライアントにストリーム形式で出力する例です - これはあまり意味がないように思えますが、HTTPリクエスト
および HTTPレスポンス
の処理を含んでおり、さらなる処理に発展させることができます。
最も簡単な例は、直接 pipe
メソッドを使用する方法です:
app.post('/stream', async (req, res) => {
try {
const response = await fetch('http://another.stream/');
response.body.pipe(res);
} catch(e) {
// ...
}
})
⚠️ node-fetch
をインポートすることを忘れないでください。
pipe
メソッドを直接使用すると、ストリームデータをさらに処理することはできません。したがって、前述の イベント に基づいて等価な形式に変更してみましょう:
app.post('/stream', async (req, res) => {
try {
const response = await fetch('http://another.stream/');
response.on('data', (chunk) => {
res.write(chunk);
});
response.on('end', () => {
res.end();
});
} catch(e) {
// ...
}
})
これで、さらなる処理が可能になります。
例:
以下のコードは、Dify というオープンソースの LLMOps プラットフォームの API(広告ではありません)をリクエストし、ストリーム形式で提供される API のレスポンスコンテンツを OpenAI API と同じ形式に変換してストリーム形式で出力し、OpenAI API をベースにしたさまざまなアプリケーションに適用できるようにします。
// const stream = await fetch(...)
stream.on('data', (chunk) => {
console.log(`Received chunk: ${chunk}`);
if (!chunk.toString().startsWith('data:')) return;
const chunkObj = JSON.parse(chunk.toString().split("data: ")[1]);
if (chunkObj.event != 'message') {
console.log('Not a message, skip.');
return;
}
const chunkContent = chunkObj.answer;
const chunkId = chunkObj.conversation_id;
const chunkCreate = chunkObj.created_at;
res.write("data: " + JSON.stringify({
"id": chunkId,
"object": "chat.completion.chunk",
"created": chunkCreate,
"model": data.model,
"choices": [{
"index": 0,
"delta": {
"content": chunkContent
},
"finish_reason": null
}]
}) + "\n\n");
})
stream.on('end', () => {
console.log('end event detected...')
res.write("data: [DONE]\n\n");
res.end();
})