NodeJS 中的 stream 是一个非常重要的概念,当进行 IO 操作的时候,我们可以将 IO 读取抽象成流的概念(有读取 IO 流一说)。本文我们分析 Node 中 stream 的概念,并分析如何创建出满足接口的自定义流。

简单地来说,stream 就是实现了特殊方法的 EventEmitter,一定的阶段会触发相应事件,从而引起用户自定义回调函数的调用。根据方法的不同,stream 可以分为可读,可写或读写(Duplex)。最常见的流是标准输入流 process.stdin 和标准输出流 process.stdout。另外一些常用的包括:

  • 可读流:http.requesthttp.response、TCP 中的 socketsfs 中的读文件流等
  • 可写流:http.requesthttp.responsefs 中的写文件流等

流继承自 EventEmitter,当流中有数据到来的时候(因为不一定能一次性读取所有的数据,所以有可能是一点一点地读取),就会自动触发 data 事件,而当没有数据的时候即结束任务时候,就会触发 end 事件。所以读文件用下面的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
var fs = require('fs')
var data = ''
var rstream = fs.createReadStream('file.txt')
// 默认情况下下面的 on 事件中 chunk 是 Buffer 类型的数据,可以更改其编码方式,不过网络传输下推荐使用 Buffer
rstream.setEncoding('utf8')
rstream.on('data', function (chunk) {
data += chunk
}).on('end', function (chunk) {
console.log(data)
});
```
当然,可读流还会触发一个叫 `readable` 的事件,即缓冲区中有数据可读,一般配合 `read` 方法使用:
```javascript
var fs = require('fs')
var rstream = fs.createReadStream('file.txt')
rstream.on('readable', function () {
var chunk
while ((chunk = rstream.read()) != null) {
console.log(chunk.length)
}
})
```
从前面的描述中,不难看出,stream 的数据是源源不断地过来,即会持续的触发 `data` 事件。但我们是可以通过可读流的 pause 和 resume 方法来暂停或恢复 `data` 事件的触发。
UNIX 中有管道(pipe)这个概念,Node 的 stream 也使用了这一思想,使得 IO 流的传输变得更加容易。举个例子来说,读文件并将内容写到另一个文件这一过程,使用 pipe 的思想即从一个创建的文件输入流传输到输出流中。
```javascript
fs.createReadStream('read.txt').pipe(fs.createWriteStream('write.txt'));
process.stdin.pipe(process.stdout) // 将输入的信息打印出来

甚至于,我们可以使用多个 pipe 将多个流连接起来(常用于我们需要对可读流中的内容作一定的处理以后再传输到输出流中),比如,我们希望把标准输入流中的所有字母全部轮换成大写并写入标准输出流,through 包就提供了一个这样的功能:

1
2
3
4
5
6
7
8
var through = require("through");
var tr = through(function (chunk) {
// write
this.queue(chunk.toString().toUpperCase());
}, function () {
// end
this.queue(null);
});

有 pipe 管道连接两个流,自然也有 unpipe 来解除二者间的连接。其使用与 pipe 类似,只不过如果流之间本身就没有管道连接的话,其实就不会做任何事情;而如果调用的时候不指定写入流,就会断开所有与当前可读流连接的写入流。

前面已经提到了可写流,它有一个 write 方法用于写数据(接口为 write(chunk, [encoding],[callback])),end 方法通知可写流已经完成写入操作同时触发 finish 事件,所以前面的 pipe 代码可以写成:

1
2
3
4
5
6
raadStream.on('data', function (chunk) {
writeStream.write(chunk); // return true if writes successfully
});
raadStream.on('end', function () {
writeStream.end('\nEOF\n'); // return true if writes successfully
});

下面分析如何创建自定义的流,为了便于描述,我们考虑如下需求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var util = require('util');
util.inherits(Counter, stream.Readable);
function Counter(opt) {
Readable.call(this, opt);
this._max = 1000;
this._index = 1;
}
Counter.prototype._read = function () {
var i = this._index++;
if (i> this._max)
this.push(null);
else {
var str = '' + i;
var buf = new Buffer(str,'ascii');
this.push(buf);
}
}

参考链接

  1. https://github.com/substack/stream-handbook