Stream 流

「流」是一组有序的、有起点终点的字节数据传输手段, 它不关心文件的整体内容, 只关心是否从文件中读到数据, 以及读到数据之后的处理

「流」是一个抽象接口, 被 Node 中很多对象所实现, 如 HTTP 服务器 request 和 response 对象都是流

NodeJS 中有四种基本流类型

  1. Readable: 可读流(如: fs.createReadStream)
  2. Writeable: 可写流(如: fs.createWriteStream)
  3. Duplex: 可读写流(如: net.Socket)
  4. Transform: 在读写过程中可以修改和变换数据的 Duplex 流(如: zlib.createDeflate())

流中的数据有两种模式

  1. 二进制模式: 每个分块都是 buffer 或者 string 对象
  2. 对象模式: 内部处理的是一系列普通对象

所有使用 NodeJS API 创建的流对象都只能操作 strings 和 Buffer 对象; 但是, 通过一些第三方流的实现, 依然能处理其他类型的 JS 值(除了 null, 在流处理中有特殊意义); 这些流被认为是在工作在「对象模式」(object mode); 在创建流的实例时, 可通过 objectMode 选项使流的实例切换到对象模式
注: 试图将已经存在的流切换到对象模式是不安全的

可读流的两种模式

  • 可读流实际工作在 flowing 和 paused 两种模式之一
    • flowing 模式下: 可读流自动从系统底层读取数据, 并通过 EventEmitter 接口事件尽快将数据提供给应用; 流动模式不缓存, 直接发射, 然后读取下次的数据; 如果使用流动模式, 但没有消费, 那么数据就丢失了
    • paused 模式下: 不会主动派发, 必须显式调用 stream.read() 方法来从流中读取数据片段
  • 所有初始工作模式为 paused 的 Readable 流, 可以通过下面三种方式切换到 flowing 模式
    • 监听「data」事件
    • 调用「stream.resume()」方法
    • 调用「stream.pipe()」方法将数据发送到 Writable「为什么 pipe 不会走缓存? pipe 是靠 resume 来实现, 而 resume 是靠 data 来实现的」
  • 可读流可以通过以下途径切换到 paused 模式
    • 如果不存在管道目标(pipe destination), 可以通过调用 stream.pause() 方法实现
    • 如果存在管道目标, 可以通过取消「data」事件监听, 并调用 stream.unpipe() 方法移除所有管道目标实现

如果 Readable 切换到 flowing 模式, 且没有消费者处理流中的数据, 这些数据将会丢失; 如调用了 readable.resume() 方法却没有监听 data 事件, 或取消了 data 事件监听, 就有可能出现这种情况

缓存区

  • Writable 和 Readable 流都会将数据存储到内部的缓存器(buffer)中

Readable 可读流

fs.createReadStream 创建一个可读流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
const fs = require('fs');
// 通过 createReadStream 创建了一个可读流
// 语法: fs.createReadStream(path, [options]);
const rs = fs.createReadStream('1.txt', {
flags: 'r', // 表明对文件进行某种操作
mode: 0o666, // 权限类
encoding: 'utf8', // 设置编码, 同 setEncoding; 默认 buffer
start: 3, // 从索引 n 开始读取
end: 8, // 到索引 m 结束读取(包括结束索引)
highWaterMark: 3 // 指定缓冲区大小(字节数)
});

// 设置编码
// 注: 设置 utf8, highWaterMark 至少为 3
rs.setEncoding('utf8');

// (如果是文件流) 文件被打开
rs.on('open', () => { console.log('file open') })
// 监听它的 data 事件, 一旦开始监听 data 事件时, 流就可以读文件内容并发射 data
// 默认情况下, 当监听 data 事件后, 会不停地读数据, 整个过程不会中断
// 但在有些情况下需要中断, 此时就需要暂定和恢复的机制
rs.on('data', (data) => {
console.log(data);
rs.pause(); // 暂定读取和发射 data 事件
setTimeout(() => {
rs.resume(); // 恢复读取并触发 data 事件
}, 1000)
})
// 读取文件出错, 触发 error 事件
rs.on('error', () => { console.log('error') })
// 文件读取完毕, 触发 end 事件
rs.on('end', () => { console.log('end') })
// (如果是文件流) 文件被关闭
rs.on('close', () => { console.log('file closed') })

linked-list

