前言
这几天攻关了一下协程通信,也顺利完成了
我觉得每次学习都是一个新的经验
狗公司今年没有年终
我肯定是要走的,没必要继续停留
- Golang协程基础 (已经完成!)
- Golang协程调度 (已经完成!)
- Golang协程通信 (已经完成)
- Golang协程控制
- Golang垃圾回收机制
CSP思想
要了解协程通信,首先就要了解CSP思想
CSP全称是 Communicating Sequential Processes,意思是通信顺序过程
这个的核心就是并发过程中进行交互,需要通过通道传递信息
在CSP的设计思想当中,通过指定的通道发送信息或者接收信息完成通信
CSP思想是Go并发的设计思想,所以Go语言里面定义了通道这种重要机制,这也是我们必须面对和掌握的
我们也可以这么理解:Go的通道是实现Go协程通信的重要媒介
Go语言的通道是什么
直接了当的说明吧
我们会在Go代码里面看到很多类似下面这种代码
1 | var work chan T |
但凡带有chan的,全都是通道的东西,也就是说,当你们看到chan的时候
就要明白,通道它来了。
Go预言通道的使用方法
我在这里默认大家都懂Go,都写过Go,知道Go的基本语法。所以基础的语法,我这就不讲了。。大家不懂得回去再学一阵
声明通道
声明一个名叫work的chan,通道里存储的数据类型为int
1 | var work chan int |
声明一个chan 存储int,不带箭头表示可读可写
1 | chan int |
声明一个chan,只能写入int,不能读
1 | chan <- int |
声明一个chan,只能读int,不能写
1 | <- chan int |
初始化通道
我们声明完通道之后,不能马上使用,如果你用了,往里面写东西,就类似
1 | package main |
你就会得到一个错误返回
1 | fatal error: all goroutines are asleep - deadlock! |
这他喵就是Go的一个固定逻辑,当你不初始化的时候,你是没发往里写东西的
初始化的操作就是
1 | message := make(chan string) |
修改下上面的代码
1 | ...... |
现在的返回完全正常了
通道写入数据
前面我已经举了个简单的代码例子了
其实再讲明白点,箭头代表你要干嘛
1 | 通道变量 <- 值 |
把信息写入到channel里面
1 | messages <- "ping" |
这就是写入,简单吧
通道读取数据
读取,一般的用法,是把取出来的值赋值出去
类似
1 | msg := <-message |
读取分为两种方式
阻塞式接收
这种就类似上面那种形式
1 | msg := <-message |
这里的message如果为空,是不会退出的,会持续性阻塞在这里
直到获取到message之后,才会完全退出
非阻塞接收
1 | msg, ok := <-message |
这里的ok是一个bool类型,如果获取到bool为false,则通道完全关闭
关闭通道
很简单,直接close通道
1 | close(message) |
如果你往一个close的通道里面持续写入
1 | func main() { |
这里会报错
1 | panic: send on closed channel |
你不可以向一个已经关闭的通道写数据,但是!但是!但是!
你可以向一个已经关闭的通道读数据!
1 | func main() { |
这里返回
1 | 1 true |
看到没,即使已经关闭,我们都可以读到一个0值
如果我们不去判断通道是否关闭,而是只获取值,那么函数将永远不会结束
所以这里就需要我们上面的判断了
改写一下代码
1 | msg2, ok := <-message |
这样写不够优雅,我们后面更新一个优雅的写法
通道只读/只写
这个前面咱们讲过,可以根据箭头指定写入还是读取
那么建立的时候,咱们也可以通过箭头给它规定死到底是读还是写
只写通道
1 | var c = make(chan <- int) |
这句的意思就是,这个通道只能写入不能读取整数
只读通道
1 | var c = make( <- chan int) |
普通通道(可读可写)
1 | var c = make(chan int) |
通道缓冲(限制通道大小)
这里的通道缓冲,可以理解为控制消息数量,你将它理解为队列机制
假设我们的机器处理能力有限,需要限制接收的消息
限制为最大接收3个消息
1 | work := make(chan int, 3) |
当超过这个消息的时候,则会发生阻塞
所以我们可以利用chan来控制消息数量
或者限制协程执行数量
或者实现一个简单的协程池
根据用例深入了解通道
上面我们讲了什么是通道,或者通道是怎么使用的,下面我们找一段代码来逐渐分析下
通道一般在代码里面是怎么使用的
写一段简单的生产者-消费者模型代码
1 | package main |
上面我完成了一个协程写入,一个协程监听的逻辑
所以说,如果,我们要进行chan的开发
我们需要两个协程
一个往里写
一个往外读
往里写我已经讲明白了,现在我讲一下往外读的逻辑
各位发现了没,协程监听逻辑里面有一段代码
1 | // 启动一个线程开始监听 |
这段代码就是往外读的核心机制
我把它改写一些,让它不要看着那么复杂
1 | func ReadChan(g chan int, quit chan bool, quitwork chan bool) { |
好了,核心就是在那个for-select部分,下面,我就来讲一下这里吧。
select
首先,当我们需要和多个通道进行通信,或者需要对通道进行判断的时候
我们一定会用到select,select使用的核心就是一个通道读写阻塞的时候,不会影响其他通道进行work
select的使用方法有些像switch
1 | select { |
select的随机性
首先,select,意思是选择,如果我们有多个管道待执行,同时准备好操作
那么select将会随机选择管道执行,举个例子
1 | c := make(chan int, 1) |
这里的返回,有时候输出 work 1,有时候输出work 2,这就是它的随机机制。
select的阻塞
继续上面的代码,我把它改写一下
1 | c := make(chan int, 1) |
如果select处没有任何通道能够符合要求,则会持续性阻塞下去,直到传入一个新的值为止
那么如何处理这种问题呢?
加一个default就行,这个意思就是,当都不满足要求,执行这个分支
1 | c := make(chan int, 1) |
select的控制
我们可以利用select控制管道,也可以自己指定规则进行退出,或者是下一步操作
例如,假设我们要实现一个,如果300s没有消息传入,那么我们就退出这个select
报告超时
超时机制实现
1 | c := make(chan int, 1) |
for-select循环
这也就是我上面写的那个逻辑
一般情况下,我们不希望它马上退出,而是希望它不断循环操作
1 | for { |
这种一般适用于生产者消费者模型,协程池等场景
还有一种定时发送的场景
1 | // 每隔2s发送一个信息,写入通道数据 |
Go通道的原理
又到了喜闻乐见看源码的时间了,我每次看源码看完脑瓜仁都会痛。。。
那就开始呗
chan的源码,放置在/go/src/runtime/chan.go 下
chan的结构体
1 | type hchan struct { |
*sudog这个函数指的是某个g,这个g用于发送和接收
这部分代码在/go/src/runtime/runtime2.go里
我将chan结构体都做了解释翻译,看注释就行了
其实,将chan理解为一个环形队列
数组,序号recvx和revcq组成了一个环形队列
recvx代表元素在通道里面的位置(读取位置)
sendx代表写入通道时元素所在的位置(写入位置)
recvx到sendex的距离就是通道里面的消息个数总数(读取位置+写入位置 = 消息总数)
举个例子
一个chan一共有4个缓存
[0, 0, 0, 0]
现在写入数据
[1, 2, 3, 4]
那么sendx = 1,recvx = 3
所以count = sendx + recvx
这块非常复杂,我看了源码和书都不太能整明白,只有后面继续学习了。
初始化chan的源码
1 | func makechan(t *chantype, size int) *hchan { |
这里最核心的部分就是分配内存的地方
首先,如果我们指定分配长度为10,那么就会计算10长度需要多少mem,分配指定大内存大小,单独分配内存空间才可以进行gc回收
写入chan的源码
1 |
|
写入协程的时候,有三种不同状态,将进行下面的分析
直接写入协程
如果我们不分配协程的缓存直接写入,会走到这个分支
首先,hchan的recvq维护了一个协程链表
recvq指的是等待的协程链表,每个协程就是一个*sudog,这个前面讲过,就是一个协程的表示方法
首先,recvq会获取协程的元素指针,默认获取链表中的第一个协程
会直接将sudog这个玩意儿复制给对应协程,然后进行协程唤醒操作(执行协程)
缓冲区写入协程
我们在meke协程的时候,如果指定了缓冲区,那么就会走到这个分支
首先判断现有队列里面元素的总量:c.qcount
然后获取到缓冲区的总量: c.dataqsiz
如果小于总量,则认为缓冲区还有空间
这时候执行分配内存的操作(向缓冲区写入数据)
然后调整sendx,让它+1(理解链表)
阻塞协程(缓冲区无空余,阻塞操作)
这个操作就是咱们提到的阻塞
如果缓冲区满了,或者协程通道没准备好,就会造成阻塞
这里将sudog进行了赋值修正,然后调用c.sendq.enqueue(mysg),这里是将sudog放入到链表的末尾,然后协程就会进入休眠状态 gp.waiting
读取chan的源码
1 | func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { |
其实读取的代码和和写入的有点像,直接分析
读取正在等待的协程
还是默认操作,从协程链表rescv里面获取第一个协程
然后复制,最后唤醒阻塞的写入协程
读取缓冲区里的协程
缓冲区里面有数据,直接读取,然后写入当前的读取协程中
阻塞协程(无法读取,缓冲区为空)
缓冲区没数据,把sudog放入链表末尾,然后休眠协程,等待写入并且重新执行
Go通道的几个使用场景
因为篇幅,下面讲一下几个重要的应用场景,我这边会给出大致代码,实现的话我会附上我的GITHUB链接,如果需要的话直接去GITHUB里DOWN下来就行了
控制协程数
channel可以控制协程的数量,换句话说,我们可以通过控制channel缓存,来控制有几个协程执行
所以我们可以根据这个特性弄个协程池出来
首先,协程池已经有相关的第三方开源实现
例如ants就非常好用
未来我会详细的读一下ants,然后出个blog给大家看
我这里只是举个简单的例子,也就是通过chan去控制同时执行的协程
假设我们现在写个代码,去并发执行一些逻辑
1 |
|
这时候,你不要去跑这段代码,因为你一定会卡死。因为你开了太多的协程。。。
然后我们拿chan控制一下这个孙子
说下我的思想
- 首先make chan 造一个有缓存的通道
- 传输通道, wg.add 一个协程
- 通过通道控制并发
改写一下代码,如下
1 | var wg = sync.WaitGroup{} |
这就是大概的协程控制逻辑
超时操作
这个前面我记得我讲过
固定的time.After这个逻辑就行
这部分代码套用了
https://eddycjy.gitbook.io/golang/di-1-ke-za-tan/control-goroutine#chang-shi-chan-+-sync
1 | func doWithTimeOut(timeout time.Duration) (int, error) { |
结尾
其实这篇文章写挺久了。,。。
但是我还是没把它好好写完
我太懒了,不行,我要加倍努力,加油加油!!!!