首页 文章 golang——channel

golang——channel

来源:https://blog.csdn.net/weixin_45627369/article/details/127193703 发布时间:2023-03-24 12:58:51 作者:Soul-Yang 阅读量:354

目录

1.分类

channel的三种状态

channel的两种类型——有缓冲,无缓冲

无缓冲

有缓冲

2.操作

1.创建

2.发送

3.接收

4.关闭

3.使用场景

4.channel底层

5.channel线程安全

为什么是线程安全的

如何实现线程安全

6.channel控制goroutine并发执行顺序

7.channel共享内存的优缺点

8.channel发送和接收什么时候会死锁


1.分类

groutine的通信机制,每个channel都有类型

  1. ch:=make(chan int)
  2. //ch为make创建的底层数据结构的引用,零值为nil
  • 2种类型:无缓冲、有缓冲
  • 3种模式:写操作模式(单向通道)、读操作模式(单向通道)、读写操作模式(双向通道)

写操作模式

读操作模式

读写操作模式

创建

make(chan<-int)

make(<-chan int)

make(chan int)

channel的三种状态

未初始化

关闭

正常

关闭

panic

panic

正常关闭

发送

永远阻塞导致死锁

panic

阻塞/成功发送

接收

永远阻塞导致死锁

缓冲区为空则为零值,否则可以继续读且返回true

阻塞/成功接收

channel的两种类型——有缓冲,无缓冲

无缓冲有缓冲
创建方式make(chan TYPE)make(chan TYPE)
发送阻塞数据接收前发送阻塞缓冲区满时发送阻塞
接收阻塞数据发送前接收阻塞缓冲区空时接收阻塞

无缓冲

时刻都是同步状态

无缓冲chan必须有发送G/接收G的同时有接收G/发送G,否则就会阻塞,

报错:fatal error: all goroutines are asleep - deadlock!      

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func loop(ch chan int) {
  7. for {
  8. select {
  9. case i := <-ch:
  10. fmt.Println("this is value of unbuffer channel", i)
  11. }
  12. }
  13. }
  14. func main() {
  15. ch := make(chan int)
  16. ch <- 1
  17. go loop(ch)
  18. time.Sleep(1 * time.Millisecond)
  19. }

但如果把ch <-放到go loop(ch)下面,程序就会正常运行

有缓冲

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func loop(ch chan int) {
  7. for {
  8. select {
  9. case i := <-ch:
  10. fmt.Println("this is value of unbuffer channel", i)
  11. }
  12. }
  13. }
  14. func main() {
  15. ch := make(chan int, 3)
  16. ch <- 1
  17. ch <- 2
  18. ch <- 3
  19. ch <- 4
  20. go loop(ch)
  21. time.Sleep(1 * time.Millisecond)
  22. }

也会报错:fatal error: all goroutines are asleep - deadlock!      

是因为channel的大小为3,而要往里面塞4个数据,所以会阻塞住

解决方法:

1.把channel的长度调大(不是很好的解决方案)

2.把发送者ch<-1等代码移动到go loop(ch)下面,让channel实时消费就不会导致阻塞了

注意点:

一个chan不能多次关闭,会导致panic

如果多个goroutine都监听同一个channel,那么channel上的数据都可能随即被某一个goroutine取走进行消费

如果多个goroutine都监听同一个channel,如果这个channel被关闭,则所有goroutine都能收到退出信号

2.操作

1.创建

  1. 带缓冲
  2. ch := make(chan int,3)
  3. 不带缓冲
  4. ch := make(chan int)

创建时会做一些检查:

  • 元素大小不能超过64K
  • 元素对齐大小不能超过maxAlign(8字节)
  • 计算出来的内存是否超过限制

创建时的策略:

  • 无缓冲的channel——会直接给hchan分配内存
  • 有缓冲的channel并且元素不包含指针(buf指针,指向底层数组)——会为hchan和底层数组分配一段连续的地址
  • 有缓冲的channel并且元素包含指针——会为hchan和底层数组分别分配地址

2.发送

包括检查数据发送两个步骤

数据发送步骤

1.如果channel的读等待队列存在接收者goroutine(有发送者goroutine阻塞)

        将数据直接发送给第一个等待的goroutine,唤醒接收的goroutine

2.如果channel的读等待队列不存在接收者goroutine(无有发送者goroutine阻塞)

        如果buf指向的循环数组未满,会把数据发送到循环数组的队尾

        如果buf指向的循环数组已满,就会阻塞,将当前goroutine加入写等待队列,并挂起等待唤醒

func chansend(c *hchan,ep unsafe.Pointer,block bool,callerpc uintptr)bool

阻塞式:

调用chansend函数,且block=true

ch <- 10

非阻塞式:

调用chansend函数,且block=false

通过select让其在将阻塞时直接返回

  1. select {
  2. case ch <- 10:
  3. ...
  4. default
  5. }

3.接收

包括检查数据接收两个步骤

数据接收步骤

