yunwu

yunwu

Node.js のストリームについて

ストリームとは何ですか#

ストリームは、私にとって非常に困惑する概念でした - ファイルの読み書き、ネットワークリクエストなど、これらはすべてストリームになります。なぜファイルを読み書きするのが単純な操作ではなく、「ストリーム」と呼ばれるのでしょうか?

ストリームはデータのコレクションであり、これらのデータは一度に完全に取得する必要がないため、大量のメモリを一度に使用する必要がありません。そのため、大容量のデータ(大きなファイルの読み書き、長時間かかるネットワークリクエストなど)をより効果的に処理できます。

サンプル#

以下は、Web サーバーで提供される大きなファイル big.file の例です。

const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
    fs.readFile('./big.file', (err, data) => {
        if(err) throw err;
        res.end(data);
    })
});
server.listen(8000);

この方法では、メモリを大量に使用するため、リクエスト時に大量のメモリを占有します。ファイルが非常に大きい場合、サーバーがクラッシュしたり、応答が遅くなったりする可能性があります。この場合、ストリームを使用してこの問題を解決し、ファイルを小さなチャンクに分割して送信し、メモリの使用量を減らすことができます。

Node の fs モジュールでは、createReadStream メソッドを使用して読み込み可能なストリームを作成できます。これをレスポンスオブジェクトにパイプ(pipe)できます。

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

これにより、メモリの使用量が低いレベルに保たれるため、ファイル全体を一度にメモリに読み込む必要はありません。

ストリームの種類#

Node.js では、次の 4 つの基本的なストリームの種類があります:Readable(読み込み可能なストリーム)Writable(書き込み可能なストリーム)、Duplex(双方向ストリーム)、Transform(変換ストリーム)。

  • Readable(読み込み可能なストリーム)は、データを消費できるソースの抽象化です。例えば、fs.createReadStream メソッドです。
  • Writable(書き込み可能なストリーム)は、データを書き込むことができるターゲットの抽象化です。例えば、fs.createWriteStream メソッドです。
  • Duplex(双方向ストリーム)は、読み込みと書き込みの両方が可能なストリームです。例えば、TCP ソケットです。
  • Transform(変換ストリーム)は、双方向ストリームを基にしており、データの変更や変換を読み込みまたは書き込み時に行うために使用されます。例えば、zlib.createGzip は gzip アルゴリズムを使用してデータを圧縮するために使用されます。変換ストリームは、入力が書き込み可能なストリームであり、出力が読み込み可能なストリームである関数と考えることができます。変換ストリームはしばしば「スルーストリーム(through streams)」と呼ばれます。

すべてのストリームは EventEmitter のインスタンスです。データの読み書きをトリガーするためにイベントを使用できます。ただし、ストリームのデータを消費するためには、pipe メソッドを使用することもできます。

pipe メソッド#

使用方法:

readableSrc.pipe(writableDest)

pipe メソッドは、ターゲットストリームを返し、pipe をチェーンできるようにします。

pipe メソッドは、ストリームのデータを消費するための最も簡単な方法です。通常、ストリームのデータを消費するためには、pipe メソッドまたはイベントを使用することが推奨されます。

イベント#

ストリームは直接イベントを消費することもできます。まず、pipe メソッドと等価なイベントを使用してストリームのデータを消費する実装を見てみましょう:

# readable.pipe(writable)
readable.on('data', (chunk) => {
  writable.write(chunk);
  // chunkはBufferです。処理する場合は、まずtoString()メソッドを使用して文字列に変換する必要があります。
  // 例: JSON.parse(chunk.toString())
});
readable.on('end', () => {
  writable.end();
});

ここでは、data イベントと end イベントを使用しています。

読み込み可能なストリームで最も重要なイベントは次のとおりです:

  • data イベント:ストリームがデータチャンクを消費者に渡すときに発生します。
  • end イベント:ストリーム内に消費できるデータがない場合に発生します。

書き込み可能なストリームで最も重要なイベントは次のとおりです:

  • drain イベント:書き込み可能なストリームがさらにデータを受け入れることができることを示すフラグです。
  • finish イベント:すべてのデータがシステムに書き込まれたときに発生します。

HTTP ストリーム#

HTTPリクエスト および HTTPレスポンス の両方がストリーム形式で入出力できます。

HTTPレスポンス はクライアント側では読み込み可能なストリームであり、サーバー側では書き込み可能なストリームです。一方、HTTPリクエスト はその逆です。

サンプル#

以下は、サーバーがストリーム形式で提供される API をリクエストし、それをクライアントにストリーム形式で出力する例です - これはあまり意味がないように思えますが、HTTPリクエスト および HTTPレスポンス の処理を含んでおり、さらなる処理に発展させることができます。

最も簡単な例は、直接 pipe メソッドを使用する方法です:

app.post('/stream', async (req, res) => {
	try {
    const response = await fetch('http://another.stream/');
    response.body.pipe(res);
  } catch(e) {
    // ...
  }
})

⚠️ node-fetch をインポートすることを忘れないでください。

pipe メソッドを直接使用すると、ストリームデータをさらに処理することはできません。したがって、前述の イベント に基づいて等価な形式に変更してみましょう:

app.post('/stream', async (req, res) => {
	try {
    const response = await fetch('http://another.stream/');
    response.on('data', (chunk) => {
      res.write(chunk);
    });
    response.on('end', () => {
      res.end();
    });
  } catch(e) {
    // ...
  }
})

これで、さらなる処理が可能になります。

例:

Github

以下のコードは、Dify というオープンソースの LLMOps プラットフォームの API(広告ではありません)をリクエストし、ストリーム形式で提供される API のレスポンスコンテンツを OpenAI API と同じ形式に変換してストリーム形式で出力し、OpenAI API をベースにしたさまざまなアプリケーションに適用できるようにします。

// const stream = await fetch(...)
stream.on('data', (chunk) => {
    console.log(`Received chunk: ${chunk}`);
    if (!chunk.toString().startsWith('data:')) return;
    const chunkObj = JSON.parse(chunk.toString().split("data: ")[1]);
    if (chunkObj.event != 'message') {
        console.log('Not a message, skip.');
        return;
    }
    const chunkContent = chunkObj.answer;
    const chunkId = chunkObj.conversation_id;
    const chunkCreate = chunkObj.created_at;
    res.write("data: " + JSON.stringify({
        "id": chunkId,
        "object": "chat.completion.chunk",
        "created": chunkCreate,
        "model": data.model,
        "choices": [{
            "index": 0,
            "delta": {
                "content": chunkContent
            },
            "finish_reason": null
        }]
    }) + "\n\n");
})
stream.on('end', () => {
    console.log('end event detected...')
    res.write("data: [DONE]\n\n");
    res.end();
})
読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。