本文主要介绍golang源代码实现了四种并发同步的方法。
关键词:golang,并发实现
大家都知道Golang天生支持并发,任何一个函数前面使用go关键字都能实现并发。go给我们提供了详细的并发原语,总体来说有四种实现并发同步的方法。
waitBySleep
主线程显式调用sleep函数等待子线程完成
1 2 3 4 5 6 7 8 9 10 11
| func waitBySleep() { for i := 0; i < 10; i++ { go fmt.Println("this is", i) } time.Sleep(time.Second) }
func main() { waitBySleep() }
|
sleep的时间根据任务的复杂程度难以估算,不是最佳实践
waitByChannel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func waitByChannel() { c := make(chan struct{}, 10) for i := 0; i < 10; i++ { go func(i int) { fmt.Println("this is", i) c <- struct{}{} }(i) } for i := 0; i < 10; i++ { <-c } } func main() { waitByChannel() }
|
通过channl可以并发读写的机制,并发的子线程将执行完毕的标志位送到channel里,主线程将channel内的标志位消费完,说明子线程已经执行完毕。
适合明确子线程的任务有多少个情况下,也不是最佳实践。
使用Channel实现的生产消费者模型
一个简易的生产者消费者模型代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| func worker(id int, jobs <-chan int, results chan<- int) { for j := range jobs { fmt.Printf("worker:%d start jobs:%d\n", id, j) time.Sleep(time.Second) fmt.Printf("worker:%d end jobs:%d\n", id, j) results <- j * 2 } } func main() { jobs := make(chan int, 100) results := make(chan int, 100) for w := 1; w < 3; w++ { go worker(w, jobs, results) } for j := 1; j <= 5; j++ { jobs <- j } close(jobs) for a := 1; a <= 5; a++ { <-results } }
|
waitByWaitGroup
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func waitByWaitGroup() { var wg sync.WaitGroup wg.Add(10) for i := 0; i < 10; i++ { go func(i int) { fmt.Println("this is", i) wg.Done() }(i) } wg.Wait() } func main() { waitByWaitGroup() }
|
本质上还是等待,只是等待的时刻缩短到子线程结束调用的那一时刻。
依靠sync.WaitGroup不能实现生产者消费者模型,需要配合channel完成
waitByCond
sync.Cond是go语言原生提供的生产者消费者模型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| type Queue struct { queue []int cond *sync.Cond }
func (q *Queue) Producer(in int) { q.cond.L.Lock() defer q.cond.L.Unlock() q.queue = append(q.queue, in) q.cond.Broadcast()
}
func (q *Queue) Consumer() (result int) { q.cond.L.Lock() defer q.cond.L.Unlock() if len(q.queue) == 0 { fmt.Println("no data available, waiting") q.cond.Wait() } result, q.queue = q.queue[0], q.queue[1:] return result }
func waitByCond() { q := Queue{ queue: []int{}, cond: sync.NewCond(&sync.Mutex{}), } for j := 1; j <= 5; j++ { go q.Producer(j) } for a := 1; a <= 5; a++ { fmt.Println("this is", q.Consumer()) } }
func main() { waitByCond() }
|