博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
窥探Node.js里的Stream
阅读量:6511 次
发布时间:2019-06-24

本文共 15671 字,大约阅读时间需要 52 分钟。

流是什么?

流的常见的应用场景有哪些?

流的实现机制是什么?

流是什么?

什么是流呢?我画了个污水处理的简化流程来表达我对流的理解(原谅我拙劣的想象力和画技)。

这也是我早期在使用gulp的时候,对它的工作机制的理解。污水通过进水口进入,通过污水处理器中一系列的处理,最终会在出水口流出清水。

回归到代码中来,使用gulp时,我输入一个less文件,gulp会使用less插件、自动补全插件、压缩插件等一系列的处理,最终输出我们想要的css文件。一条龙服务,有木有?

在学习流的过程中,我还了解到了一个有趣的模型——。理解了这个模型,那理解流就基本没什么压力了。

举个生活中的例子,帮助我们理解生产者/消费者模型。

生产者,比作生活中的工厂,不断生成泡面。

消费者,比作广大人民群众,需要不断的买泡面续命。

如果我直接去工厂里买泡面,虽然会很便宜,但人家零售就会亏本,所以只会批发给我,这足以刷爆我的小金库。

对于工厂,如果直接面对买家,那工厂生产一天的泡面,就要停产一个月,因为接下来一个月都需要去卖泡面。很快,工厂就会倒闭了。

既然工厂和消费者不能直接对接,那如果引入一个第三方——超市,作为工厂的代理商。那么,工厂就可以专注于生产泡面了,生产的泡面直接批发给超市。而消费者也不用去工厂批发了,虽然贵一点,但自由啊。想吃,就去超市买一包。

通过以上两个现实小栗子,宏观理解一下流。

三个角色:生产者、消费者、第三方中介

下方虚线的第三方中介,与上方的第三方中介,就是一个。之所以分开表示,是为了突出消费者与生产者通过第三方中介解耦了。

流的一些应用场景

我们从官网中可以找到,Node.js有四种基本的流类型:

  • ——可读流
  • ——可写流
  • ——可读写的流,也叫双工流
  • ——在读写过程中可以修改和变换数据的一种特殊的 Duplex 流

Node.js中很多内建/核心模块都是基于流实现的:

由此图可看出,流在Node.js中的地位。可以说理解了流,将会很大程度的帮助我们去理解和使用上图列举的内建模块。

流的实现机制

了解流内部机制的最佳方式除了看 ,还可以去看看 Node.js 的 源码:

从源码中我们可以看出,这四种基本类型的流都是流的一种抽象,提供给开发者去扩展使用的,所以源码看起来得有一定的使用基础。接下来我将另辟蹊径,不借助这四种基本类型的流,去实现 fs readable stream 的主要逻辑,管中窥豹,加深对流的理解

fs Readable Stream

Readable stream有两种模式:

  • flowing:在该模式下,会尽快获取数据向外输出。因此如果没有事件监听,也没有pipe()来引导数据流向,数据可能会丢失。
  • paused:默认模式。在该模式下,需要手动调用stream.read(..)来获取数据。

可以通过以下几种方法切换到flowing模式:

  • 添加'data'事件监听器
  • 调用stream.resume(..)方法
  • 调用stream.pipe()方法将数据发送给消费者Writable

可以通过以下几种方法切换到paused模式:

  • 如果没有调用stream.pipe(..),则调用stream.pause(..)即可
  • 如果有调用stream.pipe(..),那么需要通过stream.unpipe(..)移除所有的pipe

需要特别注意以下几点:

  1. 只有提供消费者去消费数据,比如,添加'data'事件监听器,可读流才会去生产数据。
  2. 如果移除'data'事件监听器,将不会自动的停止流。
  3. 如果调用了stream.pipe(..),再调用stream.pause(),将不会停止这个流。
  4. 如果可读流被切换到了流的模式,但是却没有添加'data'事件监听器,那么数据将会丢失掉。比如调用了stream.resume(),却没有添加'data'事件监听器,或者'data'事件监听器被移除了。
  5. 选择一种方式去消耗可读流生产的数据。比较推荐stream.pipe(..)。也可使用可控性比较强的事件机制,再配合readable.pause()/readable.resume()APIs
  6. 如果readabledata被同时使用了,那么readable事件的优先级会比data事件高。此时,必须在readable事件内显示调用stream.read(..)才能读到数据。

