1.协程
协程(Coroutine)不是计算机提供,程序员人为创造。 通过一个线程实现代码块相互切换执行。 实现方法:
1
2
3
4
1. greenlet,早期模块
2. yield关键字
3. asyncio装饰器(py3.4⬆️)
4. async,await关键字(py3.5⬆️)【推荐】
1.1 greenlet实现协程
1
pip install greenlet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from greenlet import greenlet
def func1():
print(1)
gr2.switch()
print(2)
gr2.switch()
def func2():
print(3)
gr1.switch()
print(4)
gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch()
# 输出 > 1 3 2 4
1.2 yield关键字
1
2
3
4
5
6
7
8
9
10
def func1():
yield 1
yield from func2()
yield 2
def func2():
yield 3
yield 4
f1 = func1()
for item in f1:
print(item)
1.3 asyncio
python3.4及之后的版本。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
@asyncio.coroutine
def func1():
print(1)
yield from asyncio.sleep(2) # 遇到IO耗时操作(阻塞),自动切换到tasks中的其他任务
print(2)
@asyncio.coroutine
def func2():
print(3)
yield from asyncio.sleep(2) # 遇到IO耗时操作(阻塞),自动切换到tasks中的其他任务
print(4)
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
遇到 IO 阻塞自动切换
1.4 async & await 关键字
python3.5 及之后的版本。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
async def func1():
print(1)
await asyncio.sleep(2) # 遇到IO耗时操作(阻塞),自动切换到tasks中的其他任务
print(2)
async def func2():
print(3)
await asyncio.sleep(2) # 遇到IO耗时操作(阻塞),自动切换到tasks中的其他任务
print(4)
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
2.协程的意义
在一个线程中如果遇到了 IO 等待时间,线程不会等待,利用空闲时间去干点其他事。 案例: 下载三张图片
- 普通方式(同步)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
""" pip install requests"""
import requests
def download_image(url):
print('开始下载。')
response = requests.get(url)
print('下载完成')
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(response.content)
if __name__ == '__main__':
url_list = ['1','2','3']
for item in url_list:
download_image(item)
协程方式(异步)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
""" 下载图片使用第三方模块 aiohttp pip install aiohttp """ import aiohttp import asyncio async def fetch(session,url): print("发送请求: ", url) async with session.get(url, verify_ssl=False) as response: content = await response.content.read() file_name = url.rsplit('_')[-1] with open(file_name, mode='wb') as file_object: file_object.write(response.content) async def main(): async with aiohttp.ClientSession() as session: url_list = ['1','2','3'] tasks = [asyncio.create_task(fetch(session, url)) for url in url_list] await asyncio.wait(task) if "__name__" ==__main__: asyncio.run(main())
3.异步编程
3.1 事件循环
理解为一个死循环,检测并执行某些代码。
1
2
3
4
5
6
7
8
9
10
# 伪代码
任务列表 = [任务1, 任务2, 。。。]
while True:
可执行列表,已完成列表 = 任务列表中检查所有的任务,将‘可执行’和‘已完成’的任务返回
for 就绪任务 in 可执行任务列表:
执行已就绪任务
for 已完成任务 in 已完成任务列表:
在任务列表中删除 已完成任务
if 任务列表 中的任务都已王成:
break
1
2
3
4
5
import asyncio
# 生成或获取一个时间循环
loop = asyncio.get_event_loop()
# 将任务放到 “任务列表”
loop.run_until_complete(任务)
3.2 async 快速上手
协程函数:定义函数的时候 async def 函数名. 协程对象: 执行 协程函数() 得到的协程对象
1
2
3
4
5
# 协程函数
async def func():
pass
# 协程对象
result = func()
注意: 执行协程函数创建协程对象,函数内部代码不会执行。 如果想要执行内部代码,必须将协程对象交给时间循环来处理。
1
2
3
4
5
6
7
async def func():
print('hello world!')
result = func()
# loop = asyncio.get_event_loop()
# loop.run_until_complete(restlt)
asyncio.run(result) # python3.7 以后将两行简化成为一行
3.3 await
await + 可等待的对象(协程对象, Future对象, Task对象 => 理解为 IO 等待)
1
2
3
4
5
6
7
import asyncio
async def func():
print('hello world')
response = await asyncio.sleep(2)
print('end ', response)
asyncio.run(func())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio
async def others(ids=0):
print('start:', ids)
await asyncio.sleep(2)
print('end:', ids)
return '返回'
async def func():
print('执行协程函数内部代码')
# response1 = await others()
# print('r1: ',response1)
# response2 = await others()
# print('r2: ',response2)
tasks = [ others(1), others(2)]
await asyncio.wait(tasks)
asyncio.run(func())
await 就是等待对象的值得到结果之后再继续向下执行。
3.4 Task对象
在事件循环中添加多个任务。 Tasks用于并发协程调度,通过asyncio.create_task(协程对象) 的方式创建Task对象,这样可以让协加入事件循环中等待被调度执行。除了使用 asyncio.create_task() 函数以外,还可以用低层级的 loop.create_task() 或 ensure_future() 函数。建议用asyncio.create_task(),不建议手动实力化 Task 对象。
示例1:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return '返回'
async def main():
task1 = asyncio.create_task(func())
task2 = asyncio.create_task(func())
print('main end')
ret1 = await task1
ret2 = await task2
print(ret1, ret2)
asyncio.run(main())
示例2:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return '返回'
async def main():
task_list = [
asyncio.create_task(func(),name='n1'),
asyncio.create_task(func(),name='n2')
]
print('main end')
done, pending = await asyncio.wait(task_list, timeout=3)
print(done)
asyncio.run(main())
示例3:
1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return '返回'
task_list = [
func(),
func()
]
done, pending = asyncio.run( asyncio.wait(task_list) )
3.5 asyncio.Future对象
Task继承Future, Task对象内部 await 结果的处理基于 Future 对象来的。
示例1:
1
2
3
4
5
6
7
8
9
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象), 这个任务什么都不干
fut = loop.create_future()
# 等待任务最终结果(Future对象),没有结果则会一直等下去
await fut
# 这里创建事件循环
asyncio.run(main())
示例2:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
async def set_after(fut):
await asyncio.sleep(2)
fut.set_result('666')
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象), 这个任务什么都不干
fut = loop.create_future()
# 创建一个任务,绑定了set_after函数,函数会给fut赋值
await loop.create_task(set_after(fut))
# 等待任务最终结果(Future对象),没有结果则会一直等下去
data = await fut
print(data)
# 这里创建事件循环
asyncio.run(main())
3.6 concurrent.futures.Future对象
使用线程池或进程池异步操作时用到的对象。(线程池/进程池中的线程/进程可以使用协同函数)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func(value):
time.sleep(1)
print(value)
return 1 # 返回给future对象
# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)
# 创建进程池
pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
fut = pool.submit(func, i)
print(fut)
案例:asyncio + 不支持asyncio异步的模块
3.7 异步迭代器
异步迭代器 实现了__aiter__()和__anext__()方法的对象。 异步可叠戴对象 可在async_for 语句中被使用的对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
class Reader():
""" 定义异步迭代器(同时也是异步可迭代对象) """
def __init__(self):
self.count = 0
async def readline(self):
self.count += 1
if self.count == 100:
return None
return self.count
def __aiter__(self):
return self
async def __anext__(self):
val = wait self.readline()
if val == None:
raise StopAsyncIteration
return val
async def func():
obj = Reader()
async for item in obs:
print(item)
asyncio.run(func())
3.8 异步上下文管理器
1
2
3
4
5
import asyncio
class AsyncContextManager:
def __init__(self):
self.conn = conn