Python 异步 IO:从事件循环到实战爬虫 的文章封面
返回文章列表
Neuroblue writing

Python 异步 IO:从事件循环到实战爬虫

系统讲清 Python 异步 IO 的心智模型、async/await、Task、TaskGroup、并发控制、超时、队列、阻塞坑和实战写法。

异步 IO 是 Python 里最容易被误解的东西之一。很多人第一次看到 async defawait,会以为它等于“自动变快”或者“多线程平替”。其实不是。

异步 IO 的核心不是让 CPU 同时执行多段 Python 代码,而是让程序在等待网络、磁盘、数据库、HTTP 请求时,把控制权交还给事件循环,让其他任务先跑。它解决的是“等”的浪费,不是“算”的瓶颈。

一句话:

异步 IO 适合大量等待,不适合大量计算。

1. 先分清三种并发

Python 里常见三种并发方式:

方式适合场景关键词
多线程阻塞 IO、旧同步库threadingThreadPoolExecutor
多进程CPU 密集计算multiprocessingProcessPoolExecutor
异步 IO高并发网络 IOasyncioasync/await

举例:

  • 爬 1000 个网页:异步 IO 很合适。
  • 调 1000 次 HTTP API:异步 IO 很合适。
  • 等数据库返回:异步 IO 很合适。
  • 计算 1000 张图片的特征:多进程更合适。
  • 解压大文件、跑模型推理:异步 IO 不会神奇提速。

异步 IO 的优势来自等待期间切换任务,而不是突破 CPU 限制。

2. 同步代码为什么慢

先看同步版本:

import time


def fetch(name: str, delay: int):
    print(f"{name} start")
    time.sleep(delay)
    print(f"{name} done")
    return name


def main():
    fetch("a", 2)
    fetch("b", 2)
    fetch("c", 2)


main()

总耗时大约 6 秒。

问题不在于 CPU 忙,而在于程序每次 sleep 都停在那里。真实网络请求也是类似的:发出请求后,绝大多数时间都在等远端响应。

3. 异步版本为什么快

异步版本:

import asyncio


async def fetch(name: str, delay: int):
    print(f"{name} start")
    await asyncio.sleep(delay)
    print(f"{name} done")
    return name


async def main():
    results = await asyncio.gather(
        fetch("a", 2),
        fetch("b", 2),
        fetch("c", 2),
    )
    print(results)


asyncio.run(main())

总耗时大约 2 秒。

关键点:

  • async def 定义协程函数。
  • 调用协程函数不会立刻执行完,而是得到一个 coroutine 对象。
  • await 表示这里可能要等待。
  • 等待期间,事件循环可以调度其他任务。
  • asyncio.run() 创建并运行事件循环。
  • asyncio.gather() 并发等待多个任务完成。

这不是三段代码真的同时占用 CPU,而是它们都在等待时互相让路。

4. 事件循环是什么

事件循环可以理解成一个调度器:

  1. 它维护一批任务。
  2. 哪个任务准备好了,就让哪个任务继续执行。
  3. 某个任务遇到 await,就暂时挂起。
  4. 等 IO 完成后,再把任务放回可执行队列。

粗略心智模型:

任务 A 执行 -> 遇到 await -> 让出控制权
任务 B 执行 -> 遇到 await -> 让出控制权
任务 C 执行 -> 完成
任务 A 的 IO 完成 -> 继续执行
任务 B 的 IO 完成 -> 继续执行

事件循环通常不需要你手动创建。应用入口用:

asyncio.run(main())

在已经处于异步环境里,例如 FastAPI 路由、Jupyter 或某些框架内部,不要随手再套一层 asyncio.run(),否则可能遇到事件循环已运行的问题。

5. coroutine、Task、Future 的区别

这三个概念经常混在一起。

5.1 coroutine

协程对象来自调用 async def 函数:

async def hello():
    return "hello"


coro = hello()

此时 hello() 还没有真正跑完。你需要:

result = await coro

5.2 Task

Task 是被事件循环调度的协程。

task = asyncio.create_task(hello())
result = await task

create_task() 的意思是:把这个协程丢给事件循环,让它开始排队运行。

5.3 Future

Future 更底层,表示一个未来会有结果的对象。日常业务代码里,你更多接触的是 coroutine 和 Task。

简单记:

  • coroutine 是“可等待的函数执行过程”。
  • Task 是“已经交给事件循环调度的 coroutine”。
  • Future 是“未来某个时刻会完成的结果容器”。

6. await 不是并发

很多人会写出这种代码:

async def main():
    await fetch("a", 2)
    await fetch("b", 2)
    await fetch("c", 2)

这仍然接近串行,总耗时约 6 秒。

因为你是在等 a 完成,再等 b,再等 c

真正并发需要创建多个任务:

async def main():
    task_a = asyncio.create_task(fetch("a", 2))
    task_b = asyncio.create_task(fetch("b", 2))
    task_c = asyncio.create_task(fetch("c", 2))

    results = await asyncio.gather(task_a, task_b, task_c)
    print(results)

或者直接:

async def main():
    results = await asyncio.gather(
        fetch("a", 2),
        fetch("b", 2),
        fetch("c", 2),
    )
    print(results)

7. gather:一组任务一起等

asyncio.gather() 适合“发出一组任务,最后要拿到所有结果”的场景。

async def download(url: str):
    await asyncio.sleep(1)
    return f"content from {url}"


async def main():
    urls = [
        "https://example.com/a",
        "https://example.com/b",
        "https://example.com/c",
    ]
    results = await asyncio.gather(*(download(url) for url in urls))
    print(results)

特点:

  • 返回结果顺序和传入任务顺序一致。
  • 默认情况下,某个任务抛异常,整体也会抛异常。
  • 适合批量请求、批量 IO、批量任务编排。

如果想让异常也作为结果返回:

results = await asyncio.gather(*tasks, return_exceptions=True)

但不要滥用它吞掉异常。生产代码里,异常应该有日志和处理策略。

8. TaskGroup:更现代的结构化并发

Python 3.11 引入了 asyncio.TaskGroup。它适合把一组子任务放进一个明确的生命周期里。

import asyncio


async def worker(name: str):
    await asyncio.sleep(1)
    print(f"{name} done")


async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(worker("a"))
        tg.create_task(worker("b"))
        tg.create_task(worker("c"))


asyncio.run(main())

TaskGroup 的好处是结构更清晰:进入上下文,创建任务;退出上下文,等待这组任务结束。如果其中任务失败,错误传播和取消语义也更适合结构化并发。

经验:

  • 简单批量取结果:gather() 很顺手。
  • 复杂任务生命周期:优先考虑 TaskGroup

9. as_completed:谁先完成先处理谁

有些场景不是等所有结果再处理,而是谁先回来先处理谁。

async def fetch_number(n: int):
    await asyncio.sleep(4 - n)
    return n


async def main():
    tasks = [fetch_number(1), fetch_number(2), fetch_number(3)]

    for done in asyncio.as_completed(tasks):
        result = await done
        print(result)

适合:

  • 搜索多个镜像源,谁先返回先用。
  • 批量下载,先完成的先写入。
  • 多个慢接口并发请求,边收边处理。

10. 并发控制:别把对方服务器打爆

异步很容易写出“瞬间发出 10000 个请求”的代码。程序可能很兴奋,服务器和你自己的网络栈可能不兴奋。

Semaphore 控制并发:

import asyncio


sem = asyncio.Semaphore(5)


async def fetch(url: str):
    async with sem:
        print(f"fetch {url}")
        await asyncio.sleep(1)
        return url


async def main():
    urls = [f"https://example.com/{i}" for i in range(20)]
    results = await asyncio.gather(*(fetch(url) for url in urls))
    print(results)

这里最多同时跑 5 个 fetch

并发数不是越大越好。经验值:

  • 本机小任务:几十到几百。
  • 请求外部网站:先从 5 到 20 试起。
  • 请求自己的 API:根据服务限流和数据库压力调整。
  • 数据库连接:不要超过连接池上限。

11. 超时:永远不要无限等

网络请求、远程 API、数据库调用都可能卡住。异步代码里一定要有超时意识。

import asyncio


async def slow_task():
    await asyncio.sleep(10)
    return "done"


async def main():
    try:
        result = await asyncio.wait_for(slow_task(), timeout=2)
        print(result)
    except TimeoutError:
        print("timeout")

Python 3.11+ 还可以用 asyncio.timeout()

async def main():
    try:
        async with asyncio.timeout(2):
            result = await slow_task()
            print(result)
    except TimeoutError:
        print("timeout")

建议:

  • 外部 HTTP 请求设置超时。
  • 数据库查询设置超时。
  • 队列消费者设置退出条件。
  • 后台任务设置取消策略。

12. 取消任务:任务不是创建了就不用管

创建 Task 后,如果不 await、不取消、不记录异常,就可能变成失控后台任务。

取消任务:

async def long_running():
    try:
        while True:
            print("working")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("cleanup")
        raise


