Go的设计思想:以通信的方式来共享内存,而不是通过共享内存的方式来实现通信,channel就是这一思想的体现。
一、Channel的基本用法
channel用于通信的最基本用法,一端接受数据,一端写入数据
package main
import (
"fmt"
"sync"
)
func main() {
ch := make(chan int) // 不带缓冲区的channel
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
ch <- 1 // 向channel中写入数据
}()
go func() {
defer wg.Done()
a := <-ch // 从channel中读取数据
fmt.Println(a)
}()
wg.Wait()
close(ch) // 关闭channel
}
二、channel使用中的注意事项
1、让channel产生panic的操作
1、往关闭的channel中写入数据会panic
2、关闭未初始化的channel会panic
3、重复关闭channel会panic
2、不同状态下Channel的读取和写入
Channel有三种状态:
1、nil:channel尚未在堆上分配内存的状态
2、打开:channel初始化之后,未被关闭的可用状态
3、关闭:channel被关闭之后,不会立刻回收在堆上的内存,这也是关闭之后仍然可读的原因,直到被gc回收内存
操作 | channel状态 | 结果 |
write | nil | 阻塞 |
write | 有缓冲区,缓冲区未满 | 写入成功 |
write | 无缓冲区,或者缓冲区已满 | 阻塞 |
write | 关闭 | panic |
操作 | channel状态 | 结果 |
read | nil | 阻塞 |
read | 打开,有元素 | 读取成功 |
read | 打开,没有元素 | 阻塞 |
read | 关闭 | 读取默认值(空值或零值) |
3、如果从channel中读取的数据是0,如何知道是对端写入的0还是channel当前已被关闭?
解决方案一:判定读取 a, ok := <-ch
package main
import (
"fmt"
"sync"
)
func main() {
ch := make(chan int) // 不带缓冲区的channel
close(ch) // 关闭channel
var wg sync.WaitGroup
wg.Add(1)
//go func() {
// defer wg.Done()
// ch <- 1 // 向channel中写入数据
//}()
go func() {
defer wg.Done()
a, ok := <-ch // 从channel中读取数据
if ok {
fmt.Println(a)
} else {
fmt.Println("channel closed")
}
}()
wg.Wait()
}
解决方案二:for v := range ch {}
采用for range的方式读取数据,for range会遍历channel中的所有数据,一一读取,没有数据可读时,阻塞,直到channel关闭,channel如果不关闭,for range将一直阻塞,可能发生deadlock
package main
import (
"fmt"
"sync"
"time"
)
func main() {
ch := make(chan int) // 不带缓冲区的channel
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
ch <- 1 // 向channel中写入数据
time.Sleep(1 * time.Second)
close(ch) // 关闭channel,如果不关闭channel,程序将报错:deadlock
}()
go func() {
defer wg.Done()
for v := range ch {
fmt.Println(v)
}
fmt.Println("Done")
}()
wg.Wait()
}
4、向不带缓冲区的channel中写入数据,必须有对端接受数据,否则将deadlock
带有缓冲区的channel不会出现这个问题,因为可以把数据写入缓冲区,就不用阻塞,而没有缓冲区的channel,写入数据时将阻塞,直到数据对对端读取
5、可以定义只读和只写的单向channel
package main
import (
"fmt"
"sync"
"time"
)
type SChannel = chan<- int
type RChannel = <-chan int
func main() {
ch := make(chan int) // 不带缓冲区的channel
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
var send SChannel = ch
send <- 1 // 向channel中写入数据
send <- 2
time.Sleep(1 * time.Second)
close(ch) // 关闭channel
}()
go func() {
defer wg.Done()
var rcv RChannel = ch
for v := range rcv {
fmt.Println(v)
}
fmt.Println("Done")
}()
wg.Wait()
}
6、channel可以同时有多个读端、写端goroutine操作,在一端关闭channel的时候,所有读端都将收到channel已关闭的消息
7、channel是并发安全的,多个goroutine同时读取channel中的数据,不会产生并发安全问题
Channel中数据的读取、写入,以及当channel阻塞后放入等待goroutine队列的过程都会加锁,所以Channel是并发安全的。
8、channel的内存分配在堆上,channel变量本身是存储在栈上
package main
import (
"fmt"
"time"
)
func createChannel() chan int {
ch := make(chan int, 1) // 创建一个缓冲区大小为1的通道
fmt.Printf("Channel address in createChannel: %p\n", &ch)
return ch // 返回通道
}
func main() {
ch := createChannel() // 调用createChannel函数,获取通道
fmt.Printf("Channel address in main: %p\n", &ch)
// 启动一个goroutine向通道发送数据
go func() {
time.Sleep(1 * time.Second) // 等待1秒,确保createChannel已经返回
ch <- 42
fmt.Println("Data sent to channel")
}()
// 从通道接收数据
value := <-ch
fmt.Printf("Received value: %d\n", value)
}
输出如下
Channel address in createChannel: 0xc000070058
Channel address in main: 0xc000070050
Data sent to channel
Received value: 42
Process finished with the exit code 0
main函数中的channel地址与createChannel中的不同,说明channel变量本身存储在栈上,但实际上的通道内存是在堆上分配的。简单解释一下,channel本身就是一个指针hchan结构体的指针,其大小只有8字节,但hchan结构体分配在堆上。
9、Channel发送和接受元素的本质是值传递
示例一:带缓冲区
package main
import (
"fmt"
"time"
)
type people struct {
name string
}
var u = people{name: "A"}
func printPeople(u <-chan *people) {
time.Sleep(2 * time.Second)
fmt.Println(<-u)
}
func main() {
c := make(chan *people, 5)
var a = &u
c <- a
fmt.Println(a)
a = &people{name: "B"}
go printPeople(c)
time.Sleep(5 * time.Second)
fmt.Println(a)
}
输出结果如下:
&{A}
&{A}
&{B}
解释:a是一个指针,一开始指向A,然后通过channel传递给另外一个协程,channel是有缓冲区的,所以在指针还指向A的时候,就将指针的值写入了缓冲区,所以另外一个协程读到的也是指向A的指针,虽然a写入channel缓冲区之后,立刻就被改变成了指向B的指针。
示例二:不带缓冲区
package main
import (
"fmt"
"time"
)
type people struct {
name string
}
var u = people{name: "A"}
func printPeople(u <-chan *people) {
fmt.Println(<-u)
}
func main() {
c := make(chan *people)
var a = &u
go func() {
fmt.Println(a)
c <- a
}()
time.Sleep(1 * time.Second)
a = &people{name: "B"}
go printPeople(c)
time.Sleep(3 * time.Second)
fmt.Println(a)
}
输出结果如下:
&{A}
&{B}
&{B}
可以看到当不带缓冲区的时候,不会把值拷贝到缓冲区中,而是等接收方接收时,直接将值拷贝给接收方,此时拷贝的是最新的值。
三、使用channel实现的简单功能
1、实现锁
原理就是往缓冲区已满的channel中写入数据,将发生阻塞。那么定义一个channel,数据类型任意。获取锁对应往channel中写入数据,释放锁对应从channel中取出数据。如果当前获取锁成功,那么其他协程在获取锁时,将因为无法写入数据而发生阻塞,直到当前协程取出数据,释放缓冲区。
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func add(ch chan bool, value *int) {
defer wg.Done()
ch <- true // 加锁
*value++
<-ch // 解锁
}
func main() {
ch := make(chan bool, 1)
value := 0
wg.Add(1000)
for i := 0; i < 1000; i++ {
go add(ch, &value)
}
wg.Wait()
fmt.Println(value)
}
在上述的案例代码中,如果没有加锁和解锁操作,最终value的值将小于1000
2、实现线程池
go实现线程池的方法请参考:手搓线程池(C++&Go)——生产消费者模型
3、4个goroutine间隔1s分别打印1、2、3、4
实现方法:goroutine1打印出来1后,然后sleep 1s,然后通过channel唤醒协程2,协程2打印出来2后,sleep 1s,然后唤醒协程3,以此类推
代码
package main
import (
"fmt"
"time"
)
func main() {
chs := make([]chan int, 4)
for i := 0; i < 4; i++ {
chs[i] = make(chan int)
go func(i int) {
for {
<-chs[i]
fmt.Println(i + 1)
time.Sleep(1 * time.Second)
chs[(i+1)%4] <- 1
}
}(i)
}
chs[0] <- 1
select {}
}
4、用channel实现一个限流器
原理:限流就是限定处理的协程数量,原理就是往channel中写入数据,缓冲区满就会阻塞,所以可以设计一个缓冲区容量为3的channel,同时启动20个协程,往channel中写入数据,只有3个协程能写入成功,进入处理,处理完成后,把数据读出来,这样另一个线程就可以进入处理,从而实现了限流功能。
四、Channel的底层实现原理
1、Channel的数据结构
channel的功能通过hchan结构体实现,其定义如下
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
timer *timer // timer feeding this chan
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
字段解释如下,重要字段用加粗表示:
sendx、recvx:分别指向下一次写的位置和下一次读的位置
recvq:等待从channel接受消息的sudog队列
sendq:等待从channel写入消息的sudog队列
lock:互斥锁
qcount、dataqsiz:循环队列的大小和容量
2、向Channel发送数据和从Channel中读取数据的流程
channel初始化
(下面这些内容我还没有完全明白,hchan和buffer是否一起分配似乎涉及go的内存回收机制)
1、channel没有缓冲区或者元素大小为0,只需要分配hchan结构体本身的大小
2、有缓冲区buffer,但元素类型不包含指针,hchan和buffer一起分配
3、有缓冲区buffer,且元素类型包含指针类型:hchan和buffer分开分配
向Channel发送数据
1、Channel中有读等待goroutine
先加锁,从recvq队列取出头部的sudog,进入send流程,send的时候,不管channel有没有缓冲区,不会把数据写入缓冲区,直接写入这个sudog对应的elem数据容器上,释放锁,唤醒这个sudog对应的goroutine
2、channel中没有读等待goroutine,并且环形缓冲数组中有剩余空间
先加锁,将数据写入缓冲区,解锁
3、channel中没有读等待goroutine,并且无剩余空间存放数据
我理解是先加锁,然后发现缓冲区满了,无法继续写入了,这时候会把锁释放掉。然后去构造一个sudog结构,sudog结构需要绑定channel(sudog将进入这个channel的sendq队列),goroutine(sudog对应着一个goroutine,后面要唤醒的),还有ep指针(这是指向原始数据的指针,这时候不会有数据拷贝的动作,直到该goroutine被唤醒,才会从原始数据那里拷贝数据),然后sudog进入channel的sendq队列,阻塞等待。
4、channel为nil(阻塞)
5、channel已经关闭(panic)
从Channel读取数据
1、Channel中有写等待goroutine
先加锁,从sendq队列中弹出头部的sudog,进入recv流程。如果channel没有缓冲区,那么直接读取sendq里面ep指针对应的原始数据,并且唤醒sudog对应的goroutine。如果channel有缓冲区,那么读取缓冲区头部的元素,然后把sudog的ep指针指向的元素写入缓冲区,唤醒goroutine。释放锁。
2、Channel中没有写等待goroutine,并且环形数据里面有剩余元素
先加锁,读取缓冲区里面的数据,然后解锁
3、channel中没有写等待goroutine,并且环形数组里面无剩余元素
我理解是先加锁,读取缓冲区,发现没有数据,然后解锁。同样的,构造一个sudog,绑定channel、goroutine、还有ep指针,将sudog放入recvq队列,开始阻塞等待。
4、读取的channel为nil(阻塞)
5、channel已经关闭,并且buf里面没有元素(读取零值)
channel关闭
channel关闭时,sendq或者recvq中可能存在等待goroutine(但不会同时存在等待goroutine),关闭时会将所有的goroutine加入glist中,并且唤醒全部的goroutine,如果sendq中有goroutine写等待,那么将会发生panic,如果有读等待goroutine,那么将读取零值。
写等待goroutine的panic示例
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int, 1)
go func() {
ch <- 3
ch <- 4
}()
go func() {
time.Sleep(2 * time.Second)
fmt.Println(<-ch)
}()
time.Sleep(1 * time.Second)
close(ch)
fmt.Println("没有panic")
}
输出结果如下:
没有panic
panic: send on closed channel