2025年5月23日 11:41:34 星期五

异步,同步,阻塞,非阻塞程序的实现

终于用透支生命的方法把这一课学完了。感动。以后不这样了。
实现异步非阻塞是一个大命题,这里只从原理出发。我会慢慢修改这篇文章。
本文将从异步sleep的实现入手,来讲解异步非阻塞程序的原理。

什么是异步,同步,阻塞,非阻塞

在写这篇文章前,我对这四个概念是非常模糊的。

同步,异步

异步同步的差异,在于当线程调用函数的时候,线程获取消息的方式.
如果是同步,线程会等待接受函数的返回值(或者轮循函数结果,直到查出它的返回状态和返回值)。如果是异步,线程不需要做任何处理,在函数执行完毕后会推送通知或者调用回调函数。

同步: 线程 ——我主动来拿结果——> 函数
异步: 线程 <—-你把结果拿给我—— 函数

阻塞,非阻塞

阻塞非阻塞的差异,在于线程调用函数的时候,线程的状态
当线程调用函数,线程就被挂起,在函数结束前什么都干不了。这就是阻塞。
反之,当线程调用函数,线程还能干其它事。这就是非阻塞。此时,函数一般会立即返回状态,而不是等待求值。以免阻塞住线程。

他们没有关系

异步同步和阻塞非阻塞没有什么本质关联。一个讲的是消息方式,一个讲的是线程状态。
线程在同步调用下,也能非阻塞(同步轮循非阻塞函数的状态),在异步下,也能阻塞(调用一个阻塞函数,然后在函数中调用回调,虽然没有什么意义)。

下面,我会慢慢实现一个异步非阻塞的sleep。最后利用Python的特性,将callback调用方式改为yield的伪同步调用。

场景一:同步阻塞

  1. import time
  2. def wait(name):
  3. print(name, " start")
  4. time.sleep(1)
  5. print(name," is over")
  6. wait("yzh")
  7. wait("zhh")

上面的程序执行完毕后,想都不用想,输出如下:

  1. 打印 yzh start
  2. # 等待1s
  3. 打印 yzh is over
  4. 打印 zhh start
  5. # 等待1s
  6. 打印 zhh is over

阻塞的后果

上面的代码,如果调用次数很多,则最后一个人要等待之前所有的人阻塞结束,才能被响应。在web项目中,这是很可怕的。所以我们需要引入非阻塞。非阻塞就是为了让一个响应的操作,不影响另一个响应。否则,当A用户在访问某个耗时巨大的网页时,B用户只能对着白板发呆。

在tornado中,有一个gen.sleep函数。它能让响应神奇的变成:

  1. 打印 yzh start
  2. 打印 zhh start
  3. # 等待1s左右
  4. 打印 yzh is over
  5. 打印 zhh is over

这个异步sleep函数,似乎在单进程下,让每个函数互相不影响,而又在内部停留了1S。
那么,我们该如何实现自己的非阻塞sleep呢。
(tornado的sleep,原理十分复杂。以后再细说。)

场景二:轮循非阻塞

实现非阻塞场景,关键在于函数不能阻塞住当前线程。也就是说,要启用新的线程让系统帮忙调度,或者以自己的方式确保所有任务都能被调度(比如yield切换来切换去)。

使用线程

  1. import time
  2. from multiprocessing.dummy import Pool as ThreadPool
  3. class Status(object):
  4. pass
  5. p = ThreadPool(4)
  6. def my_sleep():
  7. status = Status()
  8. status.status = 0
  9. def _inner():
  10. time.sleep(2)
  11. status.status = 1
  12. p.apply_async(_inner)
  13. return status
  14. def wait(name):
  15. print(name, " start")
  16. yield my_sleep()
  17. print(name, "over")
  18. gen1 = wait("yzh") # wait是一个生成器,保存为gen1
  19. gen2 = wait("zhh")
  20. timer1 = next(gen1)
  21. timer2 = next(gen2)
  22. tasks = []
  23. tasks.append([gen1,timer1])
  24. tasks.append([gen2,timer2])
  25. while tasks:
  26. for task in tasks:
  27. if task[1].status == 1:
  28. try:
  29. next(task[0]) # 状态正确则继续执行父生成器
  30. except StopIteration:
  31. tasks.remove(task)

