Golang Channel原理剖析
本文最后更新于 181 天前,其中的信息可能已经过时,如有错误请留言

Go 的设计思想:以通信的方式来共享内存,而不是通过共享内存的方式来实现通信,channel 就是这一思想的体现。

一、Channel 的基本用法

channel 用于通信的最基本用法,一端接受数据,一端写入数据

package main
import (
"fmt"
"sync"
)
func main() {
ch := make(chan int) // 不带缓冲区的channel
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
ch <- 1 // 向channel中写入数据
}()
go func() {
defer wg.Done()
a := <-ch // 从channel中读取数据
fmt.Println(a)
}()
wg.Wait()
close(ch) // 关闭channel
}

二、channel 使用中的注意事项

1、让 channel 产生 panic 的操作

1、往关闭的 channel 中写入数据会 panic

2、关闭未初始化的 channel 会 panic

3、重复关闭 channel 会 panic

2、不同状态下 Channel 的读取和写入

Channel 有三种状态:

1、nil:channel 尚未在堆上分配内存的状态

2、打开:channel 初始化之后,未被关闭的可用状态

3、关闭:channel 被关闭之后,不会立刻回收在堆上的内存,这也是关闭之后仍然可读的原因,直到被 gc 回收内存

操作 channel 状态结果
writenil 阻塞
write 有缓冲区,缓冲区未满写入成功
write 无缓冲区,或者缓冲区已满阻塞
write 关闭 panic
向 nil 状态下的 channel 写入数据,阻塞而不是 panic 的原因是,对于 select + default 情况下,程序可以走 default 分支,没必要 panic
操作 channel 状态结果
readnil 阻塞
read 打开,有元素读取成功
read 打开,没有元素阻塞
read 关闭读取默认值(空值或零值)
如果 channel 关闭的时候,channel 里面还有数据,也是可以正常读完的,之后才会读 0 值

3、如果从 channel 中读取的数据是 0,如何知道是对端写入的 0 还是 channel 当前已被关闭?

解决方案一:判定读取 a, ok := <-ch
package main
import (
"fmt"
"sync"
)
func main() {
ch := make(chan int) // 不带缓冲区的channel
close(ch) // 关闭channel
var wg sync.WaitGroup
wg.Add(1)
//go func() {
// defer wg.Done()
// ch <- 1 // 向channel中写入数据
//}()
go func() {
defer wg.Done()
a, ok := <-ch // 从channel中读取数据
if ok {
fmt.Println(a)
} else {
fmt.Println("channel closed")
}
}()
wg.Wait()
}
解决方案二:for v := range ch {}

采用 for range 的方式读取数据,for range 会遍历 channel 中的所有数据,一一读取,没有数据可读时,阻塞,直到 channel 关闭,channel 如果不关闭,for range 将一直阻塞,可能发生 deadlock

package main
import (
"fmt"
"sync"
"time"
)
func main() {
ch := make(chan int) // 不带缓冲区的channel
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
ch <- 1 // 向channel中写入数据
time.Sleep(1 * time.Second)
close(ch) // 关闭channel,如果不关闭channel,程序将报错:deadlock
}()
go func() {
defer wg.Done()
for v := range ch {
fmt.Println(v)
}
fmt.Println("Done")
}()
wg.Wait()
}

4、向不带缓冲区的 channel 中写入数据,必须有对端接受数据,否则将 deadlock

带有缓冲区的 channel 不会出现这个问题,因为可以把数据写入缓冲区,就不用阻塞,而没有缓冲区的 channel,写入数据时将阻塞,直到数据对对端读取

5、可以定义只读和只写的单向 channel

package main

import (
"fmt"
"sync"
"time"
)

type SChannel = chan<- int
type RChannel = <-chan int

func main() {
ch := make(chan int) // 不带缓冲区的channel
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
var send SChannel = ch
send <- 1 // 向channel中写入数据
send <- 2
time.Sleep(1 * time.Second)
close(ch) // 关闭channel
}()
go func() {
defer wg.Done()
var rcv RChannel = ch
for v := range rcv {
fmt.Println(v)
}
fmt.Println("Done")
}()
wg.Wait()
}

6、channel 可以同时有多个读端、写端 goroutine 操作,在一端关闭 channel 的时候,所有读端都将收到 channel 已关闭的消息

7、channel 是并发安全的,多个 goroutine 同时读取 channel 中的数据,不会产生并发安全问题

Channel 中数据的读取、写入,以及当 channel 阻塞后放入等待 goroutine 队列的过程都会加锁,所以 Channel 是并发安全的。

