Asyncio 是 Python 中用于编写并发代码的库,使用 async/await 语法。它特别适合 I/O 密集型和高并发网络应用。下面我将从基础概念到高级用法全面讲解 asyncio。
1. 基本概念 #
1.1 协程 (Coroutine) #
协程是 asyncio 的核心概念,通过 async def 定义的函数。协程调用时不会立即执行,而是返回一个协程对象,需要被事件循环调度执行。协程可以使用 await 暂停执行,等待其他协程完成。
# 定义一个简单的协程函数
async def my_coroutine():
# 返回一个固定值
return 123
# 运行协程的完整示例
import asyncio
async def main():
# 调用协程并获取结果
result = await my_coroutine()
print(f"协程返回值: {result}")
# 运行主协程
if __name__ == "__main__":
asyncio.run(main())协程有以下特点:
- 调用时不会立即执行,而是返回一个协程对象
- 需要被事件循环调度执行
- 可以使用
await暂停执行,等待其他协程完成
1.2 事件循环 (Event Loop) #
事件循环是 asyncio 的核心执行器,负责调度协程的执行、处理回调、执行网络 I/O 操作和运行子进程。它是异步编程的基础设施,管理所有异步任务的执行顺序和时机。
# 导入asyncio模块
import asyncio
# 定义一个简单的协程函数
async def main():
# 打印开始信息
print('Hello')
# 暂停1秒,模拟异步操作
await asyncio.sleep(1)
# 打印结束信息
print('World')
# 运行主协程(Python 3.7+ 推荐用法)
if __name__ == "__main__":
asyncio.run(main())1.3 Awaitables #
可等待对象是可以在 await 表达式中使用的对象。主要有三种类型:协程、Task 和 Future。理解这些概念对于掌握 asyncio 编程至关重要。
# 导入asyncio模块
import asyncio
# 定义一个协程函数
async def coroutine_function():
# 暂停0.1秒
await asyncio.sleep(0.1)
# 返回一个字符串
return "协程结果"
# 定义一个主协程函数来演示不同类型的awaitable对象
async def main():
# 1. 协程 - 直接await协程函数
result1 = await coroutine_function()
print(f"协程结果: {result1}")
# 2. Task - 通过create_task包装的协程
task = asyncio.create_task(coroutine_function())
result2 = await task
print(f"Task结果: {result2}")
# 3. Future - 低层级的可等待对象(通常不需要直接使用)
future = asyncio.Future()
# 设置future的结果
future.set_result("Future结果")
result3 = await future
print(f"Future结果: {result3}")
# 运行主协程
if __name__ == "__main__":
asyncio.run(main())2. 核心 API #
2.1 运行协程 #
运行协程有多种方式,Python 3.7+ 推荐使用 asyncio.run(),这是最简单和现代的方式。对于旧版本Python,需要使用事件循环的显式管理。
# 导入asyncio模块
import asyncio
# 定义一个示例协程
async def example_coroutine():
# 模拟一些异步工作
await asyncio.sleep(0.1)
# 返回一个值
return "协程执行完成"
# 定义主协程函数
async def main():
# 调用示例协程
result = await example_coroutine()
# 打印结果
print(result)
# Python 3.7+ 推荐方式
if __name__ == "__main__":
# 使用asyncio.run()运行主协程
asyncio.run(main())
# 旧版本方式(如果需要兼容旧版本)
# loop = asyncio.get_event_loop()
# try:
# loop.run_until_complete(main())
# finally:
# loop.close()2.2 创建任务 #
任务(Task)是协程的包装器,它调度协程的执行。创建任务后,协程会在事件循环中并发执行,而不需要立即等待其完成。
# 导入asyncio模块
import asyncio
# 定义一个任务协程
async def my_task():
# 模拟耗时操作
await asyncio.sleep(1)
# 打印任务完成信息
print("Task completed")
# 定义主协程函数
async def main():
# 创建任务,但不立即等待
task = asyncio.create_task(my_task())
# 打印任务创建信息
print("Task created")
# 等待任务完成
await task
# 打印主协程完成信息
print("Main coroutine completed")
# 运行主协程
if __name__ == "__main__":
asyncio.run(main())2.3 并发运行多个协程 #
asyncio 提供了多种方式来并发运行多个协程。asyncio.gather() 是最常用的方式,它会等待所有协程完成并返回结果列表。asyncio.wait() 提供了更细粒度的控制。
# 导入asyncio模块
import asyncio
# 定义一个模拟数据获取的协程
async def fetch_data(delay, id):
# 模拟网络延迟
await asyncio.sleep(delay)
# 返回数据
return f"Data {id}"
# 定义主协程函数
async def main():
# 方式1: 使用gather并发运行多个协程
print("使用gather方式:")
# 并发运行三个fetch_data协程
results = await asyncio.gather(
fetch_data(1, 1),
fetch_data(2, 2),
fetch_data(3, 3)
)
# 打印所有结果
print(results) # ['Data 1', 'Data 2', 'Data 3']
# 方式2: 使用wait并发运行多个协程
print("\n使用wait方式:")
# 创建任务列表
tasks = [fetch_data(i, i) for i in range(1, 4)]
# 等待所有任务完成
done, pending = await asyncio.wait(tasks)
# 遍历完成的任务并打印结果
for task in done:
print(task.result())
# 运行主协程
if __name__ == "__main__":
asyncio.run(main())2.4 超时控制 #
超时控制是异步编程中的重要概念,可以防止程序无限等待。asyncio.wait_for() 是最常用的超时控制方式,它会在指定时间内等待协程完成,超时则抛出异常。
# 导入asyncio模块
import asyncio
# 定义一个慢速操作协程
async def slow_operation():
# 模拟一个耗时很长的操作(1小时)
await asyncio.sleep(3600)
# 返回完成信息
return "Done"
# 定义主协程函数
async def main():
try:
# 设置1秒超时等待slow_operation
result = await asyncio.wait_for(slow_operation(), timeout=1.0)
# 如果成功完成,打印结果
print(result)
except asyncio.TimeoutError:
# 如果超时,打印超时信息
print("Timeout!")
# 运行主协程
if __name__ == "__main__":
asyncio.run(main())3. 高级特性 #
3.1 共享数据 #
异步编程中经常需要在多个协程之间共享数据。asyncio.Queue 是最常用的共享数据结构,它提供了线程安全的队列操作,支持生产者-消费者模式。
# 导入asyncio模块
import asyncio
# 定义工作协程,处理队列中的项目
async def worker(queue):
# 无限循环处理队列项目
while True:
# 从队列中获取项目
item = await queue.get()
# 打印处理信息
print(f"Processing {item}")
# 标记任务完成
queue.task_done()
# 定义主协程函数
async def main():
# 创建一个异步队列
queue = asyncio.Queue()
# 启动worker协程
worker_task = asyncio.create_task(worker(queue))
# 向队列中添加10个项目
for i in range(10):
await queue.put(i)
# 等待所有项目处理完成
await queue.join()
# 取消worker任务
worker_task.cancel()
# 运行主协程
if __name__ == "__main__":
asyncio.run(main())3.2 同步原语 #
异步编程中也需要同步原语来协调多个协程的执行。asyncio.Lock 是最基本的同步原语,用于保护共享资源,确保同一时间只有一个协程访问特定资源。
# 导入asyncio模块
import asyncio
# 定义工作协程,使用锁来同步访问
async def worker(lock, id):
# 使用异步上下文管理器获取锁
async with lock:
# 打印获取锁的信息
print(f"Worker {id} acquired lock")
# 模拟一些工作
await asyncio.sleep(1)
# 打印释放锁的信息
print(f"Worker {id} released lock")
# 定义主协程函数
async def main():
# 创建一个异步锁
lock = asyncio.Lock()
# 并发运行3个worker协程
await asyncio.gather(*(worker(lock, i) for i in range(3)))
# 运行主协程
if __name__ == "__main__":
asyncio.run(main())3.3 子进程 #
asyncio 支持异步执行子进程,这对于需要调用外部命令或程序的场景非常有用。asyncio.create_subprocess_shell() 可以异步执行shell命令。
# 导入asyncio模块
import asyncio
# 定义运行命令的协程
async def run_command():
# 创建子进程,执行ls -l命令
proc = await asyncio.create_subprocess_shell(
'ls -l',
# 捕获标准输出
stdout=asyncio.subprocess.PIPE,
# 捕获标准错误
stderr=asyncio.subprocess.PIPE
)
# 等待进程完成并获取输出
stdout, stderr = await proc.communicate()
# 打印退出码
print(f'Exit code: {proc.returncode}')
# 如果有标准输出,打印它
if stdout:
print(f'Stdout:\n{stdout.decode()}')
# 如果有标准错误,打印它
if stderr:
print(f'Stderr:\n{stderr.decode()}')
# 定义主协程函数
async def main():
# 运行命令
await run_command()
# 运行主协程
if __name__ == "__main__":
asyncio.run(main())3.4 流 (Streams) #
asyncio 提供了异步流操作,用于处理网络连接。asyncio.open_connection() 可以建立TCP连接,返回reader和writer对象用于读写数据。
# 导入asyncio模块
import asyncio
# 定义TCP回显客户端协程
async def tcp_echo_client(message):
# 建立到本地8888端口的连接
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
# 打印发送的消息
print(f'Send: {message!r}')
# 将消息编码并写入
writer.write(message.encode())
# 等待数据发送完成
await writer.drain()
# 读取最多100字节的响应数据
data = await reader.read(100)
# 打印接收到的数据
print(f'Received: {data.decode()!r}')
# 关闭写入端
writer.close()
# 等待写入端完全关闭
await writer.wait_closed()
# 定义主协程函数
async def main():
# 发送测试消息
await tcp_echo_client('Hello, World!')
# 运行主协程
if __name__ == "__main__":
asyncio.run(main())4. 常见模式 #
4.1 生产者-消费者模式 #
生产者-消费者模式是并发编程中的经典模式。在asyncio中,可以使用Queue来实现这个模式,生产者向队列添加数据,消费者从队列获取数据并处理。
# 导入asyncio模块
import asyncio
# 定义生产者协程
async def producer(queue):
# 生产5个项目
for i in range(5):
# 将项目放入队列
await queue.put(i)
# 暂停0.1秒
await asyncio.sleep(0.1)
# 定义消费者协程
async def consumer(queue):
# 无限循环消费队列中的项目
while True:
# 从队列中获取项目
item = await queue.get()
# 打印消费的项目
print(f"Consumed {item}")
# 标记任务完成
queue.task_done()
# 定义主协程函数
async def main():
# 创建一个队列
queue = asyncio.Queue()
# 创建2个生产者任务
producers = [asyncio.create_task(producer(queue)) for _ in range(2)]
# 创建3个消费者任务
consumers = [asyncio.create_task(consumer(queue)) for _ in range(3)]
# 等待所有生产者完成
await asyncio.gather(*producers)
# 等待队列中的所有项目被消费
await queue.join()
# 取消所有消费者任务
for c in consumers:
c.cancel()
# 运行主协程
if __name__ == "__main__":
asyncio.run(main())4.2 限制并发数 #
在高并发场景中,需要限制同时执行的协程数量以避免资源耗尽。asyncio.Semaphore 是实现并发限制的有效工具,它维护一个计数器来控制并发数量。
# 导入asyncio模块
import asyncio
# 定义工作协程,使用信号量限制并发
async def worker(semaphore, id):
# 获取信号量
async with semaphore:
# 打印工作开始信息
print(f"Worker {id} started")
# 模拟工作过程
await asyncio.sleep(1)
# 打印工作完成信息
print(f"Worker {id} finished")
# 定义主协程函数
async def main():
# 创建信号量,限制最多3个并发
semaphore = asyncio.Semaphore(3)
# 并发运行10个worker,但最多同时运行3个
await asyncio.gather(*(worker(semaphore, i) for i in range(10)))
# 运行主协程
if __name__ == "__main__":
asyncio.run(main())5. 调试与测试 #
5.1 调试模式 #
asyncio 提供了调试模式来帮助开发者诊断异步代码中的问题。启用调试模式后,会显示更详细的错误信息和执行跟踪。
# 导入asyncio模块
import asyncio
# 定义一个示例协程
async def example_coroutine():
# 模拟一些异步工作
await asyncio.sleep(0.1)
# 返回一个值
return "调试模式测试"
# 定义主协程函数
async def main():
# 调用示例协程
result = await example_coroutine()
# 打印结果
print(result)
# 运行主协程,启用调试模式
if __name__ == "__main__":
# 使用debug=True启用调试模式
asyncio.run(main(), debug=True)5.2 测试协程 #
测试异步代码需要特殊的方法,因为协程需要在事件循环中运行。unittest框架提供了测试异步代码的支持。
# 导入必要的模块
import unittest
import asyncio
# 定义测试类
class TestAsync(unittest.TestCase):
# 设置测试环境
def setUp(self):
# 创建新的事件循环
self.loop = asyncio.new_event_loop()
# 设置为当前事件循环
asyncio.set_event_loop(self.loop)
# 清理测试环境
def tearDown(self):
# 关闭事件循环
self.loop.close()
# 测试协程的方法
def test_coroutine(self):
# 定义测试协程
async def test():
# 返回测试值
return 42
# 在事件循环中运行测试协程
result = self.loop.run_until_complete(test())
# 断言结果等于42
self.assertEqual(result, 42)
# 运行测试
if __name__ == "__main__":
unittest.main()6. 最佳实践 #
asyncio 编程有一些重要的最佳实践,遵循这些实践可以避免常见问题并提高代码质量。
- 避免阻塞调用:不要在协程中使用同步 I/O 或 CPU 密集型操作
- 使用高层 API:优先使用
asyncio.run()和asyncio.create_task() - 合理设置超时:为所有网络操作设置超时
- 资源清理:确保关闭所有打开的资源(连接、文件等)
- 错误处理:妥善处理协程中的异常
7. 常见陷阱 #
异步编程有一些常见的陷阱,了解这些陷阱可以帮助开发者避免错误。
- 忘记 await:调用协程时忘记使用
await - 混合同步和异步代码:在协程中调用阻塞函数
- 不合理的任务创建:创建大量任务而不限制并发数
- 未处理的异常:协程中的异常如果不被捕获可能会被静默忽略
总结 #
Asyncio 提供了一种高效的方式来处理 I/O 密集型任务,通过协程和事件循环实现了高并发。掌握 asyncio 需要理解其异步编程模型,并熟悉各种同步原语和并发模式。随着 Python 版本的更新,asyncio API 也在不断改进,建议始终使用最新稳定版 Python 以获得最佳体验。