起步
协程平时开发都不用,既是增加编程难度,也是确实用不上。因此不如何关注。偶然看到陈祥安老师一篇《5 分钟入门 Python 协程》的文章,逻辑清晰,看后颇觉受益,于是有了这篇笔记。
以下代码基于 Python3.6.1
协程函数
在函数定义最左加上async
,表示该函数是协程函数。直接调用一个协程函数,得到的是一个 coroutine 对象,并且,该函数不会被执行。
async def foo():
print("foo")
if __name__ == "__main__":
coro = foo()
print(coro)
# 输出:
<coroutine object foo at 0x00855B70>
sys:1: RuntimeWarning: coroutine 'foo' was never awaited
执行一个协程函数的正确姿势:
import asyncio
async def foo():
print("foo")
if __name__ == "__main__":
coro = foo()
loop = asyncio.get_event_loop() # 定义一个循环事件
loop.run_until_complete(coro) # 运行协程函数
# 输出:
foo
可如果我有一堆协程函数需要执行该怎么办,一个一个的塞进run_until_complete()中去?可以呀兄弟,还真能这样,但 Python 当然为我们提供了更好法子,现在暂不说。
链式调用
当我们需要在协程函数里边执行一个协程函数时,并且要求:在我(调用函数)地盘上,你(被调函数)得先执行完了,我才继续往下走。这个时候我们可以在协程函数中使用关键字await
。你可以认为 await 就是把非阻塞代码变成了阻塞代码,增强程序员对代码的控制力。
import asyncio
async def bar():
await asyncio.sleep(1) # 同 time.sleep(1),但在协程中,asyncio.sleep()才起作用
print("bar")
async def foo():
await bar() # 等待 bar() 执行结束,再向下执行
print("foo")
if __name__ == "__main__":
coro = foo()
loop = asyncio.get_event_loop() # 定义一个循环事件
loop.run_until_complete(coro) # 运行协程函数
# 输出:
bar
foo
普遍来说,我们更习惯“先 A 后 B 再 C”这种思考方式。并且实际应用中,我们也需要有了第一个结果之后,再拿着这个结果去计算第二个结果。所以看上去链式调用是在削弱异步执行,但又不可或缺。
并发执行
如果现在我需要执行一堆协程函数,我该怎么办?这个时候 run_until_complete() 就不好使了,它可以处理一个 coroutine 对象,但不能处理一个队列的 coroutine。也就是说,下面这个做法是错误的:
# 错误示范
...
if __name__ == "__main__":
loop = asyncio.get_event_loop()
coros = [foo(i) for i in range(10)]
loop.run_until_complete(coros)
解释器抛异常:
TypeError: A Future, a coroutine or an awaitable is required
Python 解释器说得很明白了:“A ... , a ... an ... is required”(“啊我只要一个 ... ,也可以一个 ... 或者一个 ... ”)。run_until_complete() 方法只能处理一个。所以现在我们应该抽出一个协程主函数,让这个主函数去执行一个队列的协程函数,run_until_complete() 负责启动主函数就好。代码框架大致可以如下这般:
import asyncio
async def foo(i):
# do something
async def main():
# do foos
if __name__ == "__main__":
loop = asyncio.get_event_loop()
main_async = main()
loop.run_until_complete(main_async) # 启动协程主函数
于是问题转变成:如何在协程中执行一堆协程函数?
基于这个问题,Python 官方库提供了两种解决方案,也就是有两个 api 供你选择:
wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
gather(*coros_or_futures, loop=None, return_exceptions=False)
二者的区别不单是接受参数不同,返回结果的顺序也会有差异。为了直观“找不同”,于是有了下面这段代码。
import time
import random
import asyncio
async def foo(i):
delay = random.randrange(10)
await asyncio.sleep(delay) # 随机睡眠
return f"foo-{i} sleep {delay} s"
async def main():
coros = [foo(i) for i in range(10)]
start = time.time()
## 使用 asyncio.wait()
# done, pending = await asyncio.wait(coros)
# for res in done:
# print(res.result())
## 使用 asyncio.gather()
reses = await asyncio.gather(*coros)
for res in reses:
print(res)
end = time.time()
print(f"一共花费时间:{end - start}")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
main_async = main()
loop.run_until_complete(main_async)
asyncio.wait()
方法会返回两个集合,一个 done,表示做完了;一个是 pending,表示没做完。默认情况下我们不必关心后者——因为 pending 总是空。另一个不同则是,done 中的元素是一个个的都是 Task 对象,所以如果需要获得协程函数 return 出来的值,需要调用 result() 方法。asyncio.gather()
的运行结果则是:参与执行的、协程函数们的、运行结果。因此上述代码直接遍历 asyncio.gather() 的返回值。
以上仍不是重中之重,我们运行示例代码看看结果吧。先试试 asyncio.gather() ,然后注释掉,再跑跑 asyncio.wait() 。在我环境上运行结果如下:
# asyncio.gather() 执行结果
foo-0 sleep 0 s
foo-1 sleep 1 s
foo-2 sleep 2 s
foo-3 sleep 6 s
foo-4 sleep 4 s
foo-5 sleep 8 s
foo-6 sleep 9 s # 最大耗费时间
foo-7 sleep 5 s
foo-8 sleep 6 s
foo-9 sleep 6 s
一共花费时间:9.003868818283081
# asyncio.wait() 执行结果
foo-2 sleep 1 s
foo-3 sleep 8 s
foo-4 sleep 1 s
foo-8 sleep 5 s
foo-5 sleep 9 s # 最大耗费时间
foo-9 sleep 2 s
foo-6 sleep 3 s
foo-0 sleep 1 s
foo-7 sleep 9 s
foo-1 sleep 5 s
一共花费时间:9.002875328063965
可以看到,无论是 asyncio.gather() 还是 asyncio.wait(),它们总的执行时间都依赖于耗费时间最久那个协程。其次则是,前者返回结果是顺序的,后者无序(依据foo-id)。
因此并发执行协程时,可以根据实际需求选择 gather() 还是 wait()。
执行普通函数
协程函数里边可以调用普通函数吗?我想可以试试:
import time
import asyncio
def bar(delay):
time.sleep(delay)
print(f"bar sleep {delay}")
async def foo():
bar(1) # 第一次调用
print("foo")
bar(2) # 第二次调用
print("over")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
coro = foo()
loop.run_until_complete(coro)
# 输出:
bar sleep 1
foo
bar sleep 2
over
019.5.22 补充:
事实上,你应该避免协程函数中调用同步接口,这会导致对整个程序来说,都没法异步执行。
仍拿上述代码为例,现在需要执行 10个 foo(),也就是说,主函数修改为:
if __name__ == "__main__":
loop = asyncio.get_event_loop()
coros = [foo() for __ in range(10)]
loop.run_until_complete(asyncio.wait(coros))
请运行修改后的完整代码。你会发现 10个 foo() 整体变成了同步。现在再次对代码做一个调整,把同步函数 bar() 修改为异步 ,然后在 foo() 中 await,即:
async def bar(delay):
await asyncio.sleep(delay)
print(f"bar sleep {delay}")
async def foo():
await bar(1) # 第一次调用
print("foo")
await bar(2) # 第二次调用
print("over")
你会发现,尽管第一次调用 bar(1) 与第二次调用 bar(2) 之间仍看起来是阻塞——程序运行到 bar(1) 被阻塞,不再向下执行。但是,对 10个 foo() 来说,它们之间保持着异步关系。
从输出结果来看,普通函数默认以阻塞方式被调用,因此按代码顺序依次输出。但是,事件循环 loop 有一个 call_soon() 方法,可以非阻塞的调用普通函数。
def bar(delay):
time.sleep(delay)
print(f"bar sleep {delay}")
async def foo(loop):
loop.call_soon(bar, 2) # 第一个参数为 函数名,之后是 被调用函数的参数
print("foo")
loop.call_soon(bar, 3)
print("over")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
coro = foo(loop)
loop.run_until_complete(coro)
# 输出:
foo
over
bar sleep 2
bar sleep 3
然而,被 call_soon()
注册的函数之间,以阻塞方式执行。按照 Python 源码中的注释来说,就是 FIFO(先进先出)。
于是我们将上面的代码再改改,先注册一个沉睡 10s 的 bar,再注册一个沉睡 1s 的 bar。同时设置一个计时器,看看程序整体运行时间。
def bar(delay):
time.sleep(delay)
print(f"bar sleep {delay}")
async def foo(loop):
loop.call_soon(bar, 10) # sleep 10s
loop.call_soon(bar, 1) # sleep 1s
if __name__ == "__main__":
loop = asyncio.get_event_loop()
coro = foo(loop)
start = time.time()
loop.run_until_complete(coro)
end = time.time()
print(f"总花费时间:{end - start}")
# 输出:
bar sleep 10
bar sleep 1
总花费时间:11.000614881515503
是的,仍是第一个被注册的函数先返回,总花费时间是二者花费时间之和。
还不快抢沙发