Home 协程 & asyncio & 异步编程
Post
Cancel

协程 & asyncio & 异步编程

1.协程

​ 协程(Coroutine)不是计算机提供,程序员人为创造。 ​ 通过一个线程实现代码块相互切换执行。 实现方法:

1
2
3
4
1. greenlet,早期模块
 	2. yield关键字
 	3. asyncio装饰器(py3.4⬆️)
 	4. async,await关键字(py3.5⬆️)【推荐】

1.1 greenlet实现协程

1
pip install greenlet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from greenlet import greenlet

def func1():
  print(1)
  gr2.switch()
  print(2)
  gr2.switch()

def func2():
  print(3)
  gr1.switch()
  print(4)
gr1 = greenlet(func1)
gr2 = greenlet(func2)

gr1.switch()
# 输出 > 1 3 2 4

1.2 yield关键字

1
2
3
4
5
6
7
8
9
10
def func1():
  yield 1
  yield from func2()
  yield 2
def func2():
  yield 3
  yield 4
f1 = func1()
for item in f1:
  print(item)

1.3 asyncio

python3.4及之后的版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
@asyncio.coroutine
def func1():
  print(1)
  yield from asyncio.sleep(2) 	# 遇到IO耗时操作(阻塞),自动切换到tasks中的其他任务
  print(2)

@asyncio.coroutine
def func2():
  print(3)
  yield from asyncio.sleep(2) 	# 遇到IO耗时操作(阻塞),自动切换到tasks中的其他任务
  print(4)

