前言
在Python中的线程锁和Condition源码分析二文的基础之上,对Python多线程数据同步时可能用到的Semaphore、Event、Queue做了一些粗略分析。
Semaphore
从一个小demo入手,要求:让子线程完成打印数字0-8的任务,并且同一时刻,仅允许有三个子线程存在。代码实现如下:
import time
import threading
def print_num(num):
print("this is {}".format(num))
time.sleep(1)
sema.release() # 释放锁
if __name__ == "__main__":
sema = threading.Semaphore(3) # 实例一个信号锁,并设置计数器为3(最大线程数为3)
for i in range(9):
sema.acquire() # 上锁
t = threading.Thread(target=print_num, args=(i,)) # 让线程执行打印数字的任务
t.start()
为使效果明显,特意增加time.sleep(1)
让所有子线程进入短暂睡眠。运行程序会发现,打印结果会三个三个的出现,也就是说,我们做到了“仅允许有三个子线程存在”的要求。
Semaphore的实现基于条件锁(Condition),只不过,它的状态锁默认使用Lock类型。
# Semaphore源码
class Semaphore:
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock()) # 使用Lock类型的条件锁
self._value = value
...
Semaphore类在实例化时接收一个value参数(默认value=1),这个参数用于限制锁中运行的线程数。整个限制逻辑在acquire()
中实现:
# Semaphore源码
class Semaphore:
...
def acquire(self, blocking=True, timeout=None):
...
rc = False
...
with self._cond:
while self._value == 0: # 当计数器为0时
...
self._cond.wait(timeout) # 进入等待,程序被阻塞
else: # 如果计数器不为0,每次调用acquire后,value自减1
self._value -= 1
rc = True
return rc
通过release()
方法解开一个Lock锁(之前Condition类源码分析时说过,条件锁的wait()
方法会产生一个Lock锁),每解开一个锁,计数器自加1。尽管release()
方法并不一定会做“解锁”的逻辑,因为它解锁是基于条件锁的notify()
方法实现,只有在队列中有锁时才会解锁。
# Semaphore源码
class Semaphore:
...
def release(self):
...
with self._cond:
self._value += 1 # 自加1
self._cond.notify() # 解锁
因为acquire()
可能会阻塞程序,建议对应的release()
最好在其他线程中调用。
Event
Event锁是Python中的事件锁,实例方式:
import threading
event = threading.Event() # 获得一个事件锁
查看源码,会发现Event也是基于条件锁实现的。整个Event的逻辑,围绕self._flag标识符展开。self._flag为False时,表示wait会阻塞线程;如果为True,wait不阻塞线程。通过Event实例对象提供的clear()
方法,可手动调控wait的行为。
# Event源码
class Event:
def __init__(self):
self._cond = Condition(Lock())
self._flag = False
...
event对象有以下几个主要方法:
- is_set:返回self._flag当前值
set:唤醒全部被阻塞的线程,同时将self._flag置为True
# Event源码 def set(self): with self._cond: self._flag = True self._cond.notify_all()
clear:重置self._flag的值(不借助任何条件,将self._flag置为False)
# Event源码 def clear(self): with self._cond: self._flag = False
wait:如果self._flag为False,阻塞当前线程;否则不阻塞
# Event源码 def wait(self, timeout=None): with self._cond: signaled = self._flag if not signaled: # self._flag为false时,阻塞程序 signaled = self._cond.wait(timeout) return signaled
Queue
queue模块中的Queue也是线程安全,这是因为它的成员方法总是基于一个信号mutex。事实上,这个mutex就是Lock的实例对象:
# Queue源码
class Queue:
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
self.mutex = threading.Lock() # Lock锁
...
其他时候,Queue与一个寻常队列行为一样。
还不快抢沙发