Python那些事——异步!数据分析的重中之重!

如果你已经决定要理解 python 的异步部分,欢迎来到我们的asyncio how-to 。
注:哪怕连异步范式的存在都不知道的情况下,你也可以成功地使用 python。但是,如果你对底层运行模式感兴趣的话,asyncio 绝对值得查看。
异步是怎么一回事?
在传统的顺序编程中, 所有发送给解释器的指令会一条条被执行。此类代码的输出容易显现和预测。 但是…
譬如说你有一个脚本向 3 个不同服务器请求数据。 有时,谁知什么原因,发送给其中一个服务器的请求可能意外地执行了很长时间。想象一下从第二个服务器获取数据用了 10 秒钟。在你等待的时候,整个脚本实际上什么也没干。如果你可以写一个脚本可以不去等待第二个请求而是仅仅跳过它,然后开始执行第三个请求,然后回到第二个请求,执行之前离开的位置会怎么样呢。就是这样。你通过切换任务最小化了空转时间。尽管如此,当你需要一个几乎没有 i/o 的简单脚本时,你不想用异步代码。
还有一件重要的事情要提,所有代码在一个线程中运行。所以如果你想让程序的一部分在后台执行同时干一些其他事情,那是不可能的。
准备开始
这是 asyncio 主概念最基本的定义:
协程 — 消费数据的生成器,但是不生成数据。python 2.5 介绍了一种新的语法让发送数据到生成器成为可能。我推荐查阅 david beazley a curious course on coroutines and concurrency 关于协程的详细介绍。
任务 — 协程调度器。如果你观察下面的代码,你会发现它只是让 eventloop 尽快调用它的step ,同时 _step 只是调用协程的下一步。
class task(futures.future): def __init__(self, coro, loop=none): super().__init__(loop=loop) ... self._loop.call_soon(self._step) def _step(self): ... try: ... result = next(self._coro) except stopiteration as exc: self.set_result(exc.value) except baseexception as exc: self.set_exception(exc) raise else: ... self._loop.call_soon(self._step)
事件循环 — 把它想成 asyncio 的中心执行器。
现在我们看一下所有这些如何融为一体。正如我之前提到的,异步代码在一个线程中运行。
从上图可知:
1.消息循环是在线程中执行
2.从队列中取得任务
3.每个任务在协程中执行下一步动作
4.如果在一个协程中调用另一个协程(await ),会触发上下文切换,挂起当前协程,并保存现场环境(变量,状态),然后载入被调用协程
5.如果协程的执行到阻塞部分(阻塞 i/o,sleep),当前协程会挂起,并将控制权返回到线程的消息循环中,然后消息循环继续从队列中执行下一个任务...以此类推
6.队列中的所有任务执行完毕后,消息循环返回第一个任务
异步和同步的代码对比
现在我们实际验证异步模式的切实有效,我会比较两段 python 脚本,这两个脚本除了_sleep _方法外,其余部分完全相同。在第一个脚本里,我会用标准的 time.sleep 方法,在第二个脚本里使用 asyncio.sleep 的异步方法。
这里使用 sleep 是因为它是一个用来展示异步方法如何操作 i/o 的最简单办法。
使用同步 sleep 方法的代码:
import asyncio import time from datetime import datetimeasync def custom_sleep(): print('sleep', datetime.now()) time.sleep(1)async def factorial(name, number): f = 1 for i in range(2, number 1): print('task {}: compute factorial({})'.format(name, i)) await custom_sleep() f *= i print('task {}: factorial({}) is {}n'.format(name, number, f))start = time.time() loop = asyncio.get_event_loop()tasks = [ asyncio.ensure_future(factorial(a, 3)), asyncio.ensure_future(factorial(b, 4)),]loop.run_until_complete(asyncio.wait(tasks)) loop.close()end = time.time() print(total time: {}.format(end - start))
脚本输出:
task a: compute factorial(2) sleep 2017-04-06 13:39:56.207479 task a: compute factorial(3) sleep 2017-04-06 13:39:57.210128 task a: factorial(3) is 6task b: compute factorial(2) sleep 2017-04-06 13:39:58.210778 task b: compute factorial(3) sleep 2017-04-06 13:39:59.212510 task b: compute factorial(4) sleep 2017-04-06 13:40:00.217308 task b: factorial(4) is 24total time: 5.016386032104492
使用异步 sleep 的代码:
import asyncio import time from datetime import datetimeasync def custom_sleep(): print('sleep {}n'.format(datetime.now())) await asyncio.sleep(1)async def factorial(name, number): f = 1 for i in range(2, number 1): print('task {}: compute factorial({})'.format(name, i)) await custom_sleep() f *= i print('task {}: factorial({}) is {}n'.format(name, number, f))start = time.time() loop = asyncio.get_event_loop()tasks = [ asyncio.ensure_future(factorial(a, 3)), asyncio.ensure_future(factorial(b, 4)),]loop.run_until_complete(asyncio.wait(tasks)) loop.close()end = time.time() print(total time: {}.format(end - start))
脚本输出:
task a: compute factorial(2) sleep 2017-04-06 13:44:40.648665task b: compute factorial(2) sleep 2017-04-06 13:44:40.648859task a: compute factorial(3) sleep 2017-04-06 13:44:41.649564task b: compute factorial(3) sleep 2017-04-06 13:44:41.649943task a: factorial(3) is 6task b: compute factorial(4) sleep 2017-04-06 13:44:42.651755task b: factorial(4) is 24total time: 3.008226156234741
从输出可以看到,异步模式的代码执行速度快了大概两秒。当使用异步模式的时候(每次调用 await asyncio.sleep(1) ),进程控制权会返回到主程序的消息循环里,并开始运行队列的其他任务(任务a或者任务b)。
当使用标准的 sleep 方法时,当前线程会挂起等待。什么也不会做。实际上,标准的 sleep 过程中,当前线程也会返回一个 python 的解释器,可以操作现有的其他线程,但这是另一个话题了。
推荐使用异步模式编程的几个理由
很多公司的产品都广泛的使用了异步模式,如 facebook 旗下著名的 react native 和 rocksdb 。像 twitter 每天可以承载 50 亿的用户访问,靠的也是异步模式编程。所以说,通过代码重构,或者改变模式方法,就能让系统工作的更快,为什么不去试一下呢?