typescript 变 golang,教你怎么把 golang 的chan select 用 typescript 实

golang 的 chan select 实在太方便,其实任何提供了协程的语言都能很好且方便的支持 chan 和 select,因爲经常写 typescript 脚本,于是我把这两个组件实现到了一个 typescript,你可以直接使用我的库来得到 chan 和 select,本文后续是实现代码的分析,你也可以参照分析去任何支持协程的语言中把golang的特性发扬光大http://img2.58codes.com/2024/emoticon08.gif

实现可读写的 class RW

紧跟上篇文章 我们已经实现了 Reader 和 Writer 可以用来创建和通知读写任务了

但 Reader 只能读 Writer 只能写, chan 是可以读写的,所以我们现在来创建一个新的 class RW ,它可读写并带有一个可选的读写缓冲。

RW 要实现可读写直接包含一个 Reader 和 Writer 的实例即可,要实现缓冲直接使用一个数组或链表即可,只要能满足数组先进先出即可

class RW<T>{    // 读写缓存    private list: Array<T> | undefined    // 可取器    private r_ = new Reader()    // 写入器    private w_ = new Writer()}

Ring

缓存需要先进先出,数组删除第一个元素会很慢,链表则快很多,但是我们缓存长度是固定的数组可以一次分配足够内存,链表则需要频繁申请和释放内存,爲此我使用原始数组实现了一个先进先出的 Ring 队列来充当缓存,其定义如下

class Ring<T> {    private offset_ = 0    private size_ = 0    constructor(private readonly arrs: Array<T>) {    }    get length(): number {        return this.size_    }    get capacity(): number {        return this.arrs.length    }    // 在队列某位压入一个数据,如果队列已慢就返回 false    push(val: T): boolean {        const arrs = this.arrs        const size = this.size_        if (size == arrs.length) {            return false        }        arrs[(this.offset_ + size) % arrs.length] = val        this.size_++        return true    }    // 如果队列不爲空 就将其第一个元素 弹出,否则返回迭代器 {done:true}    pop(): IteratorResult<T> {        const size = this.size_        if (size == 0) {            return noResult        }        const val = this.arrs[this.offset_++]        if (this.offset_ == this.arrs.length) {            this.offset_ = 0        }        this.size_--        return {            value: val,        }    }}

所以我的 RW 定义如下

class RW<T>{    // 读写缓存    private list: Ring<T> | undefined    // 可取器    private r_ = new Reader()    // 写入器    private w_ = new Writer()    constructor(buf: number) {        if (buf > 0) { // 如果构造传入了缓存大小就创建一个缓存            this.list = new Ring<T>(new Array<T>(buf))        }    }}

read

我们的 class RW 包含了 Reader Writer 属性和一个缓存,我们先看下要如何读取,

首先我们应该判断缓存中是否有数据,有就应该立刻将缓存读出来之后我们应该判断是否已经关闭,已经关闭,就应该立刻返回一个代表关闭的值没有关闭就应该查看有没有写入任务(Writer 中记录),有就应该通知写入成功并把写入的值作爲这裏读取到的值返回给读取者如果也没有写入任何就向 Reader 注册一个读取任务,用于等待有值可读时的通知

爲此定义了两个函数 tryRead 和 read, tryRead 完成了步骤 1,2,3 如果成功了就不必执行 read 了,read 则单独完成了步骤 4,下面是实现代码

class RW<T>{    tryRead(): IteratorResult<any> | undefined {        const list = this.list        if (list) { // 缓存存在            const result = list.pop()            if (!result.done) { // 缓存中有值,将缓存的值读出来传给调用者                return result            }            // 缓存爲空 执行后续读取代码        }        // 已经关闭 返回一个 undefined 作爲 关闭标记        if (this.isClosed) {            return        }        // 从 writer 获取值        const w = this.w_        if (w.isEmpty) { // writer 爲空没有写入任务            return noResult //返回读取失败        }        return {            value: w.invoke(), // writer 有写入任务,则让 writer 完成一个任务并把写入的值传递给 读取者        }    }    read(callback: ReadCallback): ReadValue {        // 直接注册一个读取任务,所以 read 之前必须要先调用 tryRead 以确定没有值可读        return this.r_.connect(callback)    }}

write

我们还是先看下要如何写入,

首先我们应该判断是否已经关闭了,因爲 chan 一旦关闭就不可写了之后我们应该判断是否有读取任务,因爲如果有读取任务则缓存一定爲空(都被读完了才会有待读取的任务),有则应该把值写入给读取任务没有读取任务则将值写入缓存最后如果缓存已满写不进去,则注册一个写入任务等待写入完成

爲此定义了两个函数 tryWrite 和 write, tryWrite 完成了步骤 1,2,3 如果成功了就不必执行 write 了,write 则单独完成了步骤 4,下面是实现代码

class RW<T>{    tryWrite(val: T): boolean | undefined {        // 如果已经关闭 就返回一个 undefined 作爲关闭标记        if (this.isClosed) {            return        }        const r = this.r_        if (r.isEmpty) { // 没有读取任务            // 将数据写入缓存            return this.list?.push(val) ?? false // 缓存满了 push 会返回失败        }        // 存在读取任务,则用值通知 读取任务完成        r.invoke({            value: val,        })        return true // 返回写入成功    }    write(callback: WriteCallback, reject: RejectCallback, val: T): WirteValue {        // 直接注册一个写入任务,所以 write 之前必须要先调用 tryWrite 确定没有值可写        return this.w_.connect(callback, reject, val)    } }

完整 RW

下面是 class RW 的完整代码

class RW<T>{    private list: Ring<T> | undefined    constructor(buf: number) {        if (buf > 0) {            this.list = new Ring<T>(new Array<T>(buf))        }    }    private r_ = new Reader()    private w_ = new Writer()    tryRead(): IteratorResult<any> | undefined {        // 读取缓存        const list = this.list        if (list) {            const result = list.pop()            if (!result.done) {                return result            }        }        // 是否关闭        if (this.isClosed) {            return        }        // 读取 writer        const w = this.w_        if (w.isEmpty) { // 没有写入者            return noResult        }        return {            value: w.invoke(),        }    }    read(callback: ReadCallback): ReadValue {        // 设置待读        return this.r_.connect(callback)    }    tryWrite(val: T): boolean | undefined {        if (this.isClosed) {            return        }        const r = this.r_        if (r.isEmpty) { // 没有读取者            // 写入缓存            return this.list?.push(val) ?? false        }        r.invoke({            value: val,        })        return true    }    write(callback: WriteCallback, reject: RejectCallback, val: T): WirteValue {        // 设置待写        return this.w_.connect(callback, reject, val)    }    close(): boolean {        if (this.isClosed) {            return false        }        this.isClosed = true        this.w_.close()        this.r_.close()        const closed = this.closed_        if (closed) {            this.closed_ = undefined            closed.resolve()        }        return true    }    wait(): undefined | Promise<void> {        if (this.isClosed) {            return        }        let closed = this.closed_        if (closed) {            return closed.promise        }        closed = new Completer<void>()        this.closed_ = closed        return closed.promise    }    private closed_: Completer<void> | undefined    isClosed = false    get length(): number {        return this.list?.length ?? 0    }    get capacity(): number {        return this.list?.capacity ?? 0    }}

实现 chan

到此我们其实就已经实现好了 chan 的功能,只是现在是 RW 用回调函数来通知 写入和读取完成,对于支持协程的语言我们只需要 使用一个 class Chan 来包装下然后以 协程 的形式来等待而非回调函数来等待通知即可,因爲很简单基本上是对 RW 的调用所以直接贴关键代码了

export class Chan<T> implements ReadChannel<T>, WriteChannel<T> {    // 存储了底层的读写实现    private rw_: RW<T>    get rw(): RW<T> {        return this.rw_    }    constructor(buf = 0) {        // 依据参数创建是否带缓存的底层读写器        this.rw_ = new RW<T>(Math.floor(buf))    }    // golang <-chan 的 chan 读取实现    read(): IteratorResult<T> | Promise<IteratorResult<T>> {        const rw = this.rw_        const val = rw.tryRead()        if (val === undefined) {            // chan 已经关闭            return noResult        } else if (!val.done) {            // 返回读取到的值            return val        }        // 使用 js 的 Promise 来等待        return new Promise((resolve) => {            rw.read(resolve) // 通知 Promise 完成        })    }    // golang 没有直接提供这个函数但可以使用 select default 来实现,这裏直接提供了    tryRead(): IteratorResult<T> | undefined {        const rw = this.rw_        const val = rw.tryRead()        if (val === undefined) {            // chan 已经关闭            return noResult        } else if (!val.done) {            // 返回读取到的值            return val        }        return undefined    }    // golang chan<- 的 chan 写入实现    write(val: T, exception?: boolean): boolean | Promise<boolean> {        const rw = this.rw_        const result = rw.tryWrite(val)        if (result === undefined) {            // chan 已经关闭            if (exception) { // 依据调用参数(使用者自己决定) 是要抛出异常还是返回 false                throw errChannelClosed            }            return false        } else if (result) {            // 写入 chan 成功            return true        }        // 使用 js 的 Promise 来等待        return new Promise((resolve, reject) => {            rw.write(resolve, exception ? reject : undefined, val) // 通知 Promise 成功或失败        })    }    //  golang 没有直接提供这个函数但可以使用 select default 来实现,这裏直接提供了    tryWrite(val: T, exception?: boolean): boolean {        const rw = this.rw_        const result = rw.tryWrite(val)        if (result === undefined) {            // chan 已经关闭            if (exception) {                throw errChannelClosed            }            return false        } else if (result) {            // 写入 chan 成功            return true        }        // 目前不可写        return false    }    // 这个返回已经缓存了多少个数据    get length(): number {        return this.rw.length    }    // 这裏返回缓冲区容量    get capacity(): number {        return this.rw.capacity    }    // 这裏实现 js 的迭代器,就可以使用 原生的 await for of 来读取 chan 了    async *[Symbol.asyncIterator](): AsyncGenerator<T> {        while (true) {            const val = await this.read()            if (val.done) {                break            }            yield val.value        }    }}

实现 select

所有工具都其实都已经完备了,下篇文章将分析如何实现 select 来等待多个 chan


关于作者: 网站小编

码农网专注IT技术教程资源分享平台,学习资源下载网站,58码农网包含计算机技术、网站程序源码下载、编程技术论坛、互联网资源下载等产品服务,提供原创、优质、完整内容的专业码农交流分享平台。

热门文章