typescript 变 golang,教你怎么把 golang 的chan select 用 typescript 实

golang 的 chan select 实在太方便,其实任何提供了协程的语言都能很好且方便的支持 chan 和 select,因爲经常写 typescript 脚本,于是我把这两个组件实现到了一个 typescript,你可以直接使用我的库来得到 chan 和 select,本文后续是实现代码的分析,你也可以参照分析去任何支持协程的语言中把golang的特性发扬光大http://img2.58codes.com/2024/emoticon08.gif

chan 提供了什么

首先我们要分析 chan 提供了哪些功能

可写,如果当前不可写则阻塞直到写入成功可读,如果当前不可读则阻塞直到读取成功读写缓存

所以我们不妨来单独实现这些功能,最后将它们组合在一起就是完整的 chan 了

实现 Reader

首先来实现一个 Reader

class Reader {    // 这个函数用来将值传递给 reader 读取,意思是每当调用这个函数,正在阻塞读取的 chan 就能得到值    invoke(val: IteratorResult<any>);    // 这个用来 关闭 reader,关闭后才能通知读取的 协程 退出    close() ;}

有了Reader.invoke 就能通知阻塞读取的chan返回了,那 Reader 还需要提供一个接口来读取值爲此我专门定义了 class ReadValue 代表一个读取的任务,我们直接看代码说明

interface Connection {    disconet(): void}class ReadValue {    constructor(private readonly p: Reader, private readonly callback: ReadCallback) { }    // 这个函数通知读取成功,由 Reader 调用    invoke(val: IteratorResult<any>) {        // 这裏调用上层传来的回调,通知上层(select 或者 chan)读取到值了         this.callback(val)    }    // 这个函数从 reader 的读取任务中撤销注册    // 记得我们还要实现 select 吗?select 等待到一个 chan 成功后需要撤销对其它 case 的读写    disconet() {        this.p.disconet(this)    }}

ReadValue 代表了一个读取任务(有谁调用了 chan 的 读取),所以我们 Reader 实现就简单了,每当上层代码读取 chan 我们就创建一个 ReadValue 把它加入到 Reader 的一个队列中用于等待,当有值可读了(Reader.invoke 被调用) 就找一个随机的 ReadValue 去通知它读取成功了

下面是 Reader 完整代码

type ReadCallback = (val: IteratorResult<any>) => voidclass Reader {    // 记录 是否被关闭    private closed_ = false    // 保存读取的任务队列 (<-chan)    private vals = new Array<ReadValue>()    get isEmpty(): boolean {        return this.vals.length == 0    }    // 有值可读了就调用这个函数    invoke(val: IteratorResult<any>) {        const vals = this.vals        switch (vals.length) {            case 0: // 如果没有任务就抛出异常,实现正确的化永远不会执行到 case 0                throw errChannelReaderEmpty            case 1: // 只有一个等待读取的,就直接将 val 给它 并且,通知它成功                vals.pop()!.invoke(val)                return        }        // 如果多个读取任务,就随机选一个任务 通知它读取成功        const last = vals.length - 1        const i = Math.floor(Math.random() * vals.length)        if (i != last) { //swap to end // 把随机选取的任务交互到数组最后面,因爲数组删除最后一个元素比较快            [vals[i], vals[last]] = [vals[last], vals[i]]        }        vals.pop()!.invoke(val)    }    // 关闭读取    close() {        if (this.closed_) {            return        }        this.closed_ = true        const vals = this.vals        if (vals.length != 0) { // 通知所有等待中的 chan,已经关闭了快点返回,永远不会有新的值可以读到了            for (const val of vals) {                 val.invoke(noResult) // 使用了一个 {done:true} 的 js 习惯用法来通知 迭代器 结束了,是的 你后续可以使用 js 的 await for of 来迭代 读取这个 chan            }            vals.splice(0) // 将任务清空        }    }    // 这裏创建一个读取任务,读取成功就调用 回调函数    connect(callback: ReadCallback): ReadValue {        const val = new ReadValue(this, callback)        this.vals.push(val)        return val    }    // 这裏撤销注册的任务,主要用于 select 的实现    disconet(val: ReadValue) {        const vals = this.vals        for (let i = 0; i < vals.length; i++) {            if (vals[i] == val) {                vals.splice(i, 1)                break            }        }    }}

Writer

Writer 的逻辑和 Reader 类似先看下最核心的定义

class Writer {    // 调用这个就将一个写入任务完成,正在执行写入的 chan 就会返回写入成功    invoke() ;    // 调用这个关闭写入,golang 裏面会导致正在执行写入的 chan 抛出 panic    close();}

下面来定义一个 WirteValue 来表示一个写入任务

class WirteValue {    constructor(private readonly p: Writer,        private readonly callback: WriteCallback,        private readonly reject: RejectCallback,        public readonly value: any, // 这个属性记录了要写入的值    ) { }    // 调用这个通知写入成功    invoke() {        this.callback(true)    }    // Writer 关闭时就调用这个,通知上层写入任务失败    error() {        const reject = this.reject        if (reject) {            try {                reject(errChannelClosed)            } catch (_) {            }        } else {            this.callback(false)        }    }    // 调用这个就撤销写入,同样是爲了实现 select 準备的    disconet() {        this.p.disconet(this)    }}

WirteValue 代表了一个写入任务(有谁调用了 chan 的 写入),每当上层代码写入 chan 我们就创建一个 WirteValue 把它加入到 Writer 的一个队列中用于等待,当有值可写了(Writer.invoke 被调用) 就找一个随机的 WirteValue 去通知它写入成功了

下面是 Writer 完整代码

class Writer {    private closed_ = false    // 记录注册的写入任务    private vals = new Array<WirteValue>()    get isEmpty(): boolean {        return this.vals.length == 0    }    // 可写了就调用这个函数,它就会找一个写入任务来写入    invoke() {        const vals = this.vals        switch (vals.length) {            case 0: // 没有任务,如果实现正确,永远不会执行到 case 0                 throw errChannelWriterEmpty            case 1: // 只有一个任务就直接让它完成                const p = vals.pop()!                p.invoke()                return p.value // 把要写的值返回给调用者去写入        }        // 有多个任务,就随机选一个去完成        const last = vals.length - 1        const i = Math.floor(Math.random() * vals.length)        if (i != last) { //swap to end //将完成的任务交互到数组末尾以便快速删除            [vals[i], vals[last]] = [vals[last], vals[i]]        }        const p = vals.pop()!        p.invoke()        return p.value // 把要写的值返回给调用者去写入    }    // 关闭    close() {        if (this.closed_) {            return        }        this.closed_ = true        const vals = this.vals        if (vals.length != 0) {            // 通知所有未完成的任务写入失败            for (const val of vals) {                val.error()            }            vals.splice(0)        }    }    // 创建一个写入任务    connect(callback: WriteCallback, reject: RejectCallback, val: any): WirteValue {        const result = new WirteValue(this, callback, reject, val)        this.vals.push(result)        return result    }    // 撤销任务,爲实现 select 準备    disconet(val: WirteValue) {        const vals = this.vals        for (let i = 0; i < vals.length; i++) {            if (vals[i] == val) {                vals.splice(i, 1)                break            }        }    }}

实现 chan select

发现文章有点长,我将在下篇文章中分析如何先 chan 和 select 的


关于作者: 网站小编

码农网专注IT技术教程资源分享平台,学习资源下载网站,58码农网包含计算机技术、网站程序源码下载、编程技术论坛、互联网资源下载等产品服务,提供原创、优质、完整内容的专业码农交流分享平台。

热门文章