yunwu

yunwu

About Streams in Node.js

What is a Stream#

Stream used to be a concept that troubled me - reading and writing files, making network requests, all of these can be streams. Why is it that when I read or write a file, it's not a simple operation, but a "stream"?

A stream is a collection of data, but this data does not need to be obtained all at once, so it does not need to occupy a large amount of memory at once. This allows for better handling of large amounts of data, such as reading and writing large files, time-consuming network requests, etc.

Example#

Here is a WebServer that provides a large file big.file

const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
    fs.readFile('./big.file', (err, data) => {
        if(err) throw err;
        res.end(data);
    })
});
server.listen(8000);

This approach results in the request consuming a large amount of memory because the entire file is read into memory at once and then returned to the client all at once. If the file is very large, it can cause the server to crash or respond slowly. In this case, we can use streams to solve this problem by dividing the file into small chunks and sending them one by one, reducing the memory usage.

Node's fs module can use the createReadStream method to create a readable stream, which we can import (pipe) into the response object.

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

This way, the memory usage can be kept at a low level because the entire file is not read into memory at once.

Types of Streams#

In Node.js, there are four basic types of streams: Readable, Writable, Duplex, and Transform.

  • Readable streams are abstractions of sources from which data can be consumed. An example is the fs.createReadStream method.
  • Writable streams are abstractions of destinations to which data can be written. An example is the fs.createWriteStream method.
  • Duplex streams are both readable and writable. An example is a TCP socket.
  • Transform streams are based on duplex streams and can be used to change or transform data during reading or writing. An example is zlib.createGzip which uses the gzip algorithm to compress data. You can think of a transform stream as a function that takes a writable stream as input and produces a readable stream as output. You may have also heard of transform streams referred to as "through streams".

All streams are instances of EventEmitter. Events can be triggered to read or write data from them, but we can also use the pipe method to consume the data from a stream.

The pipe Method#

Usage:

readableSrc.pipe(writableDest)

The pipe method returns the destination stream, allowing us to make chained calls to pipe.

The pipe method is the simplest way to consume a stream. It is generally recommended to use the pipe method or events to consume streams.

Events#

Streams can be consumed directly through events. Let's first look at an implementation that consumes a stream using the pipe method:

# readable.pipe(writable)
readable.on('data', (chunk) => {
  writable.write(chunk);
  // chunk is a Buffer, if you want to do some processing, you need to use the toString() method to convert it to a string
  // e.g. JSON.parse(chunk.toString())
});
readable.on('end', () => {
  writable.end();
});

Here, we use the data and end events.

The most important events on a readable stream are:

  • The data event, which is triggered when a data chunk is passed to the consumer.
  • The end event, which is triggered when there is no more data to be consumed in the stream.

The most important events on a writable stream are:

  • The drain event, which is a signal that the writable stream can accept more data.
  • The finish event, which is triggered when all the data has been written to the system.

HTTP Streams#

Both HTTP Request and HTTP Response can be streamed for input/output.

HTTP Response is a readable stream on the client side and a writable stream on the server side; HTTP Request is the opposite.

Example#

Here is an example where the server requests a streaming API and streams it to the client - this may not seem meaningful, but it includes the handling of HTTP Request and HTTP Response and can be further extended for additional processing.

The simplest example is to use the pipe method directly:

app.post('/stream', async (req, res) => {
	try {
    const response = await fetch('http://another.stream/');
    response.body.pipe(res);
  } catch(e) {
    // ...
  }
})

⚠️ Remember to import node-fetch.

Using the pipe method directly does not allow for further processing of the stream data, so based on the mentioned events, let's rewrite it into an equivalent form:

app.post('/stream', async (req, res) => {
	try {
    const response = await fetch('http://another.stream/');
    response.on('data', (chunk) => {
      res.write(chunk);
    });
    response.on('end', () => {
      res.end();
    });
  } catch(e) {
    // ...
  }
})

Now, further processing can be done.

Example:

Github

The following code uses fetch to request Dify, an open-source LLMOps platform API (not advertising here), and processes the streamed output of the API, encapsulating it into the same format as the OpenAI API and then streaming it out to adapt to various applications developed based on the OpenAI API.

// const stream = await fetch(...)
stream.on('data', (chunk) => {
    console.log(`Received chunk: ${chunk}`);
    if (!chunk.toString().startsWith('data:')) return;
    const chunkObj = JSON.parse(chunk.toString().split("data: ")[1]);
    if (chunkObj.event != 'message') {
        console.log('Not a message, skip.');
        return;
    }
    const chunkContent = chunkObj.answer;
    const chunkId = chunkObj.conversation_id;
    const chunkCreate = chunkObj.created_at;
    res.write("data: " + JSON.stringify({
        "id": chunkId,
        "object": "chat.completion.chunk",
        "created": chunkCreate,
        "model": data.model,
        "choices": [{
            "index": 0,
            "delta": {
                "content": chunkContent
            },
            "finish_reason": null
        }]
    }) + "\n\n");
})
stream.on('end', () => {
    console.log('end event detected...')
    res.write("data: [DONE]\n\n");
    res.end();
})
Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.