8、channel 的内存分配在堆上,channel 变量本身是存储在栈上

package main
import (
"fmt"
"time"
)
func createChannel() chan int {
ch := make(chan int, 1) // 创建一个缓冲区大小为1的通道
fmt.Printf("Channel address in createChannel: %p\n", &ch)
return ch // 返回通道
}
func main() {
ch := createChannel() // 调用createChannel函数,获取通道
fmt.Printf("Channel address in main: %p\n", &ch)
// 启动一个goroutine向通道发送数据
go func() {
time.Sleep(1 * time.Second) // 等待1秒,确保createChannel已经返回
ch <- 42
fmt.Println("Data sent to channel")
}()
// 从通道接收数据
value := <-ch
fmt.Printf("Received value: %d\n", value)
}

输出如下

Channel address in createChannel: 0xc000070058
Channel address in main: 0xc000070050
Data sent to channel
Received value: 42
Process finished with the exit code 0

main 函数中的 channel 地址与 createChannel 中的不同,说明 channel 变量本身存储在栈上,但实际上的通道内存是在堆上分配的。简单解释一下,channel 本身就是一个指针 hchan 结构体的指针,其大小只有 8 字节,但 hchan 结构体分配在堆上。

9、Channel 发送和接受元素的本质是值传递

示例一:带缓冲区
package main
import (
"fmt"
"time"
)
type people struct {
name string
}
var u = people{name: "A"}
func printPeople(u <-chan *people) {
time.Sleep(2 * time.Second)
fmt.Println(<-u)
}
func main() {
c := make(chan *people, 5)
var a = &u
c <- a
fmt.Println(a)
a = &people{name: "B"}
go printPeople(c)
time.Sleep(5 * time.Second)
fmt.Println(a)
}

输出结果如下:

&{A}
&{A}
&{B}

解释:a 是一个指针,一开始指向 A,然后通过 channel 传递给另外一个协程,channel 是有缓冲区的,所以在指针还指向 A 的时候,就将指针的值写入了缓冲区,所以另外一个协程读到的也是指向 A 的指针,虽然 a 写入 channel 缓冲区之后,立刻就被改变成了指向 B 的指针。

示例二:不带缓冲区
package main
import (
"fmt"
"time"
)
type people struct {
name string
}
var u = people{name: "A"}
func printPeople(u <-chan *people) {
fmt.Println(<-u)
}
func main() {
c := make(chan *people)
var a = &u
go func() {
fmt.Println(a)
c <- a
}()
time.Sleep(1 * time.Second)
a = &people{name: "B"}
go printPeople(c)
time.Sleep(3 * time.Second)
fmt.Println(a)
}

输出结果如下:

&{A}
&{B}
&{B}

可以看到当不带缓冲区的时候,不会把值拷贝到缓冲区中,而是等接收方接收时,直接将值拷贝给接收方,此时拷贝的是最新的值。

三、使用 channel 实现的简单功能

1、实现锁

原理就是往缓冲区已满的 channel 中写入数据,将发生阻塞。那么定义一个 channel,数据类型任意。获取锁对应往 channel 中写入数据,释放锁对应从 channel 中取出数据。如果当前获取锁成功,那么其他协程在获取锁时,将因为无法写入数据而发生阻塞,直到当前协程取出数据,释放缓冲区。

package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func add(ch chan bool, value *int) {
defer wg.Done()
ch <- true // 加锁
*value++
<-ch // 解锁
}
func main() {
ch := make(chan bool, 1)
value := 0
wg.Add(1000)
for i := 0; i < 1000; i++ {
go add(ch, &value)
}
wg.Wait()
fmt.Println(value)
}

在上述的案例代码中,如果没有加锁和解锁操作,最终 value 的值将小于 1000

2、实现线程池

go 实现线程池的方法请参考:手搓线程池 (C++&Go)—— 生产消费者模型

3、4 个 goroutine 间隔 1s 分别打印 1、2、3、4

实现方法:goroutine1 打印出来 1 后,然后 sleep 1s,然后通过 channel 唤醒协程 2,协程 2 打印出来 2 后,sleep 1s,然后唤醒协程 3,以此类推

代码
package main
import (
"fmt"
"time"
)
func main() {
chs := make([]chan int, 4)
for i := 0; i < 4; i++ {
chs[i] = make(chan int)
go func(i int) {
for {
<-chs[i]
fmt.Println(i + 1)
time.Sleep(1 * time.Second)
chs[(i+1)%4] <- 1
}
}(i)
}
chs[0] <- 1
select {}
}

