Golang Channel原理剖析
本文最后更新于20 天前,其中的信息可能已经过时,如有错误请留言

Go的设计思想:以通信的方式来共享内存,而不是通过共享内存的方式来实现通信,channel就是这一思想的体现。

一、Channel的基本用法

channel用于通信的最基本用法,一端接受数据,一端写入数据

package main

import (
	"fmt"
	"sync"
)

func main() {
	ch := make(chan int) // 不带缓冲区的channel
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		defer wg.Done()
		ch <- 1 // 向channel中写入数据
	}()
	go func() {
		defer wg.Done()
		a := <-ch // 从channel中读取数据
		fmt.Println(a)
	}()
	wg.Wait()
	close(ch) // 关闭channel
}

二、channel使用中的注意事项

1、让channel产生panic的操作

1、往关闭的channel中写入数据会panic

2、关闭未初始化的channel会panic

3、重复关闭channel会panic

2、不同状态下Channel的读取和写入

Channel有三种状态:

1、nil:channel尚未在堆上分配内存的状态

2、打开:channel初始化之后,未被关闭的可用状态

3、关闭:channel被关闭之后,不会立刻回收在堆上的内存,这也是关闭之后仍然可读的原因,直到被gc回收内存

操作channel状态结果
writenil阻塞
write有缓冲区,缓冲区未满写入成功
write无缓冲区,或者缓冲区已满阻塞
write关闭panic
向nil状态下的channel写入数据,阻塞而不是panic的原因是,对于select + default情况下,程序可以走default分支,没必要panic
操作channel状态结果
readnil阻塞
read打开,有元素读取成功
read打开,没有元素阻塞
read关闭读取默认值(空值或零值)
如果channel关闭的时候,channel里面还有数据,也是可以正常读完的,之后才会读0值

3、如果从channel中读取的数据是0,如何知道是对端写入的0还是channel当前已被关闭?

解决方案一:判定读取 a, ok := <-ch
package main

import (
	"fmt"
	"sync"
)

func main() {
	ch := make(chan int) // 不带缓冲区的channel
	close(ch)            // 关闭channel
	var wg sync.WaitGroup
	wg.Add(1)
	//go func() {
	//	defer wg.Done()
	//	ch <- 1 // 向channel中写入数据
	//}()
	go func() {
		defer wg.Done()
		a, ok := <-ch // 从channel中读取数据
		if ok {
			fmt.Println(a)
		} else {
			fmt.Println("channel closed")
		}
	}()
	wg.Wait()
}
解决方案二:for v := range ch {}

采用for range的方式读取数据,for range会遍历channel中的所有数据,一一读取,没有数据可读时,阻塞,直到channel关闭,channel如果不关闭,for range将一直阻塞,可能发生deadlock

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    ch := make(chan int) // 不带缓冲区的channel
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
       defer wg.Done()
       ch <- 1 // 向channel中写入数据
       time.Sleep(1 * time.Second)
       close(ch) // 关闭channel,如果不关闭channel,程序将报错:deadlock
    }()
    go func() {
       defer wg.Done()
       for v := range ch {
          fmt.Println(v)
       }
       fmt.Println("Done")
    }()
    wg.Wait()
}

4、向不带缓冲区的channel中写入数据,必须有对端接受数据,否则将deadlock

带有缓冲区的channel不会出现这个问题,因为可以把数据写入缓冲区,就不用阻塞,而没有缓冲区的channel,写入数据时将阻塞,直到数据对对端读取

5、可以定义只读和只写的单向channel

package main

import (
"fmt"
"sync"
"time"
)

type SChannel = chan<- int
type RChannel = <-chan int

func main() {
ch := make(chan int) // 不带缓冲区的channel
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
var send SChannel = ch
send <- 1 // 向channel中写入数据
send <- 2
time.Sleep(1 * time.Second)
close(ch) // 关闭channel
}()
go func() {
defer wg.Done()
var rcv RChannel = ch
for v := range rcv {
fmt.Println(v)
}
fmt.Println("Done")
}()
wg.Wait()
}

6、channel可以同时有多个读端、写端goroutine操作,在一端关闭channel的时候,所有读端都将收到channel已关闭的消息

7、channel是并发安全的,多个goroutine同时读取channel中的数据,不会产生并发安全问题

Channel中数据的读取、写入,以及当channel阻塞后放入等待goroutine队列的过程都会加锁,所以Channel是并发安全的。

