golang 的 chan select 实在太方便,其实任何提供了协程的语言都能很好且方便的支持 chan 和 select,因爲经常写 typescript 脚本,于是我把这两个组件实现到了一个 typescript,你可以直接使用我的库来得到 chan 和 select,本文后续是实现代码的分析,你也可以参照分析去任何支持协程的语言中把golang的特性发扬光大
实现可读写的 class RW
紧跟上篇文章 我们已经实现了 Reader 和 Writer 可以用来创建和通知读写任务了
但 Reader 只能读 Writer 只能写, chan 是可以读写的,所以我们现在来创建一个新的 class RW ,它可读写并带有一个可选的读写缓冲。
RW 要实现可读写直接包含一个 Reader 和 Writer 的实例即可,要实现缓冲直接使用一个数组或链表即可,只要能满足数组先进先出即可
class RW<T>{ // 读写缓存 private list: Array<T> | undefined // 可取器 private r_ = new Reader() // 写入器 private w_ = new Writer()}
Ring
缓存需要先进先出,数组删除第一个元素会很慢,链表则快很多,但是我们缓存长度是固定的数组可以一次分配足够内存,链表则需要频繁申请和释放内存,爲此我使用原始数组实现了一个先进先出的 Ring 队列来充当缓存,其定义如下
class Ring<T> { private offset_ = 0 private size_ = 0 constructor(private readonly arrs: Array<T>) { } get length(): number { return this.size_ } get capacity(): number { return this.arrs.length } // 在队列某位压入一个数据,如果队列已慢就返回 false push(val: T): boolean { const arrs = this.arrs const size = this.size_ if (size == arrs.length) { return false } arrs[(this.offset_ + size) % arrs.length] = val this.size_++ return true } // 如果队列不爲空 就将其第一个元素 弹出,否则返回迭代器 {done:true} pop(): IteratorResult<T> { const size = this.size_ if (size == 0) { return noResult } const val = this.arrs[this.offset_++] if (this.offset_ == this.arrs.length) { this.offset_ = 0 } this.size_-- return { value: val, } }}
所以我的 RW 定义如下
class RW<T>{ // 读写缓存 private list: Ring<T> | undefined // 可取器 private r_ = new Reader() // 写入器 private w_ = new Writer() constructor(buf: number) { if (buf > 0) { // 如果构造传入了缓存大小就创建一个缓存 this.list = new Ring<T>(new Array<T>(buf)) } }}
read
我们的 class RW 包含了 Reader Writer 属性和一个缓存,我们先看下要如何读取,
首先我们应该判断缓存中是否有数据,有就应该立刻将缓存读出来之后我们应该判断是否已经关闭,已经关闭,就应该立刻返回一个代表关闭的值没有关闭就应该查看有没有写入任务(Writer 中记录),有就应该通知写入成功并把写入的值作爲这裏读取到的值返回给读取者如果也没有写入任何就向 Reader 注册一个读取任务,用于等待有值可读时的通知爲此定义了两个函数 tryRead 和 read, tryRead 完成了步骤 1,2,3 如果成功了就不必执行 read 了,read 则单独完成了步骤 4,下面是实现代码
class RW<T>{ tryRead(): IteratorResult<any> | undefined { const list = this.list if (list) { // 缓存存在 const result = list.pop() if (!result.done) { // 缓存中有值,将缓存的值读出来传给调用者 return result } // 缓存爲空 执行后续读取代码 } // 已经关闭 返回一个 undefined 作爲 关闭标记 if (this.isClosed) { return } // 从 writer 获取值 const w = this.w_ if (w.isEmpty) { // writer 爲空没有写入任务 return noResult //返回读取失败 } return { value: w.invoke(), // writer 有写入任务,则让 writer 完成一个任务并把写入的值传递给 读取者 } } read(callback: ReadCallback): ReadValue { // 直接注册一个读取任务,所以 read 之前必须要先调用 tryRead 以确定没有值可读 return this.r_.connect(callback) }}
write
我们还是先看下要如何写入,
首先我们应该判断是否已经关闭了,因爲 chan 一旦关闭就不可写了之后我们应该判断是否有读取任务,因爲如果有读取任务则缓存一定爲空(都被读完了才会有待读取的任务),有则应该把值写入给读取任务没有读取任务则将值写入缓存最后如果缓存已满写不进去,则注册一个写入任务等待写入完成爲此定义了两个函数 tryWrite 和 write, tryWrite 完成了步骤 1,2,3 如果成功了就不必执行 write 了,write 则单独完成了步骤 4,下面是实现代码
class RW<T>{ tryWrite(val: T): boolean | undefined { // 如果已经关闭 就返回一个 undefined 作爲关闭标记 if (this.isClosed) { return } const r = this.r_ if (r.isEmpty) { // 没有读取任务 // 将数据写入缓存 return this.list?.push(val) ?? false // 缓存满了 push 会返回失败 } // 存在读取任务,则用值通知 读取任务完成 r.invoke({ value: val, }) return true // 返回写入成功 } write(callback: WriteCallback, reject: RejectCallback, val: T): WirteValue { // 直接注册一个写入任务,所以 write 之前必须要先调用 tryWrite 确定没有值可写 return this.w_.connect(callback, reject, val) } }
完整 RW
下面是 class RW 的完整代码
class RW<T>{ private list: Ring<T> | undefined constructor(buf: number) { if (buf > 0) { this.list = new Ring<T>(new Array<T>(buf)) } } private r_ = new Reader() private w_ = new Writer() tryRead(): IteratorResult<any> | undefined { // 读取缓存 const list = this.list if (list) { const result = list.pop() if (!result.done) { return result } } // 是否关闭 if (this.isClosed) { return } // 读取 writer const w = this.w_ if (w.isEmpty) { // 没有写入者 return noResult } return { value: w.invoke(), } } read(callback: ReadCallback): ReadValue { // 设置待读 return this.r_.connect(callback) } tryWrite(val: T): boolean | undefined { if (this.isClosed) { return } const r = this.r_ if (r.isEmpty) { // 没有读取者 // 写入缓存 return this.list?.push(val) ?? false } r.invoke({ value: val, }) return true } write(callback: WriteCallback, reject: RejectCallback, val: T): WirteValue { // 设置待写 return this.w_.connect(callback, reject, val) } close(): boolean { if (this.isClosed) { return false } this.isClosed = true this.w_.close() this.r_.close() const closed = this.closed_ if (closed) { this.closed_ = undefined closed.resolve() } return true } wait(): undefined | Promise<void> { if (this.isClosed) { return } let closed = this.closed_ if (closed) { return closed.promise } closed = new Completer<void>() this.closed_ = closed return closed.promise } private closed_: Completer<void> | undefined isClosed = false get length(): number { return this.list?.length ?? 0 } get capacity(): number { return this.list?.capacity ?? 0 }}
实现 chan
到此我们其实就已经实现好了 chan 的功能,只是现在是 RW 用回调函数来通知 写入和读取完成,对于支持协程的语言我们只需要 使用一个 class Chan 来包装下然后以 协程 的形式来等待而非回调函数来等待通知即可,因爲很简单基本上是对 RW 的调用所以直接贴关键代码了
export class Chan<T> implements ReadChannel<T>, WriteChannel<T> { // 存储了底层的读写实现 private rw_: RW<T> get rw(): RW<T> { return this.rw_ } constructor(buf = 0) { // 依据参数创建是否带缓存的底层读写器 this.rw_ = new RW<T>(Math.floor(buf)) } // golang <-chan 的 chan 读取实现 read(): IteratorResult<T> | Promise<IteratorResult<T>> { const rw = this.rw_ const val = rw.tryRead() if (val === undefined) { // chan 已经关闭 return noResult } else if (!val.done) { // 返回读取到的值 return val } // 使用 js 的 Promise 来等待 return new Promise((resolve) => { rw.read(resolve) // 通知 Promise 完成 }) } // golang 没有直接提供这个函数但可以使用 select default 来实现,这裏直接提供了 tryRead(): IteratorResult<T> | undefined { const rw = this.rw_ const val = rw.tryRead() if (val === undefined) { // chan 已经关闭 return noResult } else if (!val.done) { // 返回读取到的值 return val } return undefined } // golang chan<- 的 chan 写入实现 write(val: T, exception?: boolean): boolean | Promise<boolean> { const rw = this.rw_ const result = rw.tryWrite(val) if (result === undefined) { // chan 已经关闭 if (exception) { // 依据调用参数(使用者自己决定) 是要抛出异常还是返回 false throw errChannelClosed } return false } else if (result) { // 写入 chan 成功 return true } // 使用 js 的 Promise 来等待 return new Promise((resolve, reject) => { rw.write(resolve, exception ? reject : undefined, val) // 通知 Promise 成功或失败 }) } // golang 没有直接提供这个函数但可以使用 select default 来实现,这裏直接提供了 tryWrite(val: T, exception?: boolean): boolean { const rw = this.rw_ const result = rw.tryWrite(val) if (result === undefined) { // chan 已经关闭 if (exception) { throw errChannelClosed } return false } else if (result) { // 写入 chan 成功 return true } // 目前不可写 return false } // 这个返回已经缓存了多少个数据 get length(): number { return this.rw.length } // 这裏返回缓冲区容量 get capacity(): number { return this.rw.capacity } // 这裏实现 js 的迭代器,就可以使用 原生的 await for of 来读取 chan 了 async *[Symbol.asyncIterator](): AsyncGenerator<T> { while (true) { const val = await this.read() if (val.done) { break } yield val.value } }}
实现 select
所有工具都其实都已经完备了,下篇文章将分析如何实现 select 来等待多个 chan