async def main():
    task = asyncio.create_task(long_running())
    await asyncio.sleep(3)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("cancelled")

注意:

  • 捕获 CancelledError 后,通常要重新 raise
  • 在取消时做清理,比如关闭连接、写日志、释放锁。
  • 不要随便吞掉取消异常。

13. Queue:生产者消费者模型

异步爬虫、日志处理、批量任务很适合用队列。

import asyncio


async def producer(queue: asyncio.Queue[int]):
    for i in range(10):
        await queue.put(i)
        print(f"produce {i}")
    await queue.put(None)


async def consumer(queue: asyncio.Queue[int]):
    while True:
        item = await queue.get()
        try:
            if item is None:
                break
            print(f"consume {item}")
            await asyncio.sleep(0.5)
        finally:
            queue.task_done()


async def main():
    queue = asyncio.Queue()
    await asyncio.gather(
        producer(queue),
        consumer(queue),
    )

更常见的是多个消费者:

async def main():
    queue = asyncio.Queue()
    consumers = [asyncio.create_task(consumer(queue)) for _ in range(3)]

    for i in range(20):
        await queue.put(i)

    for _ in consumers:
        await queue.put(None)

    await asyncio.gather(*consumers)

队列的价值:

  • 削峰。
  • 控制处理节奏。
  • 解耦生产和消费。
  • 让任务流水线更清晰。

14. 阻塞函数:异步里最隐蔽的坑

异步函数里不能随便放阻塞代码。

错误示例:

import time


async def bad():
    time.sleep(3)

time.sleep() 会阻塞整个事件循环。应该写:

async def good():
    await asyncio.sleep(3)

常见阻塞来源:

  • time.sleep()
  • 同步 HTTP 请求库 requests
  • 同步数据库驱动
  • 大量 CPU 计算
  • 大文件同步读写

如果不得不用同步阻塞函数,可以丢到线程里:

import asyncio
import time


def blocking_work():
    time.sleep(3)
    return "done"


async def main():
    result = await asyncio.to_thread(blocking_work)
    print(result)

asyncio.to_thread() 适合把 IO 型同步函数挪到线程里,避免卡住事件循环。CPU 密集任务仍然更适合进程池或专门的计算服务。

15. 实战:异步批量请求

实际项目里,最常见的异步 IO 是批量请求 API。这里用 httpx 做例子。

安装:

pip install httpx

代码:

import asyncio
import httpx


async def fetch(client: httpx.AsyncClient, url: str):
    response = await client.get(url)
    response.raise_for_status()
    return {
        "url": url,
        "status_code": response.status_code,
        "length": len(response.text),
    }


async def main():
    urls = [
        "https://example.com",
        "https://www.python.org",
        "https://fastapi.tiangolo.com",
    ]

    timeout = httpx.Timeout(10.0)
    async with httpx.AsyncClient(timeout=timeout) as client:
        results = await asyncio.gather(*(fetch(client, url) for url in urls))

    for item in results:
        print(item)


asyncio.run(main())

为什么要复用 AsyncClient

  • 复用连接池。
  • 统一超时、请求头、代理等配置。
  • 避免每个请求都重复创建客户端。

再加并发限制:

import asyncio
import httpx


async def fetch(client: httpx.AsyncClient, sem: asyncio.Semaphore, url: str):
    async with sem:
        response = await client.get(url)
        response.raise_for_status()
        return response.status_code


async def main():
    urls = [f"https://example.com/?q={i}" for i in range(100)]
    sem = asyncio.Semaphore(10)

    async with httpx.AsyncClient(timeout=10) as client:
        results = await asyncio.gather(
            *(fetch(client, sem, url) for url in urls),
            return_exceptions=True,
        )

    ok = [item for item in results if not isinstance(item, Exception)]
    errors = [item for item in results if isinstance(item, Exception)]
    print(f"ok={len(ok)}, errors={len(errors)}")


asyncio.run(main())

这个版本已经接近真实可用:

  • 有连接复用。
  • 有超时。
  • 有并发限制。
  • 有异常收集。

16. 实战:异步任务流水线

假设你要做一个 URL 抓取流水线:

  1. 生产 URL。
  2. 下载页面。
  3. 解析标题。
  4. 保存结果。

可以这样组织:

import asyncio
import re
import httpx


async def producer(queue: asyncio.Queue[str], urls: list[str]):
    for url in urls:
        await queue.put(url)