8、channel的内存分配在堆上,channel变量本身是存储在栈上

package main

import (
	"fmt"
	"time"
)

func createChannel() chan int {
	ch := make(chan int, 1) // 创建一个缓冲区大小为1的通道
	fmt.Printf("Channel address in createChannel: %p\n", &ch)
	return ch // 返回通道
}

func main() {
	ch := createChannel() // 调用createChannel函数,获取通道
	fmt.Printf("Channel address in main: %p\n", &ch)

	// 启动一个goroutine向通道发送数据
	go func() {
		time.Sleep(1 * time.Second) // 等待1秒,确保createChannel已经返回
		ch <- 42
		fmt.Println("Data sent to channel")
	}()

	// 从通道接收数据
	value := <-ch
	fmt.Printf("Received value: %d\n", value)
}

输出如下

Channel address in createChannel: 0xc000070058
Channel address in main: 0xc000070050
Data sent to channel
Received value: 42

Process finished with the exit code 0

main函数中的channel地址与createChannel中的不同,说明channel变量本身存储在栈上,但实际上的通道内存是在堆上分配的。简单解释一下,channel本身就是一个指针hchan结构体的指针,其大小只有8字节,但hchan结构体分配在堆上。

9、Channel发送和接受元素的本质是值传递

示例一:带缓冲区
package main

import (
	"fmt"
	"time"
)

type people struct {
	name string
}

var u = people{name: "A"}

func printPeople(u <-chan *people) {
	time.Sleep(2 * time.Second)
	fmt.Println(<-u)
}

func main() {
	c := make(chan *people, 5)
	var a = &u
	c <- a
	fmt.Println(a)
	a = &people{name: "B"}

	go printPeople(c)
	time.Sleep(5 * time.Second)
	fmt.Println(a)
}

输出结果如下:

&{A}
&{A}
&{B}

解释:a是一个指针,一开始指向A,然后通过channel传递给另外一个协程,channel是有缓冲区的,所以在指针还指向A的时候,就将指针的值写入了缓冲区,所以另外一个协程读到的也是指向A的指针,虽然a写入channel缓冲区之后,立刻就被改变成了指向B的指针。

示例二:不带缓冲区
package main

import (
	"fmt"
	"time"
)

type people struct {
	name string
}

var u = people{name: "A"}

func printPeople(u <-chan *people) {
	fmt.Println(<-u)
}

func main() {
	c := make(chan *people)
	var a = &u
	go func() {
		fmt.Println(a)
		c <- a
	}()
	time.Sleep(1 * time.Second)
	a = &people{name: "B"}

	go printPeople(c)
	time.Sleep(3 * time.Second)
	fmt.Println(a)
}

输出结果如下:

&{A}
&{B}
&{B}

可以看到当不带缓冲区的时候,不会把值拷贝到缓冲区中,而是等接收方接收时,直接将值拷贝给接收方,此时拷贝的是最新的值。

三、使用channel实现的简单功能

1、实现锁

原理就是往缓冲区已满的channel中写入数据,将发生阻塞。那么定义一个channel,数据类型任意。获取锁对应往channel中写入数据,释放锁对应从channel中取出数据。如果当前获取锁成功,那么其他协程在获取锁时,将因为无法写入数据而发生阻塞,直到当前协程取出数据,释放缓冲区。

package main

import (
    "fmt"
    "sync"
)

var wg sync.WaitGroup

func add(ch chan bool, value *int) {
    defer wg.Done()
    ch <- true // 加锁
    *value++
    <-ch // 解锁
}

func main() {
    ch := make(chan bool, 1)
    value := 0
    wg.Add(1000)
    for i := 0; i < 1000; i++ {
       go add(ch, &value)
    }
    wg.Wait()
    fmt.Println(value)
}

在上述的案例代码中,如果没有加锁和解锁操作,最终value的值将小于1000

2、实现线程池

go实现线程池的方法请参考:手搓线程池(C++&Go)——生产消费者模型

3、4个goroutine间隔1s分别打印1、2、3、4

实现方法:goroutine1打印出来1后,然后sleep 1s,然后通过channel唤醒协程2,协程2打印出来2后,sleep 1s,然后唤醒协程3,以此类推

代码
package main

import (
	"fmt"
	"time"
)

