Golang之Mutex

概述

互斥锁(英语:Mutual exclusion,缩写 Mutex)是一种用于多线程编程中,防止两条线程同时对同一公共资源(比如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区域(critical section)达成。临界区域指的是一块对公共资源进行访问的代码,并非一种机制或是算法。一个程序、进程、线程可以拥有多个临界区域,但是并不一定会应用互斥锁。

golang的sync.mutex就是一种互斥锁的实现。

Go 号称是为了高并发而生的,在高并发场景下,势必会涉及到对公共资源的竞争。互斥锁是在并发程序中对共享资源进行访问控制的主要手段。

源码解析

// 位于sync/mutex.go,基于go1.20.7

// Copyright 2009 The Go Authors. All rights reserved.

// Use of this source code is governed by a BSD-style

// license that can be found in the LICENSE file.

// Package sync provides basic synchronization primitives such as mutual

// exclusion locks. Other than the Once and WaitGroup types, most are intended

// for use by low-level library routines. Higher-level synchronization is

// better done via channels and communication.

//

// Values containing the types defined in this package should not be copied.

package sync

import (

	"internal/race"

	"sync/atomic"

	"unsafe"

)

// Provided by runtime via linkname.

func throw(string)

func fatal(string)

// A Mutex is a mutual exclusion lock.

// The zero value for a Mutex is an unlocked mutex.

//

// A Mutex must not be copied after first use.

//

// In the terminology of the Go memory model,

// the n'th call to Unlock “synchronizes before” the m'th call to Lock

// for any n < m.

// A successful call to TryLock is equivalent to a call to Lock.

// A failed call to TryLock does not establish any “synchronizes before”

// relation at all.

//上面那段话,说的是 Mutex 是一种互斥锁,默认零值是未上锁的状态。锁在第一次使用之后,就不应该被复制。在 n<m 时,第 n 次对 Unlock 的调用在时间上“先于”第 m 次对 Lock 的调用。即不可重复加锁,必须先解锁后加锁。成功的 TryLock 调用等同于 Lock 调用。TryLock 调用失败则不会建立任何“先于”关系。

type Mutex struct {

	state int32 //mutex的状态,二进制低三位对应锁的状态,将state右移三位代表mutex的数量

	sema  uint32 //信号量,用于唤醒goroutine

}

// A Locker represents an object that can be locked and unlocked.

//锁的一个接口

type Locker interface {

	Lock()

	Unlock()

}

const (

	mutexLocked = 1 << iota // mutex is locked 0b001 锁处于加锁状态

	mutexWoken   // 0b010  锁处于唤醒状态

	mutexStarving  //0b100 锁处于饥饿状态

	mutexWaiterShift = iota //3 偏移量

	// Mutex fairness.

	//

	// Mutex can be in 2 modes of operations: normal and starvation.

	// In normal mode waiters are queued in FIFO order, but a woken up waiter

	// does not own the mutex and competes with new arriving goroutines over

	// the ownership. New arriving goroutines have an advantage -- they are

	// already running on CPU and there can be lots of them, so a woken up

	// waiter has good chances of losing. In such case it is queued at front

	// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,

	// it switches mutex to the starvation mode.

	//

	// In starvation mode ownership of the mutex is directly handed off from

	// the unlocking goroutine to the waiter at the front of the queue.

	// New arriving goroutines don't try to acquire the mutex even if it appears

	// to be unlocked, and don't try to spin. Instead they queue themselves at

	// the tail of the wait queue.

	//

	// If a waiter receives ownership of the mutex and sees that either

	// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,

	// it switches mutex back to normal operation mode.

	//

	// Normal mode has considerably better performance as a goroutine can acquire

	// a mutex several times in a row even if there are blocked waiters.

	// Starvation mode is important to prevent pathological cases of tail latency.

  // 这里在解释锁的公平性。锁有两种运行模式,正常模式和饥饿模式在普通模式下,等待者按照先进先出(FIFO)的顺序排队,但是一个被唤醒的等待者并不拥有互斥锁,并且需要与新到达的 goroutine 竞争互斥锁的所有权。新到达的 goroutine 有优势 -- 它们已经在 CPU 上运行,并且可能有很多,所以一个被唤醒的等待者很有可能会失去竞争。在这种情况下,它会被排在等待队列的前面。如果一个等待者在超过 1 毫秒的时间内都没有获取到互斥锁,它会将互斥锁切换到饥饿模式。

	// 在饥饿模式下,互斥锁的所有权直接从解锁的 goroutine 转交给队列前端的等待者。新到达的 goroutine 即使看起来互斥锁是未锁定的,也不会尝试获取它,并且不会尝试自旋。相反,它们会将自己排在等待队列的末尾。

	// 如果一个等待者得到了互斥锁的所有权,并且发现(1)它是队列中的最后一个等待者,或者(2)它等待的时间少于 1 毫秒,它会将互斥锁切换回普通操作模式。

	// 普通模式的性能要好得多(针对正在运行的goroutine来说),因为一个 goroutine 可以连续多次获取互斥锁,即使有等待者被阻塞。饥饿模式对于防止尾部延迟的病态情况非常重要。

  // tail latency(尾部延迟) 指的是在一个系统中处理请求的延迟分布的尾部部分。在性能敏感的应用中,通常会关注99%或者99.9%(称为P99或P99.9)的响应时间。尾部延迟通常指的是那些极少数请求的处理时间远高于平均水平的情况。在分布式系统设计和微服务架构中,尾部延迟尤其受到关注,因为它可能会放大并影响到整个系统的性能。尽管一个请求可能只需要经过多个服务中的一个或两个服务时遇到高延迟,但这个高延迟会影响整个请求链路的响应时间,导致用户体验下降。

 	//在这里应该指的是某个goroutine一直在排队(如不断有当前进程获得锁),因此影响性能的场景。

	starvationThresholdNs = 1e6 //转换成饥饿模式的时间限制,在源码里写死为1e6 ns,也就是1ms

)

// Lock locks m.

// If the lock is already in use, the calling goroutine

// blocks until the mutex is available.

//如果锁已经被使用,调用该函数的 goroutine 会阻塞,直到互斥锁可用为止。

func (m *Mutex) Lock() {

	// Fast path: grab unlocked mutex.

  //快速处理,state为0,代表这个锁还没有人占用,直接使用就可以.这是方法的快速路径,目的是尽快地锁定未被锁定的互斥锁。

	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {

    //race.Enabled用于确定当前代码是否处于竟态检测模式下运行,本文不涉及

		if race.Enabled {

			race.Acquire(unsafe.Pointer(m))

		}

		return

	}

	// Slow path (outlined so that the fast path can be inlined)

	m.lockSlow()

}

// TryLock tries to lock m and reports whether it succeeded.

//

// Note that while correct uses of TryLock do exist, they are rare,

// and use of TryLock is often a sign of a deeper problem

// in a particular use of mutexes.

//这个很少用,尝试锁定mutex,并会报告是否成功

func (m *Mutex) TryLock() bool {

	old := m.state

  //如果锁是锁定或者饥饿状态,就返回false

	if old&(mutexLocked|mutexStarving) != 0 {

		return false

	}

	// There may be a goroutine waiting for the mutex, but we are

	// running now and can try to grab the mutex before that

	// goroutine wakes up.

  //到这里,意味着这个锁被唤醒了,将会有进程抢占这个锁,这个方法会尝试在别的进程被唤醒前,直接抢占这个锁

	if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {

		return false

	}

	if race.Enabled {

		race.Acquire(unsafe.Pointer(m))

	}

	return true

}

//刚刚的慢方法抢占锁

func (m *Mutex) lockSlow() {

	var waitStartTime int64

	starving := false

	awoke := false

	iter := 0

	old := m.state

	for {

		// Don't spin in starvation mode, ownership is handed off to waiters

		// so we won't be able to acquire the mutex anyway.

    //锁定且非饥饿状态,而且可以自旋

		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {

			// Active spinning makes sense.

			// Try to set mutexWoken flag to inform Unlock

			// to not wake other blocked goroutines.

      // 当前goroutine未唤醒,锁未唤醒,且有等待者,尝试直接唤醒,不希望唤醒其他goroutine

			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&

				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {

				awoke = true

			}

     //进入自旋

			runtime_doSpin()

			iter++

			old = m.state

			continue

		}

		new := old

		// Don't try to acquire starving mutex, new arriving goroutines must queue.

    //mutex不是饥饿模式,尝试获取锁

		if old&mutexStarving == 0 {

			new |= mutexLocked

		}

    //mutex锁定或者饥饿,增加一个等待者

		if old&(mutexLocked|mutexStarving) != 0 {

			new += 1 << mutexWaiterShift

		}

		// The current goroutine switches mutex to starvation mode.

		// But if the mutex is currently unlocked, don't do the switch.

		// Unlock expects that starving mutex has waiters, which will not

		// be true in this case.

    //如果当前goroutine需要饥饿,而且mutex没锁定,设置锁为饥饿状态

		if starving && old&mutexLocked != 0 {

			new |= mutexStarving

		}

    //如果当前goroutine被唤醒,设置mutex的唤醒状态

		if awoke {

			// The goroutine has been woken from sleep,

			// so we need to reset the flag in either case.

			if new&mutexWoken == 0 {

				throw("sync: inconsistent mutex state")

			}

			new &^= mutexWoken

		}

    //当前goroutine计算完成,尝试更新mutex的状态

		if atomic.CompareAndSwapInt32(&m.state, old, new) {

      //更新成功,且上一次不是锁定和饥饿,那么代表当前goroutine获取了锁,退出循环

			if old&(mutexLocked|mutexStarving) == 0 {

				break // locked the mutex with CAS

			}

      //竞争失败或者其他场景,开始排队,如果之前已经等待,就把当前goroutine放入队列的前列

			// If we were already waiting before, queue at the front of the queue.

			queueLifo := waitStartTime != 0

			if waitStartTime == 0 {

        //第一次来,记录一下第一次等待开始的时间

				waitStartTime = runtime_nanotime()

			}

      //阻塞当前进程,放入一个fifo的队列,第二个参数为true的话,放入队首

			runtime_SemacquireMutex(&m.sema, queueLifo, 1)

      //当前goroutine重新唤醒,根据当前goroutine的等待时间设置饥饿状态,如果当前进程已经饥饿,就不用计算了

			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

			old = m.state

			if old&mutexStarving != 0 {

				// If this goroutine was woken and mutex is in starvation mode,

				// ownership was handed off to us but mutex is in somewhat

				// inconsistent state: mutexLocked is not set and we are still

				// accounted as waiter. Fix that.

        //当前goroutine被唤醒,而且锁是饥饿状态且处于唤醒或者锁定状态,或者已经没有等待者了,就会报错

        //因为当前进程被唤醒,说明在队列首,而且锁是饥饿状态,锁应该直接交给当前goroutine,不会置任何状态,而且当前进程还在,等待者不会是0,否则就是异常

				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {

					throw("sync: inconsistent mutex state")

				}

				delta := int32(mutexLocked - 1<<mutexWaiterShift)// 锁状态-饥饿状态左移一位 -7,减少一个等待者,并且设置mutex为锁定状态

        //锁是饥饿状态,当前goroutine不饥饿,或者只有一个等待者了,就回到普通模式

				if !starving || old>>mutexWaiterShift == 1 {

					// Exit starvation mode.

					// Critical to do it here and consider wait time.

					// Starvation mode is so inefficient, that two goroutines

					// can go lock-step infinitely once they switch mutex

					// to starvation mode.

					delta -= mutexStarving -7-4 //去掉饥饿状态

				}

				atomic.AddInt32(&m.state, delta) //去掉饥饿状态,并且减少一个等待者,并且获得这个锁

				break

			}

      //锁不是饥饿状态

			awoke = true // 当前goroutine进入唤醒

			iter = 0

		} else {

      //更新失败了,抢占失败

			old = m.state

		}

	}

	if race.Enabled {

		race.Acquire(unsafe.Pointer(m))

	}

}

// Unlock unlocks m.

// It is a run-time error if m is not locked on entry to Unlock.

//

// A locked Mutex is not associated with a particular goroutine.

// It is allowed for one goroutine to lock a Mutex and then

// arrange for another goroutine to unlock it.

func (m *Mutex) Unlock() {

	if race.Enabled {

		_ = m.state

		race.Release(unsafe.Pointer(m))

	}

  //快速方案,直接解锁

	// Fast path: drop lock bit.

	new := atomic.AddInt32(&m.state, -mutexLocked)

	if new != 0 {

		// Outlined slow path to allow inlining the fast path.

		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.

    //锁未归零,意味着有人等待,或者其他场景,需要慢方法处理

		m.unlockSlow(new)

	}

}

func (m *Mutex) unlockSlow(new int32) {

  //重复解锁,状态变成了负数

	if (new+mutexLocked)&mutexLocked == 0 {

		fatal("sync: unlock of unlocked mutex")

	}

  //不是饥饿状态

	if new&mutexStarving == 0 {

		old := new

		for {

			// If there are no waiters or a goroutine has already

			// been woken or grabbed the lock, no need to wake anyone.

			// In starvation mode ownership is directly handed off from unlocking

			// goroutine to the next waiter. We are not part of this chain,

			// since we did not observe mutexStarving when we unlocked the mutex above.

			// So get off the way.

      //没有等待者或者锁的状态不全为0,认定为其他锁直接处理了,直接return就行

			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {

				return

			}

			// Grab the right to wake someone.

      //尝试减少一个等待者,并且唤醒一个goroutine,成功的话,直接return

			new = (old - 1<<mutexWaiterShift) | mutexWoken

			if atomic.CompareAndSwapInt32(&m.state, old, new) {

				runtime_Semrelease(&m.sema, false, 1)

				return

			}

			old = m.state

		}

	} else {

		// Starving mode: handoff mutex ownership to the next waiter, and yield

		// our time slice so that the next waiter can start to run immediately.

		// Note: mutexLocked is not set, the waiter will set it after wakeup.

		// But mutex is still considered locked if mutexStarving is set,

		// so new coming goroutines won't acquire it.

    //是饥饿状态,直接唤醒队首

		runtime_Semrelease(&m.sema, true, 1)

    //饥饿状态的等待者减1,在Lock方法里

	}

}

扩展-RWMutex

整体基于Mutex实现

// Copyright 2009 The Go Authors. All rights reserved.

// Use of this source code is governed by a BSD-style

// license that can be found in the LICENSE file.

package sync

import (

	"internal/race"

	"sync/atomic"

	"unsafe"

)

// There is a modified copy of this file in runtime/rwmutex.go.

// If you make any changes here, see if you should make them there.

// A RWMutex is a reader/writer mutual exclusion lock.

// The lock can be held by an arbitrary number of readers or a single writer.

// The zero value for a RWMutex is an unlocked mutex.

//

// A RWMutex must not be copied after first use.

//

// If a goroutine holds a RWMutex for reading and another goroutine might

// call Lock, no goroutine should expect to be able to acquire a read lock

// until the initial read lock is released. In particular, this prohibits

// recursive read locking. This is to ensure that the lock eventually becomes

// available; a blocked Lock call excludes new readers from acquiring the

// lock.

//

// In the terminology of the Go memory model,

// the n'th call to Unlock “synchronizes before” the m'th call to Lock

// for any n < m, just as for Mutex.

// For any call to RLock, there exists an n such that

// the n'th call to Unlock “synchronizes before” that call to RLock,

// and the corresponding call to RUnlock “synchronizes before”

// the n+1'th call to Lock.

type RWMutex struct {

	w           Mutex        // held if there are pending writers  

	writerSem   uint32       // semaphore for writers to wait for completing readers

	readerSem   uint32       // semaphore for readers to wait for completing writers

	readerCount atomic.Int32 // number of pending readers 一个表示当前有多少读操作正在进行的计数器,有人在写,会变成负数

	readerWait  atomic.Int32 // number of departing readers 一个表示有多少写操作在等待的计数器

}

const rwmutexMaxReaders = 1 << 30 //读锁最大持有数

// Happens-before relationships are indicated to the race detector via:

// - Unlock  -> Lock:  readerSem

// - Unlock  -> RLock: readerSem

// - RUnlock -> Lock:  writerSem

//

// The methods below temporarily disable handling of race synchronization

// events in order to provide the more precise model above to the race

// detector.

//

// For example, atomic.AddInt32 in RLock should not appear to provide

// acquire-release semantics, which would incorrectly synchronize racing

// readers, thus potentially missing races.

// RLock locks rw for reading.

//

// It should not be used for recursive read locking; a blocked Lock

// call excludes new readers from acquiring the lock. See the

// documentation on the RWMutex type.

func (rw *RWMutex) RLock() {

	if race.Enabled {

		_ = rw.w.state

		race.Disable()

	}

  //读的计数加1还小于0,意味着有人在写,进入等待,等价于Mutex的FIFO

	if rw.readerCount.Add(1) < 0 {

		// A writer is pending, wait for it.

		runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)

	}

	if race.Enabled {

		race.Enable()

		race.Acquire(unsafe.Pointer(&rw.readerSem))

	}

}