tasks = [
  asyncio.ensure_future(func1()),
  asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

遇到 IO 阻塞自动切换

1.4 async & await 关键字

python3.5 及之后的版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def func1():
  print(1)
  await asyncio.sleep(2) 	# 遇到IO耗时操作(阻塞),自动切换到tasks中的其他任务
  print(2)

async def func2():
  print(3)
  await asyncio.sleep(2) 	# 遇到IO耗时操作(阻塞),自动切换到tasks中的其他任务
  print(4)

tasks = [
  asyncio.ensure_future(func1()),
  asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

2.协程的意义

​ 在一个线程中如果遇到了 IO 等待时间,线程不会等待,利用空闲时间去干点其他事。 ​ 案例: 下载三张图片

  • 普通方式(同步)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
  """ pip install requests"""
  import requests
  
  def download_image(url):
  	print('开始下载。')
  	response = requests.get(url)
  	print('下载完成')
  	file_name = url.rsplit('_')[-1]
  	with open(file_name, mode='wb') as file_object:
  		file_object.write(response.content)
  if __name__ == '__main__':
  	url_list = ['1','2','3']
  	for item in url_list:
  		download_image(item)
  • 协程方式(异步)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    
    """ 下载图片使用第三方模块 aiohttp
    		pip install aiohttp """
    import aiohttp
    import asyncio
      
    async def fetch(session,url):
      print("发送请求: ", url)
      async with session.get(url, verify_ssl=False) as response:
        content = await response.content.read()
        file_name = url.rsplit('_')[-1]
    		with open(file_name, mode='wb') as file_object:
    			file_object.write(response.content)
    async def main():
      async with aiohttp.ClientSession() as session:
        url_list = ['1','2','3']
        tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
        await asyncio.wait(task)
    if "__name__" ==__main__:
      asyncio.run(main())
    

3.异步编程

3.1 事件循环

​ 理解为一个死循环,检测并执行某些代码。

1
2
3
4
5
6
7
8
9
10
# 伪代码
任务列表 = [任务1, 任务2, 。。。]
while True:
	可执行列表,已完成列表 = 任务列表中检查所有的任务,将‘可执行’和‘已完成’的任务返回
	for 就绪任务 in 可执行任务列表:
			执行已就绪任务
	for 已完成任务 in 已完成任务列表:
			在任务列表中删除 已完成任务
	if 任务列表 中的任务都已王成:
			break
1
2
3
4
5
import asyncio 
# 生成或获取一个时间循环
loop = asyncio.get_event_loop()
# 将任务放到 “任务列表”
loop.run_until_complete(任务)

3.2 async 快速上手

​ 协程函数:定义函数的时候 async def 函数名. ​ 协程对象: 执行 协程函数() 得到的协程对象

1
2
3
4
5
# 协程函数
async def func():
  pass
# 协程对象
result = func()

注意: 执行协程函数创建协程对象,函数内部代码不会执行。 如果想要执行内部代码,必须将协程对象交给时间循环来处理。

1
2
3
4
5
6
7
async def func():
  print('hello world!')
result = func()

# loop = asyncio.get_event_loop()
# loop.run_until_complete(restlt)
asyncio.run(result)	# python3.7 以后将两行简化成为一行

3.3 await

​ await + 可等待的对象(协程对象, Future对象, Task对象 => 理解为 IO 等待)

1
2
3
4
5
6
7
import asyncio

async def func():
  print('hello world')
	response = await asyncio.sleep(2)
  print('end ', response)
asyncio.run(func())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio

async def others(ids=0):
  print('start:', ids)
  await asyncio.sleep(2)
  print('end:', ids)
  return '返回'

async def func():
  print('执行协程函数内部代码')
  # response1 = await others()
  # print('r1: ',response1)
  # response2 = await others()
  # print('r2: ',response2)
  tasks = [ others(1), others(2)]
  await asyncio.wait(tasks)
asyncio.run(func())

​ await 就是等待对象的值得到结果之后再继续向下执行。

3.4 Task对象

​ 在事件循环中添加多个任务。 ​ Tasks用于并发协程调度,通过asyncio.create_task(协程对象) 的方式创建Task对象,这样可以让协加入事件循环中等待被调度执行。除了使用 asyncio.create_task() 函数以外,还可以用低层级的 loop.create_task() 或 ensure_future() 函数。建议用asyncio.create_task(),不建议手动实力化 Task 对象。

示例1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
async def func():
  print(1)
  await asyncio.sleep(2)
  print(2)
  return '返回'
async def main():
  task1 = asyncio.create_task(func())
  task2 = asyncio.create_task(func())
  print('main end')
  ret1 = await task1
  ret2 = await task2
  print(ret1, ret2)
asyncio.run(main())

示例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
async def func():
  print(1)
  await asyncio.sleep(2)
  print(2)
  return '返回'
async def main():
  task_list = [
    asyncio.create_task(func(),name='n1'), 
    asyncio.create_task(func(),name='n2')
  ]
  print('main end')
  done, pending = await asyncio.wait(task_list, timeout=3)
  print(done)

asyncio.run(main())

示例3:

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio
async def func():
  print(1)
  await asyncio.sleep(2)
  print(2)
  return '返回'

task_list = [
  func(), 
  func()
]

done, pending = asyncio.run( asyncio.wait(task_list) )

3.5 asyncio.Future对象

​ Task继承Future, Task对象内部 await 结果的处理基于 Future 对象来的。

​ 示例1:

1
2
3
4
5
6
7
8
9
async def main():
  	# 获取当前事件循环
  	loop = asyncio.get_running_loop()
    # 创建一个任务(Future对象), 这个任务什么都不干
    fut = loop.create_future()
    # 等待任务最终结果(Future对象),没有结果则会一直等下去
    await fut
# 这里创建事件循环
asyncio.run(main())

示例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def set_after(fut):
  await asyncio.sleep(2)
  fut.set_result('666')

async def main():
  # 获取当前事件循环
  loop = asyncio.get_running_loop()
  # 创建一个任务(Future对象), 这个任务什么都不干
  fut = loop.create_future()
  # 创建一个任务,绑定了set_after函数,函数会给fut赋值
  await loop.create_task(set_after(fut))
  # 等待任务最终结果(Future对象),没有结果则会一直等下去
  data = await fut
  print(data)

# 这里创建事件循环
asyncio.run(main())

3.6 concurrent.futures.Future对象

​ 使用线程池或进程池异步操作时用到的对象。(线程池/进程池中的线程/进程可以使用协同函数)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor

def func(value):
  time.sleep(1)
  print(value)
	return 1	# 返回给future对象

# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)

# 创建进程池
pool = ProcessPoolExecutor(max_workers=5)

for i in range(10):
  fut = pool.submit(func, i)
  print(fut)

案例:asyncio + 不支持asyncio异步的模块

3.7 异步迭代器

异步迭代器 实现了__aiter__()和__anext__()方法的对象。 异步可叠戴对象 可在async_for 语句中被使用的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio

class Reader():
  """ 定义异步迭代器(同时也是异步可迭代对象) """
  def __init__(self):
    self.count = 0
  async def readline(self):
    self.count += 1
    if self.count == 100:
      return None
   	return self.count
  
  def __aiter__(self):
    return self
  
  async def __anext__(self):
    val = wait self.readline()
    if val == None:
      raise StopAsyncIteration
    return val

async def func():
  obj = Reader()
  async for item in obs:
  	print(item)

asyncio.run(func())

3.8 异步上下文管理器

1
2
3
4
5
import asyncio

class AsyncContextManager:
  def __init__(self):
    self.conn = conn
This post is licensed under CC BY 4.0 by the author.