异步 IO 是 Python 里最容易被误解的东西之一。很多人第一次看到 async def 和 await,会以为它等于“自动变快”或者“多线程平替”。其实不是。
异步 IO 的核心不是让 CPU 同时执行多段 Python 代码,而是让程序在等待网络、磁盘、数据库、HTTP 请求时,把控制权交还给事件循环,让其他任务先跑。它解决的是“等”的浪费,不是“算”的瓶颈。
一句话:
异步 IO 适合大量等待,不适合大量计算。
1. 先分清三种并发
Python 里常见三种并发方式:
| 方式 | 适合场景 | 关键词 |
|---|---|---|
| 多线程 | 阻塞 IO、旧同步库 | threading、ThreadPoolExecutor |
| 多进程 | CPU 密集计算 | multiprocessing、ProcessPoolExecutor |
| 异步 IO | 高并发网络 IO | asyncio、async/await |
举例:
- 爬 1000 个网页:异步 IO 很合适。
- 调 1000 次 HTTP API:异步 IO 很合适。
- 等数据库返回:异步 IO 很合适。
- 计算 1000 张图片的特征:多进程更合适。
- 解压大文件、跑模型推理:异步 IO 不会神奇提速。
异步 IO 的优势来自等待期间切换任务,而不是突破 CPU 限制。
2. 同步代码为什么慢
先看同步版本:
import time
def fetch(name: str, delay: int):
print(f"{name} start")
time.sleep(delay)
print(f"{name} done")
return name
def main():
fetch("a", 2)
fetch("b", 2)
fetch("c", 2)
main()
总耗时大约 6 秒。
问题不在于 CPU 忙,而在于程序每次 sleep 都停在那里。真实网络请求也是类似的:发出请求后,绝大多数时间都在等远端响应。
3. 异步版本为什么快
异步版本:
import asyncio
async def fetch(name: str, delay: int):
print(f"{name} start")
await asyncio.sleep(delay)
print(f"{name} done")
return name
async def main():
results = await asyncio.gather(
fetch("a", 2),
fetch("b", 2),
fetch("c", 2),
)
print(results)
asyncio.run(main())
总耗时大约 2 秒。
关键点:
async def定义协程函数。- 调用协程函数不会立刻执行完,而是得到一个 coroutine 对象。
await表示这里可能要等待。- 等待期间,事件循环可以调度其他任务。
asyncio.run()创建并运行事件循环。asyncio.gather()并发等待多个任务完成。
这不是三段代码真的同时占用 CPU,而是它们都在等待时互相让路。
4. 事件循环是什么
事件循环可以理解成一个调度器:
- 它维护一批任务。
- 哪个任务准备好了,就让哪个任务继续执行。
- 某个任务遇到
await,就暂时挂起。 - 等 IO 完成后,再把任务放回可执行队列。
粗略心智模型:
任务 A 执行 -> 遇到 await -> 让出控制权
任务 B 执行 -> 遇到 await -> 让出控制权
任务 C 执行 -> 完成
任务 A 的 IO 完成 -> 继续执行
任务 B 的 IO 完成 -> 继续执行
事件循环通常不需要你手动创建。应用入口用:
asyncio.run(main())
在已经处于异步环境里,例如 FastAPI 路由、Jupyter 或某些框架内部,不要随手再套一层 asyncio.run(),否则可能遇到事件循环已运行的问题。
5. coroutine、Task、Future 的区别
这三个概念经常混在一起。
5.1 coroutine
协程对象来自调用 async def 函数:
async def hello():
return "hello"
coro = hello()
此时 hello() 还没有真正跑完。你需要:
result = await coro
5.2 Task
Task 是被事件循环调度的协程。
task = asyncio.create_task(hello())
result = await task
create_task() 的意思是:把这个协程丢给事件循环,让它开始排队运行。
5.3 Future
Future 更底层,表示一个未来会有结果的对象。日常业务代码里,你更多接触的是 coroutine 和 Task。
简单记:
- coroutine 是“可等待的函数执行过程”。
- Task 是“已经交给事件循环调度的 coroutine”。
- Future 是“未来某个时刻会完成的结果容器”。
6. await 不是并发
很多人会写出这种代码:
async def main():
await fetch("a", 2)
await fetch("b", 2)
await fetch("c", 2)
这仍然接近串行,总耗时约 6 秒。
因为你是在等 a 完成,再等 b,再等 c。
真正并发需要创建多个任务:
async def main():
task_a = asyncio.create_task(fetch("a", 2))
task_b = asyncio.create_task(fetch("b", 2))
task_c = asyncio.create_task(fetch("c", 2))
results = await asyncio.gather(task_a, task_b, task_c)
print(results)
或者直接:
async def main():
results = await asyncio.gather(
fetch("a", 2),
fetch("b", 2),
fetch("c", 2),
)
print(results)
7. gather:一组任务一起等
asyncio.gather() 适合“发出一组任务,最后要拿到所有结果”的场景。
async def download(url: str):
await asyncio.sleep(1)
return f"content from {url}"
async def main():
urls = [
"https://example.com/a",
"https://example.com/b",
"https://example.com/c",
]
results = await asyncio.gather(*(download(url) for url in urls))
print(results)
特点:
- 返回结果顺序和传入任务顺序一致。
- 默认情况下,某个任务抛异常,整体也会抛异常。
- 适合批量请求、批量 IO、批量任务编排。
如果想让异常也作为结果返回:
results = await asyncio.gather(*tasks, return_exceptions=True)
但不要滥用它吞掉异常。生产代码里,异常应该有日志和处理策略。
8. TaskGroup:更现代的结构化并发
Python 3.11 引入了 asyncio.TaskGroup。它适合把一组子任务放进一个明确的生命周期里。
import asyncio
async def worker(name: str):
await asyncio.sleep(1)
print(f"{name} done")
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(worker("a"))
tg.create_task(worker("b"))
tg.create_task(worker("c"))
asyncio.run(main())
TaskGroup 的好处是结构更清晰:进入上下文,创建任务;退出上下文,等待这组任务结束。如果其中任务失败,错误传播和取消语义也更适合结构化并发。
经验:
- 简单批量取结果:
gather()很顺手。 - 复杂任务生命周期:优先考虑
TaskGroup。
9. as_completed:谁先完成先处理谁
有些场景不是等所有结果再处理,而是谁先回来先处理谁。
async def fetch_number(n: int):
await asyncio.sleep(4 - n)
return n
async def main():
tasks = [fetch_number(1), fetch_number(2), fetch_number(3)]
for done in asyncio.as_completed(tasks):
result = await done
print(result)
适合:
- 搜索多个镜像源,谁先返回先用。
- 批量下载,先完成的先写入。
- 多个慢接口并发请求,边收边处理。
10. 并发控制:别把对方服务器打爆
异步很容易写出“瞬间发出 10000 个请求”的代码。程序可能很兴奋,服务器和你自己的网络栈可能不兴奋。
用 Semaphore 控制并发:
import asyncio
sem = asyncio.Semaphore(5)
async def fetch(url: str):
async with sem:
print(f"fetch {url}")
await asyncio.sleep(1)
return url
async def main():
urls = [f"https://example.com/{i}" for i in range(20)]
results = await asyncio.gather(*(fetch(url) for url in urls))
print(results)
这里最多同时跑 5 个 fetch。
并发数不是越大越好。经验值:
- 本机小任务:几十到几百。
- 请求外部网站:先从 5 到 20 试起。
- 请求自己的 API:根据服务限流和数据库压力调整。
- 数据库连接:不要超过连接池上限。
11. 超时:永远不要无限等
网络请求、远程 API、数据库调用都可能卡住。异步代码里一定要有超时意识。
import asyncio
async def slow_task():
await asyncio.sleep(10)
return "done"
async def main():
try:
result = await asyncio.wait_for(slow_task(), timeout=2)
print(result)
except TimeoutError:
print("timeout")
Python 3.11+ 还可以用 asyncio.timeout():
async def main():
try:
async with asyncio.timeout(2):
result = await slow_task()
print(result)
except TimeoutError:
print("timeout")
建议:
- 外部 HTTP 请求设置超时。
- 数据库查询设置超时。
- 队列消费者设置退出条件。
- 后台任务设置取消策略。
12. 取消任务:任务不是创建了就不用管
创建 Task 后,如果不 await、不取消、不记录异常,就可能变成失控后台任务。
取消任务:
async def long_running():
try:
while True:
print("working")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("cleanup")
raise
async def main():
task = asyncio.create_task(long_running())
await asyncio.sleep(3)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("cancelled")
注意:
- 捕获
CancelledError后,通常要重新raise。 - 在取消时做清理,比如关闭连接、写日志、释放锁。
- 不要随便吞掉取消异常。
13. Queue:生产者消费者模型
异步爬虫、日志处理、批量任务很适合用队列。
import asyncio
async def producer(queue: asyncio.Queue[int]):
for i in range(10):
await queue.put(i)
print(f"produce {i}")
await queue.put(None)
async def consumer(queue: asyncio.Queue[int]):
while True:
item = await queue.get()
try:
if item is None:
break
print(f"consume {item}")
await asyncio.sleep(0.5)
finally:
queue.task_done()
async def main():
queue = asyncio.Queue()
await asyncio.gather(
producer(queue),
consumer(queue),
)
更常见的是多个消费者:
async def main():
queue = asyncio.Queue()
consumers = [asyncio.create_task(consumer(queue)) for _ in range(3)]
for i in range(20):
await queue.put(i)
for _ in consumers:
await queue.put(None)
await asyncio.gather(*consumers)
队列的价值:
- 削峰。
- 控制处理节奏。
- 解耦生产和消费。
- 让任务流水线更清晰。
14. 阻塞函数:异步里最隐蔽的坑
异步函数里不能随便放阻塞代码。
错误示例:
import time
async def bad():
time.sleep(3)
time.sleep() 会阻塞整个事件循环。应该写:
async def good():
await asyncio.sleep(3)
常见阻塞来源:
time.sleep()- 同步 HTTP 请求库
requests - 同步数据库驱动
- 大量 CPU 计算
- 大文件同步读写
如果不得不用同步阻塞函数,可以丢到线程里:
import asyncio
import time
def blocking_work():
time.sleep(3)
return "done"
async def main():
result = await asyncio.to_thread(blocking_work)
print(result)
asyncio.to_thread() 适合把 IO 型同步函数挪到线程里,避免卡住事件循环。CPU 密集任务仍然更适合进程池或专门的计算服务。
15. 实战:异步批量请求
实际项目里,最常见的异步 IO 是批量请求 API。这里用 httpx 做例子。
安装:
pip install httpx
代码:
import asyncio
import httpx
async def fetch(client: httpx.AsyncClient, url: str):
response = await client.get(url)
response.raise_for_status()
return {
"url": url,
"status_code": response.status_code,
"length": len(response.text),
}
async def main():
urls = [
"https://example.com",
"https://www.python.org",
"https://fastapi.tiangolo.com",
]
timeout = httpx.Timeout(10.0)
async with httpx.AsyncClient(timeout=timeout) as client:
results = await asyncio.gather(*(fetch(client, url) for url in urls))
for item in results:
print(item)
asyncio.run(main())
为什么要复用 AsyncClient?
- 复用连接池。
- 统一超时、请求头、代理等配置。
- 避免每个请求都重复创建客户端。
再加并发限制:
import asyncio
import httpx
async def fetch(client: httpx.AsyncClient, sem: asyncio.Semaphore, url: str):
async with sem:
response = await client.get(url)
response.raise_for_status()
return response.status_code
async def main():
urls = [f"https://example.com/?q={i}" for i in range(100)]
sem = asyncio.Semaphore(10)
async with httpx.AsyncClient(timeout=10) as client:
results = await asyncio.gather(
*(fetch(client, sem, url) for url in urls),
return_exceptions=True,
)
ok = [item for item in results if not isinstance(item, Exception)]
errors = [item for item in results if isinstance(item, Exception)]
print(f"ok={len(ok)}, errors={len(errors)}")
asyncio.run(main())
这个版本已经接近真实可用:
- 有连接复用。
- 有超时。
- 有并发限制。
- 有异常收集。
16. 实战:异步任务流水线
假设你要做一个 URL 抓取流水线:
- 生产 URL。
- 下载页面。
- 解析标题。
- 保存结果。
可以这样组织:
import asyncio
import re
import httpx
async def producer(queue: asyncio.Queue[str], urls: list[str]):
for url in urls:
await queue.put(url)
async def worker(
name: str,
queue: asyncio.Queue[str],
client: httpx.AsyncClient,
results: list[dict],
):
while True:
url = await queue.get()
try:
response = await client.get(url)
title_match = re.search(r"<title>(.*?)</title>", response.text, re.I | re.S)
title = title_match.group(1).strip() if title_match else ""
results.append({"url": url, "title": title})
print(f"{name} done {url}")
except Exception as error:
results.append({"url": url, "error": str(error)})
finally:
queue.task_done()
async def main():
urls = [
"https://example.com",
"https://www.python.org",
"https://fastapi.tiangolo.com",
]
queue: asyncio.Queue[str] = asyncio.Queue()
results: list[dict] = []
async with httpx.AsyncClient(timeout=10) as client:
await producer(queue, urls)
workers = [
asyncio.create_task(worker(f"worker-{i}", queue, client, results))
for i in range(3)
]
await queue.join()
for task in workers:
task.cancel()
await asyncio.gather(*workers, return_exceptions=True)
print(results)
asyncio.run(main())
这个模式适合:
- 异步爬虫。
- 批量 API 同步。
- 日志消费。
- 任务队列原型。
17. FastAPI 里的 async
FastAPI 支持同步路由和异步路由。
from fastapi import FastAPI
app = FastAPI()
@app.get("/sync")
def sync_route():
return {"mode": "sync"}
@app.get("/async")
async def async_route():
return {"mode": "async"}
什么时候写 async def?
- 你在路由里 await 异步数据库。
- 你在路由里 await 异步 HTTP 客户端。
- 你在路由里组合多个异步任务。
什么时候普通 def 就够?
- 你调用的是同步库。
- 代码很简单,没有 await。
- 你不确定依赖是否异步。
不要为了“看起来高级”把所有路由都写成 async def。如果里面又调用了大量阻塞同步函数,反而会卡住事件循环。
18. 调试异步代码
几个实用技巧:
18.1 开启 debug
asyncio.run(main(), debug=True)
debug 模式能帮助发现一些事件循环阻塞、未等待协程等问题。
18.2 给任务命名
task = asyncio.create_task(fetch("https://example.com"), name="fetch-example")
排查日志时更友好。
18.3 打印当前任务
task = asyncio.current_task()
print(task.get_name() if task else "no task")
18.4 不要忽略 warning
如果看到:
RuntimeWarning: coroutine was never awaited
通常说明你调用了异步函数,但忘记 await 或没有创建任务。
错误:
fetch("https://example.com")
正确:
await fetch("https://example.com")
或者:
asyncio.create_task(fetch("https://example.com"))
19. 常见误区
19.1 async 会让所有代码变快
不会。它主要优化 IO 等待。
19.2 await 就是并发
不是。连续 await 仍然可能是串行。
19.3 create_task 后可以不管
不建议。任务异常可能丢失,生命周期也会混乱。
19.4 异步能替代线程和进程
不能。线程、进程、异步解决的问题不同。
19.5 异步代码里用 requests 没问题
有问题。requests 是同步库,会阻塞事件循环。异步 HTTP 请求优先用 httpx.AsyncClient 或 aiohttp。
20. 学习路线
建议按这个顺序练:
async def、await、asyncio.run()。asyncio.gather()并发执行。asyncio.create_task()手动创建任务。asyncio.TaskGroup()理解结构化并发。Semaphore控制并发量。wait_for()或timeout()设置超时。Queue写生产者消费者。httpx.AsyncClient写真实异步 HTTP 请求。- FastAPI 中写异步路由。
- 学会处理取消、异常、日志和资源关闭。
掌握到第 8 步,异步 IO 就不再只是语法玩具,而是能真正帮你写工具和后端服务的能力。
21. 最小作业
做一个异步 URL 检测器:
输入:urls.txt
输出:每个 URL 的状态码、响应时间、标题
要求:
1. 并发数可配置。
2. 每个请求有超时。
3. 错误不影响其他任务。
4. 结果保存为 JSON。
5. 支持 Ctrl+C 时优雅退出。
这个小项目能覆盖异步 IO 的大部分核心能力:
- 批量任务。
- 并发控制。
- 超时。
- 异常处理。
- 队列或 gather。
- 文件输出。
- 资源关闭。
如果你能写完它,再去看 FastAPI、异步爬虫、异步数据库,就会顺很多。