golang 的 chan select 实在太方便,其实任何提供了协程的语言都能很好且方便的支持 chan 和 select,因爲经常写 typescript 脚本,于是我把这两个组件实现到了一个 typescript,你可以直接使用我的库来得到 chan 和 select,本文后续是实现代码的分析,你也可以参照分析去任何支持协程的语言中把golang的特性发扬光大
case 与 select
在上篇文章 我们已经有了一个 chan,现在来实现 case 和 select
golang 的 select 实际上接收了一个 case 的数组只是golang提供了语法糖让它来更好书写,我们作爲第三方库就只能定义一个接收 case 数组的函数了
export function selectChan(...cases: Array<CaseLike | undefined>): Promise<CaseLike> | CaseLike | undefined ;
爲了让 typescript 的提示更智能于是还添加了下面几个签名,(不叫 select 而叫 selectChan 是因爲 select 名字太特殊了我想使用一个儘量不会与其它库重名的函数名 这样大量使用时才比较方便)
export function selectChan(def: undefined, ...cases: Array<CaseLike>): CaseLike | undefined;export function selectChan(...cases: Array<CaseLike>): Promise<CaseLike> | CaseLike;export function selectChan(): Promise<any>;
CaseLike 是一个包含了 chan 读写上下文信息的 class,它可以实现对 chan 的读写以及注册读写任务还有就是撤销读写,
selectChan 实现就很简单了
如果 cases 长度爲 0,则返回一个永远等待的Promise, 实现 golangselect{}
功能如果 cases 数组大于 1,则把它打乱,后续步骤解释将 case 标记重置代表没有完成,后续case细节解释标记作用,golang 没有这个标记因爲它语法上直接避免了一些问题作爲库只能通过标记才可解决按顺序查找一个可读/可写的任务,找到则执行读写并将此case 返回,(这就是步骤2要打乱 case 的原因,避免一个 select 中对于多个同时完成时始终返回传入数组中位于前方的 case)没有立刻可读写的就判断下case 数组中有没有传入 undefined,有undefined 则返回 undefined 让select 的调用者可以写 default 逻辑没有传入 undefined,则遍历 cases 数组爲每个 读写操作都注册读写任务和完成时的回调函数,当回调函数被调用则表示某个case 已经完成了,此时把所有其它注册的读写都撤销掉,然后把这个case 返回就好了这样和 switch 配合使用就可以在 js 中重现 golang 的 select 了,僞代码大概像这样
switch(await selectChan(c0,c1,c2)){ case c0: break case c1: break case c2: break}switch(await selectChan(undefined,c0,c1)){ case c0: break case c1: break case undefined: // 执行 default 逻辑 break}
下面是 selectChan 的实现代码
export function selectChan(...cases: Array<CaseLike | undefined>): Promise<CaseLike> | CaseLike | undefined { if (cases.length == 0) { // 没有传入 case 所以 select 永远阻塞 return neverPromise } else if (cases.length > 1) { shuffle(cases) // 打乱数组 } // 重置 case 状态 for (const c of cases) { c?.reset() } // 检查就绪的 case let def = false for (const c of cases) { if (c === undefined) { def = true } else { // 读写 完成 if (c.tryInvoke()) { return c } } } // 没有就绪 case 但设置了 default,返回 default case if (def) { return } else if (cases.length == 1) { // 只有一个 case const c = cases[0] as CaseLike return c.invoke().then(() => { return c }) } // 存在多个 case return new Promise((resolve, reject) => { const arrs = cases as Array<CaseLike> const conns = new Array<Connection>(arrs.length) for (let i = 0; i < arrs.length; i++) { conns[i] = arrs[i].do((c) => { for (let i = 0; i < conns.length; i++) { conns[i].disconet() } resolve(c) }, () => { for (let i = 0; i < conns.length; i++) { conns[i].disconet() } reject(errChannelClosed) }) } })}
实现 case
首先看下 CaseLike 这个接口,它定义了 Case 要提供的功能
export interface CaseLike { // 重置完成状态,将 isReady 设置爲 fasle reset(): void // 类似 Writer/Reader 的 try 函数,尝试是否可以立刻完成当前的 chan 操作 tryInvoke(): boolean // 当不可以立刻完成时,此函数去注册一个任务并使用 js 的 Promise 等待任务完成,用于 cases 数组长度爲 1 时 invoke(): Promise<void> // 当不可以立刻完成时,注册任务使用回调通知结果,返回值 Connection 可用于撤销注册的 case 任务 do(resolve: (c: CaseLike) => void, reject: (c: CaseLike) => void): Connection // 对于读取 case 这个函数返回了读取到的值 read(): IteratorResult<any> // 对于写入 case 这个返回写入是否成功了 write(): boolean // 这个属性返回 是否就绪,即select 等待完成的case 是否是这个 // 如果此值爲 false 则 CaseLike 的 read/write 函数会抛出异常,这是爲了强制把每次的 select 和 case 都关联起来避免,用户错误的把上次的 select 结果使用到后续 select 中 readonly isReady: boolean}
下面是 case 的具体实现
/** * * @sealed */export class Case<T>{ // 这个在 ts 中被定义爲内部,用于 class Chan 创建 Case 实例 static make<T>(ch: Chan<T>, r: boolean, val?: any, exception?: boolean): Case<T> { return new Case<T>(ch, r, val, exception) } // 把构造函数定义爲私有避免 用户擅自创建错误的 Case 实例 private constructor(private readonly ch: Chan<T>, // 关联的 chan private readonly r: boolean, // 这是一个 读取/写入 cahn private readonly val?: any, // 写入 chan 要写入的值 private readonly exception?: boolean, // 写入 chan 在 chan 关闭时 是要 返回 boolean 还是 throw 异常 ) { } toString(): string { if (this.r) { return JSON.stringify({ case: 'read', ready: this.isReady, val: this.read_, }, undefined, "\t") } else { return JSON.stringify({ case: 'write', ready: this.isReady, val: this.write_, }, undefined, "\t") } } // 重置 isReady 实现爲 false, 由 selectChan 函数调用 reset() { if (this.r) { this.read_ = undefined } else { this.write_ = undefined } } // 这裏直接调用 chan 提供的 tryRead/tryWrite 来判断是否可以立刻读写 tryInvoke(): boolean { if (this.r) { return this._tryRead() } else { return this._tryWrite() } } // 爲读写注册任务 do(resolve: (c: CaseLike) => void, reject: (c: CaseLike) => void): Connection { const rw = this.ch.rw if (this.r) { return rw.read((val) => { this.read_ = val resolve(this) }) } else { return rw.write((ok) => { if (ok) { this.write_ = true } else { this.write_ = false if (this.exception) { reject(this) return } } resolve(this) }, undefined, this.val) } } // 执行读写并等待完成 invoke(): Promise<void> { const rw = this.ch.rw if (this.r) { return new Promise((resolve) => { rw.read((val) => { this.read_ = val resolve() }) }) } else { return new Promise((resolve, reject) => { rw.write((ok) => { if (ok) { this.write_ = true } else { this.write_ = false if (this.exception) { reject(errChannelClosed) return } } resolve() }, undefined, this.val) }) } } // 调用 chan 的写入,同时修改好自己的 ready 状态 private _tryWrite(): boolean { const ch = this.ch const val = ch.tryWrite(this.val, this.exception) if (val) { this.write_ = true return true } else if (ch.isClosed) { this.write_ = false return true } return false } // 调用 chan 的读取,同时修改好自己的 ready 状态 private _tryRead(): boolean { const val = this.ch.tryRead() if (val == undefined) { return false } this.read_ = val return true } // 记录读取 case 读取到的值 private read_?: IteratorResult<T> // 读取 case 才能调用用于返回读取到的 值 read(): IteratorResult<T> { const val = this.read_ if (val === undefined) { throw errChanneReadCase } return val } // 记录写入 case 写入是否成功 private write_?: boolean // 写入 case 才能调用用于记录是否写入成功 write(): boolean { const val = this.write_ if (val === undefined) { throw errChanneWriteCase } return val } // 返回 case 是否 就绪 get isReady(): boolean { return this.r ? this.read_ !== undefined : this.write_ !== undefined }}
chan 创建 case
现在我们只需要爲 class Chan 提供一个 readCase 和 writeCase 函数用于创建 Case 实例即可使用 select 来等待了
class Chan{ // 爲当前 chan 创建一个 读取 Case readCase(): Case<T> { return Case.make(this, true) } // 爲当前 chan 创建一个 写入 Case 用来写入 val writeCase(val: T, exception?: boolean): Case<T> { return Case.make(this, false, val, exception) }}