// TryRLock tries to lock rw for reading and reports whether it succeeded.

//

// Note that while correct uses of TryRLock do exist, they are rare,

// and use of TryRLock is often a sign of a deeper problem

// in a particular use of mutexes.

func (rw *RWMutex) TryRLock() bool {

	if race.Enabled {

		_ = rw.w.state

		race.Disable()

	}

	for {

		c := rw.readerCount.Load()

    //代表有人持有写锁

		if c < 0 {

			if race.Enabled {

				race.Enable()

			}

			return false

		}

    //没人持有写锁,读锁持有者加一

		if rw.readerCount.CompareAndSwap(c, c+1) {

			if race.Enabled {

				race.Enable()

				race.Acquire(unsafe.Pointer(&rw.readerSem))

			}

			return true

		}

	}

}

// RUnlock undoes a single RLock call;

// it does not affect other simultaneous readers.

// It is a run-time error if rw is not locked for reading

// on entry to RUnlock.

func (rw *RWMutex) RUnlock() {

	if race.Enabled {

		_ = rw.w.state

		race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))

		race.Disable()

	}

  //读锁解锁,如果小于0,代表存在特殊情况,可能有人拥有写锁,可能没有锁

	if r := rw.readerCount.Add(-1); r < 0 {

		// Outlined slow-path to allow the fast-path to be inlined

		rw.rUnlockSlow(r)

	}

	if race.Enabled {

		race.Enable()

	}

}

