Channel 是 Go 的一种,它与 GoRoutine 一起为 Go 提供并发技术,在开发中被广泛使用。 Go 鼓励人们通过通道在 goroutine 之间传递对数据的引用(就像将数据的所有者从一个 goroutine 传递到另一个 goroutine),而 Effective Go 总结了这句话:
do not communicate by sharing memory; instead, share memory by communicating.Go 内存模型指出了通道作为并发控制的一个特性
a send on a channel happens before the corresponding receive from that channel completes. (golang spec)除了goroutine之间共享数据的正常安全传输外,通道还可以发挥很多技巧(模式),本文列举了一些通道的应用模式。
促成本文诞生的主要因素包括:
Eapache 的 Channels Library、Go 中的并发、Francesc Campoy 的 JustForfun 系列、关于合并 Channel 实现、我在 Scala Collections Handbook 中对 Scala Collections 的启发,让我们以这个模式为例。
我们知道,GO的标准库sync
是的mutex
,但是可以用作锁mutex
但它没有实现trylock
方法。
我们是为了trylock
定义是当前的 goroutine 正在尝试获取一个锁,如果成功,则获取该锁,返回 true,否则返回 false。 我们可以使用这种方法来避免当前的 goroutine 在获取锁时被阻塞。
本来这是一个常用的功能,在一些其他编程语言中实现,那么为什么没有在 Go 中实现呢? 正如第 6123 期中详细讨论的那样,在我看来,go 核心组的成员本身并不热衷于这个功能,并认为同样的方式可以通过渠道实现。 实际上,对于标准库sync.mutex
添加此功能很容易,方法是通过hack
道路:mutex
实现trylock
特征。
const mutexlocked = 1 “在上面的**中还有一个额外的这主要是通过利用通道边界情况下的阻塞功能来实现的。islocked
但是,方法并不常用,因为查询和锁定方法不是原子操作,并且此方法可能可用于调试和日志记录。由于标准库尚未准备就绪
mutex
让我们看看如何使用频道,而不是使用频道。type mutex struct }func newmutex() mutex , 1)} mu.ch <-struct{}func (m *mutex) lock() func (m *mutex) unlock() default: panic("unlock of unlocked mutex") }func (m *mutex) trylock() bool return false}func (m *mutex) islocked() bool
您还可以将缓存的大小从 1 更改为 n 以处理 n 个锁(资源)。
有时候,当我们拿到一把锁时,由于竞争,当这个锁被另一个 goroutine 拥有时,当前的 goroutine 没有办法立即拿到锁,只能阻塞和等待。 标准库不提供等待超时的功能,我们尝试实现它。
type mutex struct }func newmutex() mutex , 1)} mu.ch <-struct{}func (m *mutex) lock() func (m *mutex) unlock() default: panic("unlock of unlocked mutex") }func (m *mutex) trylock(timeout time.duration) bool return false}func (m *mutex) islocked() bool你也可以使用它
context
转换,不是使用超时,而是使用context
若要取消获取锁的超时,此作业留给读取器来实现。
当您等待多个信号时,如果接收到任何一个信号,则执行业务逻辑,忽略尚未接收到的其他信号。
例如,如果我们向提供相同服务的 n 个节点发送请求,只要任何一个服务节点返回一个结果,我们就可以执行以下业务逻辑,其他 n-1 个节点的请求都可以被取消或忽略。 当 n=2 时,就是这样back request
模式。 这允许以增加延迟为代价来交换资源。
应该注意的是,当接收到任何一个信号时,所有其他信号都将被忽略。如果您使用一个通道,只要您从任何通道接收单个数据,就可以关闭所有通道(取决于您的实现,但输出通道肯定会关闭)。
有三种方法可以做到这一点:goroutine、reflect 和 recursion。
func or(chans ..chan interface{})chan interface{} go func() case <-out: }c) }return out}
or
该函数可以处理 n 个通道,它为每个通道启动一个 goroutine,一旦任何 goroutine 从通道读取数据,输出通道就会关闭。
为避免同时关闭输出通道的问题,关断操作仅执行一次。
Go 的反射库有专用数据(用于 select 语句reflect.selectcase
) 和函数 (reflect.select
)处理。
因此,我们可以使用反射来“随机”接收来自一组可选通道的数据并关闭输出通道。
这样看起来更简洁。
func or(channels ..chan interface{})chan interface{} ordone := make(chan interface{})go func() reflect.select(cases) }return ordone}递归方法一直都是开窍实现,接下来的方法就是分而治之的方法,逐步合并通道,最终返回一个通道。
func or(channels ..chan interface{})chan interface{} ordone := make(chan interface{})go func() default: m := len(channels) / 2 select }return ordone}在后面的扇入(merge)模式下,我们仍然会使用相同的递归模式来合并多个输入通道,这比goroutines更有效率,并根据JustForfun的测试结果进行反映。
这是我们经常使用的一种模式,使用信号通道(done)来控制(取消)输入通道的处理。
一旦从完成通道读取信号,或者完成通道关闭,输入通道的处理就会被取消。
此模式提供了一种将完成通道和输入通道合并为输出通道的简单方法。
func ordone(done <-chan struct{},c <-chan interface{})chan interface{} go func() select }return valstream}Fanin模式是将多个相同类型的输入通道合并为一个相同类型的输出通道,即通道的合并。
每个通道都有一个 goroutine。
func fanin(chans ..chan interface{})chan interface{} go func() wg.done() c) }wg.wait() close(out) }return out}利用反射库对 select 语句的处理来合并输入通道。
下面的实现其实还是有点问题,输入通道读取更均匀的时候效果更好,否则性能会降低。
func faninreflect(chans ..chan interface{})chan interface{} go func() for len(cases) >0 out <-v.interface() return out}虽然这种方法不直观,但性能还是不错的(递归电平不会高,在输入通道不是很大的时候也不会成为瓶颈)。
func faninrec(chans ..chan interface{})chan interface{} close(c) return c case 1: return chans[0] case 2: return mergetwo(chans[0], chans[1]) default: m := len(chans) / 2 return mergetwo( faninrec(chans[:m]..faninrec(chans[m:].func mergetwo(a, b <-chan interface{})chan interface{} go func() c <-v case v, ok := <-b: if !ok c <-v } return c}扇出模式是将输入通道扇出到多个通道中。
扇出行为至少可分为两种类型:
从输入通道读取一条数据并发送到每个输入通道,这种模式称为 T 型,从输入通道读取一条数据,从输出通道中选择一个通道发送 本节只介绍第一种情况,下一节介绍第二种情况。
读取值被发送到每个输出通道,异步模式会导致大量的 goroutine。
func fanout(ch <-chan interface{},out chan interface{},async bool) for v := range ch ()else }在此模式下,一旦输出通道被阻塞,可能会导致后续处理延迟。
func fanoutreflect(ch <-chan interface{},out chan interface{})cases := make(reflect.selectcase, len(out)) for i := range cases for v := range ch for _ = range cases }分配模式将从输入通道读取的值发送到其中一个输出通道。
Roundrobin 选择输出通道的方式。
func fanout(ch <-chan interface{},out chan interface{})// roundrobin var i = 0 var n = len(out) for v := range ch }利用随机选择的发射。
func fanoutreflect(ch <-chan interface{},out chan interface{})cases := make(reflect.selectcase, len(out)) for i := range cases for v := range ch _= reflect.select(cases) }eapache 通道提供了一些将模式应用于通道的方法,例如上面的扇入和扇出模式。 因为围棋本身的通道已经无法再延伸了
eapache/channels
该库定义了自己的通道接口,并提供方便的通道转换。
eapache/channels
提供了四种方法:
distribute:从输入通道读取值并将其发送到其中一个输出通道。 当输入通道关闭时,输出通道被关闭:从输入通道读取值并发送到所有输出通道。 当输入通道关闭时,输出通道在多路复用中关闭:输入通道合并为一个输出通道,当所有输入都关闭时输出关闭 offpipe:还为上述四个功能提供了两个通道的串weakxxx
,输入关闭,输出未关闭。
让我们看一个对应函数的示例。
func testdist() channels.distribute(a, outputs[0], outputs[1], outputs[2], outputs[3]) outputs[0], outputs[1], outputs[2], outputs[3]) go func() a.close() for i := 0; i < 6; i++ var j int select fmt.printf("channel#%d: %d", j, v) }
func testtee() channels.tee(a, outputs[0], outputs[1], outputs[2], outputs[3]) outputs[0], outputs[1], outputs[2], outputs[3]) go func() a.close() for i := 0; i < 20; i++ var j int select fmt.printf("channel#%d: %d", j, v) }
func testmulti() channels.multiplex(a, inputs[0], inputs[1], inputs[2], inputs[3]) inputs[0], inputs[1], inputs[2], inputs[3]) go func() for i := range inputs }for v := range a.out()
func testpipe() a.close() for v := range b.out()从通道行为的角度来看,它看起来很像数据流,因此我们可以实现类似 scala 集合的东西。
Scala 的集合类提供了广泛的操作(方法),但其他编程语言或框架也提供了类似的方法,例如 Apache Spark、J**a Stream、Reactivex 等。
下面列举了一些方法的一些实现,相信经过一些人的深入研究,相关的方法可以变成一个好的类库,但现在我们来看一些例子。
skip 函数是在频道开始读取之前跳过频道中的一些数据。
skipnskipn 跳过前 n 个数据。
func skipn(done <-chan struct{},valuestream <-chan interface{},num int)func skipfn(done <-chan struct{},valuestream <-chan interface{},fn func(interface{})bool)func skipwhile(done <-chan struct{}, valuestream <-chan interface{},fn func(interface{})bool) takentaken 读取前 n 个数据。Map 和 Reduce 是一组常见的操作。func taken(done <-chan struct{},valuestream <-chan interface{},num int)func takefn(done <-chan struct{},valuestream <-chan interface{},fn func(interface{})bool)func takewhile(done <-chan struct{}, valuestream <-chan interface{},fn func(interface{})bool) 如果输入是一个通道,并且通道中的数据仍然是同一类型的通道,那么 flat 将返回一个输出通道, 输出通道中的数据就是输入通道中的数据。
它与扇入不同,在扇入中,输入通道在调用时是固定的,并以阵列形式提供,而 flat 的输入是可以在运行时添加到通道中的通道。
func ordone(done <-chan struct{},c <-chan interface{})chan interface{} go func() select }return valstream}func flat(done <-chan struct{},chanstream <-chan <-chan interface{})chan interface{} go func() select stream = maybestream case <-done: return } for val := range ordone(done, stream) return valstream}
地图将一个通道映射到另一个通道,通道的类型可以不同。
func mapchan(in <-chan interface{},fn func(interface{})interface{})chan interface{} if in == nil go func() return out}因为
map
是 go 的关键字,因此我们不能将函数类型命名为map
,此处使用mapchan
鉴于。
例如,您可以处理公司中的员工工资渠道,并输出扣除税后的员工工资渠道。
func reduce(in <-chan interface{},fn func(r, v interface{})interface{})interface{} out := <-in for v := range in return out}您可以:
reduce
实现sum
max
min
和其他聚合操作。
本文列出了一些深入的通道应用模式,相信通过阅读本文,可以更深入地了解go的通道类型,并在开发中灵活应用通道。 也欢迎您在评论中为频道提出更多应用模式建议。
所有这些都可以在 GitHub 上找到:Smallnest Channels。