更多API使用可参考官网

flowing模式

下面尝试实现文件可读流的流动模式,一观它的内部机制。

// 源码参见 https://github.com/nodejs/node/blob/master/lib/fs.jsconst fs = require('fs');const EventEmitter = require('events');const util = require('util');// 使用Node.js内部工具模块,让文件可读流继承事件的很多事件方法// 由此可看出,流就是基于事件机制实现的util.inherits(FsReadableStream, EventEmitter);// 声明一个文件可读流的构造函数,并初始化相关参数function FsReadableStream(path, options) {    const self = this; // 防止this指针的指向混乱    // 这里为了主要说明实现流程,省略参数的边界限制。    self.path = path;    // 打开文件时的参数    // 参见 https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback    self.flags = options.flags || 'r';    self.mode = options.mode || 0o66;    // 文件内容的读取起止位置    self.start = options.start || 0;    self.end = options.end;    // 每次读取内容的水位线,即一次读取,最大缓存    self.highWaterMark = options.highWaterMark || 64 * 1024;    // 内容读取完毕之后,是否自动关闭文件    self.autoClose = options.autoClose === undefined ? true : options.autoClose;    // 最后输出的数据将以何种编码方式解码    self.encoding = options.encoding || 'utf8';    // 文件有效的描述符    self.fd = null;    // 文件读取的实时起始位置。第一次从0开始,第二次就是从 0 + 第一次读的内容长度    self.pos = self.start;    // 申请水位线大小的空间作为buffer缓存    self.buffer = Buffer.alloc(self.highWaterMark);    // new这个构造函数时,就打开文件,为后续做准备    self.open();    // 模式  初始为暂停模式    self.flowing = null;    // 一旦有新的事件被监听,且,是'data'事件,    // 就将模式切换至流动模式,并读取数据    self.on('newListener', function (eventName) {        if (eventName === 'data') {            self.flowing = true;            self.read();        }    });}// 在对文件操作之前,需要先打开文件,获取文件的有效描述符FsReadableStream.prototype.open = function () {    const self = this;    fs.open(self.path, self.flags, self.mode, function (err, fd) {        if (err) {            self.emit('error', err);            if (self.autoClose) {                self.destroy();            }            return;        }        self.fd = fd;        self.emit('open', fd);    });};// 读取文件里的内容FsReadableStream.prototype.read = function () {    const self = this;    // self.open()是异步方法,此时,需判断文件是否被打开了    if (typeof self.fd !== 'number') {        // 文件未打开,可以添加open事件监听器        self.once('open', self.read);        return;    }    // 需计算每次需要读取多少数据。    const howMuchToRead = self.end ? Math.min(self.highWaterMark, self.end - self.pos + 1) : self.highWaterMark;    fs.read(self.fd, self.buffer, 0, howMuchToRead, self.pos, function (err, bytesRead) {        if (err) {            self.emit('error', err);            if (self.autoClose) {                self.destroy();            }            return;        }        if (bytesRead > 0) {            // 更新读取位置            self.pos = self.pos + bytesRead;            // 有可能读取的内容长度比buffer缓存长度小,就必须截取出来,防止乱码情况            const data = self.encoding ? self.buffer.slice(0, bytesRead).toString(self.encoding) : self.buffer.slice(0, bytesRead);            self.emit('data', data);            // 如果下一次读取的起始位置比结束位置要大,则表明已经读完了            if (self.pos > self.end) {                self.emit('end');                if (self.autoClose) {                    self.destroy();                }            }            // 如果仍然处于流动模式,将会继续读取数据            if (self.flowing) {                self.read();            }        } else {            // 文件内容已经读取完毕了            self.emit('end');            if (self.autoClose) {                self.destroy();            }        }    });};// 将流动模式切换到暂停模式FsReadableStream.prototype.pause = function () {    if (this.flowing !== false) {        this.flowing = false;    }};// 将暂停模式切换到流动模式FsReadableStream.prototype.resume = function () {    // 如果直接调用resume,却没有添加data监听器    // 数据则会丢失    if (!this.flowing) {        this.flowing = true;        this.read();    }};// 关闭文件FsReadableStream.prototype.destroy = function () {    const self = this;    if (typeof self.fd === 'number') {        fs.close(self.fd, function (err) {            if (err) {                self.emit('error', err);                return;            }            self.fd = null;            self.emit('close');        });        return;    }    this.emit('close');};复制代码