func (rw *RWMutex) rUnlockSlow(r int32) {

  //异常情况,解锁了一个未锁的锁或者写锁的锁

	if r+1 == 0 || r+1 == -rwmutexMaxReaders {

		race.Enable()

		fatal("sync: RUnlock of unlocked RWMutex")

	}

  // 一个写锁正在等待,直接唤醒。这里的场景是最后一个读锁的解锁,才会唤醒写锁

	// A writer is pending.

	if rw.readerWait.Add(-1) == 0 {

		// The last reader unblocks the writer.

		runtime_Semrelease(&rw.writerSem, false, 1)

	}

}

// Lock locks rw for writing.

// If the lock is already locked for reading or writing,

// Lock blocks until the lock is available.

func (rw *RWMutex) Lock() {

	if race.Enabled {

		_ = rw.w.state

		race.Disable()

	}

  // 首先,加个真的锁,而且由于这个锁的原因,readerWait最多只能有1个

	// First, resolve competition with other writers.

	rw.w.Lock()

  // 然后设置读Count为负值,并且保留原来的读锁持有数量到r

	// Announce to readers there is a pending writer.

	r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders

	// Wait for active readers.

  //如果当前有还在读的人且增加写锁的等待者加一成功,进入等待状态,类似于waitgroup,等所有读锁退出

	if r != 0 && rw.readerWait.Add(r) != 0 {

		runtime_SemacquireRWMutex(&rw.writerSem, false, 0)

	}

	if race.Enabled {

		race.Enable()

		race.Acquire(unsafe.Pointer(&rw.readerSem))

		race.Acquire(unsafe.Pointer(&rw.writerSem))

	}

}

