ai
  • index
  • 1.欢迎来到LlamaIndex
  • 2.高级概念
  • 3.安装与设置
  • 4.入门教程(使用OpenAI)
  • 5.入门教程(使用本地LLMs)
  • 6.构建一个LLM应用
  • 7.使用LLMs
  • pydantic
  • asyncio
  • apikey
  • 8.RAG简介
  • 9.加载数据
  • 10.索引
  • 11.存储
  • 12.查询
  • weaviate
  • Cohere
  • warnings
  • WeaviateStart
  • spacy
  • 使用LlamaIndex构建全栈Web应用指南
  • back2
  • back4
  • front2
  • front4
  • front6
  • front8
  • llamaindex_backend
  • llamaindex_frontend
  • 1. 基本概念
    • 1.1 协程 (Coroutine)
    • 1.2 事件循环 (Event Loop)
    • 1.3 Awaitables
  • 2. 核心 API
    • 2.1 运行协程
    • 2.2 创建任务
    • 2.3 并发运行多个协程
    • 2.4 超时控制
  • 3. 高级特性
    • 3.1 共享数据
    • 3.2 同步原语
    • 3.3 子进程
    • 3.4 流 (Streams)
  • 4. 常见模式
    • 4.1 生产者-消费者模式
    • 4.2 限制并发数
  • 5. 调试与测试
    • 5.1 调试模式
    • 5.2 测试协程
  • 6. 最佳实践
  • 7. 常见陷阱
  • 总结

Asyncio 是 Python 中用于编写并发代码的库,使用 async/await 语法。它特别适合 I/O 密集型和高并发网络应用。下面我将从基础概念到高级用法全面讲解 asyncio。

1. 基本概念 #

1.1 协程 (Coroutine) #

协程是 asyncio 的核心概念,通过 async def 定义的函数。协程调用时不会立即执行,而是返回一个协程对象,需要被事件循环调度执行。协程可以使用 await 暂停执行,等待其他协程完成。

# 定义一个简单的协程函数
async def my_coroutine():
    # 返回一个固定值
    return 123

# 运行协程的完整示例
import asyncio

async def main():
    # 调用协程并获取结果
    result = await my_coroutine()
    print(f"协程返回值: {result}")

# 运行主协程
if __name__ == "__main__":
    asyncio.run(main())

协程有以下特点:

  • 调用时不会立即执行,而是返回一个协程对象
  • 需要被事件循环调度执行
  • 可以使用 await 暂停执行,等待其他协程完成

1.2 事件循环 (Event Loop) #

事件循环是 asyncio 的核心执行器,负责调度协程的执行、处理回调、执行网络 I/O 操作和运行子进程。它是异步编程的基础设施,管理所有异步任务的执行顺序和时机。

# 导入asyncio模块
import asyncio

# 定义一个简单的协程函数
async def main():
    # 打印开始信息
    print('Hello')
    # 暂停1秒,模拟异步操作
    await asyncio.sleep(1)
    # 打印结束信息
    print('World')

# 运行主协程(Python 3.7+ 推荐用法)
if __name__ == "__main__":
    asyncio.run(main())

1.3 Awaitables #

可等待对象是可以在 await 表达式中使用的对象。主要有三种类型:协程、Task 和 Future。理解这些概念对于掌握 asyncio 编程至关重要。

# 导入asyncio模块
import asyncio

# 定义一个协程函数
async def coroutine_function():
    # 暂停0.1秒
    await asyncio.sleep(0.1)
    # 返回一个字符串
    return "协程结果"

# 定义一个主协程函数来演示不同类型的awaitable对象
async def main():
    # 1. 协程 - 直接await协程函数
    result1 = await coroutine_function()
    print(f"协程结果: {result1}")

    # 2. Task - 通过create_task包装的协程
    task = asyncio.create_task(coroutine_function())
    result2 = await task
    print(f"Task结果: {result2}")

    # 3. Future - 低层级的可等待对象(通常不需要直接使用)
    future = asyncio.Future()
    # 设置future的结果
    future.set_result("Future结果")
    result3 = await future
    print(f"Future结果: {result3}")

# 运行主协程
if __name__ == "__main__":
    asyncio.run(main())

2. 核心 API #

2.1 运行协程 #

