select多路复用

在某些场景下我们可能需要同时从多个通道接收数据。通道在接收数据时,如果没有数据可以被接收那么当前 goroutine 将会发生阻塞。你也许会写出如下代码尝试使用遍历的方式来实现从多个通道中接收值。

1
2
3
4
5
6
7
for{
// 尝试从ch1接收值
data, ok := <-ch1
// 尝试从ch2接收值
data, ok := <-ch2

}

这种方式虽然可以实现从多个通道接收值的需求,但是程序的运行性能会差很多。Go 语言内置了select关键字,使用它可以同时响应多个通道的操作。

Select 的使用方式类似于之前学到的 switch 语句,它也有一系列 case 分支和一个默认的分支。每个 case 分支会对应一个通道的通信(接收或发送)过程。select 会一直等待,直到其中的某个 case 的通信操作完成时,就会执行该 case 分支对应的语句。具体格式如下:

1
2
3
4
5
6
7
8
9
10
select {
case <-ch1:
//...
case data := <-ch2:
//...
case ch3 <- 10:
//...
default:
//默认操作
}

Select 语句具有以下特点。

  • 可处理一个或多个 channel 的发送/接收操作。
  • 如果多个 case 同时满足,select 会随机选择一个执行。
  • 对于没有 case 的 select 会一直阻塞,可用于阻塞 main 函数,防止退出。

下面的示例代码能够在终端打印出10以内的奇数,我们借助这个代码片段来看一下 select 的具体使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main

import "fmt"

func main() {
ch := make(chan int, 1)
for i := 1; i <= 10; i++ {
select {
case x := <-ch:
fmt.Println(x)
case ch <- i:
}
}
}

上面的代码输出内容如下。

1
2
3
4
5
1
3
5
7
9

示例中的代码首先是创建了一个缓冲区大小为1的通道 ch,进入 for 循环后:

  • 第一次循环时 i = 1,select 语句中包含两个 case 分支,此时由于通道中没有值可以接收,所以x := <-ch 这个 case 分支不满足,而ch <- i这个分支可以执行,会把1发送到通道中,结束本次 for 循环;
  • 第二次 for 循环时,i = 2,由于通道缓冲区已满,所以ch <- i这个分支不满足,而x := <-ch这个分支可以执行,从通道接收值1并赋值给变量 x ,所以会在终端打印出 1;
  • 后续的 for 循环以此类推会依次打印出3、5、7、9。

通道误用示例

接下来,我们将展示两个因误用通道导致程序出现 bug 的代码片段,希望能够加深读者对通道操作的印象。
示例1

各位读者可以查看以下示例代码,尝试找出其中存在的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// demo1 通道误用导致的bug
func demo1() {
wg := sync.WaitGroup{}

ch := make(chan int, 10)
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)

wg.Add(3)
for j := 0; j < 3; j++ {
go func() {
for {
task := <-ch
// 这里假设对接收的数据执行某些操作
fmt.Println(task)
}
wg.Done()
}()
}
wg.Wait()
}

将上述代码编译执行后,匿名函数所在的 goroutine 并不会按照预期在通道被关闭后退出。因为task := <- ch的接收操作在通道被关闭后会一直接收到零值,而不会退出。此处的接收操作应该使用task, ok := <- ch ,通过判断布尔值ok为假时退出;或者使用select 来处理通道。
示例2

各位读者阅读下方代码片段,尝试找出其中存在的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// demo2 通道误用导致的bug
func demo2() {
ch := make(chan string)
go func() {
// 这里假设执行一些耗时的操作
time.Sleep(3 * time.Second)
ch <- "job result"
}()

select {
case result := <-ch:
fmt.Println(result)
case <-time.After(time.Second): // 较小的超时时间
return
}
}

上述代码片段可能导致 goroutine 泄露(goroutine 并未按预期退出并销毁)。由于 select 命中了超时逻辑,导致通道没有消费者(无接收操作),而其定义的通道为无缓冲通道,因此 goroutine 中的ch <- “job result”操作会一直阻塞,最终导致 goroutine 泄露。

并发安全和锁

有时候我们的代码中可能会存在多个 goroutine 同时操作一个资源(临界区)的情况,这种情况下就会发生竞态问题(数据竞态)。这就好比现实生活中十字路口被各个方向的汽车竞争,还有火车上的卫生间被车厢里的人竞争。