1.如果channel的写等待队列存在发送者goroutine(有发送者goroutine阻塞)

       如果是无缓冲channel,直接从第一个发送者goroutine那里把数据拷贝给接收变量,唤醒发送的goroutine

       如果是有缓冲channel(已满),将循环数组buf的队首元素拷贝给接收变量,将第一个发送者goroutine的数据拷贝到buf指向的循环数组队尾,唤醒发送的goroutine

2.如果channel的写等待队列不存在发送者goroutine(没有发送者goroutine阻塞)

        如果buf指向的循环数组非空,将循环数组的队首元素拷贝给接收变量

        如果buf指向的循环数组为空,这个时候就会阻塞,将当前goroutine加入读等待队列,并挂起等待唤醒

func chanrecv(c *hchan,ep unsafe.Pointer,block bool)(selected,received bool)

阻塞式:

调用chanrecv函数,且block=true

有4种方式

  1. <-ch
  2. v:= <-ch
  3. v,ok := <-ch
  4. //当channel关闭时,for循环会自动退出,无需主动监测channel是否关闭,可以防止读取已经关闭的channel,造成读到数据为通道所存储的数据类型的零值
  5. for i := range ch {
  6. fmt.Println(i)
  7. }

非阻塞式:

调用chanrecv函数,且block=false

  1. select {
  2. case <- ch:
  3. ...
  4. default
  5. }

4.关闭

调用closechan函数

func closechan(c *hchan)
close(ch)
  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. "unsafe"
  6. )
  7. //G1是发送者
  8. //当G1向ch发送数据时,首先会对buf加锁,然后将task存储的数据copy到buf中,然后sendx++,然后释放对buf的锁
  9. func sendTask(ch chan string) {
  10. taskList := []string{"this", "is", "a", "demo"}
  11. for _, task := range taskList {
  12. ch <- task
  13. }
  14. }
  15. //G2是接收者
  16. //当G2消费数据时,会首先对buf加锁,然后将buf中的数据copy到task变量对应的内存里,然后recvx++,并释放锁
  17. func recvTask(ch chan string) {
  18. for {
  19. task := <-ch //接收任务
  20. fmt.Println("received", task) //处理任务
  21. }
  22. }
  23. func main() {
  24. //chan是带缓冲,缓冲大小为4的channel
  25. //初始hchan结构体的buf为空,sendx和recvx均为0
  26. ch := make(chan string, 4)
  27. fmt.Println(ch, unsafe.Sizeof(ch))
  28. go sendTask(ch)
  29. go recvTask(ch)
  30. time.Sleep(1 * time.Second)
  31. }
  1. 0xc000058060 8
  2. received this
  3. received is
  4. received a
  5. received demo

3.使用场景

  1. 停止信号监听
  2. 定时任务
  3. 生产方和消费方解耦
  4. 控制并发数

4.channel底层

Go channel是一个队列,遵循FIFO的原则,负责协程之间的通信(Go语言不提倡通过共享内存而通信,而是通过通信来实现内存共享),CSP(Communicating Sequential Process)并发模型,就是通过goroutine和channel来实现的

通过var声明/make函数创建的channel变量是一个存储在函数栈上的指针,占用8个字节,指向堆上的hchan结构体

 buf循环数组好处:消费数据后不需要移动,只用移动sendx,recvx下标

有锁,保证线程安全

  1. type hchan struct {
  2. closed uint32 //channel是否关闭的标志
  3. elemtype *_type //channel中的元素类型
  4. //channel中分为有缓冲和无缓冲两种
  5. //对于有缓冲的channel存储数据,使用了ring buffer(环形缓冲区)来缓存写入的数据
  6. //本质是循环数组
  7. //普通数组容量固定更适合指定的空间,弹出元素时,普通数组需要全部前移
  8. buf unsafe.Pointer //指向底层循环数组的指针(环形缓冲区)
  9. qcount uint //循环数组中的元素数量
  10. dataqsiz uint //循环数组的长度
  11. elemsize uint16 //元素的大小
  12. sendx uint //下一次写下标的位置
  13. recvx uint //下一次读下标的位置
  14. //尝试读取channel或向channel写入数据时被阻塞的goroutine
  15. recvq waitq //下一次写下标的位置
  16. sendq waitq //下一次读下标的位置
  17. lock mutex //互斥锁,保证读写channel时不存在并发竞争问题
  18. }

hchan结构体的主要组成部分:

  • 用来保存goroutine之间传递数据的循环数组:buf
  • 用来记录此循环数组当前发送或接收数据的下标值:sendx和recvx
  • 用于保存向该chan发送和从该chan接收数据被阻塞的goroutine队列:sendq和recvq
  • 保证channel写入和读取数据时线程安全的锁:lock

等待队列:

双向链表,包含一个头结点和一个尾结点

每个节点是一个sudog结构体变量,记录哪个协程在等待,等待的是哪个channel,等待发送/接收的数据在哪里

  1. type waitq struct{
  2. first *sudog
  3. last *sudog
  4. }
  5. type sudog struct{
  6. g *g
  7. next *sudog
  8. prev *sudog
  9. elem unsafe.Pointer
  10. c * hchan //等待的是哪个channel
  11. ...
  12. }

