本文最后更新于:2024年1月14日 晚上

异步和高并发在一些服务器场景需求很多,从2013年起由 Python 之父 Guido 亲自操刀主持了Tulip(asyncio)项目的开发,使得 Python 具备了优雅的异步编程库。

简介

  • 之前介绍了使用 Python Threading 实现异步编程,使用的是多线程的思路,语法不够优雅而且对于简单的工作来说还是太重了。

  • 比较合适的程序运行单位叫做协程,协程能够在IO等待时间就去切换执行其他任务,当IO操作结束后再自动回调,那么就会大大节省资源并提供性能。

协程

协程(Coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块相互切换执行。

  • Asyncio 并不能带来真正的并行(parallelism)。当然,因为 GIL(全局解释器锁)的存在,Python 的多线程也不能带来真正的并行。可交给 asyncio 执行的任务,称为协程(coroutine)。一个协程可以放弃执行,把机会让给其它协程(即 yield from 或 await)。

  • 在Python中有多种方式可以实现协程,例如:

  1. greenlet,是一个第三方模块,用于实现协程代码(Gevent协程就是基于greenlet实现);
  2. yield,生成器,借助生成器的特点也可以实现协程代码;
  3. asyncio,在 Python3.4 中引入的模块用于编写协程代码;
  4. async & awiat,在 Python3.5 中引入的两个关键字,结合 asyncio 模块可以更方便的编写协程代码。

目前python异步相关的主流技术是通过包含关键字async&await的async模块实现,因此我们重点关注 asyncio 模块。

asyncio

asyncio 是 Python 3.4 版本引入的标准库,直接内置了对异步IO的支持。

代码示例
  • 示例 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio

@asyncio.coroutine
def func1():
print(1)
yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(2)

@asyncio.coroutine
def func2():
print(3)
yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(4)

tasks = [
asyncio.ensure_future( func1() ),
asyncio.ensure_future( func2() )
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

-->
1
3
2
4
  • 示例 2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio

async def func1():
print(1)
await asyncio.sleep(2) # 耗时操作
print(2)

async def func2():
print(3)
await asyncio.sleep(2) # 耗时操作
print(4)

tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

-->
1
3
2
4
  • 示例 3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# aiohttp 为支持异步编程的http请求库
import aiohttp
import asyncio

async def fetch(session, url):
print("发送请求:", url)
async with session.get(url, verify_ssl=False) as response:
content = await response.content.read()
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(content)

async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://www.1.jpg',
'https://www.2.jpg',
'https://www.3.jpg'
]
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
await asyncio.wait(tasks)


if __name__ == '__main__':
asyncio.run(main())

输出:一次发送三个下载请求,同时下载,假如每次下载花费1s,完成任务仅需要1s 左右

async 关键字

async & await关键字在 Python 3.5 版本中正式引入,代替了asyncio.coroutine 装饰器,基于他编写的协程代码其实就是上一示例的加强版,让代码可以更加简便可读。

  • 协程函数:定义函数时候由async关键字装饰的函数 async def 函数名 (定义协程)
  • 协程对象:执行协程函数得到的协程对象。
1
2
3
4
5
# 协程函数
async def func():
pass
# 协程对象
result = func()

注意:执行协程函数只会创建协程对象,函数内部代码不会执行。如果想要运行协程函数内部代码,必须要将协程对象交给事件循环来处理。

1
2
3
4
5
6
7
8
9
10
11
import asyncio 
async def func():
print("执行协程函数内部代码!")
result = func()

# 调用方法1:
# loop = asyncio.get_event_loop()
# loop.run_until_complete( result )

# 调用方法2:
asyncio.run( result )
  • 协程能做的事情
    • 等待一个 future 结束
    • 等待另一个协程(产生一个结果,或引发一个异常)
    • 产生一个结果给正在等它的协程
    • 引发一个异常给正在等它的协程

await 关键字

await + 可等待的对象(协程对象、Future、Task对象 -> IO等待),遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

async def others():
print("start")
await asyncio.sleep(2)
print('end')
return '返回值'

async def func():
print("执行协程函数内部代码")
# await等待对象的值得到结果之后再继续向下走
response = await others()
print("IO请求结束,结果为:", response)

asyncio.run( func() )

task 对象

Task 对象的作用是在事件循环中添加多个任务,用于并发调度协程,通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async def module_a():
print("start module_a")
await asyncio.sleep(2) # 模拟 module_a 的io操作
print('end module_a')
return 'module_a 完成'

async def module_b():
print("start module_b")
await asyncio.sleep(1) # 模拟 module_a 的io操作
print('end module_b')
return 'module_b 完成'

task_list = [
module_a(),
module_b(),
]

done,pending = asyncio.run( asyncio.wait(task_list) )
print(done)

future 回调

假如协程是一个 IO 的读操作,等它读完数据后,我们希望得到通知,以便下一步数据的处理。这一需求可以通过往 future 添加回调来实现。

1
2
3
4
5
6
7
def done_callback(futu):
print('Done')

futu = asyncio.ensure_future(do_some_work(3))
futu.add_done_callback(done_callback)

loop.run_until_complete(futu)

多个协程

实际项目中,往往有多个协程,同时在一个 loop 里运行。为了把多个协程交给 loop,需要借助 asyncio.gather 函数。

1
loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3)))

