Go Cond源码剖析

12-04 3,660 views

        今天来说说sync库中的信号量(condition),在Java、Python等有线程概念的语言中,信号量是用在多线程多任务同步的。一个线程完成了某一个动作,通过信号量告诉其它线程,其它线程再进行某些动作(像不像一种事务的编排)。Go没有线程的概念,而是通过G和M绑定获取CPU执行的权限。Go是使用”通知列表”进行信号量管理。

        首先来一个信号量的使用案例。

package main

import (
    "fmt"
    "sync"
    "time"
)


func cond(){
    var wg sync.WaitGroup
    var c = sync.NewCond(&sync.Mutex{})

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int){
            defer wg.Done()

            // Wait函数c.L.Lock()和该函数的c.L.Unlock()是一对关系
            c.L.Lock()

            c.Wait()				// 进入阻塞队列,等待被唤醒
            fmt.Println(id, "done.")	// 在Wait函数已经上锁,该任务可以安全被执行

            c.L.Unlock()			// 并发任务执行完成,解锁
        }(i)
    }

    go func() {
        time.Sleep(time.Second)
        c.Signal()

        time.Sleep(time.Second * 2)
        c.Broadcast()
    }()

    wg.Wait()
}

func main() {
    cond()
}

        使用Cond时要小心,其实在该实例中c.L.Unlock()并不是该实例c.L.Lock()的解锁。在c.Wait()中做了详细的操作,具体做了如下工作。

            1. 将ticket(票据)添加到通知等待列表。

          2. 解锁,此处才是给cond函数c.L.Lock解锁。
          3. ticket(票据)进入休眠,等待唤醒。
          4. 上锁,此处的上锁才是给cond函数c.L.Unlock加锁。

        只有了解了原理之后,才能更愉快使用condition。拿该例子来说,c.L.Unlock()放到fmt.Println(id, “done.”)之前和之后完全是不一样的。写在fmt.Println(id, “done.”)之后,并发任务是安全的.放在fmt.Println(id, “done.”)之前并发任务是非安全的。

        接下来,咱们来说说Cond源码的具体实现。

type Cond struct {
        noCopy noCopy
        L Locker
        notify  notifyList		// 通知列表
        checker copyChecker  // 复制检查
}

         比较有意思的是复制检查,接下来咱们看看作者如何实现copyChecker的。

type copyChecker uintptr

func (c *copyChecker) check() {
        if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&	// 初始化时, *c == 0,自然和c pointer不等
                !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) && 	                               // 第一次成功,c保存自身地址,终止
                uintptr(*c) != uintptr(unsafe.Pointer(c)) {
                panic("sync.Cond is copied")
        }
}

        copyChecker的做法有点儿意思将一个uintptr字段地址用自身保存起来,如果当前对象被复制,那么存储地址和当前地址自然不等,从而判断该对象被复制。

func (c *Cond) Wait() {
	// 等待操作内必须处理Locker操作
	// 因此在Wait前,须先执行锁定操作
	// 当处于等待状态时,并未持有锁

        c.checker.check()
        t := runtime_notifyListAdd(&c.notify)	// 添加到通知等待队列
        c.L.Unlock()
        runtime_notifyListWait(&c.notify, t)	// 休眠阻塞,直到被通知唤醒
        c.L.Lock()						// 任务被唤醒之后,马上进行上锁。只有上锁之后,才可以安全执行并发任务
}

          所以在使用Cond时一定要小心,必须在使用Wait之前,须先执行锁定操作。这种设计比价诡异,不清楚作者这样设计目的。在Wait函数中主要调用了runtime_notifyListAdd和runtime_notifyListWait函数,接下来主要剖析一下这两个函数的具体实现。

          notifyListAdd主要是将票据添加到通知等待列表,并返回wait票据序号。

func notifyListAdd(l *notifyList) uint32 {
        return atomic.Xadd(&l.wait, 1) - 1 	// 返回票据号
}
// notifyList 是一个用计数器实现的票据队列管理
runtime/sema.go
type notifyList struct {
        wait uint32		// 等待计数器
        notify uint32	// 通知计数器

        // List of parked waiters.
        lock mutex
        head *sudog
        tail *sudog
}

        notifyListWait 主要是将等待票据和当前G 打包成sudog,放入链表后休眠,等待被唤醒。

func notifyListWait(l *notifyList, t uint32) {
	// l.notify 通知列表
        // t 票据号

        lock(&l.lock)

        // 如果等待票据号小于通知序号,则意味着已经触发通知操作,直接返回
        if less(t, l.notify) {
                unlock(&l.lock)
                return
        }

        // 将当前G打包成sudog
        // sudog 表示等待列表中的goroutine
        s := acquireSudog()
        s.g = getg()
        s.ticket = t
        s.releasetime = 0
        
        // 放入等待链表
        if l.tail == nil {
                l.head = s
        } else {
                l.tail.next = s
        }
        l.tail = s

        // 休眠,等待唤醒
        goparkunlock(&l.lock, "semacquire", traceEvGoBlockCond, 3)
        releaseSudog(s)
}

        通知队列本身就是并发安全的,所以加锁的目的无非是:

            1. 阻塞通知期间有新的等待者加入
          2. 在Wait逻辑完成前,没有新的事件发生

// 单播
func (c *Cond) Signal() {
        c.checker.check()
        runtime_notifyListNotifyOne(&c.notify)
}

// 广播
func (c *Cond) Broadcast() {
        c.checker.check()
        runtime_notifyListNotifyAll(&c.notify)
}

         无论是广播还是单个通知都很简单,接下来说说广播和单播是如何实现的。

         广播通知遍历整个等待链表,然后唤醒即可。

func notifyListNotifyAll(l *notifyList) {
        
        // 如果通知序号赶上等待序号,那么意味着没有剩余需要通知的等待者
        if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
                return
        }

	// 取出链表。(本地暂存,不影响链表被外部使用)        
        lock(&l.lock)
        s := l.head
        l.head = nil
        l.tail = nil

        // 更新通知序号,使其和等待序号相同
        atomic.Store(&l.notify, atomic.Load(&l.wait))
        unlock(&l.lock)

        //遍历链表,唤醒所有sudog
        for s != nil {
                next := s.next
                s.next = nil
                readyWithTime(s, 4)
                s = next
        }
}

          单个通知,只需要唤醒和通知序号相同的sudog即可。

func notifyListNotifyOne(l *notifyList) {
        
        // 如果通知序号赶上等待序号,那么意味着没有剩余需要通知的等待者
        if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
                return
        }

        lock(&l.lock)

        // 再次检查
        // 看来锁还是很耗资源的
        t := l.notify
        if t == atomic.Load(&l.wait) {
                unlock(&l.lock)
                return
        }

        // 累加通知序号
        atomic.Store(&l.notify, t+1)

        // 遍历列表,找到对应通知序号的sudog唤醒
        for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
                if s.ticket == t {
                	// 调整链表
                        n := s.next
                        if p != nil {
                                p.next = n
                        } else {
                                l.head = n
                        }
                        if n == nil {
                                l.tail = p
                        }
                        unlock(&l.lock)
                        s.next = nil

                        // 唤醒
                        readyWithTime(s, 4)
                        return
                }
        }
        unlock(&l.lock)
}

        现在Golang 关于Condition的实现原理是不是清楚了,并没有想象的那么负责。

(完)