协程(1)

Python 小记 2019-05-21 6593 字 1247 浏览 点赞

起步

协程平时开发都不用,既是增加编程难度,也是确实用不上。因此不如何关注。偶然看到陈祥安老师一篇《5 分钟入门 Python 协程》的文章,逻辑清晰,看后颇觉受益,于是有了这篇笔记。

以下代码基于 Python3.6.1

协程函数

在函数定义最左加上async,表示该函数是协程函数。直接调用一个协程函数,得到的是一个 coroutine 对象,并且,该函数不会被执行

async def foo():
    print("foo")

if __name__ == "__main__":
    coro = foo()
    print(coro)

# 输出:
<coroutine object foo at 0x00855B70>
sys:1: RuntimeWarning: coroutine 'foo' was never awaited

执行一个协程函数的正确姿势:

import asyncio

async def foo():
    print("foo")

if __name__ == "__main__":
    coro = foo()
    loop = asyncio.get_event_loop()  # 定义一个循环事件
    loop.run_until_complete(coro)  # 运行协程函数

# 输出:
foo

可如果我有一堆协程函数需要执行该怎么办,一个一个的塞进run_until_complete()中去?可以呀兄弟,还真能这样,但 Python 当然为我们提供了更好法子,现在暂不说。

链式调用

当我们需要在协程函数里边执行一个协程函数时,并且要求:在我(调用函数)地盘上,你(被调函数)得先执行完了,我才继续往下走。这个时候我们可以在协程函数中使用关键字await。你可以认为 await 就是把非阻塞代码变成了阻塞代码,增强程序员对代码的控制力。

import asyncio

async def bar():
    await asyncio.sleep(1)  # 同 time.sleep(1),但在协程中,asyncio.sleep()才起作用
    print("bar")

async def foo():
    await bar()  # 等待 bar() 执行结束,再向下执行
    print("foo")

if __name__ == "__main__":
    coro = foo()
    loop = asyncio.get_event_loop()  # 定义一个循环事件
    loop.run_until_complete(coro)  # 运行协程函数

# 输出:
bar
foo

普遍来说,我们更习惯“先 A 后 B 再 C”这种思考方式。并且实际应用中,我们也需要有了第一个结果之后,再拿着这个结果去计算第二个结果。所以看上去链式调用是在削弱异步执行,但又不可或缺。

并发执行

如果现在我需要执行一堆协程函数,我该怎么办?这个时候 run_until_complete() 就不好使了,它可以处理一个 coroutine 对象,但不能处理一个队列的 coroutine。也就是说,下面这个做法是错误的:

# 错误示范
...
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    coros = [foo(i) for i in range(10)]
    loop.run_until_complete(coros)

解释器抛异常:

TypeError: A Future, a coroutine or an awaitable is required

Python 解释器说得很明白了:“A ... , a ... an ... is required”(“啊我只要一个 ... ,也可以一个 ... 或者一个 ... ”)。run_until_complete() 方法只能处理一个。所以现在我们应该抽出一个协程主函数,让这个主函数去执行一个队列的协程函数,run_until_complete() 负责启动主函数就好。代码框架大致可以如下这般:

import asyncio

async def foo(i):
    # do something

async def main():
    # do foos 

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    main_async = main()
    loop.run_until_complete(main_async)  # 启动协程主函数

于是问题转变成:如何在协程中执行一堆协程函数?

基于这个问题,Python 官方库提供了两种解决方案,也就是有两个 api 供你选择:

  • wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
  • gather(*coros_or_futures, loop=None, return_exceptions=False)

二者的区别不单是接受参数不同,返回结果的顺序也会有差异。为了直观“找不同”,于是有了下面这段代码。

import time
import random
import asyncio

async def foo(i):
    delay = random.randrange(10)
    await asyncio.sleep(delay)  # 随机睡眠
    return f"foo-{i}  sleep {delay} s"

async def main():
    coros = [foo(i) for i in range(10)]

    start = time.time()

    ## 使用 asyncio.wait()
    # done, pending = await asyncio.wait(coros)
    # for res in done:
    #     print(res.result())

    ## 使用 asyncio.gather()
    reses = await asyncio.gather(*coros)
    for res in reses:
        print(res)

    end = time.time()
    print(f"一共花费时间:{end - start}")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    main_async = main()
    loop.run_until_complete(main_async)

asyncio.wait()方法会返回两个集合,一个 done,表示做完了;一个是 pending,表示没做完。默认情况下我们不必关心后者——因为 pending 总是空。另一个不同则是,done 中的元素是一个个的都是 Task 对象,所以如果需要获得协程函数 return 出来的值,需要调用 result() 方法。asyncio.gather()的运行结果则是:参与执行的、协程函数们的、运行结果。因此上述代码直接遍历 asyncio.gather() 的返回值。

