Node.js 面试必问的 stream,你能答出多少?

stream 在 Node.js 面试中出现频率非常高。

那什么是 stream(流)呢?

其实我们写的 Node.js 代码经常会用到流。

直接来看代码吧:

mkdir stream-test
cd stream-test
npm init -y

image.png

创建 src/test.mjs

import http from 'node:http';
import fs from 'node:fs';

const server = http.createServer(async function (req, res) {
    const data = fs.readFileSync(import.meta.dirname + '/index.html', 'utf-8');
    res.end(data);
});

server.listen(8000);

可以安装下 node 的类型,这样写的时候会有类型提示:

npm install --save-dev @types/node

我们跑了个 http 服务。

用 fs.readFileSync 读取 data.txt 的内容返回。

跑一下:

node ./src/test.mjs

创建 src/data.txt

神说要有光

然后用 curl 访问下:

curl -i http://localhost:8000

image.png

因为是全部读完返回的,所以可以知道 Content-Length,也就是响应体的长度。

当文件比较小的时候,这样读取、返回没啥问题。

那如果文件非常大呢?

比如有好几百 M,这时候全部读取完再返回是不是就合适了?

因为要等好久才能读取完文件,之后才有响应。

这就需要用到流了:

image.png

const readStream = fs.createReadStream(import.meta.dirname + '/data.txt', 'utf-8');
readStream.pipe(res);

我们用 fs.createReadStream 创建文件读取流,然后 pipe 到响应的流。

跑一下:

node ./src/test.mjs

image.png

结果一样,但是因为现在是流式返回的,并不知道响应体的 Content-Length。

所以是用 Transfer-Encoding: chunked 的方式返回流式内容。

这个是面试常考题。

从服务器下载一个文件的时候,如何知道文件下载完了呢?

有两种方式:

一种是 header 里带上 Content-Length,浏览器下载到这个长度就结束。

另一种是设置 transfer-encoding:chunked,它是不固定长度的,服务器不断返回内容,直到返回一个空的内容代表结束。

比如这样:

5
Hello
1
,
5
World
1
!
0

这里分了 “Hello” “,” “World”“!” 这 4 个块,长度分别为 5、1、5、1

最后以一个长度为 0 的块代表传输结束。

这样,不管内容多少都可以分块返回,就不用指定 Content-Length 了。

这就是大文件的流式传输的原理,就是 transfer-encoding:chunked。

当然,这是 http 传输时的流,在用 shell 命令的时候,也经常会用到流:

比如

ls | grep pack

image.png

ls 命令的输出流,作为 grep 命令的输入流。

当然,我们也可以把 grep 命令的输出流,作为 node 脚本的输入流。

创建 src/read.mjs

process.stdin.on('readable', function () {
    const buf = process.stdin.read();
    console.log(buf?.toString('utf-8'));
});

process.stdin 就是输入流,监听 readable 事件,用 read 读取数据。

跑一下:

ls | grep pack | node src/read.mjs

image.png

可以看到,我们的 node 脚本接收到了 grep 的输出流作为输入流。

这就是管道 pipe 的含义。

image.png

上面我们写的从文件读取流 pipe 到响应流,就是这个意思。

综上,可以小结下我们对流的认识:

流就是分段的传输内容,比如从服务端像浏览器返回响应数据的流,读取文件的流等。

流和流之间可以通过管道 pipe 连接,上个流的输出作为下个流的输入。

在 node 里,流一共有 4 种:可读流 Readable、可写流 Writable、双工流 Duplex、转换流 Transform

import stream from 'node:stream';

// 可读流
const Readable = stream.Readable;
// 可写流
const Writable = stream.Writable;
// 双工流
const Duplex = stream.Duplex;
// 转换流
const Transform = stream.Transform;

其余的流都是基于这 4 种流封装出来的。

我们分别来试一下:

Readable

Readable 要实现 _read 方法,通过 push 返回具体的数据。

import { Readable } from 'node:stream';