4、用 channel 实现一个限流器

原理:限流就是限定处理的协程数量,原理就是往 channel 中写入数据,缓冲区满就会阻塞,所以可以设计一个缓冲区容量为 3 的 channel,同时启动 20 个协程,往 channel 中写入数据,只有 3 个协程能写入成功,进入处理,处理完成后,把数据读出来,这样另一个线程就可以进入处理,从而实现了限流功能。

四、Channel 的底层实现原理

1、Channel 的数据结构

channel 的功能通过 hchan 结构体实现,其定义如下

type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
timer *timer // timer feeding this chan
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}

字段解释如下,重要字段用加粗表示:

sendx、recvx:分别指向下一次写的位置和下一次读的位置

recvq:等待从 channel 接受消息的 sudog 队列

sendq:等待从 channel 写入消息的 sudog 队列

lock:互斥锁

qcount、dataqsiz:循环队列的大小和容量

2、向 Channel 发送数据和从 Channel 中读取数据的流程

channel 初始化

(下面这些内容我还没有完全明白,hchan 和 buffer 是否一起分配似乎涉及 go 的内存回收机制)

1、channel 没有缓冲区或者元素大小为 0,只需要分配 hchan 结构体本身的大小

2、有缓冲区 buffer,但元素类型不包含指针,hchan 和 buffer 一起分配

3、有缓冲区 buffer,且元素类型包含指针类型:hchan 和 buffer 分开分配

向 Channel 发送数据

1、Channel 中有读等待 goroutine

先加锁,从 recvq 队列取出头部的 sudog,进入 send 流程,send 的时候,不管 channel 有没有缓冲区,不会把数据写入缓冲区,直接写入这个 sudog 对应的 elem 数据容器上,释放锁,唤醒这个 sudog 对应的 goroutine

2、channel 中没有读等待 goroutine,并且环形缓冲数组中有剩余空间

先加锁,将数据写入缓冲区,解锁

3、channel 中没有读等待 goroutine,并且无剩余空间存放数据

我理解是先加锁,然后发现缓冲区满了,无法继续写入了,这时候会把锁释放掉。然后去构造一个 sudog 结构,sudog 结构需要绑定 channel(sudog 将进入这个 channel 的 sendq 队列),goroutine(sudog 对应着一个 goroutine,后面要唤醒的),还有 ep 指针(这是指向原始数据的指针,这时候不会有数据拷贝的动作,直到该 goroutine 被唤醒,才会从原始数据那里拷贝数据),然后 sudog 进入 channel 的 sendq 队列,阻塞等待。

4、channel 为 nil(阻塞)

5、channel 已经关闭(panic)

从 Channel 读取数据

1、Channel 中有写等待 goroutine

先加锁,从 sendq 队列中弹出头部的 sudog,进入 recv 流程。如果 channel 没有缓冲区,那么直接读取 sendq 里面 ep 指针对应的原始数据,并且唤醒 sudog 对应的 goroutine。如果 channel 有缓冲区,那么读取缓冲区头部的元素,然后把 sudog 的 ep 指针指向的元素写入缓冲区,唤醒 goroutine。释放锁。

2、Channel 中没有写等待 goroutine,并且环形数据里面有剩余元素

先加锁,读取缓冲区里面的数据,然后解锁

3、channel 中没有写等待 goroutine,并且环形数组里面无剩余元素

我理解是先加锁,读取缓冲区,发现没有数据,然后解锁。同样的,构造一个 sudog,绑定 channel、goroutine、还有 ep 指针,将 sudog 放入 recvq 队列,开始阻塞等待。

4、读取的 channel 为 nil(阻塞)

5、channel 已经关闭,并且 buf 里面没有元素(读取零值)

channel 关闭

channel 关闭时,sendq 或者 recvq 中可能存在等待 goroutine(但不会同时存在等待 goroutine),关闭时会将所有的 goroutine 加入 glist 中,并且唤醒全部的 goroutine,如果 sendq 中有 goroutine 写等待,那么将会发生 panic,如果有读等待 goroutine,那么将读取零值。

写等待 goroutine 的 panic 示例
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int, 1)
go func() {
ch <- 3
ch <- 4
}()
go func() {
time.Sleep(2 * time.Second)
fmt.Println(<-ch)
}()
time.Sleep(1 * time.Second)
close(ch)
fmt.Println("没有panic")
}

输出结果如下:

没有 panic
panic: send on closed channel

感谢阅读!如有疑问请留言
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