python笔记:python中实现异步 实现异步最经典的方法是起一个线程,然后调用回调函数。在python中的yield关键字,可以简单的切换代码的上下文。这为优雅的实现异步提供了可能。 ## 系统协程处理 在python中,也能使用协程来进行任务的处理。由于python不能利用多核优势,协程在某种程度上比线程的效率更高。然而,在协程中,任务不能是阻塞的。因为协程的任务不能并行。阻塞一个任务将阻塞住整个流程。 ``` import time import asyncio async def money_counter(): time.sleep(5) return 1000 async def guest(username): print("guest {} ask for money".format(username)) money = await money_counter() print("get money {}".format(money)) loop = asyncio.get_event_loop() tasks = [ asyncio.ensure_future(guest("yzh")), asyncio.ensure_future(guest("zhh"))] loop.run_until_complete(asyncio.gather(*tasks)) loop.close() print("continue") input() # 很遗憾,结果是这样的: # guest yzh ask for money # get money 1000 # guest zhh ask for money # get money 1000 # continue ``` 上面的示例中由于使用的是sleep,并不能节省时间。但是如果换成IO操作(asyncio.open_connection),则可以提升性能。 ###那是不是无解了? 当然不是 我们要用Loop的函数处理ThreadPoolExecutor对象,让它能把future的result传回await。 在Py的异步中起线程调用阻塞函数通常没有什么意义。 看代码吧: ``` import time import asyncio from concurrent import futures thread_pool = futures.ThreadPoolExecutor(4) def money_counter(): time.sleep(2) return 1000 async def guest(username): print("guest {} ask for money".format(username)) # money = await money_counter() money = await loop.run_in_executor(thread_pool, money_counter) print("get money {}".format(money)) loop = asyncio.get_event_loop() tasks = [ asyncio.ensure_future(guest("yzh")), asyncio.ensure_future(guest("zhh"))] loop.run_until_complete(asyncio.gather(*tasks)) loop.close() print("continue") input() ``` ## 包装一个callback的异步函数 如何把一个使用callback的函数包装成能使用await语法的乖宝宝呢? 答案是,使用future来进行封装。下面有一个小例子。 ``` import asyncio import time from multiprocessing.dummy import Pool as ThreadPool loop = asyncio.get_event_loop() tasks = [] def tasks_run(): while True: if tasks: time.sleep(1) temp_task = tasks.pop(0) loop.call_soon_threadsafe(temp_task) # 在其它线程调用callback需要使用这个函数 time.sleep(1) p = ThreadPool(2) p.apply_async(tasks_run) def asyncfn(clb): tasks.append(clb) def wraper(): myfuture = asyncio.Future() print("clb0 {}".format(myfuture)) def clb(): print("clb2 {}".format(myfuture)) myfuture.set_result("hello") asyncfn(clb) return myfuture async def main1(): data = await wraper() print("data is {}".format(data)) loop.run_until_complete(main1()) loop.close() ``` ##如果让我来实现 如果让我来实现单线程异步框架,我要怎么做呢? 其实很简单,所谓异步,一定要有调度,要能并行。要并行就一定不能阻塞,要有多线程,或者调用其它的异步接口(比如IO,数据库)。 最简单的方式如下: ``` loop = Loop() loop.add(task1) loop.add(task2) def task1(): dosth_before() result = yield dosth_async1() return result def task2(): dosth_before() result = yield dosth_async2() return result ``` loop中,循环这两个任务,首先执行task1,当执行到`result = yield dosth_async1()`的时候,`yield dosth_async1()`执行(此时还没result的事)。这里一定不能阻塞。所以,它立即就返回了`dosth_async1()`对象。 重点来了,此时loop不要去管task1了,马上起动task2,执行到`yield dosth_async2()`后,又跳出了task2,此时,所有的任务都循环了一遍。该执行第二遍loop了。 在第二次循环中,如果`dosth_async1()`已经有结果了。我们点艹task1(用task1.send(result)),让它继续执行,task1得到了result,该干嘛干嘛。执行完毕后,轮到了task2,故技重施,如果`dosth_async2`有结果了,那就点艹task2。 整个loop执行完毕。 来自 大脸猪 写于 2016-10-30 11:34 -- 更新于2020-10-19 13:06 -- 3 条评论