// TryLock tries to lock rw for writing and reports whether it succeeded.

//

// Note that while correct uses of TryLock do exist, they are rare,

// and use of TryLock is often a sign of a deeper problem

// in a particular use of mutexes.

func (rw *RWMutex) TryLock() bool {

	if race.Enabled {

		_ = rw.w.state

		race.Disable()

	}

  //使用真锁的tryLock,如果失败,就不用继续了

	if !rw.w.TryLock() {

		if race.Enabled {

			race.Enable()

		}

		return false

	}

  //尝试假设没有读者设置锁状态

	if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {

		rw.w.Unlock()

		if race.Enabled {

			race.Enable()

		}

		return false

	}

	if race.Enabled {

		race.Enable()

		race.Acquire(unsafe.Pointer(&rw.readerSem))

		race.Acquire(unsafe.Pointer(&rw.writerSem))

	}

	return true

}

// Unlock unlocks rw for writing. It is a run-time error if rw is

// not locked for writing on entry to Unlock.

//

// As with Mutexes, a locked RWMutex is not associated with a particular

// goroutine. One goroutine may RLock (Lock) a RWMutex and then

// arrange for another goroutine to RUnlock (Unlock) it.

func (rw *RWMutex) Unlock() {

	if race.Enabled {

		_ = rw.w.state

		race.Release(unsafe.Pointer(&rw.readerSem))

		race.Disable()

	}

	// Announce to readers there is no active writer.

	r := rw.readerCount.Add(rwmutexMaxReaders)

  //出现下面这种情况是因为尝试为一个读锁解写锁,所以值越界了

	if r >= rwmutexMaxReaders {

		race.Enable()

		fatal("sync: Unlock of unlocked RWMutex")

	}

  //唤醒所有读锁

	// Unblock blocked readers, if any.

	for i := 0; i < int(r); i++ {

		runtime_Semrelease(&rw.readerSem, false, 0)

	}

	// Allow other writers to proceed.

  //解锁

  rw.w.Unlock()

	if race.Enabled {

		race.Enable()

	}

}

// RLocker returns a Locker interface that implements

// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.

func (rw *RWMutex) RLocker() Locker {

	return (*rlocker)(rw)

}

type rlocker RWMutex

func (r *rlocker) Lock()   { (*RWMutex)(r).RLock() }

func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }

参考文档

Go同步原语 — sync.Mutex 详解 - 知乎 (zhihu.com)