Golang之PV原语
概念
PV原语是通过操作信号量来处理进程间的同步与互斥的一段不可分割的程序
介绍
P原语
(1) 信号量减1; (2) 若信号量减1后仍大于或等于零,则该程序继续执行; (3) 若信号量减1后小于零,则该程序被阻塞后进入等待队列中。
这里等于0,意味着当前goroutine获得了“锁”
V原语
(1) 信号量加1; (2) 若结果大于零,则程序继续执行; (3) 若结果小于或等于零,则从等待队列中唤醒一等待程序;
这里等于0,意味着在当前gorotuine执行的时候,又有goroutine申请了,因为当前goroutine先加1了,所以要唤醒一个goroutine。
默认应该是存在1
方法
//这个函数用于获取一个信号量。它会阻塞调用者的 goroutine 直到信号量 s 变为可用。它可能会使 goroutine 进入等待状态,并在信号量被释放时唤醒它。
func runtime_Semacquire(s *uint32)
//这个函数类似于 runtime_Semacquire,但它专门用于获取互斥锁。参数 lifo 决定了等待获取互斥锁的 goroutine 是否应该按照后进先出(LIFO)的顺序被唤醒。这可以用于实现更公平的锁,防止饥饿。skipframes 参数用于调试,它影响运行时栈的跟踪,可以帮助确定锁被持有的位置。
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
//这个函数用于释放一个信号量,允许其他正在等待的 goroutine 继续执行。如果 handoff 参数为 true,运行时会直接将信号量的所有权传递给等待队列中的下一个 goroutine,而不是简单地增加信号量的计数。这有助于减少唤醒的 goroutine 数量,可以提高性能。同样,skipframes 参数用于调试。
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
包Semaphore
这个包是一个带权重的信号量,mutex可以看作初始值为1的semaphore,不过Mutex实际用的时候信号量是0(具体可以看Mutex介绍那一篇)。
type Weighted struct {
size int64 // 设置一个最大权值
cur int64 // 标识当前已被使用的资源数
mu sync.Mutex // 提供临界区保护
waiters list.List // 阻塞等待的调用者列表
}
semaphore库核心结构就是Weighted,主要有4个字段:
size:这个代表的是最大权值,在创建Weighted对象指定cur:相当于一个游标,来记录当前已使用的权值mu:互斥锁,并发情况下做临界区保护waiters:阻塞等待的调用者列表,使用链表数据结构保证先进先出的顺序,存储的数据是waiter对象,waiter数据结构如下:
type waiter struct {
n int64 // 等待调用者权重值
ready chan<- struct{} // close channel就是唤醒
}
这里只有两个字段:
n:这个就是等待调用者的权重值ready:这就是一个channel,利用channel的close机制实现唤醒
semaphore还提供了一个创建Weighted对象的方法,在初始化时需要给定最大权值:
// NewWeighted为并发访问创建一个新的加权信号量,该信号量具有给定的最大权值。
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
阻塞获取权值的方法 - Acquire
先直接看代码吧:
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock() // 加锁保护临界区
// 有资源可用并且没有等待获取权值的goroutine
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n // 加权
s.mu.Unlock() // 释放锁
return nil
}
// 要获取的权值n大于最大的权值了
if n > s.size {
// 先释放锁,确保其他goroutine调用Acquire的地方不被阻塞
s.mu.Unlock()
// 阻塞等待context的返回
<-ctx.Done()
return ctx.Err()
}
// 走到这里就说明现在没有资源可用了
// 创建一个channel用来做通知唤醒
ready := make(chan struct{})
// 创建waiter对象
w := waiter{n: n, ready: ready}
// waiter按顺序入队
elem := s.waiters.PushBack(w)
// 释放锁,等待唤醒,别阻塞其他goroutine
s.mu.Unlock()
// 阻塞等待唤醒
select {
// context关闭
case <-ctx.Done():
err := ctx.Err() // 先获取context的错误信息
s.mu.Lock()
select {
case <-ready:
// 在context被关闭后被唤醒了,那么试图修复队列,假装我们没有取消
err = nil
default:
// 判断是否是第一个元素
isFront := s.waiters.Front() == elem
// 移除第一个元素
s.waiters.Remove(elem)
// 如果是第一个元素且有资源可用通知其他waiter
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
// 被唤醒了
case <-ready:
return nil
}
}
注释已经加到代码中了,总结一下这个方法主要有三个流程:
-
流程一:有资源可用时并且没有等待权值的
goroutine,走正常加权流程; -
流程二:想要获取的权值
n大于初始化时设置最大的权值了,这个goroutine永远不会获取到信号量,所以阻塞等待context的关闭; -
流程三:前两步都没问题的话,就说明现在系统没有资源可用了,这时就需要阻塞等待唤醒,在阻塞等待唤醒这里有特殊逻辑;
-
特殊逻辑一:如果在
context被关闭后被唤醒了,那么就先忽略掉这个cancel,试图修复队列。 -
特殊逻辑二:
context关闭后,则根据是否有可用资源决定通知后面等待唤醒的调用者,这样做的目的其实是为了避免当不同的context控制不同的goroutine时,未关闭的goroutine不会被阻塞住,依然执行,来看这样一个例子(因为goroutine的抢占式调度,所以这个例子也会具有偶然性):
-
func main() {
s := semaphore.NewWeighted(3)
ctx,cancel := context.WithTimeout(context.Background(), time.Second * 2)
defer cancel() //最好手动cancel,即使是timeout的context,手动的cancel也会更快的释放资源
ct,cancel1 := context.WithTimeout(context.Background(), time.Second * 3)
defer cancel1() //defer是先入后出,这个先执行,上面那个后执行
for i :=0; i < 3; i++{
if i != 0{
go func(num int) {
if err := s.Acquire(ctx,3); err != nil{
fmt.Printf("goroutine: %d, err is %s\n", num, err.Error())
return
}
time.Sleep(2 * time.Second)
fmt.Printf("goroutine: %d run over\n",num)
s.Release(3)
}(i)
}else {
go func(num int) {
if err := s.Acquire(ct,3); err != nil{
fmt.Printf("goroutine: %d, err is %s\n", num, err.Error())
return
}
time.Sleep(3 * time.Second)
fmt.Printf("goroutine: %d run over\n",num)
s.Release(3)
}(i)
}
}
time.Sleep(10 * time.Second)
}
上面的例子中goroutine:0 使用ct对象来做控制,超时时间为3s,goroutine:1和goroutine:2对象使用ctx对象来做控制,超时时间为2s,这三个goroutine占用的资源都等于最大资源数,也就是说只能有一个goruotine运行成功,另外两个goroutine都会被阻塞,因为goroutine是抢占式调度,所以我们不能确定哪个gouroutine会第一个被执行,这里我们假设第一个获取到信号量的是gouroutine:2,阻塞等待的调用者列表顺序是:goroutine:1 -> goroutine:0,因为在goroutine:2中有一个2s的延时,所以会触发ctx的超时,ctx会下发Done信号,因为goroutine:2和goroutine:1都是被ctx控制的,所以就会把goroutine:1从等待者队列中取消,但是因为goroutine:1属于队列的第一个队员,并且因为goroutine:2已经释放资源,那么就会唤醒goroutine:0继续执行。
使用这种方式可以避免goroutine永久失眠。
不阻塞获取权值的方法 - TryAcquire
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock() // 加锁
// 有资源可用并且没有等待获取资源的goroutine
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
这个方法就简单很多了,不阻塞地获取权重为n的信号量,成功时返回true,失败时返回false并保持信号量不变。
释放权重
func (s *Weighted) Release(n int64) {
s.mu.Lock()
// 释放资源
s.cur -= n
// 释放资源大于持有的资源,则会发生panic
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
// 通知其他等待的调用者
s.notifyWaiters()
s.mu.Unlock()
}
这里就是很常规的操作,主要就是资源释放,同时进行安全性判断,如果释放资源大于持有的资源,则会发生panic。
唤醒waiter
在Acquire和Release方法中都调用了notifyWaiters,我们来分析一下这个方法:
func (s *Weighted) notifyWaiters() {
for {
// 获取等待调用者队列中的首位
next := s.waiters.Front()
// 没有要通知的调用者了
if next == nil {
break // No more waiters blocked.
}
// 断言出waiter信息
w := next.Value.(waiter)
if s.size-s.cur < w.n {
// 没有足够资源为下一个调用者使用时,继续阻塞该调用者,遵循先进先出的原则,
// 避免需要资源数比较大的waiter被饿死
// 这里没有再次调用,还有一个原因,这个唤醒始终取的是队首元素,而且它无法执行的情况,也无法直接删除,因此再次调用
// 无意义
//
// 考虑一个场景,使用信号量作为读写锁,现有N个令牌,N个reader和一个writer
// 每个reader都可以通过Acquire(1)获取读锁,writer写入可以通过Acquire(N)获得写锁定
// 但不包括所有的reader,如果我们允许reader在队列中前进,writer将会饿死-总是有一个令牌可供每个reader
// 这个场景也挺有意思的,仅允许没有人读的时候完成写,写的优先级非常低。一个很特殊的读写锁
break
}
// 获取资源
s.cur += w.n
// 从waiter列表中移除
s.waiters.Remove(next)
// 使用channel的close机制唤醒waiter
close(w.ready)
}
}
这里只需要注意一个点:唤醒waiter始终采用先进先出的原则,避免需要资源数比较大的waiter被饿死。
何时使用Semaphore
到这里我们就把Semaphore的源代码看了一篇,代码行数不多,封装的也很巧妙,那么我们该什么时候选在使用它呢?
目前能想到一个场景就是Semaphore配合上errgroup实现一个"工作池",使用Semaphore限制goroutine的数量,配合上errgroup做并发控制,示例如下:
const (
limit = 2
)
func main() {
serviceName := []string{
"cart",
"order",
"account",
"item",
"menu",
}
eg,ctx := errgroup.WithContext(context.Background())
s := semaphore.NewWeighted(limit)
for index := range serviceName{
name := serviceName[index]
if err := s.Acquire(ctx,1); err != nil{
fmt.Printf("Acquire failed and err is %s\n", err.Error())
break
}
eg.Go(func() error {
defer s.Release(1)
return callService(name)
})
}
if err := eg.Wait(); err != nil{
fmt.Printf("err is %s\n", err.Error())
return
}
fmt.Printf("run success\n")
}
func callService(name string) error {
fmt.Println("call ",name)
time.Sleep(1 * time.Second)
return nil
}
结果如下:
call order
call cart
call account
call item
call menu
run success
总结
本文主要介绍了Go官方扩展库Semaphore的实现以及PV原语的相关内容,他的设计思路简单,仅仅用几十行就完成了完美的封装,值得借鉴学习。不过在实际业务场景中,使用信号量的场景并不多。大多数场景都可以使用channel来替代,但是有些场景使用Semaphore来实现会更好。比如参考文章【[警惕] 请勿滥用goroutine】,使用channel+sync来控制goroutine数量,这种实现方式并不好。因为实际已经起来了多个goroutine,只不过控制了工作的goroutine数量,如果改用semaphore实现才是真正的控制了goroutine数量。