流是什么?
流的常见的应用场景有哪些?
流的实现机制是什么?
流是什么?
什么是流呢?我画了个污水处理的简化流程来表达我对流的理解(原谅我拙劣的想象力和画技)。
这也是我早期在使用gulp的时候,对它的工作机制的理解。污水通过进水口进入,通过污水处理器中一系列的处理,最终会在出水口流出清水。
回归到代码中来,使用gulp时,我输入一个less文件,gulp会使用less插件、自动补全插件、压缩插件等一系列的处理,最终输出我们想要的css文件。一条龙服务,有木有?
在学习流的过程中,我还了解到了一个有趣的模型——。理解了这个模型,那理解流就基本没什么压力了。
举个生活中的例子,帮助我们理解生产者/消费者模型。
生产者,比作生活中的工厂,不断生成泡面。
消费者,比作广大人民群众,需要不断的买泡面续命。
如果我直接去工厂里买泡面,虽然会很便宜,但人家零售就会亏本,所以只会批发给我,这足以刷爆我的小金库。
对于工厂,如果直接面对买家,那工厂生产一天的泡面,就要停产一个月,因为接下来一个月都需要去卖泡面。很快,工厂就会倒闭了。
既然工厂和消费者不能直接对接,那如果引入一个第三方——超市,作为工厂的代理商。那么,工厂就可以专注于生产泡面了,生产的泡面直接批发给超市。而消费者也不用去工厂批发了,虽然贵一点,但自由啊。想吃,就去超市买一包。
通过以上两个现实小栗子,宏观理解一下流。
三个角色:生产者、消费者、第三方中介
下方虚线的第三方中介,与上方的第三方中介,就是一个。之所以分开表示,是为了突出消费者与生产者通过第三方中介解耦了。
流的一些应用场景
我们从官网中可以找到,Node.js有四种基本的流类型:
- ——可读流
- ——可写流
- ——可读写的流,也叫双工流
- ——在读写过程中可以修改和变换数据的一种特殊的 Duplex 流
Node.js中很多内建/核心模块都是基于流实现的:
由此图可看出,流在Node.js中的地位。可以说理解了流,将会很大程度的帮助我们去理解和使用上图列举的内建模块。
流的实现机制
了解流内部机制的最佳方式除了看 ,还可以去看看 Node.js 的 源码:
从源码中我们可以看出,这四种基本类型的流都是流的一种抽象,提供给开发者去扩展使用的,所以源码看起来得有一定的使用基础。接下来我将另辟蹊径,不借助这四种基本类型的流,去实现 fs readable stream 的主要逻辑,管中窥豹,加深对流的理解
fs Readable Stream
Readable stream有两种模式:
- flowing:在该模式下,会尽快获取数据向外输出。因此如果没有事件监听,也没有pipe()来引导数据流向,数据可能会丢失。
- paused:默认模式。在该模式下,需要手动调用
stream.read(..)
来获取数据。
可以通过以下几种方法切换到flowing模式:
- 添加
'data'
事件监听器 - 调用
stream.resume(..)
方法 - 调用
stream.pipe()
方法将数据发送给消费者Writable
可以通过以下几种方法切换到paused模式:
- 如果没有调用
stream.pipe(..)
,则调用stream.pause(..)
即可 - 如果有调用
stream.pipe(..)
,那么需要通过stream.unpipe(..)
移除所有的pipe
需要特别注意以下几点:
- 只有提供消费者去消费数据,比如,添加
'data'
事件监听器,可读流才会去生产数据。 - 如果移除
'data'
事件监听器,将不会自动的停止流。 - 如果调用了
stream.pipe(..)
,再调用stream.pause()
,将不会停止这个流。 - 如果可读流被切换到了流的模式,但是却没有添加
'data'
事件监听器,那么数据将会丢失掉。比如调用了stream.resume()
,却没有添加'data'
事件监听器,或者'data'
事件监听器被移除了。 - 选择一种方式去消耗可读流生产的数据。比较推荐
stream.pipe(..)
。也可使用可控性比较强的事件机制,再配合readable.pause()/readable.resume()
APIs - 如果
readable
和data
被同时使用了,那么readable
事件的优先级会比data
事件高。此时,必须在readable
事件内显示调用stream.read(..)
才能读到数据。
更多API使用可参考官网
flowing模式
下面尝试实现文件可读流的流动模式,一观它的内部机制。
// 源码参见 https://github.com/nodejs/node/blob/master/lib/fs.jsconst fs = require('fs');const EventEmitter = require('events');const util = require('util');// 使用Node.js内部工具模块,让文件可读流继承事件的很多事件方法// 由此可看出,流就是基于事件机制实现的util.inherits(FsReadableStream, EventEmitter);// 声明一个文件可读流的构造函数,并初始化相关参数function FsReadableStream(path, options) { const self = this; // 防止this指针的指向混乱 // 这里为了主要说明实现流程,省略参数的边界限制。 self.path = path; // 打开文件时的参数 // 参见 https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback self.flags = options.flags || 'r'; self.mode = options.mode || 0o66; // 文件内容的读取起止位置 self.start = options.start || 0; self.end = options.end; // 每次读取内容的水位线,即一次读取,最大缓存 self.highWaterMark = options.highWaterMark || 64 * 1024; // 内容读取完毕之后,是否自动关闭文件 self.autoClose = options.autoClose === undefined ? true : options.autoClose; // 最后输出的数据将以何种编码方式解码 self.encoding = options.encoding || 'utf8'; // 文件有效的描述符 self.fd = null; // 文件读取的实时起始位置。第一次从0开始,第二次就是从 0 + 第一次读的内容长度 self.pos = self.start; // 申请水位线大小的空间作为buffer缓存 self.buffer = Buffer.alloc(self.highWaterMark); // new这个构造函数时,就打开文件,为后续做准备 self.open(); // 模式 初始为暂停模式 self.flowing = null; // 一旦有新的事件被监听,且,是'data'事件, // 就将模式切换至流动模式,并读取数据 self.on('newListener', function (eventName) { if (eventName === 'data') { self.flowing = true; self.read(); } });}// 在对文件操作之前,需要先打开文件,获取文件的有效描述符FsReadableStream.prototype.open = function () { const self = this; fs.open(self.path, self.flags, self.mode, function (err, fd) { if (err) { self.emit('error', err); if (self.autoClose) { self.destroy(); } return; } self.fd = fd; self.emit('open', fd); });};// 读取文件里的内容FsReadableStream.prototype.read = function () { const self = this; // self.open()是异步方法,此时,需判断文件是否被打开了 if (typeof self.fd !== 'number') { // 文件未打开,可以添加open事件监听器 self.once('open', self.read); return; } // 需计算每次需要读取多少数据。 const howMuchToRead = self.end ? Math.min(self.highWaterMark, self.end - self.pos + 1) : self.highWaterMark; fs.read(self.fd, self.buffer, 0, howMuchToRead, self.pos, function (err, bytesRead) { if (err) { self.emit('error', err); if (self.autoClose) { self.destroy(); } return; } if (bytesRead > 0) { // 更新读取位置 self.pos = self.pos + bytesRead; // 有可能读取的内容长度比buffer缓存长度小,就必须截取出来,防止乱码情况 const data = self.encoding ? self.buffer.slice(0, bytesRead).toString(self.encoding) : self.buffer.slice(0, bytesRead); self.emit('data', data); // 如果下一次读取的起始位置比结束位置要大,则表明已经读完了 if (self.pos > self.end) { self.emit('end'); if (self.autoClose) { self.destroy(); } } // 如果仍然处于流动模式,将会继续读取数据 if (self.flowing) { self.read(); } } else { // 文件内容已经读取完毕了 self.emit('end'); if (self.autoClose) { self.destroy(); } } });};// 将流动模式切换到暂停模式FsReadableStream.prototype.pause = function () { if (this.flowing !== false) { this.flowing = false; }};// 将暂停模式切换到流动模式FsReadableStream.prototype.resume = function () { // 如果直接调用resume,却没有添加data监听器 // 数据则会丢失 if (!this.flowing) { this.flowing = true; this.read(); }};// 关闭文件FsReadableStream.prototype.destroy = function () { const self = this; if (typeof self.fd === 'number') { fs.close(self.fd, function (err) { if (err) { self.emit('error', err); return; } self.fd = null; self.emit('close'); }); return; } this.emit('close');};复制代码
我们会发现流动模式,是会源源不断的生成数据的,直到数据源枯竭为止。当然,也可以通过stream.pause(..)/stream.resume
去精准控制。这种模式的控制权在于开发者,开发者必须熟悉这种模式的运行机制,谨慎运用,否则很容易出现,消费者被撑爆或者数据中途丢失的情况。
paused模式
由于流动模式和暂停模式是互斥的,所以采用分开实现可读流的两种模式。暂停模式下,我们需要监听另外一个事件——'readable'
,并显示调用stream.read(n)
才能读到数据。
// 源码参见 https://github.com/nodejs/node/blob/master/lib/fs.js// 文件可读流的暂停模式const fs = require('fs');const EventEmitter = require('events');const util = require('util');// 使用Node.js内部工具模块,让文件可读流继承事件的很多事件方法// 由此可看出,流就是基于事件机制实现的util.inherits(FsReadableStream, EventEmitter);// 声明一个文件可读流的构造函数,并初始化相关参数function FsReadableStream(path, options) { const self = this; // 防止this指针的指向混乱 // 这里为了主要说明实现流程,省略参数的边界限制。 self.path = path; // 打开文件时的参数 // 参见 https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback self.flags = options.flags || 'r'; self.mode = options.mode || 0o66; // 文件内容的读取起止位置 self.start = options.start || 0; self.end = options.end; // 每次读取内容的水位线,即一次读取,最大缓存 self.highWaterMark = options.highWaterMark || 64 * 1024; // 内容读取完毕之后,是否自动关闭文件 self.autoClose = options.autoClose === undefined ? true : options.autoClose; // 最后输出的数据将以何种编码方式解码 self.encoding = options.encoding || 'utf8'; // 文件有效的描述符 self.fd = null; // 文件读取的实时起始位置。第一次从0开始,第二次就是从 0 + 第一次读的内容长度 self.pos = self.start; // 作为缓存存放每次读到的数据 [Buffer, Buffer, Buffer...] self.buffers = []; // 当前缓存的长度 self.buffers.length 只能读到数组有多少个元素。 // 这里是缓存 buffers 每一项的长度之和 self.length = 0; // 因为读取文件是异步操作,所以这里需要一个标记 // 如果处于正在读状态,则将数据存放在缓存中 this.reading = false; // 是否达到 发送 'readable' 事件的条件 // 'readable' 事件表明流有了新的动态:要么是有了新的数据,要么是到了流的尾部。 // 对于前者, stream.read() 将返回可用的数据。而对于后者, stream.read() 将返回 null this.emittedReadable = false; // new这个构造函数时,就打开文件,为后续做准备 self.open(); // 一旦有新的事件被监听,且,是'readable'事件, // 开启 emittedReadable self.on('newListener', function (eventName) { if (eventName === 'readable') { self.read(); } });}FsReadableStream.prototype.read = function (n) { const self = this; let buffer = null; // 当索要的数据长度 大于 缓存区的长度 if (n > self.length) { // 此时要索要的数据,超过了缓存大小 // 就会提升水位线,来适应这种需求量 // computeNewHighWaterMark 是node源码中提高水位线的方法 self.highWaterMark = computeNewHighWaterMark(n); self.emitReadable = true; self._read(); } // 当索要的数据长度 大于 0 且 小于等于 缓存区的长度 if (n > 0 && n <= self.length) { // 先申请Buffer内存 buffer = Buffer.alloc(n); let index = 0; // 循环次数 let flag = true; // 控制while的标记 let b; while (flag && (b = self.buffers.shift())) { for (let i = 0; i < b.length; i++) { buffer[index++] = b[i]; // 赋值 if (n === index) { let arr = b.slice(index); if (arr.length) { // 不要的 再塞回缓存 self.buffers.unshift(arr); } self.length = self.length - n; flag = false; } } } } // 如果当期缓存区没有数据 if (self.length === 0) { self.emittedReadable = true; } // 当缓存区的数据长度 小于 水位线了 就去生成数据,继续放在缓存区 if (self.length < self.highWaterMark) { if (!self.reading) { self.reading = true; self._read(); } } // 返回读到的数据 return buffer;};FsReadableStream.prototype._read = function () { const self = this; if (typeof self.fd !== 'number') { return self.once('open', self._read); } const buffer = Buffer.alloc(self.highWaterMark); fs.read(self.fd, buffer, 0, self.highWaterMark, self.pos, function (err, bytesRead) { if (bytesRead > 0) { // 默认将读取的内容放到缓存区中 self.buffers.push(buffer.slice(0, bytesRead)); self.pos = self.pos + bytesRead; // 维护读取的索引 self.length = self.length + bytesRead; // 维护缓存区的大小 self.reading = false; // 读取完成 // 是否需要触发readable事件 if (self.emittedReadable) { self.emittedReadable = false; // 下次默认不触发 self.emit('readable'); } } else { self.emit('end'); if (self.autoClose) { self.destroy(); } } });};// 在对文件操作之前,需要先打开文件,获取文件的有效描述符FsReadableStream.prototype.open = function () { const self = this; fs.open(self.path, self.flags, self.mode, function (err, fd) { if (err) { self.emit('error', err); if (self.autoClose) { self.destroy(); } return; } self.fd = fd; self.emit('open', fd); });};// 关闭文件FsReadableStream.prototype.destroy = function () { const self = this; if (typeof self.fd === 'number') { fs.close(self.fd, function (err) { if (err) { self.emit('error', err); return; } self.fd = null; self.emit('close'); }); return; } this.emit('close');};复制代码
这种模式,我们会发现,'readable'
事件是告诉我们什么时候可以去索取数据了。如果直接去调stream.read(n)
的话,会因为fs.read(..)
异步操作,还没将数据读出并放至缓存区,导致结果将返回null。
只要缓存区内容被消耗至水位线以下,就会自动续杯,生成水位线大小的数据放到缓存中。
那什么时候会触发'readable'
事件呢?缓存区为空,然后生产了水位线大小的数据放在缓存区之后,便会触发。下一次触发的时机,仍然是缓存区被消耗干净了,再次续满杯之后。
stream.read(n)
想要多少数据,就传入多长。可以通过stream.length
查看当前缓存区数据的长度,再决定索取多少数据。实际场景运用中则需要使用stream._readableState.length
fs Writable Stream
fs writable stream
它的机制和可读流有些相似。话不多说,先上代码:
// 源码参见 https://github.com/nodejs/node/blob/master/lib/fs.jsconst fs = require('fs');const EventEmitter = require('events');const util = require('util');// 使用Node.js内部工具模块,让文件可写流继承事件的很多事件方法// 由此可看出,流就是基于事件机制实现的util.inherits(FsWritableStream, EventEmitter);// 声明一个文件可写流的构造函数,并初始化相关参数function FsWritableStream(path, options) { const self = this; // 防止this指针的指向混乱 // 这里为了主要说明实现流程,省略参数的边界限制。 self.path = path; // 打开文件时的参数 // 参见 https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback self.flags = options.flags || 'r'; self.mode = options.mode || 0o66; // 文件内容的写入开始位置 self.start = options.start || 0; // 每次写入内容的水位线,即最大缓存 self.highWaterMark = options.highWaterMark || 64 * 1024; // 内容写入完毕之后,是否自动关闭文件 self.autoClose = options.autoClose === undefined ? true : options.autoClose; // 告诉程序写入的数据将以何种编码方式解码 self.encoding = options.encoding || 'utf8'; // 文件有效的描述符 self.fd = null; // 文件写入的实时起始位置。第一次从0开始,第二次就是从 0 + 第一次写入的内容长度 self.pos = self.start; // 作为缓存,存放来不及写入文件的数据 [Buffer, Buffer, Buffer...] self.buffers = []; // 当前缓存的长度 self.buffers.length 只能读到数组有多少个元素。 // 这里是缓存 buffers 每一项的长度之和 self.length = 0; // 因为读取文件是异步操作,所以这里需要一个标记 // 如果处于正在读状态,则将数据存放在缓存中 this.writing = false; // 控制是否通知'drain'事件监听器,表示,缓存区从满状态,被消耗空了 self.needDrain = false; // new这个构造函数时,就打开文件,为后续做准备 self.open();}FsWriteStream.prototype.open = function () { const self = this; fs.open(self.path, self.flags, self.mode, function (err, fd) { if (err) { self.emit('error', err); if (self.autoClose) { self.destroy(); } return; } self.fd = fd; self.emit('open', fd); });};FsWriteStream.prototype.destroy = function () { const self = this; if (typeof self.fd === 'number') { fs.close(self.fd, function (err) { if (err) { self.emit('error', err); return; } self.emit('close'); }); } else { self.emit('close'); }};// 主动发起调用,这里默认三个参数都会传FsWriteStream.prototype.write = function (chunk, encoding, callback) { const self = this; const bufferChunk = Buffer.isBuffer(chunk) && chunk || Buffer.from(chunk, encoding); self.length = self.length + bufferChunk.length; // 如果当前缓存长度 小于 水位线,表示缓存区未满 const ret = self.length < self.highWaterMark; // 当缓存区满了,则打开 向drain事件 发通知的开关 self.needDrain = !ret; if (self.writing) { // 正在 写入/消费 数据,所以先存到缓存中 self.buffers.push({ chunk, encoding, callback, }); } else { // 相当于去发ajax,这里先来一个loading self.writing = true; self._write(chunk, encoding, function () { callback(); // 当写入完成时,清除缓存中的内容 self.clearBuffer(); }); } // 每次调用write时,到会返回当前缓存区是否满了 return ret;};FsWriteStream.prototype._write = function (chunk, encoding, callback) { const self = this; if (typeof self.fd !== 'number') { self.once('open', function () { self._write(chunk, encoding, callback); }); return; } fs.write(self.fd, chunk, 0, chunk.length, self.pos, function (err, writtenBytes) { if (err) { self.emit('error', err); self.writing = null; if (self.autoClose) { self.destroy(); } return false; } // 长度 减掉已经消费成功的数据长度 self.length = self.length - writtenBytes; // 更新下一次写入的开始位置 self.pos = self.pos + writtenBytes; callback(); });};FsWriteStream.prototype.clearBuffer = function () { const self = this; const buffer = self.buffers.shift(); if (buffer) { // 如果缓存区仍有数据,则继续消费数据 self._write(buffer.chunk, buffer.encoding, function () { buffer.callback(); self.clearBuffer(); }); } else { // 如果缓存区空了,则重置写入状态 self.writing = false; if (self.needDrain) { // 发送 drain 事件,告知缓存区已经消耗完了,可以进行下一波数据写入了 self.needDrain = false; self.emit('drain'); } }};复制代码
我们会发现,当数据流来的时候,可写流会直接去消费数据。当 消费/写入文件 速度过于缓慢的时候,数据流会被送入缓存区缓存起来。
当生产者传来的数据速度过快,把缓存塞满了之后,就会出现「背压」(fs.write(..)
返回的结果),这个时候是需要告诉生产者暂停生产的,当缓存区被消耗完之后,可写流会给生产者发送一个 drain
消息,这样就可以恢复生产了。
总结
以上 fs Readable Stream
和fs Writable Stream
分别是流的基本类型Readable Stream
和Writable Stream
的上层API。fs源码中,实际上是分别继承这两个基本流类型再加上一些fs的文件操作,最后扩展成一个文件流的。
所以流就是基于事件和状态机去实现'生产者/消费者'这样的一个模型。
更多关于流的更多使用,可参考