深入node4 可写流的实现
2021/12/8 9:17:04
本文主要是介绍深入node4 可写流的实现,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
可写流的使用
可写流的highWaterMark表示期望这个文件接受多少个值。
end不仅会写入,而且会触发close事件。
一个true,一个false,是因为我们的highWaterMark设置了3,希望只用3个内存来写,但是返回的值与我们是否写入无关,返回false也会写入。
-
但是有个问题,我们写多个wirte的时候,是并发异步操作,所以不能确定哪个快哪个慢。
-
可以将并发异步操作变为串行异步。
-
除了第一次的write,接下来的write排队,第一个完成后,将队列中的每一个write拿出来执行。有点像eventsloop,直到清空队列,但是队列缓存可能会过大,所以需要一个预期,也就是highWaterMark来控制,达到预期后,就不要调用write方法。虽然再调用也会写入进去。
-
结合fs.createStream。
当我们第一次写入的时候,当文件吃不下了,也就是到highWaterMark的值了,就应该停止写入了,等文件吃完了,触发了drain方法,这时候再去恢复rs.resume(),此时后面的写入就是往真实的文件中去写了,而不是一直在排队了。读取默认的highWaterMark是64k,而写入的默认highWaterMark是16k。 -
这跟第一种并发异步写入有很大区别。
可写流的实现
可写流是基于链表实现的,因为可写流涉及队列排序的问题,比较多的使用了头部的增删。而链表在头部和尾部的增删的时候效率比较高。
链表的实现可以观看:单链表,双链表
- 可读流内部是基于steam和stream的Readable(跟我们实现的MyReadSteream类相似,内部也是调用fs.read)以及events来实现的。fs自己实现了ReadStream继承stream的Readable,并且实现自己的_read方法。
- 可写流内部是基于stream和Writable以及events来实现的,fs实现的WrtieStream继承了stream的Writable。自己实现的_wirte方法供父类的wirte方法调用。
实现十个数,希望使用三个内存来处理
原生的。
实现自己的WriteStream
思路:跟可读流一样,将open和wirte操作分离,通过事件发布的模式。然后通过变量判断当前write是否是第一次调用,保证每次只有一个write在执行,其他的全部扔进缓存,当write执行完毕之后,再从缓存中一个一个拿出来执行write。直到缓存清空完毕,触发drain事件,通知用户缓存清完了,变量置为初始化。
这样就能使并发异步操作变成串行异步操作。
初始化变量,这里跟ReadStream有点区别,比如没有end,encoding默认写入是utf8等等。
len用来判断当前缓存的值的长度,needDrain用来判断缓存是否过多是否达到期望值。cahce是缓存队列。writing用来标识是否第一次写入。
- open方法
- 执行write方法
因为数据可能有中文,有英文,所以第一步默认转为buffer。然后判断当前write写入的buffer的长度+this.len(缓存队列的长度)与期望值highWaterMark相比较,是否超过期望值。
再重写cb函数,这样每次写入成功调用cb的时候,就会执行clearBuffer方法。
然后通过writing判断是不是第一次写入,第一次写入就调用_write方法,真正的执行fs.write,否则就放入缓存之中。 - _write方法
write方法可能用户执行的时候open还没打开,因为是异步,所以需要做处理,然后执行write方法,写入内容,每次写完之后都要维护偏移量,并且减少len。执行cb()。cb是重写过的,会调用clearBuffer方法。 - clearBuffer方法
这个方法主要将缓存中的write任务一个一个拿出并执行,注意这里调用的是_write方法,是要真正写入文件的。然后清空缓存之后,需要出发drain事件,告诉用户已经清空,当前可以继续write。并且重置一些变量。
触发四次drain事件,成功写入四个。
总结:
通过缓存以及全局变量,和事件系统,只有第一次write才会真正执行fs.write,其他的write方法会放入缓存,只有当fs.write执行完毕之后,才会继续从缓存取出进行fs.wrtie。直到缓存为空,再触发drain事件,重置变量,让用户可以继续调用write方法写入。就好比人吃饭,喂一口大的,它总要先吃一点小的,剩下部分在口腔里嚼,并且返回false告诉你,先别喂,等全部吃完。当它慢慢嚼完并吞下当前口腔有的事物之后,再告诉你,可以继续喂了,依次类推,反复循环,直到这碗饭吃饭(全部写入)
优化 使用链表代替数组cache
链表的实现:在单链表这里。
实验:
成功删除了第一个3,链表此时是6=>10。
改造完成,
期望是3,所以第一次会调用三次write,两个write存入queue中,如
首先queue中有两个,然后取出49之后,有剩一个50,再取出来之后就为空。完成。
全部代码:
// WriteStream const LinkedList = require("./linklist"); //基于链表实现队列 class Queue { constructor() { this.link = new LinkedList(); } //在最后一个中加入 offer(element) { this.link.append(element); } //移除链表第一位并返回 shift() { return this.link.removeAt(0); } } class WriteStream extends EventMitter { constructor(path, options) { super(); this.path = path; this.flags = options.flags || "w"; this.encoding = options.encoding || "utf8"; this.autoClose = options.autoClose || true; this.mode = options.mode || 0o666; this.start = options.start || 0; this.highWaterMark = options.highWaterMark || 16 * 1024; this.emitClose = options.emitClose || true; this.offset = this.start; // 每次写入文件的位移数 this.fd = undefined; this.len = 0; //判断的缓存 this.needDrain = false; //是否需要触发drain this.cache = []; //第二次开始的写入缓存 this.writing = false; //标识是否正在写入 this.queue = new Queue(); this.open(); } open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { this.fd = fd; this.emit("open", fd); }); } _write(chunk, encoding, cb) { if (typeof this.fd !== "number") { this.once("open", () => { this._write(chunk, encoding, cb); }); return; } fs.write(this.fd, chunk, 0, chunk.length, this.offset, (err, written) => { this.offset += written; //维护偏移量 this.len -= written; //把缓存的个数减少 cb(); //回调 }); } //依次将缓存队列等待任务拿出来一个个写入。 clearBuffer() { const a = JSON.stringify(this.queue); console.log(a); const data = this.queue.shift(); if (data) { this._write(data.chunk, data.encoding, data.cb); } else { //缓存读取完了,需要触发drain事件 this.writing = false; //当前写入已经完成了。 if (this.needDrain) { //如果存入过多导致期望值过大 this.needDrain = false; this.emit("drain"); } } } } //模拟Stream的Writable的实例上的write方法 EventMitter.prototype.write = function ( chunk, encoding = this.encoding, cb = () => {} ) { // 异步方法 // 注意写入的中文跟英文比较。 将数据全部转换为Buffer chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); this.len += chunk.length; //判断到达期望了吗 const returnValue = this.len < this.highWaterMark ? true : false; //当数据写入后,需要触发drain并且将Len减减 this.needDrain = !returnValue; // 清空缓存队列的逻辑 const userCb = cb; cb = () => { userCb(); this.clearBuffer(); }; //判断是否第一次给入的数据,因为后面的write都是直接放入缓存中 if (!this.writing) { //第一次写入,真正执行写入操作 this._write(chunk, encoding, cb); this.writing = true; } else { //第二次写入了,存入缓存期 this.queue.offer({ chunk, encoding, cb, }); } return returnValue; }; const ws = new WriteStream("b.txt", { flags: "w", encoding: null, autoClose: true, start: 0, //没有end属性,只有start highWaterMark: 3, //与可读流不一样,可写流的highWaterMark表示期望这个文件只接受3个内存 }); ws.on("open", () => { console.log("文件打开"); }); let i = 0; function write() { let flag = true; while (i < 4 && flag) { flag = ws.write(i++ + ""); console.log('flag', flag); } } ws.on("drain", () => { //只有当吸入的数据达到了预期,并且数据已经被写入文件之后才会触发drain事件。 console.log("写完了"); write(); }); write();
// 链表 class Node { constructor(data) { this.data = data; this.next = null; } } module.exports = class LinkedList { constructor() { //头指针 this.head = null; //链表的长度 this.length = 0; } append(data) { const element = new Node(data); if (!this.head) { this.head = element; } else { let current = this.head; //遍历找到最后的节点 while (current.next) { current = current.next; } //插入 current.next = element; } this.length++; } // //特定位置插入 insert(position, data) { if ( typeof position !== "number" || position < 0 || position > this.length ) { return false; } const element = new Node(data); //插入首位 if (position === 0) { element.next = this.head; this.head = element; } else { //插入中间的一位 let current = this.head; let index = 1; //采用插入位置前一位进行操作,比如插入到第四个,就将element.next指向第三个的下一个,再将第三个的下一个重新向element while (index++ < position) { // 如position = 4, index = 4的时候, current指向第三个,因为执行了两遍 current = current.next; } // 让element变成第四个 element.next = current.next; current.next = element; // for (let i = 1; i < position; i++) { // if (i === position - 1) { // element.next = current.next // current.next = element // break; // } // current = current.next // } } this.length++; } // //获取对应位置的元素 get(position) { if ( typeof position !== "number" || position < 0 || position >= this.length ) { return undefined; } let current = this.head; let index = 0; while (index++ < position) { current = current.next; } return current.data; } // //返回元素在列表中的索引 indexOf(data) { let current = this.head; let index = 0; while (current) { if (current.data === data) { return index; } current = current.next; index++; } return -1; } // //修改某个位置的元素 update(position, data) { if ( typeof position !== "number" || position < 0 || position >= this.length ) { return false; } let current = this.head; let index = 0; while (index++ < position) { current = current.next; } current.data = data; } // //从列表的特定位置移除一项 removeAt(position) { if ( typeof position !== "number" || position < 0 || position >= this.length ) { return undefined; } let current = this.head; if (position === 0) { const headElData = this.head.data; this.head = this.head.next; this.length--; return headElData; } let index = 0; // 找到应该删除的节点的前一个,比如删除2,此时的current 为1所指向的节点。 while (index++ < position - 1) { current = current.next; } const currentElement = current.next; let element = current.next.next; //保存第三个节点 current.next.next = null; //让第二个节点与第三个节点断掉关系 current.next = element; // 第一个节点指向第三个节点 this.length--; return currentElement.data; } // 从列表移除一项 remove(element) { let current = this.head; let nextElement; let isSuccess; //如果第一个就是 if (current.data === element) { nextElement = this.head.next; this.head = element; element.next = nextElement; return true; } // 找到应该删除的节点的前一个,比如删除2,此时的current 为1所指向的节点。 while (current) { if (current.next.data === element) { nextElement = current.next.next; //存粗下下个节点 current.next.next = null; //断开下个节点与下下个节点的联系 current.next = nextElement; //连接下下个节点 isSuccess = true; break; } else { current = current.next; } } //是否删除成功 if (isSuccess) { this.length--; return true; } return false; } isEmpty() { return !!this.length; } size() { return this.length; } toString() { let current = this.head; let str = ""; while (current) { str += `,${current.data.toString()}`; current = current.next; } return str.slice(1); } };
这篇关于深入node4 可写流的实现的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-082024年常用的情绪识别API
- 2025-01-07如何利用看板工具优化品牌内容创作与审批,确保按时发布?
- 2025-01-07百万架构师第十一课:源码分析:Spring 源码分析:Spring源码分析前篇|JavaGuide
- 2025-01-07质量检测标准严苛,这 6 款办公软件达标了吗?
- 2025-01-07提升品牌活动管理的效率:看板工具助力品牌活动日历的可视化管理
- 2025-01-07宠物商场的精准营销秘籍:揭秘看板软件的力量
- 2025-01-07“30了,资深骑手” | 程序员能有什么好出路?
- 2025-01-07宠物公园的营销秘籍:看板软件如何帮你精准触达目标客户?
- 2025-01-07从任务分解到资源优化:甘特图工具全解析
- 2025-01-07企业升级必备指南:从传统办公软件到SaaS工具的转型攻略