官方文档:

1
2
gather(*coros_or_futures, loop=None, return_exceptions=False)
Return 各协程协成的合并结果

或者先把协程存在列表里:

1
2
coros = [do_some_work(1), do_some_work(3)]
loop.run_until_complete(asyncio.gather(*coros))

也可以传 futures 给它:

1
2
3
4
5
6
7
8
9
futus = [asyncio.ensure_future(do_some_work(1)),
asyncio.ensure_future(do_some_work(3))]

loop.run_until_complete(asyncio.gather(*futus))

如果进程中已经有了loop, 则可以直接等待这几个异步的结果
results = await asyncio.gather(*futus)
返回结果results是一个列表,每个元素是每个异步的返回内容
print(str(results))

gather 起聚合的作用,把多个 futures 包装成单个 future,因为 loop.run_until_complete 只接受单个 future。

Close Loop

以上示例都没有调用 loop.close,好像也没有什么问题。所以到底要不要调 loop.close 呢?
简单来说,loop 只要不关闭,就还可以再运行。:

1
2
3
loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))
loop.close()

但是如果关闭了,就不能再运行了:

1
2
3
loop.run_until_complete(do_some_work(loop, 1))
loop.close()
loop.run_until_complete(do_some_work(loop, 3)) # 此处异常

建议调用 loop.close,以彻底清理 loop 对象防止误用。

多进程+asyncio

由于python本身只能单线程,所以所谓的线程是通过线程锁实现的。现在必须要通过多进程实现更多的并发。

现在demo实现了使用多进程,每个进程都有一个asyncio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import asyncio
import threading
import multiprocessing
from multiprocessing import Queue ,Pool,Process
#import aiohttp
import os

async def hello(name):
print('hello {} {}**********{}'.format(name,os.getpid(),threading.current_thread()))
#await asyncio.sleep(int(name))
await asyncio.sleep(1)
print('end:{} {}'.format(name,os.getpid()))


def process_start(*namelist):
tasks=[]
loop=asyncio.get_event_loop()
for name in namelist:
tasks.append(asyncio.ensure_future(hello(name)))
loop.run_until_complete(asyncio.wait(tasks))

def task_start(namelist):
i=0
lst=[]
flag=10
while namelist:
i+=1
l=namelist.pop()
lst.append(l)
if i==flag:
p=Process(target=process_start,args=lst)
p.start()
#p.join()
lst=[]
i=0
if namelist!=[]:
p=Process(target=process_start,args=lst)
p.start()
#p.join()

if __name__=='__main__':
# 测试使用多个进程来实现函数
namelist=list('0123456789'*10)
print(namelist)
task_start(namelist)

# 测试使用一个异步io来实现全部函数
# loop=asyncio.get_event_loop()
# tasks=[]
# namelist=list('0123456789'*10)
# for i in namelist:
# tasks.append(asyncio.ensure_future(hello(i)))
# loop.run_until_complete(asyncio.wait(tasks))


gather 起聚合的作用,把多个 futures 包装成单个 future,因为 loop.run_until_complete 只接受单个 future。

参考资料



文章链接:
https://www.zywvvd.com/notes/coding/python/asyncio/asyncio/


“觉得不错的话,给点打赏吧 ୧(๑•̀⌄•́๑)૭”

微信二维码

微信支付

支付宝二维码

支付宝支付

Python 异步执行 Asyncio
https://www.zywvvd.com/notes/coding/python/asyncio/asyncio/
作者
Yiwei Zhang
发布于
2023年2月13日
许可协议