我们用下面的代码演示一个数据竞争的示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"sync"
)

var (
x int64

wg sync.WaitGroup // 等待组
)

// add 对全局变量x执行5000次加1操作
func add() {
for i := 0; i < 5000; i++ {
x = x + 1
}
wg.Done()
}

func main() {
wg.Add(2)

go add()
go add()

wg.Wait()
fmt.Println(x)
}

我们将上面的代码编译后执行,不出意外每次执行都会输出诸如9537、5865、6527等不同的结果。这是为什么呢?

在上面的示例代码片中,我们开启了两个 goroutine 分别执行 add 函数,这两个 goroutine 在访问和修改全局的x变量时就会存在数据竞争,某个 goroutine 中对全局变量x的修改可能会覆盖掉另一个 goroutine 中的操作,所以导致最后的结果与预期不符。

互斥锁

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同一时间只有一个 goroutine 可以访问共享资源。Go 语言中使用sync包中提供的Mutex类型来实现互斥锁。

sync.Mutex提供了两个方法供我们使用。
方法名 功能
func (m *Mutex) Lock() 获取互斥锁
func (m *Mutex) Unlock() 释放互斥锁

我们在下面的示例代码中使用互斥锁限制每次只有一个 goroutine 才能修改全局变量x,从而修复上面代码中的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"fmt"
"sync"
)

// sync.Mutex

var (
x int64

wg sync.WaitGroup // 等待组

m sync.Mutex // 互斥锁
)

// add 对全局变量x执行5000次加1操作
func add() {
for i := 0; i < 5000; i++ {
m.Lock() // 修改x前加锁
x = x + 1
m.Unlock() // 改完解锁
}
wg.Done()
}

func main() {
wg.Add(2)

go add()
go add()

wg.Wait()
fmt.Println(x)
}

将上面的代码编译后多次执行,每一次都会得到预期中的结果——10000。

使用互斥锁能够保证同一时间有且只有一个 goroutine 进入临界区,其他的 goroutine 则在等待锁;当互斥锁释放后,等待的 goroutine 才可以获取锁进入临界区,多个 goroutine 同时等待一个锁时,唤醒的策略是随机的。

读写互斥锁

互斥锁是完全互斥的,但是实际上有很多场景是读多写少的,当我们并发的去读取一个资源而不涉及资源修改的时候是没有必要加互斥锁的,这种场景下使用读写锁是更好的一种选择。读写锁在 Go 语言中使用sync包中的RWMutex类型。

sync.RWMutex提供了以下5个方法。
方法名 功能
func (rw *RWMutex) Lock() 获取写锁
func (rw *RWMutex) Unlock() 释放写锁
func (rw *RWMutex) RLock() 获取读锁
func (rw *RWMutex) RUnlock() 释放读锁
func (rw *RWMutex) RLocker() Locker 返回一个实现Locker接口的读写锁

读写锁分为两种:读锁和写锁。当一个 goroutine 获取到读锁之后,其他的 goroutine 如果是获取读锁会继续获得锁,如果是获取写锁就会等待;而当一个 goroutine 获取写锁之后,其他的 goroutine 无论是获取读锁还是写锁都会等待。

下面我们使用代码构造一个读多写少的场景,然后分别使用互斥锁和读写锁查看它们的性能差异。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
var (
x int64
wg sync.WaitGroup
mutex sync.Mutex
rwMutex sync.RWMutex
)

// writeWithLock 使用互斥锁的写操作
func writeWithLock() {
mutex.Lock() // 加互斥锁
x = x + 1
time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
mutex.Unlock() // 解互斥锁
wg.Done()
}

// readWithLock 使用互斥锁的读操作
func readWithLock() {
mutex.Lock() // 加互斥锁
time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
mutex.Unlock() // 释放互斥锁
wg.Done()
}

// writeWithLock 使用读写互斥锁的写操作
func writeWithRWLock() {
rwMutex.Lock() // 加写锁
x = x + 1
time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
rwMutex.Unlock() // 释放写锁
wg.Done()
}

// readWithRWLock 使用读写互斥锁的读操作
func readWithRWLock() {
rwMutex.RLock() // 加读锁
time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
rwMutex.RUnlock() // 释放读锁
wg.Done()
}

