sync.Cond的例子

sync.Cond类似于pthread中的条件变量,但等待的为goroutine,而不是线程。比较难理解的为Wait函数,在调用该函数时必须L为Lock状态,调用Wait函数后,goroutine会自动解锁,并等待条件的到来,等条件到来后会重新加锁。

代码量并不多,下面是去掉注释后的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package sync

import (
"sync/atomic"
"unsafe"
)

type Cond struct {
noCopy noCopy

// L is held while observing or changing the condition
L Locker

notify notifyList
checker copyChecker
}

func NewCond(l Locker) *Cond {
return &Cond{L: l}
}

func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}

func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}

// copyChecker holds back pointer to itself to detect object copying.
type copyChecker uintptr

func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}

// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://github.com/golang/go/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}

// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock() {}

具体的使用例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package main

import (
"fmt"
"sync"
)

func main() {
mutex := &sync.Mutex{}
cond := sync.NewCond(mutex)

wg := &sync.WaitGroup{}

wait := func(i int, c chan int) {
defer wg.Done()
fmt.Println("start chan ", i)
cond.L.Lock()
defer cond.L.Unlock()
fmt.Printf("chan %d wait before\n", i)
c <- i
// Wait是理解起来稍微麻烦的点,Cond.Wait会自动释放锁等待信号的到来,当信号到来后,第一个获取到信号的Wait将继续往下执行并从新上锁
cond.Wait()
fmt.Printf("chan %d wait end\n", i)
}

signal := func(count int, c chan int) {
defer wg.Done()
for i := 0; i < count; i++ {
fmt.Printf("read chan %d ready\n", <-c)
}
fmt.Println("call signal")
cond.Signal()
}

broadcast := func(count int, c chan int) {
defer wg.Done()
for i := 0; i < count; i++ {
fmt.Printf("read chan %d ready\n", <-c)
}
fmt.Println("call broadcast")
cond.Broadcast()
}

c := make(chan int)
wg.Add(2)
go wait(0, c)
go signal(1, c)
wg.Wait()
fmt.Println("signal test finished\n\n")

count := 3
for i := 0; i < count; i++ {
wg.Add(1)
go wait(i, c)
}
wg.Add(1)
go broadcast(count, c)
wg.Wait()
}