5.channel线程安全

为什么是线程安全的

不同协程通过channel进行通信,本身的使用场景就是多线程,为了保证数据的一致性,必须实现线程安全

如何实现线程安全

channel的底层实现中,hchan结构体中采用Mutex锁保证读写安全

在对循环数组buf中的数据进行入队和出队操作时,必须优先获取互斥锁,才能操作channel数据

6.channel控制goroutine并发执行顺序

多个goroutine并发执行时,每个goroutine抢到处理器的时间点不一致,goroutine的执行本身不能保证顺序——代码中先写的goroutine并不能保证先执行

思路:使用channel进行通知,用channel去传递信息,从而控制并发执行顺序

从第x个协程中拿数据,通知第x+1个协程

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. var wg sync.WaitGroup
  8. func print(goroutine string, inchan chan struct{}, outchan chan struct{}) {
  9. time.Sleep(1 * time.Second) //模仿内部操作耗时
  10. select {
  11. case <-inchan://从第x个协程中拿数据
  12. fmt.Printf("%s\n", goroutine)//通知第x+1个协程
  13. outchan <- struct{}{}
  14. }
  15. wg.Done()
  16. }
  17. func main() {
  18. ch1 := make(chan struct{}, 1)
  19. ch2 := make(chan struct{}, 1)
  20. ch3 := make(chan struct{}, 1)
  21. ch1 <- struct{}{}
  22. wg.Add(3)
  23. var start = time.Now().Unix()
  24. go print("goroutine1", ch1, ch2)
  25. go print("goroutine2", ch2, ch3)
  26. go print("goroutine3", ch3, ch1)
  27. wg.Wait()
  28. end := time.Now().Unix()
  29. fmt.Printf("during:%d", end-start)
  30. }
  1. goroutine1
  2. goroutine2
  3. goroutine3
  4. during:1

串行耗时3s,而这里并行耗时1s

7.channel共享内存的优缺点

go设计思想:不要通过共享内存来通信,要通过通信来共享内存

通过发送消息进行同步常见例子:goCSP模型(Communication Sequential Process)

                               ->process1

process->channel  ->process2

                               ->process3

大部分语言采用的都是通过共享内存进行通信,然后通过互斥锁,CAS等操作保证并发安全

go引入了channel和goroutine实现CSP模型,将生产者和消费者进行了解耦,channel和消息队列很相似

优点:

        将生产者和消费者进行了解耦,降低并发当中的耦合

缺点:

        容易出现死锁

8.channel发送和接收什么时候会死锁

死锁:

1.单个协程永久阻塞

2.两个或两个以上协程的执行过程中,由于竞争资源或由于彼此通信而造成的一种阻塞的现象

channel死锁场景:

1.非缓存channel读写不能同时

2.缓存channel缓存满时写/缓存空时读

3.多个channel相互等待

1.非缓存channel读写不能同时

  1. package main
  2. import "fmt"
  3. func main() {
  4. ch := make(chan int)
  5. ans := <-ch
  6. //ch <- 1
  7. fmt.Println(ans)
  8. }
  1. package main
  2. func main() {
  3. ch := make(chan int)
  4. //ans := <-ch
  5. ch <- 1
  6. //fmt.Println(ans)
  7. }
  1. package main
  2. import "fmt"
  3. func main() {
  4. ch := make(chan int)
  5. ans := <-ch
  6. ch <- 1
  7. fmt.Println(ans)
  8. }
  1. package main
  2. import "fmt"
  3. func main() {
  4. ch := make(chan int)
  5. ch <- 1
  6. ans := <-ch
  7. fmt.Println(ans)
  8. }

2.缓存channel缓存满时写/缓存空时读

  1. package main
  2. func main() {
  3. ch := make(chan int, 1)
  4. ch <- 1
  5. ch <- 2
  6. }
  1. package main
  2. import "fmt"
  3. func main() {
  4. ch := make(chan int, 1)
  5. fmt.Println(<-ch)
  6. }

3.多个channel相互等待

主协程和子协程相互等待导致死锁

  1. package main
  2. import "fmt"
  3. func main() {
  4. ch1 := make(chan int)
  5. ch2 := make(chan int)
  6. //互相等对方造成死锁
  7. go func() {
  8. for {
  9. select {
  10. case num := <-ch1:
  11. fmt.Println("num=", num)
  12. ch2 <- 100
  13. }
  14. }
  15. }()
  16. for {
  17. select {
  18. case num := <-ch2:
  19. fmt.Println("num=", num)
  20. ch1 <- 200
  21. }
  22. }
  23. }

文章知识点与官方知识档案匹配,可进一步学习相关知识
Go技能树任督二脉channel2871 人正在系统学习中
  
留言
https://blog.key9.cn/
用户登录
您还没有写任何评论内容!
您已经评论过了!
只能赞一次哦!
您已经收藏啦!