运行协程有多种方式,Python 3.7+ 推荐使用 asyncio.run(),这是最简单和现代的方式。对于旧版本Python,需要使用事件循环的显式管理。

# 导入asyncio模块
import asyncio

# 定义一个示例协程
async def example_coroutine():
    # 模拟一些异步工作
    await asyncio.sleep(0.1)
    # 返回一个值
    return "协程执行完成"

# 定义主协程函数
async def main():
    # 调用示例协程
    result = await example_coroutine()
    # 打印结果
    print(result)

# Python 3.7+ 推荐方式
if __name__ == "__main__":
    # 使用asyncio.run()运行主协程
    asyncio.run(main())

    # 旧版本方式(如果需要兼容旧版本)
    # loop = asyncio.get_event_loop()
    # try:
    #     loop.run_until_complete(main())
    # finally:
    #     loop.close()

2.2 创建任务 #

任务(Task)是协程的包装器,它调度协程的执行。创建任务后,协程会在事件循环中并发执行,而不需要立即等待其完成。

# 导入asyncio模块
import asyncio

# 定义一个任务协程
async def my_task():
    # 模拟耗时操作
    await asyncio.sleep(1)
    # 打印任务完成信息
    print("Task completed")

# 定义主协程函数
async def main():
    # 创建任务,但不立即等待
    task = asyncio.create_task(my_task())
    # 打印任务创建信息
    print("Task created")
    # 等待任务完成
    await task
    # 打印主协程完成信息
    print("Main coroutine completed")

# 运行主协程
if __name__ == "__main__":
    asyncio.run(main())

2.3 并发运行多个协程 #

asyncio 提供了多种方式来并发运行多个协程。asyncio.gather() 是最常用的方式,它会等待所有协程完成并返回结果列表。asyncio.wait() 提供了更细粒度的控制。

# 导入asyncio模块
import asyncio

# 定义一个模拟数据获取的协程
async def fetch_data(delay, id):
    # 模拟网络延迟
    await asyncio.sleep(delay)
    # 返回数据
    return f"Data {id}"

# 定义主协程函数
async def main():
    # 方式1: 使用gather并发运行多个协程
    print("使用gather方式:")
    # 并发运行三个fetch_data协程
    results = await asyncio.gather(
        fetch_data(1, 1),
        fetch_data(2, 2),
        fetch_data(3, 3)
    )
    # 打印所有结果
    print(results)  # ['Data 1', 'Data 2', 'Data 3']

    # 方式2: 使用wait并发运行多个协程
    print("\n使用wait方式:")
    # 创建任务列表
    tasks = [fetch_data(i, i) for i in range(1, 4)]
    # 等待所有任务完成
    done, pending = await asyncio.wait(tasks)
    # 遍历完成的任务并打印结果
    for task in done:
        print(task.result())

# 运行主协程
if __name__ == "__main__":
    asyncio.run(main())

2.4 超时控制 #

超时控制是异步编程中的重要概念,可以防止程序无限等待。asyncio.wait_for() 是最常用的超时控制方式,它会在指定时间内等待协程完成,超时则抛出异常。

# 导入asyncio模块
import asyncio

# 定义一个慢速操作协程
async def slow_operation():
    # 模拟一个耗时很长的操作(1小时)
    await asyncio.sleep(3600)
    # 返回完成信息
    return "Done"

# 定义主协程函数
async def main():
    try:
        # 设置1秒超时等待slow_operation
        result = await asyncio.wait_for(slow_operation(), timeout=1.0)
        # 如果成功完成,打印结果
        print(result)
    except asyncio.TimeoutError:
        # 如果超时,打印超时信息
        print("Timeout!")

# 运行主协程
if __name__ == "__main__":
    asyncio.run(main())

3. 高级特性 #

3.1 共享数据 #

异步编程中经常需要在多个协程之间共享数据。asyncio.Queue 是最常用的共享数据结构,它提供了线程安全的队列操作,支持生产者-消费者模式。

# 导入asyncio模块
import asyncio

# 定义工作协程,处理队列中的项目
async def worker(queue):
    # 无限循环处理队列项目
    while True:
        # 从队列中获取项目
        item = await queue.get()
        # 打印处理信息
        print(f"Processing {item}")
        # 标记任务完成
        queue.task_done()