暂停(paused)模式: 当监听 readable 事件时, 可读流会马上向底层读取文件, 然后把读取到的文件放在缓存区中「const state = this._readableState」, 进入暂停模式

缓存区没有最大(小)值
self.read(0) 只填充缓存, 不会发射 data 事件, 但会发射 stream.emit(‘readable’) 事件
this._read(state.highWaterMark) 每次调用底层的方法读取时取 highWaterMark 设定的字节数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
rs.on('readable', () => {
// length 是指缓存区数据大小
// state.length += state.objectMode ? 1 : chunk.length;
console.log(rs._readableState.length);
// read 如果不传参数表示读取整个缓存区数据, number 表示读取 n 个字节
// 如果可读流发现要读的字节小于缓存字节大小, 则直接返回
// 当读完指定的字节后, 如果可读流发现剩下的字节已经比最高水位线小了, 则会立刻读取, 填满最高水位线
const ch = rs.read(1);
console.log(ch);
console.log(rs._readableState.length);
setTimeout(() => {
console.log(rs._readableState.length);
}, 200);
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 1234567890
const fs = require('fs');
const rs = fs.createReadStream('./1.txt', { highWaterMark: 3 });
// 立刻从文件中读取 highWaterMark 数据, 读完后填充缓存区, 然后触发发射 readable
rs.on('readable', () => {
const ch = rs.read(1);
console.log(ch);
// 当读了一个字节后, 只剩下两个字节, 不够 highWaterMark, 会再次读取 highWaterMark 字节并填到缓存区中
setTimeout(() => {
console.log(rs._readableState.length);
const ch = rs.read(1);
console.log(ch);
setTimeout(() => {
console.log(rs._readableState.length);
}, 200)
}, 200)
})

手写可读流(Readable)

可读流有两种模式: 流动模式(flowing)、暂停模式(paused)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 使用, 读取 1234567890
// 1. 流动模式: 不走缓存
const fs = require('fs');
// const rs = fs.createReadStream('./1.txt', {
const rs = new ReadStream('./1.txt', {
flags: 'r',
mode: 0o666,
start: 3,
end: 7,
autoClose: true,
highWaterMark: 3, // 最高水位线是三个字节
encoding: 'utf8'
})
rs.on('open', () => {
console.log('open');
})
rs.on('data', (data) => {
console.log(data);
})
rs.on('end', () => {
console.log('end');
})
rs.on('close', () => {
console.log('close');
})
rs.on('error', () => {
console.log('error');
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// 源码
const fs = require('fs');
const EventEmitter = require('events');

class ReadStream extends EventEmitter {
constructor(path, options) {
super(path, options);
this.path = path;
this.flags = options.flags || 'r';
this.mode = options.mode || 0o666;
this.pos = this.start = options.start || 0;
this.end = options.end;
this.autoClose = options.autoClose;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.encoding = options.encoding;
this.flowing = null;
this.buffer = Buffer.alloc(this.highWaterMark);
this.open(); // 准备打开文件读取
this.on('newListener', (type, listener) => {
// 当给这个实例添加了任意的监听函数时, 会触发 newListener
if (type === 'data') {
// 如果监听了 data 事件, 流会自动切换到流动模式
this.flowing = true;
this.read();
}
})
}

read() {
if (typeof this.fd !== 'number') {
return this.once('open', () => this.read());
}

const howMuchToRead = this.end
? Math.min(this.end - this.pos + 1, this.highWaterMark)
: this.highWaterMark;

// this.buffer 并不是缓存区
fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, bytes) => {
// bytes 是实际读到的字节数
if (err && this.autoClose) {
this.destroy();
return this.emit('error', err);
}
if (bytes) {
let data = this.buffer.slice(0, bytes);
this.pos += bytes;
data = this.encoding
? data.toString(this.encoding)
: data;
this.emit('data', data);
if (this.end && this.pos > this.end) {
return this.endFn();
} else {
this.read();
}
} else {
return this.endFn();
}
});
}

endFn() {
this.emit('end');
this.destroy();
}

open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err && this.autoClose) {
this.destroy();
return this.emit('error', err);
}
this.fd = fd; // 保存文件描述符
this.emit('open');
})
}

destroy() {
fs.close(this.fd, () => {
this.emit('close');
})
}
}

module.exports = ReadStream;

Writable 可写流

fs.createWriteStream 创建一个可写流