func main() {
	chs := make([]chan int, 4)
	for i := 0; i < 4; i++ {
		chs[i] = make(chan int)
		go func(i int) {
			for {
				<-chs[i]
				fmt.Println(i + 1)
				time.Sleep(1 * time.Second)
				chs[(i+1)%4] <- 1
			}
		}(i)
	}
	chs[0] <- 1
	select {}
}

4、用channel实现一个限流器

原理:限流就是限定处理的协程数量,原理就是往channel中写入数据,缓冲区满就会阻塞,所以可以设计一个缓冲区容量为3的channel,同时启动20个协程,往channel中写入数据,只有3个协程能写入成功,进入处理,处理完成后,把数据读出来,这样另一个线程就可以进入处理,从而实现了限流功能。

四、Channel的底层实现原理

1、Channel的数据结构

channel的功能通过hchan结构体实现,其定义如下

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	timer    *timer // timer feeding this chan
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

字段解释如下,重要字段用加粗表示:

sendx、recvx:分别指向下一次写的位置和下一次读的位置

recvq:等待从channel接受消息的sudog队列

sendq:等待从channel写入消息的sudog队列

lock:互斥锁

qcount、dataqsiz:循环队列的大小和容量

2、向Channel发送数据和从Channel中读取数据的流程

channel初始化

(下面这些内容我还没有完全明白,hchan和buffer是否一起分配似乎涉及go的内存回收机制)

1、channel没有缓冲区或者元素大小为0,只需要分配hchan结构体本身的大小

2、有缓冲区buffer,但元素类型不包含指针,hchan和buffer一起分配

3、有缓冲区buffer,且元素类型包含指针类型:hchan和buffer分开分配

向Channel发送数据

1、Channel中有读等待goroutine

先加锁,从recvq队列取出头部的sudog,进入send流程,send的时候,不管channel有没有缓冲区,不会把数据写入缓冲区,直接写入这个sudog对应的elem数据容器上,释放锁,唤醒这个sudog对应的goroutine

2、channel中没有读等待goroutine,并且环形缓冲数组中有剩余空间

先加锁,将数据写入缓冲区,解锁

3、channel中没有读等待goroutine,并且无剩余空间存放数据

我理解是先加锁,然后发现缓冲区满了,无法继续写入了,这时候会把锁释放掉。然后去构造一个sudog结构,sudog结构需要绑定channel(sudog将进入这个channel的sendq队列),goroutine(sudog对应着一个goroutine,后面要唤醒的),还有ep指针(这是指向原始数据的指针,这时候不会有数据拷贝的动作,直到该goroutine被唤醒,才会从原始数据那里拷贝数据),然后sudog进入channel的sendq队列,阻塞等待。

4、channel为nil(阻塞)

5、channel已经关闭(panic)

从Channel读取数据

1、Channel中有写等待goroutine

先加锁,从sendq队列中弹出头部的sudog,进入recv流程。如果channel没有缓冲区,那么直接读取sendq里面ep指针对应的原始数据,并且唤醒sudog对应的goroutine。如果channel有缓冲区,那么读取缓冲区头部的元素,然后把sudog的ep指针指向的元素写入缓冲区,唤醒goroutine。释放锁。

2、Channel中没有写等待goroutine,并且环形数据里面有剩余元素

先加锁,读取缓冲区里面的数据,然后解锁

3、channel中没有写等待goroutine,并且环形数组里面无剩余元素

我理解是先加锁,读取缓冲区,发现没有数据,然后解锁。同样的,构造一个sudog,绑定channel、goroutine、还有ep指针,将sudog放入recvq队列,开始阻塞等待。

4、读取的channel为nil(阻塞)

5、channel已经关闭,并且buf里面没有元素(读取零值)

channel关闭

channel关闭时,sendq或者recvq中可能存在等待goroutine(但不会同时存在等待goroutine),关闭时会将所有的goroutine加入glist中,并且唤醒全部的goroutine,如果sendq中有goroutine写等待,那么将会发生panic,如果有读等待goroutine,那么将读取零值。

写等待goroutine的panic示例
package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan int, 1)
	go func() {
		ch <- 3
		ch <- 4
	}()
	go func() {
		time.Sleep(2 * time.Second)
		fmt.Println(<-ch)
	}()
	time.Sleep(1 * time.Second)
	close(ch)
	fmt.Println("没有panic")
}

输出结果如下:

没有panic
panic: send on closed channel

感谢阅读!如有疑问请留言
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