Go语言进阶(一)并发同步实现的四种方法

本文主要介绍golang源代码实现了四种并发同步的方法。

关键词:golang,并发实现

大家都知道Golang天生支持并发,任何一个函数前面使用go关键字都能实现并发。go给我们提供了详细的并发原语,总体来说有四种实现并发同步的方法。

waitBySleep

主线程显式调用sleep函数等待子线程完成

1
2
3
4
5
6
7
8
9
10
11
func waitBySleep() {
for i := 0; i < 10; i++ {
go fmt.Println("this is", i)
}
time.Sleep(time.Second)
}

func main() {
waitBySleep()
}

sleep的时间根据任务的复杂程度难以估算,不是最佳实践

waitByChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func waitByChannel() {
c := make(chan struct{}, 10)
for i := 0; i < 10; i++ {
go func(i int) {
fmt.Println("this is", i)
c <- struct{}{}
}(i)
}
for i := 0; i < 10; i++ {
<-c
}
}
func main() {
waitByChannel()
}

通过channl可以并发读写的机制,并发的子线程将执行完毕的标志位送到channel里,主线程将channel内的标志位消费完,说明子线程已经执行完毕。

适合明确子线程的任务有多少个情况下,也不是最佳实践。

使用Channel实现的生产消费者模型

一个简易的生产者消费者模型代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func worker(id int, jobs <-chan int, results chan<- int) { // jobs:只读通道,result:只写通道
for j := range jobs {
fmt.Printf("worker:%d start jobs:%d\n", id, j)
time.Sleep(time.Second)
fmt.Printf("worker:%d end jobs:%d\n", id, j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 开启3个goroutine
for w := 1; w < 3; w++ {
go worker(w, jobs, results)
}
// 分配5个任务
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs) // 关闭通道
// 输出结果
for a := 1; a <= 5; a++ {
<-results // 取出的过程是阻塞的,要等待每一个worker执行完
}
}

waitByWaitGroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func waitByWaitGroup() {
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ { // 创建几个线程干活
go func(i int) {
fmt.Println("this is", i)
wg.Done()
}(i)
}
wg.Wait()
}
func main() {
waitByWaitGroup()
}

本质上还是等待,只是等待的时刻缩短到子线程结束调用的那一时刻。

依靠sync.WaitGroup不能实现生产者消费者模型,需要配合channel完成

waitByCond

sync.Cond是go语言原生提供的生产者消费者模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
type Queue struct {
queue []int
cond *sync.Cond
}

func (q *Queue) Producer(in int) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.queue = append(q.queue, in)
q.cond.Broadcast()

}

func (q *Queue) Consumer() (result int) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if len(q.queue) == 0 {
fmt.Println("no data available, waiting")
q.cond.Wait()
}
result, q.queue = q.queue[0], q.queue[1:]
return result
}

func waitByCond() {
q := Queue{
queue: []int{},
cond: sync.NewCond(&sync.Mutex{}),
}
for j := 1; j <= 5; j++ {
go q.Producer(j)
}
for a := 1; a <= 5; a++ {
fmt.Println("this is", q.Consumer())
}
}

func main() {
waitByCond()
}