异步,同步,阻塞,非阻塞程序的实现
终于用透支生命的方法把这一课学完了。感动。以后不这样了。
实现异步非阻塞是一个大命题,这里只从原理出发。我会慢慢修改这篇文章。
本文将从异步sleep的实现入手,来讲解异步非阻塞程序的原理。
什么是异步,同步,阻塞,非阻塞
在写这篇文章前,我对这四个概念是非常模糊的。
同步,异步
异步同步的差异,在于当线程调用函数的时候,线程获取消息的方式.
如果是同步,线程会等待接受函数的返回值(或者轮循函数结果,直到查出它的返回状态和返回值)。如果是异步,线程不需要做任何处理,在函数执行完毕后会推送通知或者调用回调函数。
同步: 线程 ——我主动来拿结果——> 函数
异步: 线程 <—-你把结果拿给我—— 函数
阻塞,非阻塞
阻塞非阻塞的差异,在于线程调用函数的时候,线程的状态。
当线程调用函数,线程就被挂起,在函数结束前什么都干不了。这就是阻塞。
反之,当线程调用函数,线程还能干其它事。这就是非阻塞。此时,函数一般会立即返回状态,而不是等待求值。以免阻塞住线程。
他们没有关系
异步同步和阻塞非阻塞没有什么本质关联。一个讲的是消息方式,一个讲的是线程状态。
线程在同步调用下,也能非阻塞(同步轮循非阻塞函数的状态),在异步下,也能阻塞(调用一个阻塞函数,然后在函数中调用回调,虽然没有什么意义)。
下面,我会慢慢实现一个异步非阻塞的sleep。最后利用Python的特性,将callback调用方式改为yield的伪同步调用。
场景一:同步阻塞
import time
def wait(name):
print(name, " start")
time.sleep(1)
print(name," is over")
wait("yzh")
wait("zhh")
上面的程序执行完毕后,想都不用想,输出如下:
打印 yzh start
# 等待1s
打印 yzh is over
打印 zhh start
# 等待1s
打印 zhh is over
阻塞的后果
上面的代码,如果调用次数很多,则最后一个人要等待之前所有的人阻塞结束,才能被响应。在web项目中,这是很可怕的。所以我们需要引入非阻塞。非阻塞就是为了让一个响应的操作,不影响另一个响应。否则,当A用户在访问某个耗时巨大的网页时,B用户只能对着白板发呆。
在tornado中,有一个gen.sleep函数。它能让响应神奇的变成:
打印 yzh start
打印 zhh start
# 等待1s左右
打印 yzh is over
打印 zhh is over
这个异步sleep函数,似乎在单进程下,让每个函数互相不影响,而又在内部停留了1S。
那么,我们该如何实现自己的非阻塞sleep呢。
(tornado的sleep,原理十分复杂。以后再细说。)
场景二:轮循非阻塞
实现非阻塞场景,关键在于函数不能阻塞住当前线程。也就是说,要启用新的线程让系统帮忙调度,或者以自己的方式确保所有任务都能被调度(比如yield切换来切换去)。
使用线程
import time
from multiprocessing.dummy import Pool as ThreadPool
class Status(object):
pass
p = ThreadPool(4)
def my_sleep():
status = Status()
status.status = 0
def _inner():
time.sleep(2)
status.status = 1
p.apply_async(_inner)
return status
def wait(name):
print(name, " start")
yield my_sleep()
print(name, "over")
gen1 = wait("yzh") # wait是一个生成器,保存为gen1
gen2 = wait("zhh")
timer1 = next(gen1)
timer2 = next(gen2)
tasks = []
tasks.append([gen1,timer1])
tasks.append([gen2,timer2])
while tasks:
for task in tasks:
if task[1].status == 1:
try:
next(task[0]) # 状态正确则继续执行父生成器
except StopIteration:
tasks.remove(task)
使用线程没什么好说的,线程会更新状态,当状态更新后,在下次轮循会触发生成器继续执行后面的动作。
不使用线程
import time
def my_sleep(now):
"""
这个函数本来就是一个生成器。所以可以在单线程下切换运行状态。
"""
while time.time() < now + 2:
yield
def wait(name):
print(name, " start")
now = time.time()
yield my_sleep(now)
print(name, " is over")
gen1 = wait("yzh") # wait是一个生成器,保存为gen1
gen2 = wait("zhh")
timer1 = next(gen1) # 当执行gen的时候,它会Yield一个timer生成器。
# timer是生成器,这是我们可以在单线程下切换timer上下文的关键。
timer2 = next(gen2)
tasks = []
tasks.append([gen1,timer1])
tasks.append([gen2,timer2])
while tasks:
for task in tasks:
try:
next(task[1]) # 不断的轮循每个生成器关连的timer。直到timer执行完毕,引发异常。
except StopIteration:
try:
next(task[0]) # 当timer异常,我们可以知道它的父生成器要继续执行了。
# 对应的yield my_sleep(now) 执行完毕。可以继续下一步,所以我们对父生成器发送继续执行指令
except StopIteration:
tasks.remove(task) # 当父生成器也执行完毕,整个任务终止。把当前任务移除任务队列。
上面的代码中,在一个while循环中轮循timer的状态。由于timer存在于wait中。所以需要把timer“提取”出来。
又因为,没有使用多线程,所以必须自己实现一些简单的调度处理,也就是说,要能自由的切换各个timer的上下文。在单线程下可以使用yield。
- 把timer 从生存器gen yield返回出来
- 轮循timer的状态(实质是切换进出timer,看它有没有引发StopIteration异常)
- 如果发生了异常说明gen应该执行下一步操作了。next(gen)
- 如果gen也发生了StopIteration异常,说明这个任务完毕。
场景三:异步非阻塞
实现异步的经典方式是使用回调,实现非阻塞的经典方式是使用线程。
所以,代码就呼之欲出了。
import time
from multiprocessing.dummy import Pool as ThreadPool
def my_sleep(callback, *callback_args):
def _inner():
time.sleep(2)
callback(*callback_args)
p.apply_async(_inner)
def wait(name):
print(name, " start")
my_sleep(wait_callback,name)
def wait_callback(name):
print(name, " is over")
p = ThreadPool(4)
wait("yzh")
wait("zhh")
p.close()
p.join()
在wait中,唤起my_sleep函数。由于my_sleep在新线程中执行,所以它不会阻塞住主线程。
在my_sleep结束时,调用回调函数。使得任务继续进行。
也就是说,在每个要处理阻塞的地方,都人为的把函数切成三个部分:
- 执行函数前半部
- 执行新线程,把后半部作为回调函数传入。函数退出。
- 等待后半部在线程完毕后被执行。
场景四:终极,伪同步实现异步非阻塞
这个以后再写。先吃饭。
来自 大脸猪 写于 2017-04-26 14:18 -- 更新于2020-10-19 13:06 -- 2 条评论