func do(wf, rf func(), wc, rc int) {
start := time.Now()
// wc个并发写操作
for i := 0; i < wc; i++ {
wg.Add(1)
go wf()
}

// rc个并发读操作
for i := 0; i < rc; i++ {
wg.Add(1)
go rf()
}

wg.Wait()
cost := time.Since(start)
fmt.Printf("x:%v cost:%v\n", x, cost)

}

我们假设每一次读操作都会耗时1ms,而每一次写操作会耗时10ms,我们分别测试使用互斥锁和读写互斥锁执行10次并发写和1000次并发读的耗时数据。

1
2
3
4
5
// 使用互斥锁,10并发写,1000并发读
do(writeWithLock, readWithLock, 10, 1000) // x:10 cost:1.466500951s

// 使用读写互斥锁,10并发写,1000并发读
do(writeWithRWLock, readWithRWLock, 10, 1000) // x:10 cost:117.207592ms

从最终的执行结果可以看出,使用读写互斥锁在读多写少的场景下能够极大地提高程序的性能。不过需要注意的是如果一个程序中的读操作和写操作数量级差别不大,那么读写互斥锁的优势就发挥不出来。

sync.WaitGroup

在代码中生硬的使用time.Sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步。 sync.WaitGroup有以下几个方法:
方法名 功能
func (wg * WaitGroup) Add(delta int) 计数器+delta
(wg *WaitGroup) Done() 计数器-1
(wg *WaitGroup) Wait() 阻塞直到计数器变为0

sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了 N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用 Done 方法将计数器减1。通过调用 Wait 来等待并发任务执行完,当计数器值为 0 时,表示所有并发任务已经完成。

我们利用sync.WaitGroup将上面的代码优化一下:

1
2
3
4
5
6
7
8
9
10
11
12
var wg sync.WaitGroup

func hello() {
defer wg.Done()
fmt.Println("Hello Goroutine!")
}
func main() {
wg.Add(1)
go hello() // 启动另外一个goroutine去执行hello函数
fmt.Println("main goroutine done!")
wg.Wait()
}

需要注意sync.WaitGroup是一个结构体,进行参数传递的时候要传递指针。

sync.Once

在某些场景下我们需要确保某些操作即使在高并发的场景下也只会被执行一次,例如只加载一次配置文件等。

Go语言中的sync包中提供了一个针对只执行一次场景的解决方案——sync.Once,sync.Once只有一个Do方法,其签名如下:

1
func (o *Once) Do(f func())

注意:如果要执行的函数f需要传递参数就需要搭配闭包来使用。
加载配置文件示例

延迟一个开销很大的初始化操作到真正用到它的时候再执行是一个很好的实践。因为预先初始化一个变量(比如在init函数中完成初始化)会增加程序的启动耗时,而且有可能实际执行过程中这个变量没有用上,那么这个初始化操作就不是必须要做的。我们来看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var icons map[string]image.Image

func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}

// Icon 被多个goroutine调用时不是并发安全的
func Icon(name string) image.Image {
if icons == nil {
loadIcons()
}
return icons[name]
}

多个 goroutine 并发调用Icon函数时不是并发安全的,现代的编译器和CPU可能会在保证每个 goroutine 都满足串行一致的基础上自由地重排访问内存的顺序。loadIcons函数可能会被重排为以下结果:

func loadIcons() {
icons = make(map[string]image.Image)
icons[“left”] = loadIcon(“left.png”)
icons[“up”] = loadIcon(“up.png”)
icons[“right”] = loadIcon(“right.png”)
icons[“down”] = loadIcon(“down.png”)
}

在这种情况下就会出现即使判断了icons不是nil也不意味着变量初始化完成了。考虑到这种情况,我们能想到的办法就是添加互斥锁,保证初始化icons的时候不会被其他的 goroutine 操作,但是这样做又会引发性能问题。

使用sync.Once改造的示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}

// Icon 是并发安全的
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}

并发安全的单例模式

下面是借助sync.Once实现的并发安全的单例模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package singleton

import (
"sync"
)

type singleton struct {}

var instance *singleton
var once sync.Once

func GetInstance() *singleton {
once.Do(func() {
instance = &singleton{}
})
return instance
}

sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。

sync.Map

Go 语言中内置的 map 不是并发安全的,请看下面这段示例代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"strconv"
"sync"
)

var m = make(map[string]int)

func get(key string) int {
return m[key]
}

func set(key string, value int) {
m[key] = value
}