我们会发现流动模式,是会源源不断的生成数据的,直到数据源枯竭为止。当然,也可以通过stream.pause(..)/stream.resume去精准控制。这种模式的控制权在于开发者,开发者必须熟悉这种模式的运行机制,谨慎运用,否则很容易出现,消费者被撑爆或者数据中途丢失的情况。

paused模式

由于流动模式和暂停模式是互斥的,所以采用分开实现可读流的两种模式。暂停模式下,我们需要监听另外一个事件——'readable',并显示调用stream.read(n)才能读到数据。

// 源码参见 https://github.com/nodejs/node/blob/master/lib/fs.js// 文件可读流的暂停模式const fs = require('fs');const EventEmitter = require('events');const util = require('util');// 使用Node.js内部工具模块,让文件可读流继承事件的很多事件方法// 由此可看出,流就是基于事件机制实现的util.inherits(FsReadableStream, EventEmitter);// 声明一个文件可读流的构造函数,并初始化相关参数function FsReadableStream(path, options) {    const self = this; // 防止this指针的指向混乱    // 这里为了主要说明实现流程,省略参数的边界限制。    self.path = path;    // 打开文件时的参数    // 参见 https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback    self.flags = options.flags || 'r';    self.mode = options.mode || 0o66;    // 文件内容的读取起止位置    self.start = options.start || 0;    self.end = options.end;    // 每次读取内容的水位线,即一次读取,最大缓存    self.highWaterMark = options.highWaterMark || 64 * 1024;    // 内容读取完毕之后,是否自动关闭文件    self.autoClose = options.autoClose === undefined ? true : options.autoClose;    // 最后输出的数据将以何种编码方式解码    self.encoding = options.encoding || 'utf8';    // 文件有效的描述符    self.fd = null;    // 文件读取的实时起始位置。第一次从0开始,第二次就是从 0 + 第一次读的内容长度    self.pos = self.start;    // 作为缓存存放每次读到的数据  [Buffer, Buffer, Buffer...]    self.buffers = [];    // 当前缓存的长度   self.buffers.length 只能读到数组有多少个元素。    // 这里是缓存 buffers 每一项的长度之和    self.length = 0;    // 因为读取文件是异步操作,所以这里需要一个标记    // 如果处于正在读状态,则将数据存放在缓存中    this.reading = false;    // 是否达到 发送 'readable' 事件的条件    // 'readable' 事件表明流有了新的动态:要么是有了新的数据,要么是到了流的尾部。    // 对于前者, stream.read() 将返回可用的数据。而对于后者, stream.read() 将返回 null    this.emittedReadable = false;    // new这个构造函数时,就打开文件,为后续做准备    self.open();    // 一旦有新的事件被监听,且,是'readable'事件,    // 开启 emittedReadable    self.on('newListener', function (eventName) {        if (eventName === 'readable') {            self.read();        }    });}FsReadableStream.prototype.read = function (n) {    const self = this;    let buffer = null;    // 当索要的数据长度  大于  缓存区的长度    if (n > self.length) {        // 此时要索要的数据,超过了缓存大小        // 就会提升水位线,来适应这种需求量        // computeNewHighWaterMark 是node源码中提高水位线的方法        self.highWaterMark = computeNewHighWaterMark(n);        self.emitReadable = true;        self._read();    }    // 当索要的数据长度  大于  0  且  小于等于  缓存区的长度    if (n > 0 && n <= self.length) {        // 先申请Buffer内存        buffer = Buffer.alloc(n);        let index = 0; // 循环次数        let flag = true; // 控制while的标记        let b;        while (flag && (b = self.buffers.shift())) {            for (let i = 0; i < b.length; i++) {                buffer[index++] = b[i]; // 赋值                if (n === index) {                    let arr = b.slice(index);                    if (arr.length) {                        // 不要的 再塞回缓存                        self.buffers.unshift(arr);                    }                    self.length = self.length - n;                    flag = false;                }            }        }    }    // 如果当期缓存区没有数据    if (self.length === 0) {        self.emittedReadable = true;    }    // 当缓存区的数据长度  小于  水位线了   就去生成数据,继续放在缓存区    if (self.length < self.highWaterMark) {        if (!self.reading) {            self.reading = true;            self._read();        }    }    // 返回读到的数据    return buffer;};FsReadableStream.prototype._read = function () {    const self = this;    if (typeof self.fd !== 'number') {        return self.once('open', self._read);    }    const buffer = Buffer.alloc(self.highWaterMark);    fs.read(self.fd, buffer, 0, self.highWaterMark, self.pos, function (err, bytesRead) {        if (bytesRead > 0) {            // 默认将读取的内容放到缓存区中            self.buffers.push(buffer.slice(0, bytesRead));            self.pos = self.pos + bytesRead; // 维护读取的索引            self.length = self.length + bytesRead; // 维护缓存区的大小            self.reading = false; // 读取完成            // 是否需要触发readable事件            if (self.emittedReadable) {                self.emittedReadable = false; // 下次默认不触发                self.emit('readable');            }        } else {            self.emit('end');            if (self.autoClose) {                self.destroy();            }        }    });};// 在对文件操作之前,需要先打开文件,获取文件的有效描述符FsReadableStream.prototype.open = function () {    const self = this;    fs.open(self.path, self.flags, self.mode, function (err, fd) {        if (err) {            self.emit('error', err);            if (self.autoClose) {                self.destroy();            }            return;        }        self.fd = fd;        self.emit('open', fd);    });};// 关闭文件FsReadableStream.prototype.destroy = function () {    const self = this;    if (typeof self.fd === 'number') {        fs.close(self.fd, function (err) {            if (err) {                self.emit('error', err);                return;            }            self.fd = null;            self.emit('close');        });        return;    }    this.emit('close');};复制代码