# 定义主协程函数
async def main():
    # 创建一个异步队列
    queue = asyncio.Queue()

    # 启动worker协程
    worker_task = asyncio.create_task(worker(queue))

    # 向队列中添加10个项目
    for i in range(10):
        await queue.put(i)

    # 等待所有项目处理完成
    await queue.join()
    # 取消worker任务
    worker_task.cancel()

# 运行主协程
if __name__ == "__main__":
    asyncio.run(main())

3.2 同步原语 #

异步编程中也需要同步原语来协调多个协程的执行。asyncio.Lock 是最基本的同步原语,用于保护共享资源,确保同一时间只有一个协程访问特定资源。

# 导入asyncio模块
import asyncio

# 定义工作协程,使用锁来同步访问
async def worker(lock, id):
    # 使用异步上下文管理器获取锁
    async with lock:
        # 打印获取锁的信息
        print(f"Worker {id} acquired lock")
        # 模拟一些工作
        await asyncio.sleep(1)
        # 打印释放锁的信息
        print(f"Worker {id} released lock")

# 定义主协程函数
async def main():
    # 创建一个异步锁
    lock = asyncio.Lock()
    # 并发运行3个worker协程
    await asyncio.gather(*(worker(lock, i) for i in range(3)))

# 运行主协程
if __name__ == "__main__":
    asyncio.run(main())

3.3 子进程 #

asyncio 支持异步执行子进程,这对于需要调用外部命令或程序的场景非常有用。asyncio.create_subprocess_shell() 可以异步执行shell命令。

# 导入asyncio模块
import asyncio

# 定义运行命令的协程
async def run_command():
    # 创建子进程,执行ls -l命令
    proc = await asyncio.create_subprocess_shell(
        'ls -l',
        # 捕获标准输出
        stdout=asyncio.subprocess.PIPE,
        # 捕获标准错误
        stderr=asyncio.subprocess.PIPE
    )

    # 等待进程完成并获取输出
    stdout, stderr = await proc.communicate()
    # 打印退出码
    print(f'Exit code: {proc.returncode}')
    # 如果有标准输出,打印它
    if stdout:
        print(f'Stdout:\n{stdout.decode()}')
    # 如果有标准错误,打印它
    if stderr:
        print(f'Stderr:\n{stderr.decode()}')

# 定义主协程函数
async def main():
    # 运行命令
    await run_command()

# 运行主协程
if __name__ == "__main__":
    asyncio.run(main())

3.4 流 (Streams) #

asyncio 提供了异步流操作,用于处理网络连接。asyncio.open_connection() 可以建立TCP连接,返回reader和writer对象用于读写数据。

# 导入asyncio模块
import asyncio

# 定义TCP回显客户端协程
async def tcp_echo_client(message):
    # 建立到本地8888端口的连接
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888)

    # 打印发送的消息
    print(f'Send: {message!r}')
    # 将消息编码并写入
    writer.write(message.encode())
    # 等待数据发送完成
    await writer.drain()

    # 读取最多100字节的响应数据
    data = await reader.read(100)
    # 打印接收到的数据
    print(f'Received: {data.decode()!r}')

    # 关闭写入端
    writer.close()
    # 等待写入端完全关闭
    await writer.wait_closed()

# 定义主协程函数
async def main():
    # 发送测试消息
    await tcp_echo_client('Hello, World!')

# 运行主协程
if __name__ == "__main__":
    asyncio.run(main())

4. 常见模式 #

4.1 生产者-消费者模式 #

生产者-消费者模式是并发编程中的经典模式。在asyncio中,可以使用Queue来实现这个模式,生产者向队列添加数据,消费者从队列获取数据并处理。

# 导入asyncio模块
import asyncio

# 定义生产者协程
async def producer(queue):
    # 生产5个项目
    for i in range(5):
        # 将项目放入队列
        await queue.put(i)
        # 暂停0.1秒
        await asyncio.sleep(0.1)

# 定义消费者协程
async def consumer(queue):
    # 无限循环消费队列中的项目
    while True:
        # 从队列中获取项目
        item = await queue.get()
        # 打印消费的项目
        print(f"Consumed {item}")
        # 标记任务完成
        queue.task_done()