以上仍不是重中之重,我们运行示例代码看看结果吧。先试试 asyncio.gather() ,然后注释掉,再跑跑 asyncio.wait() 。在我环境上运行结果如下:

# asyncio.gather() 执行结果
foo-0  sleep 0 s
foo-1  sleep 1 s
foo-2  sleep 2 s
foo-3  sleep 6 s
foo-4  sleep 4 s
foo-5  sleep 8 s
foo-6  sleep 9 s  # 最大耗费时间
foo-7  sleep 5 s
foo-8  sleep 6 s
foo-9  sleep 6 s
一共花费时间:9.003868818283081
# asyncio.wait() 执行结果
foo-2  sleep 1 s
foo-3  sleep 8 s
foo-4  sleep 1 s
foo-8  sleep 5 s
foo-5  sleep 9 s  # 最大耗费时间
foo-9  sleep 2 s
foo-6  sleep 3 s
foo-0  sleep 1 s
foo-7  sleep 9 s
foo-1  sleep 5 s
一共花费时间:9.002875328063965

可以看到,无论是 asyncio.gather() 还是 asyncio.wait(),它们总的执行时间都依赖于耗费时间最久那个协程。其次则是,前者返回结果是顺序的,后者无序(依据foo-id)。

因此并发执行协程时,可以根据实际需求选择 gather() 还是 wait()。

执行普通函数

协程函数里边可以调用普通函数吗?我想可以试试:

import time
import asyncio

def bar(delay):
    time.sleep(delay)
    print(f"bar sleep {delay}")

async def foo():
    bar(1)  # 第一次调用
    print("foo")
    bar(2)  # 第二次调用
    print("over")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    coro = foo()
    loop.run_until_complete(coro)

# 输出:
bar sleep 1
foo
bar sleep 2
over

019.5.22 补充:

事实上,你应该避免协程函数中调用同步接口,这会导致对整个程序来说,都没法异步执行。

仍拿上述代码为例,现在需要执行 10个 foo(),也就是说,主函数修改为:

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    coros = [foo() for __ in range(10)]
    loop.run_until_complete(asyncio.wait(coros))

请运行修改后的完整代码。你会发现 10个 foo() 整体变成了同步。现在再次对代码做一个调整,把同步函数 bar() 修改为异步 ,然后在 foo() 中 await,即:

async def bar(delay):
    await asyncio.sleep(delay)
    print(f"bar sleep {delay}")

async def foo():
    await bar(1)  # 第一次调用
    print("foo")
    await bar(2)  # 第二次调用
    print("over")

你会发现,尽管第一次调用 bar(1) 与第二次调用 bar(2) 之间仍看起来是阻塞——程序运行到 bar(1) 被阻塞,不再向下执行。但是,对 10个 foo() 来说,它们之间保持着异步关系。


从输出结果来看,普通函数默认以阻塞方式被调用,因此按代码顺序依次输出。但是,事件循环 loop 有一个 call_soon() 方法,可以非阻塞的调用普通函数

def bar(delay):
    time.sleep(delay)
    print(f"bar sleep {delay}")

async def foo(loop):
    loop.call_soon(bar, 2)  # 第一个参数为 函数名,之后是 被调用函数的参数
    print("foo")
    loop.call_soon(bar, 3)
    print("over")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    coro = foo(loop)
    loop.run_until_complete(coro)

# 输出:
foo
over
bar sleep 2
bar sleep 3

然而,被 call_soon() 注册的函数之间,以阻塞方式执行。按照 Python 源码中的注释来说,就是 FIFO(先进先出)。

于是我们将上面的代码再改改,先注册一个沉睡 10s 的 bar,再注册一个沉睡 1s 的 bar。同时设置一个计时器,看看程序整体运行时间。

def bar(delay):
    time.sleep(delay)
    print(f"bar sleep {delay}")

async def foo(loop):
    loop.call_soon(bar, 10)  # sleep 10s
    loop.call_soon(bar, 1)  # sleep 1s

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    coro = foo(loop)

    start = time.time()
    loop.run_until_complete(coro)
    end = time.time()
    print(f"总花费时间:{end - start}")

# 输出:
bar sleep 10
bar sleep 1
总花费时间:11.000614881515503

是的,仍是第一个被注册的函数先返回,总花费时间是二者花费时间之和。



本文由 Guan 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

还不快抢沙发

添加新评论