本文最后更新于:2024年5月7日 下午
异步和高并发在一些服务器场景需求很多,从2013年起由 Python 之父 Guido 亲自操刀主持了Tulip(asyncio)项目的开发,使得 Python 具备了优雅的异步编程库。
简介
协程
协程(Coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块相互切换执行。
greenlet,是一个第三方模块,用于实现协程代码(Gevent协程就是基于greenlet实现);
yield,生成器,借助生成器的特点也可以实现协程代码;
asyncio,在 Python3.4 中引入的模块用于编写协程代码;
async & awiat,在 Python3.5 中引入的两个关键字,结合 asyncio 模块可以更方便的编写协程代码。
目前python异步相关的主流技术是通过包含关键字async&await的async模块实现,因此我们重点关注 asyncio 模块。
asyncio
asyncio 是 Python 3.4 版本引入的标准库,直接内置了对异步IO的支持。
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import asyncio@asyncio.coroutine def func1 (): print (1 ) yield from asyncio.sleep(2 ) print (2 )@asyncio.coroutine def func2 (): print (3 ) yield from asyncio.sleep(2 ) print (4 ) tasks = [ asyncio.ensure_future( func1() ), asyncio.ensure_future( func2() ) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) -->1 3 2 4
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import asyncioasync def func1 (): print (1 ) await asyncio.sleep(2 ) print (2 )async def func2 (): print (3 ) await asyncio.sleep(2 ) print (4 ) tasks = [ asyncio.ensure_future(func1()), asyncio.ensure_future(func2()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) -->1 3 2 4
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import aiohttpimport asyncioasync def fetch (session, url ): print ("发送请求:" , url) async with session.get(url, verify_ssl=False ) as response: content = await response.content.read() file_name = url.rsplit('_' )[-1 ] with open (file_name, mode='wb' ) as file_object: file_object.write(content)async def main (): async with aiohttp.ClientSession() as session: url_list = [ 'https://www.1.jpg' , 'https://www.2.jpg' , 'https://www.3.jpg' ] tasks = [asyncio.create_task(fetch(session, url)) for url in url_list] await asyncio.wait(tasks)if __name__ == '__main__' : asyncio.run(main())
输出 :一次发送三个下载请求,同时下载,假如每次下载花费1s,完成任务仅需要1s 左右
async 关键字
async & await
关键字在 Python 3.5 版本中正式引入,代替了asyncio.coroutine
装饰器,基于他编写的协程代码其实就是上一示例的加强版,让代码可以更加简便可读。
协程函数:定义函数时候由async关键字装饰的函数 async def 函数名
(定义协程)
协程对象:执行协程函数得到的协程对象。
1 2 3 4 5 async def func (): pass result = func()
注意 :执行协程函数只会创建协程对象,函数内部代码不会执行。如果想要运行协程函数内部代码,必须要将协程对象交给事件循环来处理。
1 2 3 4 5 6 7 8 9 10 11 import asyncio async def func (): print ("执行协程函数内部代码!" ) result = func() asyncio.run( result )
协程能做的事情
等待一个 future 结束
等待另一个协程(产生一个结果,或引发一个异常)
产生一个结果给正在等它的协程
引发一个异常给正在等它的协程
await 关键字
await + 可等待的对象(协程对象、Future、Task对象 -> IO等待),遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import asyncioasync def others (): print ("start" ) await asyncio.sleep(2 ) print ('end' ) return '返回值' async def func (): print ("执行协程函数内部代码" ) response = await others() print ("IO请求结束,结果为:" , response) asyncio.run( func() )
task 对象
Task 对象的作用是在事件循环中添加多个任务,用于并发调度协程,通过asyncio.create_task(协程对象)
的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 async def module_a (): print ("start module_a" ) await asyncio.sleep(2 ) print ('end module_a' ) return 'module_a 完成' async def module_b (): print ("start module_b" ) await asyncio.sleep(1 ) print ('end module_b' ) return 'module_b 完成' task_list = [ module_a(), module_b(), ] done,pending = asyncio.run( asyncio.wait(task_list) )print (done)
future 回调
假如协程是一个 IO 的读操作,等它读完数据后,我们希望得到通知,以便下一步数据的处理。这一需求可以通过往 future 添加回调来实现。
1 2 3 4 5 6 7 def done_callback (futu ): print ('Done' ) futu = asyncio.ensure_future(do_some_work(3 )) futu.add_done_callback(done_callback) loop.run_until_complete(futu)
多个协程
实际项目中,往往有多个协程,同时在一个 loop 里运行。为了把多个协程交给 loop,需要借助 asyncio.gather
函数。
1 loop.run_until_complete(asyncio.gather(do_some_work(1 ), do_some_work(3 )))
官方文档:
1 2 gather(*coros_or_futures, loop=None , return_exceptions=False ) Return 各协程协成的合并结果
或者先把协程存在列表里:
1 2 coros = [do_some_work(1 ), do_some_work(3 )] loop.run_until_complete(asyncio.gather(*coros))
也可以传 futures 给它:
1 2 3 4 5 6 7 8 9 futus = [asyncio.ensure_future(do_some_work(1 )), asyncio.ensure_future(do_some_work(3 ))] loop.run_until_complete(asyncio.gather(*futus)) 如果进程中已经有了loop, 则可以直接等待这几个异步的结果 results = await asyncio.gather(*futus) 返回结果results是一个列表,每个元素是每个异步的返回内容print (str (results))
gather 起聚合的作用,把多个 futures 包装成单个 future,因为 loop.run_until_complete 只接受单个 future。
Close Loop
以上示例都没有调用 loop.close,好像也没有什么问题。所以到底要不要调 loop.close 呢?
简单来说,loop 只要不关闭,就还可以再运行。:
1 2 3 loop .run_until_complete(do_some_work(loop , 1 ))loop .run_until_complete(do_some_work(loop , 3 ))loop .close ()
但是如果关闭了,就不能再运行了:
1 2 3 loop .run_until_complete(do_some_work(loop , 1 ))loop .close()loop .run_until_complete(do_some_work(loop , 3 ))
建议调用 loop.close,以彻底清理 loop 对象防止误用。
多进程+asyncio
由于python本身只能单线程,所以所谓的线程是通过线程锁实现的。现在必须要通过多进程实现更多的并发。
现在demo实现了使用多进程,每个进程都有一个asyncio
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 import asyncioimport threadingimport multiprocessingfrom multiprocessing import Queue ,Pool,Processimport osasync def hello (name ): print ('hello {} {}**********{}' .format (name,os.getpid(),threading.current_thread())) await asyncio.sleep(1 ) print ('end:{} {}' .format (name,os.getpid()))def process_start (*namelist ): tasks=[] loop=asyncio.get_event_loop() for name in namelist: tasks.append(asyncio.ensure_future(hello(name))) loop.run_until_complete(asyncio.wait(tasks))def task_start (namelist ): i=0 lst=[] flag=10 while namelist: i+=1 l=namelist.pop() lst.append(l) if i==flag: p=Process(target=process_start,args=lst) p.start() lst=[] i=0 if namelist!=[]: p=Process(target=process_start,args=lst) p.start() if __name__=='__main__' : namelist=list ('0123456789' *10 ) print (namelist) task_start(namelist)
gather 起聚合的作用,把多个 futures 包装成单个 future,因为 loop.run_until_complete 只接受单个 future。
参考资料
文章链接:
https://www.zywvvd.com/notes/coding/python/asyncio/asyncio/