当向可写流写数据时, 不会立刻写入文件, 而是会先写入缓存区, 缓存区的大小就是 highWaterMark, 默认值是 16K; 等缓存区满后在真正写入文件中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const fs = require('fs');
const ws = fs.createWriteStream('2.txt', {
flags: 'w',
mode: 0o666,
start: 0,
highWaterMark: 3
});
// 如果缓存区已满, 返回 false; 如果缓存区未满, 返回 true
// 一般情况下, 如果返回了 false, 就不能再往里面写了,
// 但是如果真写了, 数据也不会丢失, 会放在内存中, 等缓存区清空后再从内存中读出来
let flag = ws.write('1'); // true
flag = ws.write('2'); // true
flag = ws.write('3'); // false
flag = ws.write('4'); // false

// ws.end(chunk, [encoding], [callback])
// 表明接下来没有数据要被写入 Writable 通过传入可选的 chunk 和 encoding 参数,
// 可以在关闭流之前再写入一段数据, 如果传入了可选的 callback, 它将作为 “finish” 事件的回调函数
ws.end('finish');
ws.write('5'); // 不会再写入

// finish 方法, 一般使用 end 即可
ws.on('finish', () => {
console.log('finish');
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
const fs = require('fs');
const { Writable } = require('stream');
let ws = new Writable('./a.txt', {
flags: 'w',
encoding: 'utf8',
mode: 0o666,
autoClose: true,
start: 0,
highWaterMark: 2
})
let i = 9;
function write() {
let flag = true;
while (i >= 0 && flag) {
flag = ws.write(i-- + '');
console.log(flag);
}
}
write();
// 当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false
// 一旦当前所有缓存的数据块都被排空(被操作系统接收来进行输出), 那么 “drain” 事件就会被触发
// tip: 一旦 write() 返回 false, 在 “drain” 事件触发前, 不能写入任何数据块
ws.on('drain', () => {
console.log('drain');
write();
})

手写可写流(Writeable)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 使用
const fs = require('fs');
const WriteStream = require('./WriteStream');
// const ws = fs.createWriteStream('./1.txt', {
const ws = new WriteStream('./1.txt', {
flags: 'w',
mode: 0o666,
start: 0,
encoding: 'utf8',
autoClose: true, // 当流写完后自动关闭文件
highWaterMark: 3
});
let n = 9;

function write() {
let flag = true;
while (flag && n > 0) {
ws.write(n + "", 'utf8', () => {});
n--;
console.log('flag=', flag);
}
}
ws.once('drain', () => {
console.log('drain');
write();
});
write();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// 源码
const fs = require('fs');
const EventEmitter = require('events');

class WriteStream extends EventEmitter {
constructor(path, options) {
super(path, options);
this.path = path;
this.flags = options.flags || 'w';
this.mode = options.mode || 0o666;
this.start = options.start || 0;
this.pos = this.start; // 文件的写入索引
this.encoding = options.encoding || 'utf8';
this.autoClose = options.autoClose || true;
this.highWaterMark = options.highWaterMark || 16 * 1024; // 64K

this.buffers = []; // 表示缓存区, 在 node 中用的是链表(linked-list)
this.writing = false; // 表示内部正在写入数据
this.length = 0; // 表示缓存区字节的长度
this.open();
}

open() {
// fd 表示文件描述符
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
if (this.autoClose) {
this.destroy();
}
this.emit('error', err);
}
return this.fd = fd;
this.emit('open');
})
}

// 如果底层已经在写入数据的话, 则必须将当前要写入的数据放在缓冲区中
// chunk 要写入的数据; encoding 编码格式, cb 回调函数
write(chunk, encoding, cb) {
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, this.encoding);
const len = chunk.length;
// 缓存区的长度加上当前长度
this.length += len;
// 判断当前最新的缓存区是否小于最高水位线
const ret = this.length < this.highWaterMark;
// 表示正在向底层写数据, 则当前数据必须放在缓存区中
if (this.writing) {
this.buffers.push({ chunk, encoding, cb });
// 判断当前最新的缓存区是否小于最高水位线
} else {
// 直接调用底层的写入方法写入
// 注意: 在底层写完当前数据后要清空缓存区
this.writing = true;
this._write(chunk, encoding, () => this.clearBuffer());
}
return ret;
}