const readableStream = new Readable();

readableStream._read = function() {
    this.push('阿门阿前一棵葡萄树,');
    this.push('阿东阿东绿的刚发芽,');
    this.push('阿东背着那重重的的壳呀,');
    this.push('一步一步地往上爬。')
    this.push(null);
}

readableStream.on('data', (data)=> {
    console.log(data.toString())
});

readableStream.on('end', () => {
    console.log('done');
});

当 push 一个 null 时,就代表结束流。

跑一下:

node ./src/readable.mjs

image.png

创建 Readable 流也可以通过继承的方式:

新建 src/readable2.mjs

import { Readable } from 'node:stream';

class ReadableDong extends Readable {

    _read() {
        this.push('阿门阿前一棵葡萄树,');
        this.push('阿东阿东绿的刚发芽,');
        this.push('阿东背着那重重的的壳呀,');
        this.push('一步一步地往上爬。')
        this.push(null);
    }

}

const readableStream = new ReadableDong();

readableStream.on('data', (data)=> {
    console.log(data.toString())
});

readableStream.on('end', () => {
    console.log('done');
});

跑一下:

node ./src/readable2.mjs

image.png

可读流是生成内容的,那么很自然可以和生成器结合:

创建 src/readable3.mjs

import { Readable } from 'node:stream';

class ReadableDong extends Readable {

    constructor(iterator) {
        super();
        this.iterator = iterator;
    }

    _read() {
        const next = this.iterator.next();
        if(next.done) {
            return this.push(null);
        } else {
            this.push(next.value)
        }
    }

}

function *songGenerator() {
    yield '阿门阿前一棵葡萄树,';
    yield '阿东阿东绿的刚发芽,';
    yield '阿东背着那重重的的壳呀,';
    yield '一步一步地往上爬。';
}

const songIterator = songGenerator();

const readableStream = new ReadableDong(songIterator);

readableStream.on('data', (data)=> {
    console.log(data.toString())
});

readableStream.on('end', () => {
    console.log('done');
});

* 和 yield 是 js 的 generator 的语法,它是异步返回 yield 后的内容,通过 iterator 的 next 来取下一个。

跑一下:

node ./src/readable3.mjs

image.png

我们封装个工厂方法:

image.png

function createReadStream(interator) {
    return new ReadableDong(interator);
}

const readableStream = createReadStream(songIterator)

readableStream.on('data', (data)=> {
    console.log(data.toString())
});

readableStream.on('end', () => {
    console.log('done');
});

是不是就和 fs.createReadStream 很像了?

试一下:

创建 src/fsReadStream.mjs

import fs from 'node:fs';

const readStream = fs.createReadStream(import.meta.dirname + '/data.txt', 'utf-8');

readStream.on('data', (data)=> {
    console.log(data.toString())
});

readStream.on('end', () => {
    console.log('done');
});

跑一下:

node ./src/fsReadStream.mjs

image.png

其实文件的 ReadStream 就是基于 stream 的 Readable 封装出来的。

这就是可读流。

http 服务的 request 就是 Readable 的实例:

image.png

所以我们可以这样写:

创建 src/test2.mjs

import http from 'node:http';
import fs from 'node:fs';

const server = http.createServer(async function (req, res) {
    const writeStream = fs.createWriteStream('aaa.txt', 'utf-8');
    req.pipe(writeStream);
    res.end('done');
});

server.listen(8000);

跑起来:

node ./src/test2.mjs

访问下:

curl -X POST -d "a=1&b=2" http://localhost:8000

image.png

image.png

可以看到,从 request 的流中读出的内容写入了文件的 WriteStream

Writable

Readable 是实现 _read 方法,通过 this.push 返回内容

Writable 则要实现 _write 方法,接收写入的内容。

创建 src/writable.mjs

import { Writable } from 'node:stream';

class WritableDong extends Writable {

    constructor(iterator) {
        super();
        this.iterator = iterator;
    }