使用线程没什么好说的,线程会更新状态,当状态更新后,在下次轮循会触发生成器继续执行后面的动作。

不使用线程

  1. import time
  2. def my_sleep(now):
  3. """
  4. 这个函数本来就是一个生成器。所以可以在单线程下切换运行状态。
  5. """
  6. while time.time() < now + 2:
  7. yield
  8. def wait(name):
  9. print(name, " start")
  10. now = time.time()
  11. yield my_sleep(now)
  12. print(name, " is over")
  13. gen1 = wait("yzh") # wait是一个生成器,保存为gen1
  14. gen2 = wait("zhh")
  15. timer1 = next(gen1) # 当执行gen的时候,它会Yield一个timer生成器。
  16. # timer是生成器,这是我们可以在单线程下切换timer上下文的关键。
  17. timer2 = next(gen2)
  18. tasks = []
  19. tasks.append([gen1,timer1])
  20. tasks.append([gen2,timer2])
  21. while tasks:
  22. for task in tasks:
  23. try:
  24. next(task[1]) # 不断的轮循每个生成器关连的timer。直到timer执行完毕,引发异常。
  25. except StopIteration:
  26. try:
  27. next(task[0]) # 当timer异常,我们可以知道它的父生成器要继续执行了。
  28. # 对应的yield my_sleep(now) 执行完毕。可以继续下一步,所以我们对父生成器发送继续执行指令
  29. except StopIteration:
  30. tasks.remove(task) # 当父生成器也执行完毕,整个任务终止。把当前任务移除任务队列。

上面的代码中,在一个while循环中轮循timer的状态。由于timer存在于wait中。所以需要把timer“提取”出来。
又因为,没有使用多线程,所以必须自己实现一些简单的调度处理,也就是说,要能自由的切换各个timer的上下文。在单线程下可以使用yield。

  1. 把timer 从生存器gen yield返回出来
  2. 轮循timer的状态(实质是切换进出timer,看它有没有引发StopIteration异常)
  3. 如果发生了异常说明gen应该执行下一步操作了。next(gen)
  4. 如果gen也发生了StopIteration异常,说明这个任务完毕。

场景三:异步非阻塞

实现异步的经典方式是使用回调,实现非阻塞的经典方式是使用线程。
所以,代码就呼之欲出了。

  1. import time
  2. from multiprocessing.dummy import Pool as ThreadPool
  3. def my_sleep(callback, *callback_args):
  4. def _inner():
  5. time.sleep(2)
  6. callback(*callback_args)
  7. p.apply_async(_inner)
  8. def wait(name):
  9. print(name, " start")
  10. my_sleep(wait_callback,name)
  11. def wait_callback(name):
  12. print(name, " is over")
  13. p = ThreadPool(4)
  14. wait("yzh")
  15. wait("zhh")
  16. p.close()
  17. p.join()

在wait中,唤起my_sleep函数。由于my_sleep在新线程中执行,所以它不会阻塞住主线程。
在my_sleep结束时,调用回调函数。使得任务继续进行。
也就是说,在每个要处理阻塞的地方,都人为的把函数切成三个部分:

  1. 执行函数前半部
  2. 执行新线程,把后半部作为回调函数传入。函数退出。
  3. 等待后半部在线程完毕后被执行。

场景四:终极,伪同步实现异步非阻塞

这个以后再写。先吃饭。

来自 大脸猪 写于 2017-04-26 14:18 -- 更新于2020-10-19 13:06 -- 2 条评论

2条评论

字体
字号


评论:

● 来自 一个路过的学习者 写于 2022-02-07 18:34 回复

吃完了吗

● 来自 大猪 写于 2022-02-10 19:27 回复

@一个路过的学习者 哈哈。吃完了后,几年没写python,忘了。