前言 这几天攻关了一下协程通信,也顺利完成了
我觉得每次学习都是一个新的经验
狗公司今年没有年终
我肯定是要走的,没必要继续停留
Golang协程基础 (已经完成!)
Golang协程调度 (已经完成!)
Golang协程通信 (已经完成)
Golang协程控制
Golang垃圾回收机制
CSP思想 要了解协程通信,首先就要了解CSP思想
CSP全称是 Communicating Sequential Processes,意思是通信顺序过程
这个的核心就是并发过程中进行交互,需要通过通道 传递信息
在CSP的设计思想当中,通过指定的通道发送信息或者接收信息完成通信
CSP思想是Go并发的设计思想,所以Go语言里面定义了通道 这种重要机制,这也是我们必须面对和掌握的
我们也可以这么理解:Go的通道是实现Go协程通信的重要媒介
Go语言的通道是什么 直接了当的说明吧
我们会在Go代码里面看到很多类似下面这种代码
1 2 3 var work chan Tchan <- float<-chan string
但凡带有chan的,全都是通道的东西,也就是说,当你们看到chan的时候
就要明白,通道它来了。
Go预言通道的使用方法 我在这里默认大家都懂Go,都写过Go,知道Go的基本语法。所以基础的语法,我这就不讲了。。大家不懂得回去再学一阵
声明通道 声明一个名叫work的chan,通道里存储的数据类型为int
声明一个chan 存储int,不带箭头表示可读可写
声明一个chan,只能写入int,不能读
声明一个chan,只能读int,不能写
初始化通道 我们声明完通道之后,不能马上使用,如果你用了,往里面写东西,就类似
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package mainimport "fmt" func main () { var message chan string go func () { message <- "work" }() msg := <-message fmt.Println(msg) }
你就会得到一个错误返回
1 2 3 4 5 6 fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan send (nil chan )]: main.main() ..... exit status 2
这他喵就是Go的一个固定逻辑,当你不初始化的时候,你是没发往里写东西的
初始化的操作就是
1 message := make (chan string )
修改下上面的代码
1 2 3 4 5 6 7 8 9 10 ...... message := make (chan string ) go func () { message <- "work" }() msg := <-message fmt.Println(msg)
现在的返回完全正常了
通道写入数据 前面我已经举了个简单的代码例子了
其实再讲明白点,箭头代表你要干嘛
把信息写入到channel里面
这就是写入,简单吧
通道读取数据 读取,一般的用法,是把取出来的值赋值出去
类似
读取分为两种方式
阻塞式接收 这种就类似上面那种形式
这里的message如果为空,是不会退出的,会持续性阻塞在这里
直到获取到message之后,才会完全退出
非阻塞接收
这里的ok是一个bool类型,如果获取到bool为false,则通道完全关闭
关闭通道 很简单,直接close通道
如果你往一个close的通道里面持续写入
1 2 3 4 5 6 7 8 9 10 11 12 func main () { message := make (chan string ) go func () { message <- "work" close (message) }() msg, ok := <-message message <- "sda" fmt.Println(msg, ok) }
这里会报错
1 2 3 4 5 6 panic: send on closed channel goroutine 1 [running]: main.main() .... exit status 2
你不可以向一个已经关闭的通道写数据,但是!但是!但是!
你可以向一个已经关闭的通道读数据!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func main () { message := make (chan int ) go func () { message <- 1 close (message) }() msg := <-message fmt.Println(msg) time.Sleep(time.Second * 2 ) msg2 := <-message fmt.Println(msg2) }
这里返回
看到没,即使已经关闭,我们都可以读到一个0值
如果我们不去判断通道是否关闭,而是只获取值,那么函数将永远不会结束
所以这里就需要我们上面的判断了
改写一下代码
1 2 3 4 msg2, ok := <-message if !ok { fmt.Println("stop" ) }
这样写不够优雅,我们后面更新一个优雅的写法
通道只读/只写 这个前面咱们讲过,可以根据箭头指定写入还是读取
那么建立的时候,咱们也可以通过箭头给它规定死到底是读还是写
只写通道 1 var c = make (chan <- int )
这句的意思就是,这个通道只能写入不能读取整数
只读通道 1 var c = make ( <- chan int )
普通通道(可读可写)
通道缓冲(限制通道大小) 这里的通道缓冲,可以理解为控制消息数量,你将它理解为队列机制
假设我们的机器处理能力有限,需要限制接收的消息
限制为最大接收3个消息
1 work := make (chan int , 3 )
当超过这个消息的时候,则会发生阻塞
所以我们可以利用chan来控制消息数量
或者限制协程执行数量
或者实现一个简单的协程池
根据用例深入了解通道 上面我们讲了什么是通道,或者通道是怎么使用的,下面我们找一段代码来逐渐分析下
通道一般在代码里面是怎么使用的
写一段简单的生产者-消费者模型代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package mainimport "fmt" func work () { g := make (chan int ) quit := make (chan bool ) quitwork := make (chan bool ) go func () { for { select { case v := <-g: fmt.Println(v) case <-quit: fmt.Println("读取协程 end" ) quitwork <- true return } } }() go func () { for i := 0 ; i < 10 ; i++ { g <- i } fmt.Println("写入协程,读取协程退出" ) quit <- true }() <-quitwork fmt.Println("work end" ) } func main () { work() }
上面我完成了一个协程写入,一个协程监听的逻辑
所以说,如果,我们要进行chan的开发
我们需要两个协程
一个往里写
一个往外读
往里写我已经讲明白了,现在我讲一下往外读的逻辑
各位发现了没,协程监听逻辑里面有一段代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 go func () { for { select { case v := <-g: fmt.Println(v) case <-quit: fmt.Println("读取协程 end" ) quitwork <- true return } } }()
这段代码就是往外读 的核心机制
我把它改写一些,让它不要看着那么复杂
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func ReadChan (g chan int , quit chan bool , quitwork chan bool ) { for { select { case v := <-g: fmt.Println(v) case <-quit: fmt.Println("读取协程 end" ) quitwork <- true return } } }
好了,核心就是在那个for-select 部分,下面,我就来讲一下这里吧。
select 首先,当我们需要和多个通道进行通信,或者需要对通道进行判断的时候
我们一定会用到select,select使用的核心就是一个通道读写阻塞的时候,不会影响其他通道进行work
select的使用方法有些像switch
1 2 3 4 5 6 7 8 select { case <-ch1: ... case <-ch2: ... default : ... }
select的随机性 首先,select,意思是选择,如果我们有多个管道待执行,同时准备好操作
那么select将会随机选择管道执行,举个例子
1 2 3 4 5 6 7 8 c := make (chan int , 1 ) c <-1 select { case <-c: fmt.Println("work 1" ) case <-c: fmt.Println("work 2" ) }
这里的返回,有时候输出 work 1,有时候输出work 2,这就是它的随机机制。
select的阻塞 继续上面的代码,我把它改写一下
1 2 3 4 5 6 7 8 c := make (chan int , 1 ) select { case <-c: fmt.Println("work 1" ) case <-c: fmt.Println("work 2" ) }
如果select处没有任何通道能够符合要求,则会持续性阻塞下去,直到传入一个新的值为止
那么如何处理这种问题呢?
加一个default就行,这个意思就是,当都不满足要求,执行这个分支
1 2 3 4 5 6 7 8 9 10 c := make (chan int , 1 ) select { case <-c: fmt.Println("work 1" ) case <-c: fmt.Println("work 2" ) default : fmt.Println("work 3" ) }
select的控制 我们可以利用select控制管道,也可以自己指定规则进行退出,或者是下一步操作
例如,假设我们要实现一个,如果300s没有消息传入,那么我们就退出这个select
报告超时
超时机制实现
1 2 3 4 5 6 7 8 9 c := make (chan int , 1 ) select { case <-c: fmt.Println("work 1" ) case <-c: fmt.Println("work 2" ) case <-time.After(5 * time.Second): fmt.Println("timeout....." ) }
for-select循环 这也就是我上面写的那个逻辑
一般情况下,我们不希望它马上退出,而是希望它不断循环操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 for { select { case v := <-g: fmt.Println(v) case <-quit: fmt.Println("读取协程 end" ) quitwork <- true return } }
这种一般适用于生产者消费者模型,协程池等场景
还有一种定时发送 的场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 tick := time.Tick(time.Second * 2 ) for { select { case v := <-g: fmt.Println(v) case <-tick: fmt.Println("tick" ) case <-quit: fmt.Println("读取协程 end" ) quitwork <- true return } }
Go通道的原理 又到了喜闻乐见看源码的时间了,我每次看源码看完脑瓜仁都会痛。。。
那就开始呗
chan的源码,放置在/go/src/runtime/chan.go 下
chan的结构体 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 type hchan struct { qcount uint dataqsiz uint buf unsafe.Pointer elemsize uint16 closed uint32 elemtype *_type sendx uint recvx uint recvq waitq sendq waitq lock mutex } type waitq struct { first *sudog last *sudog }
*sudog这个函数指的是某个g,这个g用于发送和接收
这部分代码在/go/src/runtime/runtime2.go里
我将chan结构体都做了解释翻译,看注释就行了
其实,将chan理解为一个环形队列
数组,序号recvx和revcq组成了一个环形队列
recvx代表元素在通道里面的位置(读取位置)
sendx代表写入通道时元素所在的位置(写入位置)
recvx到sendex的距离就是通道里面的消息个数总数(读取位置+写入位置 = 消息总数)
举个例子
一个chan一共有4个缓存
[0, 0, 0, 0]
现在写入数据
[1, 2, 3, 4]
那么sendx = 1,recvx = 3
所以count = sendx + recvx
这块非常复杂,我看了源码和书都不太能整明白,只有后面继续学习了。
初始化chan的源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func makechan (t *chantype, size int ) *hchan { ... mem, overflow := math.MulUintptr(elem.size, uintptr (size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic (plainError("makechan: size out of range" )) } var c *hchan switch { case mem == 0 : c = (*hchan)(mallocgc(hchanSize, nil , true )) c.buf = c.raceaddr() case elem.ptrdata == 0 : c = (*hchan)(mallocgc(hchanSize+mem, nil , true )) c.buf = add(unsafe.Pointer(c), hchanSize) default : c = new (hchan) c.buf = mallocgc(mem, elem, true ) } ... }
这里最核心的部分就是分配内存的地方
首先,如果我们指定分配长度为10,那么就会计算10长度需要多少mem,分配指定大内存大小,单独分配内存空间才可以进行gc回收
写入chan的源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 func chansend (c *hchan, ep unsafe.Pointer, block bool , callerpc uintptr ) bool { if c.closed != 0 { unlock(&c.lock) panic (plainError("send on closed channel" )) } if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func () { unlock(&c.lock) }, 3 ) return true } if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil ) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) }
写入协程的时候,有三种不同状态,将进行下面的分析
直接写入协程 如果我们不分配协程的缓存直接写入,会走到这个分支
首先,hchan的recvq维护了一个协程链表
recvq指的是等待的协程链表,每个协程就是一个*sudog,这个前面讲过,就是一个协程的表示方法
首先,recvq会获取协程的元素指针,默认获取链表中的第一个 协程
会直接将sudog这个玩意儿复制给对应协程,然后进行协程唤醒操作(执行协程)
缓冲区写入协程 我们在meke协程的时候,如果指定了缓冲区,那么就会走到这个分支
首先判断现有队列里面元素的总量:c.qcount
然后获取到缓冲区的总量: c.dataqsiz
如果小于总量,则认为缓冲区还有空间
这时候执行分配内存的操作(向缓冲区写入数据)
然后调整sendx,让它+1(理解链表)
阻塞协程(缓冲区无空余,阻塞操作) 这个操作就是咱们提到的阻塞
如果缓冲区满了,或者协程通道没准备好,就会造成阻塞
这里将sudog进行了赋值修正,然后调用c.sendq.enqueue(mysg) ,这里是将sudog放入到链表的末尾,然后协程就会进入休眠状态 gp.waiting
读取chan的源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 func chanrecv (c *hchan, ep unsafe.Pointer, block bool ) (selected, received bool ) { if c == nil { if !block { return } gopark(nil , nil , waitReasonChanReceiveNilChan, traceEvGoStop, 2 ) throw("unreachable" ) } if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func () { unlock(&c.lock) }, 3 ) return true , true } if c.qcount > 0 { qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil ) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true , true } gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) }
其实读取的代码和和写入的有点像,直接分析
读取正在等待的协程 还是默认操作,从协程链表rescv里面获取第一个协程
然后复制,最后唤醒阻塞的写入协程
读取缓冲区里的协程 缓冲区里面有数据,直接读取,然后写入当前的读取协程中
阻塞协程(无法读取,缓冲区为空) 缓冲区没数据,把sudog放入链表末尾,然后休眠协程,等待写入并且重新执行
Go通道的几个使用场景 因为篇幅,下面讲一下几个重要的应用场景,我这边会给出大致代码,实现的话我会附上我的GITHUB链接,如果需要的话直接去GITHUB里DOWN下来就行了
控制协程数 channel可以控制协程的数量,换句话说,我们可以通过控制channel缓存,来控制有几个协程执行
所以我们可以根据这个特性弄个协程池出来
首先,协程池已经有相关的第三方开源实现
例如ants 就非常好用
未来我会详细的读一下ants,然后出个blog给大家看
我这里只是举个简单的例子,也就是通过chan去控制同时执行的协程
假设我们现在写个代码,去并发执行一些逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func Read (i int ) { fmt.Printf("go func: %d\n" , i) time.Sleep(time.Second) } func main () { userCount := math.MaxInt64 for i := 0 ; i < userCount; i++ { go func (i int ) { go Read(i) }(i) } }
这时候,你不要去跑这段代码,因为你一定会卡死。因为你开了太多的协程。。。
然后我们拿chan控制一下这个孙子
说下我的思想
首先make chan 造一个有缓存的通道
传输通道, wg.add 一个协程
通过通道控制并发
改写一下代码,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 var wg = sync.WaitGroup{}func Read (ch chan bool , i int ) { defer wg.Done() ch <- true fmt.Printf("go func: %d, time: %d\n" , i, time.Now().Unix()) time.Sleep(time.Second) <-ch } func main () { userCount := 10 ch := make (chan bool , 2 ) for i := 0 ; i < userCount; i++ { wg.Add(1 ) go Read(ch, i) } wg.Wait() }
这就是大概的协程控制逻辑
超时操作 这个前面我记得我讲过
固定的time.After这个逻辑就行
这部分代码套用了
https://eddycjy.gitbook.io/golang/di-1-ke-za-tan/control-goroutine#chang-shi-chan-+-sync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func doWithTimeOut (timeout time.Duration) (int , error) { select { case ret := <-do(): return ret, nil case <-time.After(timeout): return 0 , errors.New("timeout" ) } } func do () <-chan int { outCh := make (chan int ) go func () { go work() }() return outCh }
结尾 其实这篇文章写挺久了。,。。
但是我还是没把它好好写完
我太懒了,不行,我要加倍努力,加油加油!!!!