    _write(data, enc, next) {
        console.log(data.toString());
        setTimeout(() => {
            next();
        }, 1000);
    }
}

function createWriteStream() {
    return new WritableDong();
}

const writeStream = createWriteStream();

writeStream.on('finish', () => console.log('done'));

writeStream.write('阿门阿前一棵葡萄树,');
writeStream.write('阿东阿东绿的刚发芽,');
writeStream.write('阿东背着那重重的的壳呀,');
writeStream.write('一步一步地往上爬。');
writeStream.end();

Writable 的特点是可以自己控制消费数据的频率,只有调用 next 方法的时候,才会处理下一部分数据。

我们每 1s 处理一次写入。

跑一下:

2024-12-17 09.25.23.gif

其实我们常用的 fs.createWriteStream 就是这样封装出来的。

用一下试试:

创建 src/fsWriteStream.mjs

import fs from 'node:fs';

const writeStream = fs.createWriteStream('tmp.txt', 'utf-8');

writeStream.on('finish', () => console.log('done'));

writeStream.write('阿门阿前一棵葡萄树,');
writeStream.write('阿东阿东绿的刚发芽,');
writeStream.write('阿东背着那重重的的壳呀,');
writeStream.write('一步一步地往上爬。');
writeStream.end();

跑一下:

node ./src/fsWriteStream.mjs

image.png

image.png

这就是可写流。

http 服务的 response 就是 Writable 的实例:

image.png

image.png

Duplex

Duplex 是可读可写,同时实现 _read 和 _write 就可以了,也就是双工流

创建 src/duplex.mjs

import { Duplex } from 'node:stream';

class DuplexStream extends Duplex {

    _read() {
        this.push('阿门阿前一棵葡萄树,');
        this.push('阿东阿东绿的刚发芽,');
        this.push('阿东背着那重重的的壳呀,');
        this.push('一步一步地往上爬。')
        this.push(null);
    }

    _write(data, enc, next) {
        console.log(data.toString());
        setTimeout(() => {
            next();
        }, 1000);
    }
}

const duplexStream = new DuplexStream();

duplexStream.on('data', data => {
    console.log(data.toString())
});
duplexStream.on('end', data => {
    console.log('read done')
});

duplexStream.write('阿门阿前一棵葡萄树,');
duplexStream.write('阿东阿东绿的刚发芽,');
duplexStream.write('阿东背着那重重的的壳呀,');
duplexStream.write('一步一步地往上爬。');
duplexStream.end();

duplexStream.on('finish', data => {
    console.log('write done')
});

跑一下:

node ./src/duplex.mjs

2024-12-17 11.41.14.gif

整合了 Readable 流和 Writable 流的功能,这就是双工流 Duplex。

UDP 协议会用 socket 来做双向通信,它就是 Duplex 的实现:

image.png

试一下:

创建 src/socket-server.mjs

import net from 'node:net';

const server = net.createServer(function(clientSocket){
    console.log('新的客户端 socket 连接');

    clientSocket.on('data', function(data){
        console.log(data.toString());

        clientSocket.write('hello');
    });

    clientSocket.on('end', function(){
        console.log('连接中断');
    });
});

server.listen(6666, 'localhost', function(){
    const address = server.address();

    console.log('被监听的地址为:%j', address);
});

跑一下:

node ./src/socket-server.mjs

image.png

我们跑了一个 udp 的服务。

然后再创建个 udp 的客户端;

src/socket-client.mjs

import net from 'node:net';

const socket = net.createConnection({ 
    host: 'localhost',
    port: 6666 
}, () => {
  console.log('连接到了服务端!');

  socket.write('world!\n');

  setTimeout(()=> {
    socket.end();
  }, 2000);
});

socket.on('data', (data) => {
  console.log(data.toString());
});

socket.on('end', () => {
  console.log('断开连接');
});

连上 udp 服务端,发送条消息,然后 2s 后断开连接。

跑一下:

2024-12-17 12.10.59.gif

这种双向通信就是基于 Duplex 做的。

image.png

看下这些方法:

image.png

image.png

write 是可写流的, data、end 事件是可读流的。

Socket 就是 Duplex 的实现。

Transform

Transform 也是 Duplex 双工流,只不过它会对写入的内容做一些转换之后提供给消费者来读

image.png

Transform 流要实现 _transform 的 api。

试一下:

创建 src/transform.mjs

import { Transform } from 'node:stream';

class ReverseStream extends Transform {

  _transform(buf, enc, next) {
    const res = buf.toString().split('').reverse().join('');
    this.push(res);

    next()
  }
}

var transformStream = new ReverseStream();

transformStream.on('data', data => console.log(data.toString()))
transformStream.on('end', data => console.log('read done'));

transformStream.write('阿门阿前一棵葡萄树');
transformStream.write('阿东阿东绿的刚发芽');
transformStream.write('阿东背着那重重的的壳呀');
transformStream.write('一步一步地往上爬');
transformStream.end()

transformStream.on('finish', data => console.log('write done'));

在 _transform 方法里用 push 来产生可读流数据,然后 next 是消费下一个写入的数据。

push 和 next 方法在前面的 Readable、Writable 里都用过。

跑一下:

image.png

这就是转换流,它也是一种双工流。

那在 node.js 的内置模块里,有哪些 api 是转换流呢?

zlib

试一下:

创建 src/zlib.mjs

import {
    createReadStream,
    createWriteStream,
} from 'node:fs';
import { createGzip } from 'node:zlib';
  
const gzip = createGzip();
const source = createReadStream(import.meta.dirname + '/data.txt');
const destination = createWriteStream('data.txt.gz');

source.pipe(gzip).pipe(destination);

从文件的 ReadStream,pipe 到 Gzip 转换流,然后 pipe 到文件的 WriteStream。

这感觉是不是和我们写 shell 命令时一样?

image.png

跑一下:

node ./src/zlib.mjs

2024-12-17 12.34.03.gif

压缩成功了。

点击解压,就可以看到其中的文件。

这个 gzip 是用于 http 的压缩传输的:

image.png

很常用,所以 Node.js 内置了这个模块。

这里的多次 pipe 也可以用 stream 的 pipeline 的 api 简化:

image.png

这条 pipeline 的特点是 Readable 产生数据,然后经过任意多个 Duplex(包括 Transform),最后传入 Writable

image.png

至此,四种 stream 我们就都用了一遍。

代码上传了 github github.com/QuarkGluonP…

总结

stream 是 Node.js 的非常常用 API,也是面试必问的点。

我们每天敲的 shell 命令,就是基于流的概念,上个进程的输出可以做为下个进程的输入。

写 Node.js 代码的时候,文件读写、网络通信等都是基于流。

虽然各种流有很多,但底层的 stream 只有 4 种:

  • Readable:实现 _read 方法,通过 push 传入内容
  • Writable:实现 _write 方法,通过 next 消费内容
  • Duplex:实现 _read、_write,可读可写
  • Transform:实现 _transform,对写入的内容做转换再传出去,继承自 Duplex

面试问的话,除了说出这 4 种 stream 外,最好举一个具体的 api 来说明。

比如 fs.createReadStream、http 的 request 是 Readable 的实现,fs.createWriteStream、http 的 response 是 Writable 的实现,net 的 Socket 是 Duplex 的实现,zlib.createGzip 是 Transform 的实现。

理解这 4 种 stream,能自己实现,也能知道哪些 api 是哪种流,就算掌握的差不多了。

更多内容可以看我的小册《Node.js CLI 通关秘籍》

注册登录 后评论
    // 作者
    zxg_神说要有光 发布于 掘金
    • 0
    // 本帖子
    分类
    // 相关帖子
    Coming soon...
    • 0
    Node.js 面试必问的 stream,你能答出多少?zxg_神说要有光 发布于 掘金