clearBuffer() {
// 取出缓存区中的第一个 buffer
const data = this.buffers.shift();
if (data) {
this._write(data.chunk, data.encoding, () => this.clearBuffer());
} else {
this.writing = false;
// 清空缓存区
this.emit('drain');
}
}

_write(chunk, encoding, cb) {
if (typeof this.fd !== 'number') {
return this.once('open', () => this._write(chunk, encoding, cb));
}
fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) => {
if (err && this.autoClose) {
this.destroy();
this.emit('error', err);
}
this.pos += bytesWritten;
// 写入多少字节, 缓存区要减少多少字节
this.length -= bytesWritten;
cb && cb();
})
}

destroy() {
fs.close(this.fd, () => { this.emit('close') })
}
}

module.exports = WriteStream;

pipe 方法

Linux 经典管道概念: 前者的输出是后者的输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/*
* pipe 方法使用
*/
rs.pipe(ws);

/*
* 取消 pipe(移除目标可写流)
*/
rs.unpipe(ws);

/*
* pipe 方法的原理
*/
const fs = require('fs');
const rs = fs.createReadStream('./1.txt', { highWaterMark: 3 })
const ws = fs.createWriteStream('./2.txt', { highWaterMark: 3 })
// 当监听可读流 data 事件时会触发回调函数的执行,
// 可以实现数据的产生者和消费者速度的均衡
rs.on('data', (data) => {
const flag = ws.write(data);
if (!flag) {
rs.pause();
}
})
// 监听可写流缓存区清空事件, 当所有要写入的数据写入完成后, 接着从可读流中读取并触发 data 事件
ws.on('drain', () => {
rs.resume();
})
rs.on('end', () => {
ws.end();
})

流的经典应用

1. 行读取器

Unix 系统中, 每行结尾只有换行(line feed), 即 “\n”
Windows 系统中, 每行结尾是“<回车><换行>”, 即 “\r\n”
Mac 系统中, 每行结尾是“回车(carriage return)”, 即 “\r”
ASCII 码中, 换行(\n 10 0A), 回车(\r 13 0D)

1
2
3
123
456
789
1
2
3
4
5
6
7
8
9
const fs = require('fs');
fs.readFile('1.txt', (err, data) => {
console.log(data);
/**
* Mac: <Buffer 31 32 33 0a 34 35 36 0a 37 38 39>
* Unix: <Buffer 31 32 33 0d 34 35 36 0d 37 38 39>
* Windows: <Buffer 31 32 33 0d 0a 34 35 36 0d 0a 37 38 39>
*/
})

目的: 写一个类, 根据传入的文件路径得到类的实例, 当行读取器每次读到一行的时候, 就向外发射 newLine 事件, 当读到结束的时候, 就向外发射 end 事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
const EventEmitter = require('events');
const util = require('util');
const fs = require('fs');
const NEW_LINE = 0x0A; // /n 换行
const RETURN = 0x0D; // /r 回车

function LineReader(path, encoding) {
EventEmitter.call(this);
this.encoding = encoding || 'utf8';
this._reader = fs.createReadStream(path);
// 当给一个对象添加新的监听函数时会触发该事件
this.on('newListener', (type, listener) => {
// 如果添加了 newLine 的监听, 那么就开始读取文件并按行发射数据
if (type === 'newLine') {
// 当监听了一个可读流的 readable 事件, 流会调用底层读取文件的 API 方法填充缓存区, 填充完后向外发射 readable 事件
let buffers = [];
this._reader.on('readable', () => {
let char;
while(null !== (char = this._reader.read(1))) {
switch (char[0]) {
case NEW_LINE:
this.emit('newLine', Buffer.from(buffers).toString(this.encoding));
buffers.length = 0;
break;
case RETURN:
this.emit('newLine', Buffer.from(buffers).toString(this.encoding));
buffers.length = 0;
const newChar = this._reader.read(1);
if (newChar[0] !== NEW_LINE) {
buffers.push(newChar[0])
}
break;
default:
buffers.push(char[0]);
break;
}
}
});
this._reader.on('end', () => {
this.emit('newLine', Buffer.from(buffers).toString(this.encoding));
this.emit('end');
});
}
});
}
util.inherits(LineReader, EventEmitter);
module.exports = LineReader;

// 调用
const LineReader = require('./LineReader');
const reader = new LineReader('./1.txt', 'utf8');
reader.on('newLine', data => {
console.log(data);
});
reader.on('end', () => {
console.log('over');
})