网站/小程序/APP个性化定制开发,二开,改版等服务,加扣:8582-36016

这篇文章主要为大家介绍了Golang Mutex互斥锁深入理解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

    引言

    Golang的并发编程令人着迷,使用轻量的协程、基于CSP的channel、简单的go func()就可以开始并发编程,在并发编程中,往往离不开锁的概念。

    本文介绍了常用的同步原语 sync.Mutex,同时从源码剖析它的结构与实现原理,最后简单介绍了mutex在日常使用中可能遇到的问题,希望大家读有所获。

    Mutex结构

    Mutex运行时数据结构位于sync/mutex.go

    type Mutex struct {
       state int32
       sema  uint32
    }

    其中state表示当前互斥锁的状态,sema表示 控制锁状态的信号量.

    互斥锁的状态定义在常量中:

    const (
       mutexLocked = 1 << iota // 1 ,处于锁定状态; 2^0
       mutexWoken // 2 ;从正常模式被从唤醒;  2^1
       mutexStarving // 4 ;处于饥饿状态;    2^2
       mutexWaiterShift = iota // 3 ;获得互斥锁上等待的Goroutine个数需要左移的位数: 1 << mutexWaiterShift
       starvationThresholdNs = 1e6 // 锁进入饥饿状态的等待时间
    )

    0即其他状态。

    sema是一个组合,低三位分别表示锁的三种状态,高29位表示正在等待互斥锁释放的gorountine个数,和Java表示线程池状态那部分有点类似

    一个mutex对象仅占用8个字节,让人不禁感叹其设计的巧妙

    饥饿模式和正常模式

    正常模式

    在正常模式下,等待的协程会按照先进先出的顺序得到锁 在正常模式下,刚被唤醒的goroutine与新创建的goroutine竞争时,大概率无法获得锁。

    饥饿模式

    为了避免正常模式下,goroutine被“饿死”的情况,go在1.19版本引入了饥饿模式,保证了Mutex的公平性

    在饥饿模式中,互斥锁会直接交给等待队列最前面的goroutine。新的goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。

    状态的切换

    在正常模式下,一旦Goroutine超过1ms没有获取到锁,它就会将当前互斥锁切换饥饿模式

    如果一个goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。

    加锁和解锁

    加锁

    func (m *Mutex) Lock() {
       // Fast path: grab unlocked mutex.
       if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
          return
       }
       // 原注释: Slow path (outlined so that the fast path can be inlined)
       // 将
       m.lockSlow()
    }

    可以看到,当前互斥锁的状态为0时,尝试将当前锁状态设置为更新锁定状态,且这些操作是原子的。

    若当前状态不为0,则进入lockSlow方法
    先定义了几个参数

    var waitStartTime int64
    starving := false //
    awoke := false
    iter := 0
    old := m.state

    随后进入一个很大的for循环,让我们来逐步分析

    自旋

    for {
         // 1 && 2
       if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
          //  3.
          if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
             atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
             awoke = true
          }
          runtime_doSpin()
          iter++
          old = m.state
          continue
       }

    old&(mutexLocked|mutexStarving) == mutexLocked

    当且仅当当前锁状态为mutexLocked时,表达式为true

    runtime_canSpin(iter) 是否满足自旋条件

    • 运行在拥有多个CPU的机器上;

    • 当前Goroutine为了获取该锁进入自旋的次数小于四次;

    • 当前机器上至少存在一个正在运行的处理器 P,并且处理的运行队列为空;

    如果当前状态下自旋是合理的,将awoke置为true,同时设置锁状态为mutexWoken,进入自旋逻辑

    runtime_doSpin()会执行30次PAUSE指令,并且仅占用CPU资源 代码位于:runtime\asm_amd64.s +567

    //go:linkname sync_runtime_doSpin sync.runtime_doSpin
    //go:nosplit
    func sync_runtime_doSpin() {
       procyield(active_spin_cnt)
    }
    TEXT runtime·procyield(SB),NOSPLIT,$0-0
        MOVL    cycles+0(FP), AX
    again:
        PAUSE
        SUBL    $1, AX
        JNZ again
        RET


    计算锁的新状态

    停止了自旋后,

    new := old
    // 1.
    if old&mutexStarving == 0 {
       new |= mutexLocked
    }
    // 2.
    if old&(mutexLocked|mutexStarving) != 0 {
       new += 1 << mutexWaiterShift
    }
    // 3 && 4.
    if starving && old&mutexLocked != 0 {
       new |= mutexStarving
    }
    // 5.
    if awoke {
       if new&mutexWoken == 0 {
          throw("sync: inconsistent mutex state")
       }
       new &^= mutexWoken
    }
    • old&mutexStarving == 0 表明原来不是饥饿模式。如果是饥饿模式的话,其他goroutine不会执行接下来的代码,直接进入等待队列队尾

    • 如果原来是 mutexLocked 或者 mutexStarving模式,waiterCounts数加一

    • 如果被标记为饥饿状态,且锁状态为mutexLocked的话,设置锁的新状态为饥饿状态。

    • 被标记为饥饿状态的前提是 被唤醒过且抢锁失败

    • 计算新状态

    更新锁状态

    // 1.
    if atomic.CompareAndSwapInt32(&m.state, old, new) {
          if old&(mutexLocked|mutexStarving) == 0 {
             break // locked the mutex with CAS
          }
          // 2.
          queueLifo := waitStartTime != 0
          if waitStartTime == 0 {
             waitStartTime = runtime_nanotime()
          }
          // 3.
          runtime_SemacquireMutex(&m.sema, queueLifo, 1)
          // 4.
          starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
          old = m.state
          // 5.
          if old&mutexStarving != 0 {
             /
             if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                throw("sync: inconsistent mutex state")
             }
             delta := int32(mutexLocked - 1<<mutexWaiterShift)
             if !starving || old>>mutexWaiterShift == 1 {
                delta -= mutexStarving
             }
             atomic.AddInt32(&m.state, delta)
             break
          }
          awoke = true
          iter = 0
       } else {
          old = m.state
       }
    }
    • 尝试将锁状态设置为new 。这里设置成功不代表上锁成功,有可能new不为mutexLocked 或者是waiterCount数量的改变

    • waitStartTime不为0 说明当前goroutine已经等待过了,将当前goroutine放到等待队列的队头

    • 走到这里,会调用runtime_SemacquireMutex 方法使当前协程阻塞,runtime_SemacquireMutex方法中会不断尝试获得锁,并会陷入休眠 等待信号量释放。

    • 当前协程可以获得信号量,从runtime_SemacquireMutex方法中返回。此时协程会去更新starving标志位:如果当前starving标志位为true或者等待时间超过starvationThresholdNs ,将starving置为true

    之后会按照饥饿模式与正常模式,走不同的逻辑

    • - 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环;  

    • - 在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出;

    解锁

    func (m *Mutex) Unlock() {
       // 1.
       new := atomic.AddInt32(&m.state, -mutexLocked)
       if new != 0 {
          // 2.
          m.unlockSlow(new)
       }
    }
    • 将锁状态的值增加 -mutexLocked 。如果新状态不等于0,进入unlockSlow方法

    func (m *Mutex) unlockSlow(new int32) {
        // 1.
       if (new+mutexLocked)&mutexLocked == 0 {
          throw("sync: unlock of unlocked mutex")
       }
       if new&mutexStarving == 0 {
          old := new
          for {
          // 2.
             if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
             }
             // 2.1.
             new = (old - 1<<mutexWaiterShift) | mutexWoken
             if atomic.CompareAndSwapInt32(&m.state, old, new) {
             // 2.2.
                runtime_Semrelease(&m.sema, false, 1)
                return
             }
             old = m.state
          }
       } else {
       // 3.
          runtime_Semrelease(&m.sema, true, 1)
       }
    }

    1.new+mutexLocked代表将锁置为1,如果两个状态& 不为0,则说明重复解锁.如果重复解锁则抛出panic

    2. 如果等待者数量等于0,或者锁的状态已经变为mutexWoken、mutexStarving、mutexStarving,则直接返回

    • 将waiterCount数量-1,尝试选择一个goroutine唤醒

    • 尝试更新锁状态,如果更新锁状态成功,则唤醒队尾的一个gorountine

    3. 如果不满足 2的判断条件,则进入饥饿模式,同时交出锁的使用权

    可能遇到的问题

    锁拷贝

    mu1 := &sync.Mutex{}
    mu1.Lock()
    mu2 := mu1
    mu2.Unlock()

    此时mu2能够正常解锁,那么我们再试试解锁mu1呢

    mu1 := &sync.Mutex{}
    mu1.Lock()
    mu2 := mu1
    mu2.Unlock()
    mu1.Unlock()

    可以看到发生了error

    panic导致没有unlock

    当lock()之后,可能由于代码问题导致程序发生了panic,那么mutex无法被及时unlock(),由于其他协程还在等待锁,此时可能触发死锁

    func TestWithLock() {
       nums := 100
       wg := &sync.WaitGroup{}
       safeSlice := SafeSlice{
          s:    []int{},
          lock: new(sync.RWMutex),
       }
       i := 0
       for idx := 0; idx < nums; idx++ { // 并行nums个协程做append
          wg.Add(1)
          go func() {
             defer func() {
                if r := recover(); r != nil {
                   log.Println("recover")
                }
                wg.Done()
             }()
             safeSlice.lock.Lock()
             safeSlice.s = append(safeSlice.s, i)
             if i == 98{
                panic("123")
             }
             i++
             safeSlice.lock.Unlock()
          }()
       }
       wg.Wait()
       log.Println(len(safeSlice.s))
    }

    修改:

    func TestWithLock() {
       nums := 100
       wg := &sync.WaitGroup{}
       safeSlice := SafeSlice{
          s:    []int{},
          lock: new(sync.RWMutex),
       }
       i := 0
       for idx := 0; idx < nums; idx++ { // 并行nums个协程做append
          wg.Add(1)
          go func() {
             defer func() {
                if r := recover(); r != nil {
                }
                safeSlice.lock.Unlock()
                wg.Done()
             }()
             safeSlice.lock.Lock()
             safeSlice.s = append(safeSlice.s, i)
             if i == 98{
                panic("123")
             }
             i++
          }()
       }
       wg.Wait()
       log.Println(len(safeSlice.s))
    }


    评论 0

    暂无评论
    0
    0
    0
    立即
    投稿
    发表
    评论
    返回
    顶部