用FastAPI掌握Python异步IO:轻松实现高并发网络请求处理
2025/1/3 23:05:24
本文主要是介绍用FastAPI掌握Python异步IO:轻松实现高并发网络请求处理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
由于 Python 是一种解释型语言,在后端开发中使用,例如 Python + Django 的组合,与 Java + Spring 相比,它的响应时间会稍微长一点。然而,只要代码设计得当,这种差异并不是非常明显。即使 Django 使用多进程模式进行并发处理,其并发处理能力仍然相对较弱。Python 提供了一些解决方案来提升其并发处理能力。例如,可以通过使用异步框架 FastAPI,利用其异步特性,可以显著提高 I/O 密集型任务的并发处理能力。FastAPI 是最快捷的 Python 框架之一。
让我们先简要看看如何使用FastAPI。
安装步骤:
在终端中运行以下命令来安装 fastapi: pip install fastapi
简易的服务器端代码:
# app.py # 这是一个使用FastAPI创建的简单Web应用示例 from typing import Union from fastapi import FastAPI app = FastAPI() # 定义根路径的GET请求处理函数 @app.get("/") async def read_root(): # 返回一个包含问候信息的字典 return {"Hello": "World"}
初创:
uvicorn app:app --reload
可以看到,相比其他框架,FastAPI 的接口仅仅多了一个 async
关键字。async
关键字定义接口为异步。仅从返回结果,我们无法区分 FastAPI 和其它 Python 框架的区别。区别在于并发处理。当 FastAPI 的服务器线程处理路由请求,例如 http://127.0.0.1:8000/ 时,如果遇到网络 I/O,它们不再等待,而是继续处理其他请求。当网络 I/O 完成后,执行会继续。这种异步能力增强了 I/O 密集型任务的处理能力。
我们再来看一个例子。在业务代码中,明确发起了一次异步的网络请求。对于这个网络 I/O 操作,就像处理路由请求时一样,FastAPI 也会像处理路由请求那样异步处理它。
# app.py from fastapi import FastAPI, HTTPException import httpx app = FastAPI() # 下面是一个用来调用外部API的异步GET请求例子 @app.get("/external-api") async def call_external_api(): url = "https://leapcell.io" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code != 200: raise HTTPException(status_code=response.status_code, detail="获取数据失败") return response.json()
如果你想让数据库I/O异步,你需要数据库驱动支持或ORM提供异步操作支持。
FastAPI的异步核心实现是基于异步I/O。我们可以直接通过异步I/O运行服务器,无需借助FastAPI。
import asyncio from aiohttp import web async def index(request): await asyncio.sleep(1) # 模拟一个 I/O 操作 return web.Response(text='{"Hello": "World"}', content_type='application/json') async def init(loop): # 使用事件循环来处理 web 请求 app = web.Application(loop=loop) app.router.add_route('GET', '/', index) # 启动服务器,并且事件循环处理 web 请求 srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('服务器已启动,访问地址为 http://127.0.0.1:8000...') return srv # 获取事件循环 loop = asyncio.get_event_loop() # 开始事件循环 loop.run_until_complete(init(loop)) loop.run_forever()
当运行这个示例时,返回值是:
http://127.0.0.1:8000/这是一个本地测试网址
与示例1一样,异步I/O的基本原理是基于“协程”和“事件循环”(协程和事件循环)。
async def index(request): await asyncio.sleep(1) # 模拟 I/O 操作 return web.Response(text='{"Hello": "World"}', content_type='application/json')
index
函数通过 async def
定义,这意味着它是一个协程。在 I/O 操作前使用 await
关键字,让执行线程在进行此 I/O 操作时不要等待。普通函数的调用是通过调用栈实现的,它们只能顺序执行。然而,协程是一种特殊的函数(不是协作线程),它允许线程在遇到 await
标记时暂停执行,转而去执行其他任务。当 I/O 操作完成后,执行会继续进行。
我们来看看多个并发协程同时运行会产生什么样的效果。
import asyncio from datetime import datetime async def coroutine3(): print(f"协程3开始于{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") await asyncio.sleep(1) # 模拟I/O操作(等待1秒) print(f"协程3结束于{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") async def coroutine2(): print(f"协程2开始于{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") await asyncio.sleep(1) # 模拟I/O操作(等待1秒) print(f"协程2结束于{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") async def coroutine1(): print(f"协程1开始于{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") await asyncio.sleep(1) # 模拟I/O操作(等待1秒) print(f"协程1结束于{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") async def main(): print("主协程开始") # 创建任务来并行执行协程 task1 = asyncio.create_task(coroutine1()) task2 = asyncio.create_task(coroutine2()) task3 = asyncio.create_task(coroutine3()) # 等待所有任务结束 await task1 await task2 await task3 print("主协程结束") # 运行主协程 asyncio.run(main())
输出:
主程序开始 协程任务1于2024年12月27日 12时28分01秒661251开始运行 协程任务2于2024年12月27日 12时28分01秒661276开始运行 协程任务3于2024年12月27日 12时28分01秒665012开始运行 协程任务1于2024年12月27日 12时28分02秒665125完成运行 协程任务2于2024年12月27日 12时28分02秒665120完成运行 协程任务3于2024年12月27日 12时28分02秒665120完成运行 主程序结束
我们可以看到,线程不是依次执行这三个任务。遇到 I/O 操作时,它会切换到执行其他任务。I/O 操作完成后,它会继续执行。还可以看到,这三个协程基本上是同时开始等待 I/O 操作,所以它们的执行完成时间基本上相同。虽然这里并没有明确使用事件循环,但 asyncio.run
会隐式地使用它。
协程通过生成器来实现。生成器能够暂停和恢复函数的执行,这也是协程的特性。
def 简单的生成器(): print("First value") yield 1 print("Second value") yield 2 print("Third value") yield 3 # 简单的生成器 是一个生成器函数,生成器gen 是一个生成器 生成器gen = 简单的生成器() print(next(生成器gen)) # 输出:First value \n 1, print(next(生成器gen)) # 输出:Second value \n 2, print(next(生成器gen)) # 输出:Third value \n 3
当使用 next()
运行生成器时,遇到 yield
会暂停执行。再次调用 next()
时,会从上次暂停的 yield
处继续执行。在 Python 3.5 之前,协程也是通过 yield
来定义。从 Python 3.5 开始,便使用 async def
+ await
进行定义。
import asyncio from datetime import datetime @asyncio.coroutine def my_coroutine(): print("协程开始", datetime.now()) # 调用 asyncio.sleep(1) 异步地: yield from asyncio.sleep(1) print("协程结束", datetime.now()) # 获取事件循环对象 loop = asyncio.get_event_loop() # 运行协程 loop.run_until_complete(my_coroutine()) loop.close()
生成器的暂停和恢复功能不仅可以用于协程,还可以用于其他许多用途。例如,可以在循环中计算并存储结果。例如,实现帕斯卡三角,每一行的两端都是1,中间的每个数字都是上面两个数字的和。
def pascal_triangle(): row = [1] while True: yield row new_row = [1] # 每行的第一个和最后一个元素总是1 for i in range(1, len(row)): new_row.append(row[i - 1] + row[i]) row = new_row # 生成并打印前5行帕斯卡三角形 triangle = pascal_triangle() for _ in range(5): print(next(triangle))
结果:
帕斯卡三角形的前几行如下所示:
[1] [1, 1] [1, 2, 1] [1, 3, 3, 1] [1, 4, 6, 4, 1]
# 事件循环机制 由于协程可以暂停执行,协程什么时候继续执行呢?这就需要事件循环来通知执行线程。
# 获取事件循环对象 loop = asyncio.get_event_loop() # 运行协程直到完成 loop.run_until_complete(my_coroutine()) # 关闭事件循环 loop.close()
事件循环使用I/O多路复用技术,不断地循环监控可以继续的协程的事件。当可以执行时,线程将继续执行这些协程任务。 # 输入输出: 多路复用技术 要简单理解 I/O 多路复用的概念:对于快递站的老板来说,不用一个个催快递员任务完成了没有,快递员完成任务后会自己过来。这样我的任务处理效率就提高了,我就可以腾出手来做其他事情了。 ![](https://imgapi.imooc.com/6773b508093f223e14000931.jpg) `select`、`poll` 和 `epoll` 都可以实现 I/O 多路复用功能。相比 `select` 和 `poll`,`epoll` 性能更优。Linux 默认使用 `epoll`,而 macOS 则使用类似 `epoll` 的 `kqueue`,性能也差不多。 # 一个使用事件驱动的 Socket 服务器
导入selectors 导入socket # 创建一个selectors对象,相当于在Linux上运行时实现了epoll sel = selectors.DefaultSelector() # 请求接收事件处理函数。接受新的连接并注册读事件 def accept(sock, mask): conn, addr = sock.accept() # 接受连接 print('接受来自', addr, '的连接') conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) # 注册读事件 # 请求读取事件处理函数。读取请求数据并发送HTTP响应,然后关闭连接。 def read(conn, mask): data = conn.recv(100) # 从连接中读取数据 print('响应请求') response = "HTTP/1.1 200 OK\r\n" \ "内容长度: 18\r\n" \ "Content-Type: application/json\r\n" \ "连接: close\r\n" \ "\r\n" \ "{\"Hello": \"World\"}" conn.send(response.encode()) # 将响应编码 print('关闭连接') sel.unregister(conn) # 注销事件 conn.close() # 关闭连接 # 创建一个服务器socket sock = socket.socket() sock.bind(('localhost', 8000)) sock.listen() sock.setblocking(False) # 注册接受事件 sel.register(sock, selectors.EVENT_READ, accept) print("服务器在8000端口运行...") # 事件循环 while True: # 在没有请求时会阻塞 events = sel.select() # 选择准备好的事件 print("事件长度: ", len(events)) for key, mask in events: callback = key.data # 获取事件处理函数 print("处理器名:", callback.__name__) callback(key.fileobj, mask) # 调用事件处理函数
启动监听指定端口的服务器套接字。如果运行在 Linux 系统上,`selectors` 默认使用 `epoll` 作为其实现。代码使用 `epoll` 来注册一个接收请求的事件(即 accept 事件)。当有新请求到达时,`epoll` 会触发执行事件处理函数,同时注册一个读事件(read 事件)以处理和响应请求数据。通过 Web 访问 http://127.0.0.1:8000/ 时,返回的结果与示例 1 一致,服务器运行日志:
服务器正在8000端口运行...等待事件 事件长度: 1 处理程序名称: accept 已接受来自('127.0.0.1', 60941)的连接 事件长度: 1 处理程序名称: read 响应数据包 关闭与('127.0.0.1', 60941)的连接
# 套接字服务端 直接使用 Socket 来启动一个服务器。当你通过浏览器访问 <http://127.0.0.1:8080/> 或者使用命令 `curl http://127.0.0.1:8080/` 时,会返回 `{"Hello": "World"}`
import socket from datetime import datetime # 创建一个TCP套接字 server_socket = socket.socket() # 将套接字绑定到指定的IP地址和端口号 server_socket.bind(('127.0.0.1', 8001)) # 开始监听传入的连接 server_socket.listen(5) # 循环以接受客户端连接 while True: print("%s 等待连接..." % datetime.now()) client_socket, addr = server_socket.accept() # 这会阻塞,等待客户端连接 print(f"{datetime.now()} 收到来自 {addr} 的连接") # 接收客户端数据 data = client_socket.recv(1024) print(f"接收到的数据: {data.decode()}") # 发送响应数据 response = "HTTP/1.1 200 OK\r\n" \ "Content-Type: application/json\r\n" \ "Content-Length: 18\r\n" \ "Connection: close\r\n\r\n" \ "{\"Hello\": \"World\"}" client_socket.sendall(response.encode()) # 关闭客户端的套接字 client_socket.close()
用 `curl http://127.0.0.1:8001/` 访问时,服务器日志如下:
2024-12-27 12:53:36.711732 正在等待连接... 2024-12-27 12:54:30.715928 从 ('127.0.0.1', 64361) 收到连接 收到: GET / HTTP/1.1 Host: 127.0.0.1:8001 User-Agent: curl/8.4.0 Accept: */*
这里是摘要 异步I/O是一种技术,在底层通过“协程”和“事件循环机制”实现。“协程”确保在执行过程中,当线程遇到标记的I/O操作时,不需要等待I/O完成,而是暂停并让线程执行其他任务而不被阻塞。“事件循环机制”利用I/O多路复用技术,不断地循环监控I/O事件。当某个I/O事件完成时,相应的回调被触发,使协程可以继续执行。 # [Leapcell](https://leapcell.io/): FastAPI及其他Python应用的理想平台 最后来介绍一个用于部署 Flask/FastAPI 的理想平台:[**Leapcell**](https://leapcell.io/)(一个很棒的平台)。 **Leapcell**(<https://leapcell.io/>)是一个特别为现代分布式应用设计的云计算平台。其按使用量计费的定价模式确保没有闲置资源的浪费成本,意味着用户只需按实际使用的资源付费。 ![](https://imgapi.imooc.com/6773b50a097d225914000316.jpg) WSGI/ASGI应用程序可以利用**Leapcell**(<https://leapcell.io/>)的独特优势: #1 多语言支持 * 支持用 JavaScript、Python、Go 或 Rust 开发。 # 2\. 免费上线无限项目 * 只按使用计费。没有请求则不收费。 # 3\. 超值的成本效益 * 按使用付费,无闲置服务器费用。 * 比如说,25美元左右可以支持694万次请求量,平均响应时间仅为60毫秒。 # 4\. 开发体验简化 * 直观的用户界面,让设置变得轻松。 * 完全自动化的CI/CD管道和GitOps集成。 * 实时指标和日志,提供可操作的见解。 # 5\. 轻松扩展性和高性能表现 * 自动扩展,轻松应对高并发。 * 零运维负担,让开发者专注于开发。 了解更多内容,请查看[**相关文档!**](https://docs.leapcell.io/) Leapcell的推特主页: [**https://x.com/LeapcellHQ**](https://x.com/LeapcellHQ)
这篇关于用FastAPI掌握Python异步IO:轻松实现高并发网络请求处理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-02封装学习:Python面向对象编程基础教程
- 2024-12-28Python编程基础教程
- 2024-12-27Python编程入门指南
- 2024-12-27Python编程基础
- 2024-12-27Python编程基础教程
- 2024-12-27Python编程基础指南
- 2024-12-24Python编程入门指南
- 2024-12-24Python编程基础入门
- 2024-12-24Python编程基础:变量与数据类型
- 2024-12-23使用python部署一个usdt合约,部署自己的usdt稳定币