前言
除了之前在Python中的线程锁中提到的Lock和RLock,Python中的同步机制还有Conditions,我称其为条件锁。以下是我对Condition类的使用以及相关源码分析。
Condition
条件同步机制中,通过通知的方式(notify)激活其他等待中的线程(wait)。在Python内部,Condition的实现是基于RLock之上(同时也有Lock)。获得一个条件锁的方式:
import threading
cond = threading.Condition()
现在需要一个小demo展示条件锁的用法。我想利用多线程打印如下这段对话:
男:今天吃什么?
女:随便。
男:五花肉好不好?
女:油腻了啦!
男:水煮西兰花呢?
女:会不会清淡了?
男:那你想吃什么?
女:随便。
由于对话应该被人理解,所以需要规定线程们的执行顺序。尽管Lock和RLock可以实现期望的那样,但Python已经针对这种需求做好准备——封装成Condition类,轮到它出场了。
import threading
def grilfriend():
with cond: # 上第一层锁
print("女:随便。")
cond.notify()
cond.wait()
print("女:油腻了啦!")
cond.notify()
cond.wait()
print("女:会不会清淡了?")
cond.notify()
cond.wait()
print("女:随便。")
def boyfriend():
with cond:
print("男:今天吃什么?")
cond.notify()
cond.wait()
print("男:五花肉好不好?")
cond.notify()
cond.wait()
print("男:水煮西兰花呢?")
cond.notify()
cond.wait()
print("男:那你想吃什么?")
cond.notify()
if __name__ == "__main__":
cond = threading.Condition()
bthread = threading.Thread(target=boyfriend)
gthread = threading.Thread(target=grilfriend)
bthread.start() # 为让 男友 先说话,所以先执行boyfriend的线程
gthread.start()
bthread.join()
gthread.join()
上面这个demo中,有三点需要注意:
- 为让“男友”先说话,需要先开启boyfriend的线程;
- 条件锁的
notify()
方法和wait()
方法必须在已经上过条件锁的状态下使用; - 当线程之间进行交互时,合理安排
notify()
的次数——它的出现次数应该总是>=wait()
出现次数。
Condition源码分析
实例化
实例Condition时可以指定一个lock,如果没有指定,默认创建RLock的实例。同时Condition拥有与RLock一样的上锁方法acquire()
和解锁方法release()
。事实上,这两个方法直接来源于RLock类。相关源码如下:
# Condition源码
class Condition:
def __init__(self, lock=None):
if lock is None:
lock = RLock() # 实例化RLock的对象
self._lock = lock
self.acquire = lock.acquire # 借用RLock中的方法
self.release = lock.release # 借用RLock中的方法
...
初探notify和wait
条件锁的两个重要方法是notify()
和wait()
。前面已经说过,notify()
和wait()
必须在条件锁上锁的状态下使用——拿wait的源码举例,当该方法被调用,程序会先去调用self._is_owned()
,判断当前线程号与条件锁中的self._ower
是否一致,如果不一致,抛出异常RuntimeError。
# Condition源码
class Condition:
...
def _is_owned(self):
return self._owner == get_ident()
def wait(self, timeout=None):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
...
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
...
...
事实上,调用self._is_owned()
时,程序是去lock中找成员变量_ower。又因为lock是RLock的实例,所以追踪进RLock的源码中发现,如果lock不在“锁住”状态,self._owner
一定等于None,那么self._is_owned()
会返回False,必然导致程序抛异常。这种行为对notify()
来说也是一样。
# RLock源码
class _RLock:
def __init__(self):
...
self._owner = None
...
def acquire(self, blocking=True, timeout=-1):
me = get_ident()
# 非第一次上锁逻辑
if self._owner == me:
self._count += 1
return 1
# 第一次上锁逻辑
rc = self._block.acquire(blocking, timeout)
if rc:
self._owner = me # 第一次上锁时,对self._owner赋值
self._count = 1
return rc
def release(self):
if self._owner != get_ident():
raise RuntimeError("cannot release un-acquired lock")
self._count = count = self._count - 1
if not count: # 如果没锁了,将self._owner 置为 None
self._owner = None
self._block.release()
notify
# Condition源码
class Condition:
...
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters # 双端队列,存放lock
waiters_to_notify = _deque(_islice(all_waiters, n))
if not waiters_to_notify: # 如果为空,退出
return
for waiter in waiters_to_notify: # 否则遍历
waiter.release() # 一一释放锁
try:
all_waiters.remove(waiter) # 同时移除队列中已经释放的锁
except ValueError:
pass
notify()
是在一个双端队列中进行操作,这个队列在Condition中名为_waiters。默认情况下,notify只会释放一个锁(按先进先出原则)。如果队列中没有锁,直接退出函数,不报任何异常。
Condition中还有一个notify_all()
方法,调用它会释放队列中全部的锁:
# Condition源码
class Condition:
...
def notify_all(self):
self.notify(len(self._waiters))
wait
事实上wait()
中的代码有那么长,但为了突出重点,这里仅仅截取了需要的部分:
# Condition源码
class Condition:
...
def wait(self, timeout=None):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock() # 获得一个Lock类型的锁
waiter.acquire() # 上锁
self._waiters.append(waiter) # 加入队列中去
...
也就是说,经过_is_owned()
的校验后,程序向下执行到waiter = _allocate_lock()
,这时需要注意:函数 _allocate_lock()
返回的是一个Lock类型的锁。因此整个结构是:在RLock中,又分布了至少一个Lock锁。多个线程的交互时,就是通过增加队列中的Lock和释放这些Lock来实现阻塞或是激活线程的。
有一点需要理清楚:wait()
用来阻塞调用它的线程本身,而notify()
会激活那个/哪些程序,得看它从队列中取出的Lock锁住的是谁。
wait_for
# Condition源码
class Condition:
...
def wait_for(self, predicate, timeout=None):
endtime = None
waittime = timeout
result = predicate()
while not result: # 当执行结果为True时,
# 或者超过了设置的等待时间,返回predicate()运行的结果
if waittime is not None:
if endtime is None:
endtime = _time() + waittime
else:
waittime = endtime - _time()
if waittime <= 0:
break
self.wait(waittime)
result = predicate()
return result
wait_for()
也是Condition中的方法,它接收的第一个参数是可以调用的函数名,第二个参数是等待的时间(单位秒)。从源码来看,在wait_for()
中会执行你传入的函数。当满足两个条件中的其一,wait_for()
执行结束。条件1:传入的函数返回有效值时会退出wait_for,并返回这个有效值;条件2:超出预设的等待时长,退出wait_for并返回非有效值。由于其中借助了Condition的wait()
方法,所以需要先上条件锁才可以保证程序的正常运行。
有一点也需要明确,在等待过程中,程序不是等到经过timeout秒后,才去第二次执行predicate()。而是等待期间会一直在循环中执行predicate(),一旦拿到有效结果,立马退出循环,并返回这个结果。测试代码可以这样写:
import time
import threading
def return_num():
start = time.time()
while not (time.time() - start >= 3):
pass
return 1
if __name__ == "__main__":
cond = threading.Condition()
with cond:
print(cond.wait_for(return_num, 10))
运行会发现,程序在4s内(3s+其他时间)执行完毕。
还不快抢沙发