这种模式,我们会发现,'readable'事件是告诉我们什么时候可以去索取数据了。如果直接去调stream.read(n)的话,会因为fs.read(..)异步操作,还没将数据读出并放至缓存区,导致结果将返回null。

只要缓存区内容被消耗至水位线以下,就会自动续杯,生成水位线大小的数据放到缓存中。

那什么时候会触发'readable'事件呢?缓存区为空,然后生产了水位线大小的数据放在缓存区之后,便会触发。下一次触发的时机,仍然是缓存区被消耗干净了,再次续满杯之后。

stream.read(n)想要多少数据,就传入多长。可以通过stream.length查看当前缓存区数据的长度,再决定索取多少数据。实际场景运用中则需要使用stream._readableState.length

fs Writable Stream

fs writable stream它的机制和可读流有些相似。话不多说,先上代码:

// 源码参见 https://github.com/nodejs/node/blob/master/lib/fs.jsconst fs = require('fs');const EventEmitter = require('events');const util = require('util');// 使用Node.js内部工具模块,让文件可写流继承事件的很多事件方法// 由此可看出,流就是基于事件机制实现的util.inherits(FsWritableStream, EventEmitter);// 声明一个文件可写流的构造函数,并初始化相关参数function FsWritableStream(path, options) {    const self = this; // 防止this指针的指向混乱    // 这里为了主要说明实现流程,省略参数的边界限制。    self.path = path;    // 打开文件时的参数    // 参见 https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback    self.flags = options.flags || 'r';    self.mode = options.mode || 0o66;    // 文件内容的写入开始位置    self.start = options.start || 0;    // 每次写入内容的水位线,即最大缓存    self.highWaterMark = options.highWaterMark || 64 * 1024;    // 内容写入完毕之后,是否自动关闭文件    self.autoClose = options.autoClose === undefined ? true : options.autoClose;    // 告诉程序写入的数据将以何种编码方式解码    self.encoding = options.encoding || 'utf8';    // 文件有效的描述符    self.fd = null;    // 文件写入的实时起始位置。第一次从0开始,第二次就是从 0 + 第一次写入的内容长度    self.pos = self.start;    // 作为缓存,存放来不及写入文件的数据  [Buffer, Buffer, Buffer...]    self.buffers = [];    // 当前缓存的长度   self.buffers.length 只能读到数组有多少个元素。    // 这里是缓存 buffers 每一项的长度之和    self.length = 0;    // 因为读取文件是异步操作,所以这里需要一个标记    // 如果处于正在读状态,则将数据存放在缓存中    this.writing = false;    // 控制是否通知'drain'事件监听器,表示,缓存区从满状态,被消耗空了    self.needDrain = false;    // new这个构造函数时,就打开文件,为后续做准备    self.open();}FsWriteStream.prototype.open = function () {    const self = this;    fs.open(self.path, self.flags, self.mode, function (err, fd) {        if (err) {            self.emit('error', err);            if (self.autoClose) {                self.destroy();            }            return;        }        self.fd = fd;        self.emit('open', fd);    });};FsWriteStream.prototype.destroy = function () {    const self = this;    if (typeof self.fd === 'number') {        fs.close(self.fd, function (err) {            if (err) {                self.emit('error', err);                return;            }            self.emit('close');        });    } else {        self.emit('close');    }};// 主动发起调用,这里默认三个参数都会传FsWriteStream.prototype.write = function (chunk, encoding, callback) {    const self = this;    const bufferChunk = Buffer.isBuffer(chunk) && chunk || Buffer.from(chunk, encoding);    self.length = self.length + bufferChunk.length;    // 如果当前缓存长度 小于 水位线,表示缓存区未满    const ret = self.length < self.highWaterMark;    // 当缓存区满了,则打开 向drain事件 发通知的开关    self.needDrain = !ret;    if (self.writing) {        // 正在 写入/消费 数据,所以先存到缓存中        self.buffers.push({            chunk,            encoding,            callback,        });    } else {        // 相当于去发ajax,这里先来一个loading        self.writing = true;        self._write(chunk, encoding, function () {            callback();            // 当写入完成时,清除缓存中的内容            self.clearBuffer();        });    }    // 每次调用write时,到会返回当前缓存区是否满了    return ret;};FsWriteStream.prototype._write = function (chunk, encoding, callback) {    const self = this;    if (typeof self.fd !== 'number') {        self.once('open', function () {            self._write(chunk, encoding, callback);        });        return;    }    fs.write(self.fd, chunk, 0, chunk.length, self.pos, function (err, writtenBytes) {        if (err) {            self.emit('error', err);            self.writing = null;            if (self.autoClose) {                self.destroy();            }            return false;        }        // 长度 减掉已经消费成功的数据长度        self.length = self.length - writtenBytes;        // 更新下一次写入的开始位置        self.pos = self.pos + writtenBytes;        callback();    });};FsWriteStream.prototype.clearBuffer = function () {    const self = this;    const buffer = self.buffers.shift();    if (buffer) {        // 如果缓存区仍有数据,则继续消费数据        self._write(buffer.chunk, buffer.encoding, function () {            buffer.callback();            self.clearBuffer();        });    } else {        // 如果缓存区空了,则重置写入状态        self.writing = false;        if (self.needDrain) {            // 发送 drain 事件,告知缓存区已经消耗完了,可以进行下一波数据写入了            self.needDrain = false;            self.emit('drain');        }    }};复制代码

我们会发现,当数据流来的时候,可写流会直接去消费数据。当 消费/写入文件 速度过于缓慢的时候,数据流会被送入缓存区缓存起来。

当生产者传来的数据速度过快,把缓存塞满了之后,就会出现「背压」(fs.write(..)返回的结果),这个时候是需要告诉生产者暂停生产的,当缓存区被消耗完之后,可写流会给生产者发送一个 drain 消息,这样就可以恢复生产了。

总结

以上 fs Readable Streamfs Writable Stream分别是流的基本类型Readable StreamWritable Stream的上层API。fs源码中,实际上是分别继承这两个基本流类型再加上一些fs的文件操作,最后扩展成一个文件流的。

所以流就是基于事件和状态机去实现'生产者/消费者'这样的一个模型。

更多关于流的更多使用,可参考

参考

转载于:https://juejin.im/post/5ac8b961f265da2397071de9

你可能感兴趣的文章