golang 的 chan select 实在太方便,其实任何提供了协程的语言都能很好且方便的支持 chan 和 select,因爲经常写 typescript 脚本,于是我把这两个组件实现到了一个 typescript,你可以直接使用我的库来得到 chan 和 select,本文后续是实现代码的分析,你也可以参照分析去任何支持协程的语言中把golang的特性发扬光大
chan 提供了什么
首先我们要分析 chan 提供了哪些功能
可写,如果当前不可写则阻塞直到写入成功可读,如果当前不可读则阻塞直到读取成功读写缓存所以我们不妨来单独实现这些功能,最后将它们组合在一起就是完整的 chan 了
实现 Reader
首先来实现一个 Reader
class Reader { // 这个函数用来将值传递给 reader 读取,意思是每当调用这个函数,正在阻塞读取的 chan 就能得到值 invoke(val: IteratorResult<any>); // 这个用来 关闭 reader,关闭后才能通知读取的 协程 退出 close() ;}
有了Reader.invoke 就能通知阻塞读取的chan返回了,那 Reader 还需要提供一个接口来读取值爲此我专门定义了 class ReadValue 代表一个读取的任务,我们直接看代码说明
interface Connection { disconet(): void}class ReadValue { constructor(private readonly p: Reader, private readonly callback: ReadCallback) { } // 这个函数通知读取成功,由 Reader 调用 invoke(val: IteratorResult<any>) { // 这裏调用上层传来的回调,通知上层(select 或者 chan)读取到值了 this.callback(val) } // 这个函数从 reader 的读取任务中撤销注册 // 记得我们还要实现 select 吗?select 等待到一个 chan 成功后需要撤销对其它 case 的读写 disconet() { this.p.disconet(this) }}
ReadValue 代表了一个读取任务(有谁调用了 chan 的 读取),所以我们 Reader 实现就简单了,每当上层代码读取 chan 我们就创建一个 ReadValue 把它加入到 Reader 的一个队列中用于等待,当有值可读了(Reader.invoke 被调用) 就找一个随机的 ReadValue 去通知它读取成功了
下面是 Reader 完整代码
type ReadCallback = (val: IteratorResult<any>) => voidclass Reader { // 记录 是否被关闭 private closed_ = false // 保存读取的任务队列 (<-chan) private vals = new Array<ReadValue>() get isEmpty(): boolean { return this.vals.length == 0 } // 有值可读了就调用这个函数 invoke(val: IteratorResult<any>) { const vals = this.vals switch (vals.length) { case 0: // 如果没有任务就抛出异常,实现正确的化永远不会执行到 case 0 throw errChannelReaderEmpty case 1: // 只有一个等待读取的,就直接将 val 给它 并且,通知它成功 vals.pop()!.invoke(val) return } // 如果多个读取任务,就随机选一个任务 通知它读取成功 const last = vals.length - 1 const i = Math.floor(Math.random() * vals.length) if (i != last) { //swap to end // 把随机选取的任务交互到数组最后面,因爲数组删除最后一个元素比较快 [vals[i], vals[last]] = [vals[last], vals[i]] } vals.pop()!.invoke(val) } // 关闭读取 close() { if (this.closed_) { return } this.closed_ = true const vals = this.vals if (vals.length != 0) { // 通知所有等待中的 chan,已经关闭了快点返回,永远不会有新的值可以读到了 for (const val of vals) { val.invoke(noResult) // 使用了一个 {done:true} 的 js 习惯用法来通知 迭代器 结束了,是的 你后续可以使用 js 的 await for of 来迭代 读取这个 chan } vals.splice(0) // 将任务清空 } } // 这裏创建一个读取任务,读取成功就调用 回调函数 connect(callback: ReadCallback): ReadValue { const val = new ReadValue(this, callback) this.vals.push(val) return val } // 这裏撤销注册的任务,主要用于 select 的实现 disconet(val: ReadValue) { const vals = this.vals for (let i = 0; i < vals.length; i++) { if (vals[i] == val) { vals.splice(i, 1) break } } }}
Writer
Writer 的逻辑和 Reader 类似先看下最核心的定义
class Writer { // 调用这个就将一个写入任务完成,正在执行写入的 chan 就会返回写入成功 invoke() ; // 调用这个关闭写入,golang 裏面会导致正在执行写入的 chan 抛出 panic close();}
下面来定义一个 WirteValue 来表示一个写入任务
class WirteValue { constructor(private readonly p: Writer, private readonly callback: WriteCallback, private readonly reject: RejectCallback, public readonly value: any, // 这个属性记录了要写入的值 ) { } // 调用这个通知写入成功 invoke() { this.callback(true) } // Writer 关闭时就调用这个,通知上层写入任务失败 error() { const reject = this.reject if (reject) { try { reject(errChannelClosed) } catch (_) { } } else { this.callback(false) } } // 调用这个就撤销写入,同样是爲了实现 select 準备的 disconet() { this.p.disconet(this) }}
WirteValue 代表了一个写入任务(有谁调用了 chan 的 写入),每当上层代码写入 chan 我们就创建一个 WirteValue 把它加入到 Writer 的一个队列中用于等待,当有值可写了(Writer.invoke 被调用) 就找一个随机的 WirteValue 去通知它写入成功了
下面是 Writer 完整代码
class Writer { private closed_ = false // 记录注册的写入任务 private vals = new Array<WirteValue>() get isEmpty(): boolean { return this.vals.length == 0 } // 可写了就调用这个函数,它就会找一个写入任务来写入 invoke() { const vals = this.vals switch (vals.length) { case 0: // 没有任务,如果实现正确,永远不会执行到 case 0 throw errChannelWriterEmpty case 1: // 只有一个任务就直接让它完成 const p = vals.pop()! p.invoke() return p.value // 把要写的值返回给调用者去写入 } // 有多个任务,就随机选一个去完成 const last = vals.length - 1 const i = Math.floor(Math.random() * vals.length) if (i != last) { //swap to end //将完成的任务交互到数组末尾以便快速删除 [vals[i], vals[last]] = [vals[last], vals[i]] } const p = vals.pop()! p.invoke() return p.value // 把要写的值返回给调用者去写入 } // 关闭 close() { if (this.closed_) { return } this.closed_ = true const vals = this.vals if (vals.length != 0) { // 通知所有未完成的任务写入失败 for (const val of vals) { val.error() } vals.splice(0) } } // 创建一个写入任务 connect(callback: WriteCallback, reject: RejectCallback, val: any): WirteValue { const result = new WirteValue(this, callback, reject, val) this.vals.push(result) return result } // 撤销任务,爲实现 select 準备 disconet(val: WirteValue) { const vals = this.vals for (let i = 0; i < vals.length; i++) { if (vals[i] == val) { vals.splice(i, 1) break } } }}
实现 chan select
发现文章有点长,我将在下篇文章中分析如何先 chan 和 select 的