What is a Stream#
Stream used to be a concept that troubled me - reading and writing files, making network requests, all of these can be streams. Why is it that when I read or write a file, it's not a simple operation, but a "stream"?
A stream is a collection of data, but this data does not need to be obtained all at once, so it does not need to occupy a large amount of memory at once. This allows for better handling of large amounts of data, such as reading and writing large files, time-consuming network requests, etc.
Example#
Here is a WebServer that provides a large file big.file
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
fs.readFile('./big.file', (err, data) => {
if(err) throw err;
res.end(data);
})
});
server.listen(8000);
This approach results in the request consuming a large amount of memory because the entire file is read into memory at once and then returned to the client all at once. If the file is very large, it can cause the server to crash or respond slowly. In this case, we can use streams to solve this problem by dividing the file into small chunks and sending them one by one, reducing the memory usage.
Node's fs module can use the createReadStream method to create a readable stream, which we can import (pipe) into the response object.
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const src = fs.createReadStream('./big.file');
src.pipe(res);
});
server.listen(8000);
This way, the memory usage can be kept at a low level because the entire file is not read into memory at once.
Types of Streams#
In Node.js, there are four basic types of streams: Readable, Writable, Duplex, and Transform.
- Readable streams are abstractions of sources from which data can be consumed. An example is the
fs.createReadStream
method. - Writable streams are abstractions of destinations to which data can be written. An example is the
fs.createWriteStream
method. - Duplex streams are both readable and writable. An example is a TCP socket.
- Transform streams are based on duplex streams and can be used to change or transform data during reading or writing. An example is
zlib.createGzip
which uses the gzip algorithm to compress data. You can think of a transform stream as a function that takes a writable stream as input and produces a readable stream as output. You may have also heard of transform streams referred to as "through streams".
All streams are instances of EventEmitter. Events can be triggered to read or write data from them, but we can also use the pipe
method to consume the data from a stream.
The pipe Method#
Usage:
readableSrc.pipe(writableDest)
The pipe
method returns the destination stream, allowing us to make chained calls to pipe
.
The pipe
method is the simplest way to consume a stream. It is generally recommended to use the pipe method or events to consume streams.
Events#
Streams can be consumed directly through events. Let's first look at an implementation that consumes a stream using the pipe
method:
# readable.pipe(writable)
readable.on('data', (chunk) => {
writable.write(chunk);
// chunk is a Buffer, if you want to do some processing, you need to use the toString() method to convert it to a string
// e.g. JSON.parse(chunk.toString())
});
readable.on('end', () => {
writable.end();
});
Here, we use the data
and end
events.
The most important events on a readable stream are:
- The
data
event, which is triggered when a data chunk is passed to the consumer. - The
end
event, which is triggered when there is no more data to be consumed in the stream.
The most important events on a writable stream are:
- The
drain
event, which is a signal that the writable stream can accept more data. - The
finish
event, which is triggered when all the data has been written to the system.
HTTP Streams#
Both HTTP Request
and HTTP Response
can be streamed for input/output.
HTTP Response
is a readable stream on the client side and a writable stream on the server side; HTTP Request
is the opposite.
Example#
Here is an example where the server requests a streaming API and streams it to the client - this may not seem meaningful, but it includes the handling of HTTP Request
and HTTP Response
and can be further extended for additional processing.
The simplest example is to use the pipe
method directly:
app.post('/stream', async (req, res) => {
try {
const response = await fetch('http://another.stream/');
response.body.pipe(res);
} catch(e) {
// ...
}
})
⚠️ Remember to import node-fetch
.
Using the pipe
method directly does not allow for further processing of the stream data, so based on the mentioned events, let's rewrite it into an equivalent form:
app.post('/stream', async (req, res) => {
try {
const response = await fetch('http://another.stream/');
response.on('data', (chunk) => {
res.write(chunk);
});
response.on('end', () => {
res.end();
});
} catch(e) {
// ...
}
})
Now, further processing can be done.
Example:
The following code uses fetch
to request Dify, an open-source LLMOps platform API (not advertising here), and processes the streamed output of the API, encapsulating it into the same format as the OpenAI API and then streaming it out to adapt to various applications developed based on the OpenAI API.
// const stream = await fetch(...)
stream.on('data', (chunk) => {
console.log(`Received chunk: ${chunk}`);
if (!chunk.toString().startsWith('data:')) return;
const chunkObj = JSON.parse(chunk.toString().split("data: ")[1]);
if (chunkObj.event != 'message') {
console.log('Not a message, skip.');
return;
}
const chunkContent = chunkObj.answer;
const chunkId = chunkObj.conversation_id;
const chunkCreate = chunkObj.created_at;
res.write("data: " + JSON.stringify({
"id": chunkId,
"object": "chat.completion.chunk",
"created": chunkCreate,
"model": data.model,
"choices": [{
"index": 0,
"delta": {
"content": chunkContent
},
"finish_reason": null
}]
}) + "\n\n");
})
stream.on('end', () => {
console.log('end event detected...')
res.write("data: [DONE]\n\n");
res.end();
})