async def worker(
    name: str,
    queue: asyncio.Queue[str],
    client: httpx.AsyncClient,
    results: list[dict],
):
    while True:
        url = await queue.get()
        try:
            response = await client.get(url)
            title_match = re.search(r"<title>(.*?)</title>", response.text, re.I | re.S)
            title = title_match.group(1).strip() if title_match else ""
            results.append({"url": url, "title": title})
            print(f"{name} done {url}")
        except Exception as error:
            results.append({"url": url, "error": str(error)})
        finally:
            queue.task_done()


async def main():
    urls = [
        "https://example.com",
        "https://www.python.org",
        "https://fastapi.tiangolo.com",
    ]
    queue: asyncio.Queue[str] = asyncio.Queue()
    results: list[dict] = []

    async with httpx.AsyncClient(timeout=10) as client:
        await producer(queue, urls)
        workers = [
            asyncio.create_task(worker(f"worker-{i}", queue, client, results))
            for i in range(3)
        ]

        await queue.join()

        for task in workers:
            task.cancel()

        await asyncio.gather(*workers, return_exceptions=True)

    print(results)


asyncio.run(main())

这个模式适合:

  • 异步爬虫。
  • 批量 API 同步。
  • 日志消费。
  • 任务队列原型。

17. FastAPI 里的 async

FastAPI 支持同步路由和异步路由。

from fastapi import FastAPI

app = FastAPI()


@app.get("/sync")
def sync_route():
    return {"mode": "sync"}


@app.get("/async")
async def async_route():
    return {"mode": "async"}

什么时候写 async def

  • 你在路由里 await 异步数据库。
  • 你在路由里 await 异步 HTTP 客户端。
  • 你在路由里组合多个异步任务。

什么时候普通 def 就够?

  • 你调用的是同步库。
  • 代码很简单,没有 await。
  • 你不确定依赖是否异步。

不要为了“看起来高级”把所有路由都写成 async def。如果里面又调用了大量阻塞同步函数,反而会卡住事件循环。

18. 调试异步代码

几个实用技巧:

18.1 开启 debug

asyncio.run(main(), debug=True)

debug 模式能帮助发现一些事件循环阻塞、未等待协程等问题。

18.2 给任务命名

task = asyncio.create_task(fetch("https://example.com"), name="fetch-example")

排查日志时更友好。

18.3 打印当前任务

task = asyncio.current_task()
print(task.get_name() if task else "no task")

18.4 不要忽略 warning

如果看到:

RuntimeWarning: coroutine was never awaited

通常说明你调用了异步函数,但忘记 await 或没有创建任务。

错误:

fetch("https://example.com")

正确:

await fetch("https://example.com")

或者:

asyncio.create_task(fetch("https://example.com"))

19. 常见误区

19.1 async 会让所有代码变快

不会。它主要优化 IO 等待。

19.2 await 就是并发

不是。连续 await 仍然可能是串行。

19.3 create_task 后可以不管

不建议。任务异常可能丢失,生命周期也会混乱。

19.4 异步能替代线程和进程

不能。线程、进程、异步解决的问题不同。

19.5 异步代码里用 requests 没问题

有问题。requests 是同步库,会阻塞事件循环。异步 HTTP 请求优先用 httpx.AsyncClientaiohttp

20. 学习路线

建议按这个顺序练:

  1. async defawaitasyncio.run()
  2. asyncio.gather() 并发执行。
  3. asyncio.create_task() 手动创建任务。
  4. asyncio.TaskGroup() 理解结构化并发。
  5. Semaphore 控制并发量。
  6. wait_for()timeout() 设置超时。
  7. Queue 写生产者消费者。
  8. httpx.AsyncClient 写真实异步 HTTP 请求。
  9. FastAPI 中写异步路由。
  10. 学会处理取消、异常、日志和资源关闭。

掌握到第 8 步,异步 IO 就不再只是语法玩具,而是能真正帮你写工具和后端服务的能力。

21. 最小作业

做一个异步 URL 检测器:

输入:urls.txt
输出:每个 URL 的状态码、响应时间、标题
要求:
1. 并发数可配置。
2. 每个请求有超时。
3. 错误不影响其他任务。
4. 结果保存为 JSON。
5. 支持 Ctrl+C 时优雅退出。

这个小项目能覆盖异步 IO 的大部分核心能力:

  • 批量任务。
  • 并发控制。
  • 超时。
  • 异常处理。
  • 队列或 gather。
  • 文件输出。
  • 资源关闭。

如果你能写完它,再去看 FastAPI、异步爬虫、异步数据库,就会顺很多。

22. 参考资料