func main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
set(key, n)
fmt.Printf("k=:%v,v:=%v\n", key, get(key))
wg.Done()
}(i)
}
wg.Wait()
}

将上面的代码编译后执行,会报出fatal error: concurrent map writes错误。我们不能在多个 goroutine 中并发对内置的 map 进行读写操作,否则会存在数据竞争问题。

像这种场景下就需要为 map 加锁来保证并发的安全性了,Go语言的sync包中提供了一个开箱即用的并发安全版 map——sync.Map。开箱即用表示其不用像内置的 map 一样使用 make 函数初始化就能直接使用。同时sync.Map内置了诸如Store、Load、LoadOrStore、Delete、Range等操作方法。
方法名 功能
func (m *Map) Store(key, value interface{}) 存储key-value数据
func (m *Map) Load(key interface{}) (value interface{}, ok bool) 查询key对应的value
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) 查询或存储key对应的value
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) 查询并删除key
func (m *Map) Delete(key interface{}) 删除key
func (m *Map) Range(f func(key, value interface{}) bool) 对map中的每个key-value依次调用f

下面的代码示例演示了并发读写sync.Map。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"strconv"
"sync"
)

// 并发安全的map
var m = sync.Map{}

func main() {
wg := sync.WaitGroup{}
// 对m执行20个并发的读写操作
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
m.Store(key, n) // 存储key-value
value, _ := m.Load(key) // 根据key取值
fmt.Printf("k=:%v,v:=%v\n", key, value)
wg.Done()
}(i)
}
wg.Wait()
}

原子操作

针对整数数据类型(int32、uint32、int64、uint64)我们还可以使用原子操作来保证并发安全,通常直接使用原子操作比使用锁操作效率更高。Go语言中原子操作由内置的标准库sync/atomic提供。
atomic包
方法 解释
func LoadInt32(addr *int32) (val int32)func LoadInt64(addr *int64) (val int64)func LoadUint32(addr *uint32) (val uint32)func LoadUint64(addr *uint64) (val uint64)func LoadUintptr(addr *uintptr) (val uintptr)func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) 读取操作
func StoreInt32(addr *int32, val int32)func StoreInt64(addr *int64, val int64)func StoreUint32(addr *uint32, val uint32)func StoreUint64(addr *uint64, val uint64)func StoreUintptr(addr *uintptr, val uintptr)func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) 写入操作
func AddInt32(addr *int32, delta int32) (new int32)func AddInt64(addr *int64, delta int64) (new int64)func AddUint32(addr *uint32, delta uint32) (new uint32)func AddUint64(addr *uint64, delta uint64) (new uint64)func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) 修改操作
func SwapInt32(addr *int32, new int32) (old int32)func SwapInt64(addr *int64, new int64) (old int64)func SwapUint32(addr *uint32, new uint32) (old uint32)func SwapUint64(addr *uint64, new uint64) (old uint64)func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) 交换操作
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) 比较并交换操作
示例

我们填写一个示例来比较下互斥锁和原子操作的性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package main

import (
"fmt"
"sync"
"sync/atomic"
"time"
)

type Counter interface {
Inc()
Load() int64
}

// 普通版
type CommonCounter struct {
counter int64
}

func (c CommonCounter) Inc() {
c.counter++
}

func (c CommonCounter) Load() int64 {
return c.counter
}

// 互斥锁版
type MutexCounter struct {
counter int64
lock sync.Mutex
}

func (m *MutexCounter) Inc() {
m.lock.Lock()
defer m.lock.Unlock()
m.counter++
}

func (m *MutexCounter) Load() int64 {
m.lock.Lock()
defer m.lock.Unlock()
return m.counter
}

// 原子操作版
type AtomicCounter struct {
counter int64
}

func (a *AtomicCounter) Inc() {
atomic.AddInt64(&a.counter, 1)
}

func (a *AtomicCounter) Load() int64 {
return atomic.LoadInt64(&a.counter)
}

func test(c Counter) {
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
c.Inc()
wg.Done()
}()
}
wg.Wait()
end := time.Now()
fmt.Println(c.Load(), end.Sub(start))
}

func main() {
c1 := CommonCounter{} // 非并发安全
test(c1)
c2 := MutexCounter{} // 使用互斥锁实现并发安全
test(&c2)
c3 := AtomicCounter{} // 并发安全且比互斥锁效率更高
test(&c3)
}

atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者 sync 包的函数/类型实现同步更好。