# 定义主协程函数
async def main():
    # 创建一个队列
    queue = asyncio.Queue()
    # 创建2个生产者任务
    producers = [asyncio.create_task(producer(queue)) for _ in range(2)]
    # 创建3个消费者任务
    consumers = [asyncio.create_task(consumer(queue)) for _ in range(3)]

    # 等待所有生产者完成
    await asyncio.gather(*producers)
    # 等待队列中的所有项目被消费
    await queue.join()

    # 取消所有消费者任务
    for c in consumers:
        c.cancel()

# 运行主协程
if __name__ == "__main__":
    asyncio.run(main())

4.2 限制并发数 #

在高并发场景中,需要限制同时执行的协程数量以避免资源耗尽。asyncio.Semaphore 是实现并发限制的有效工具,它维护一个计数器来控制并发数量。

# 导入asyncio模块
import asyncio

# 定义工作协程,使用信号量限制并发
async def worker(semaphore, id):
    # 获取信号量
    async with semaphore:
        # 打印工作开始信息
        print(f"Worker {id} started")
        # 模拟工作过程
        await asyncio.sleep(1)
        # 打印工作完成信息
        print(f"Worker {id} finished")

# 定义主协程函数
async def main():
    # 创建信号量,限制最多3个并发
    semaphore = asyncio.Semaphore(3)
    # 并发运行10个worker,但最多同时运行3个
    await asyncio.gather(*(worker(semaphore, i) for i in range(10)))

# 运行主协程
if __name__ == "__main__":
    asyncio.run(main())

5. 调试与测试 #

5.1 调试模式 #

asyncio 提供了调试模式来帮助开发者诊断异步代码中的问题。启用调试模式后,会显示更详细的错误信息和执行跟踪。

# 导入asyncio模块
import asyncio

# 定义一个示例协程
async def example_coroutine():
    # 模拟一些异步工作
    await asyncio.sleep(0.1)
    # 返回一个值
    return "调试模式测试"

# 定义主协程函数
async def main():
    # 调用示例协程
    result = await example_coroutine()
    # 打印结果
    print(result)

# 运行主协程,启用调试模式
if __name__ == "__main__":
    # 使用debug=True启用调试模式
    asyncio.run(main(), debug=True)

5.2 测试协程 #

测试异步代码需要特殊的方法,因为协程需要在事件循环中运行。unittest框架提供了测试异步代码的支持。

# 导入必要的模块
import unittest
import asyncio

# 定义测试类
class TestAsync(unittest.TestCase):
    # 设置测试环境
    def setUp(self):
        # 创建新的事件循环
        self.loop = asyncio.new_event_loop()
        # 设置为当前事件循环
        asyncio.set_event_loop(self.loop)

    # 清理测试环境
    def tearDown(self):
        # 关闭事件循环
        self.loop.close()

    # 测试协程的方法
    def test_coroutine(self):
        # 定义测试协程
        async def test():
            # 返回测试值
            return 42

        # 在事件循环中运行测试协程
        result = self.loop.run_until_complete(test())
        # 断言结果等于42
        self.assertEqual(result, 42)

# 运行测试
if __name__ == "__main__":
    unittest.main()

6. 最佳实践 #

asyncio 编程有一些重要的最佳实践,遵循这些实践可以避免常见问题并提高代码质量。

  1. 避免阻塞调用:不要在协程中使用同步 I/O 或 CPU 密集型操作
  2. 使用高层 API:优先使用 asyncio.run() 和 asyncio.create_task()
  3. 合理设置超时:为所有网络操作设置超时
  4. 资源清理:确保关闭所有打开的资源(连接、文件等)
  5. 错误处理:妥善处理协程中的异常

7. 常见陷阱 #

异步编程有一些常见的陷阱,了解这些陷阱可以帮助开发者避免错误。

  1. 忘记 await:调用协程时忘记使用 await
  2. 混合同步和异步代码:在协程中调用阻塞函数
  3. 不合理的任务创建:创建大量任务而不限制并发数
  4. 未处理的异常:协程中的异常如果不被捕获可能会被静默忽略

总结 #

Asyncio 提供了一种高效的方式来处理 I/O 密集型任务,通过协程和事件循环实现了高并发。掌握 asyncio 需要理解其异步编程模型,并熟悉各种同步原语和并发模式。随着 Python 版本的更新,asyncio API 也在不断改进,建议始终使用最新稳定版 Python 以获得最佳体验。

访问验证

请输入访问令牌

Token不正确,请重新输入