如何在FastAPI中玩转WebSocket消息处理?

扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长

发现1000+提升效率与开发的AI工具和实用程序https://tools.cmdragon.cn/


一、文本消息接收与发送

# 运行环境:Python 3.8+
# 安装依赖:pip install fastapi==0.68.0 uvicorn==0.15.0 websockets==10.3 pydantic==1.10.7
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
 await websocket.accept()
 try:
 while True:
 # 接收文本消息
 client_msg = await websocket.receive_text()
 # 处理消息(示例:添加时间戳)
 server_response = f"[{datetime.now()}] Server received: {client_msg}"
 # 发送文本响应
 await websocket.send_text(server_response)
 except WebSocketDisconnect:
 print("Client disconnected")

消息处理流程:

  1. 客户端通过ws://协议建立WebSocket连接
  2. 服务端使用await websocket.accept()接受连接
  3. 进入接收循环处理receive_text()send_text()
  4. 异常处理自动断开连接
sequenceDiagram participant C as Client participant S as Server C->>S: Establish WebSocket connection (ws://) activate S S->>S: await websocket.accept() to accept connection activate C loop Receiving and Sending Messages C->>S: receive_text() S-->>C: send_text() end alt Exception Handling S->>C: Automatically disconnect end deactivate S deactivate C

应用场景:实时聊天室、协同编辑系统、实时日志监控


二、二进制数据传输处理

@app.websocket("/ws/file-transfer")
async def websocket_file(websocket: WebSocket):
 await websocket.accept()
 try:
 while True:
 # 接收二进制数据
 binary_data = await websocket.receive_bytes()
 # 保存文件示例
 with open("received_file.bin", "wb") as f:
 f.write(binary_data)
 # 发送确认消息
 await websocket.send_bytes(b"FILE_RECEIVED")
 except WebSocketDisconnect:
 print("File transfer interrupted")

二进制处理要点:

  • 使用receive_bytes()send_bytes()方法
  • 适合传输图片、音频、视频等二进制格式
  • 建议分块传输大文件(结合消息头协议)

三、JSON消息序列化与自动解析

from pydantic import BaseModel
class MessageModel(BaseModel):
 user: str
 content: str
 timestamp: float
@app.websocket("/ws/json-demo")
async def websocket_json(websocket: WebSocket):
 await websocket.accept()
 try:
 while True:
 json_data = await websocket.receive_json()
 # 自动验证JSON结构
 message = MessageModel(**json_data)
 # 处理业务逻辑
 processed_data = message.dict()
 processed_data["status"] = "PROCESSED"
 # 返回处理结果
 await websocket.send_json(processed_data)
 except ValidationError as e:
 await websocket.send_json({"error": str(e)})

自动验证流程:

  1. 接收原始JSON数据
  2. 通过Pydantic模型进行数据清洗
  3. 自动类型转换和字段验证
  4. 结构化错误响应返回

四、消息接收循环与超时控制

from websockets.exceptions import ConnectionClosed
@app.websocket("/ws/with-timeout")
async def websocket_timeout(websocket: WebSocket):
 await websocket.accept()
 try:
 while True:
 try:
 # 设置10秒接收超时
 data = await asyncio.wait_for(websocket.receive_text(), timeout=10)
 await process_message(data)
 except asyncio.TimeoutError:
 # 发送心跳包保持连接
 await websocket.send_text("HEARTBEAT")
 except ConnectionClosed:
 print("Connection closed normally")

超时控制策略:

  • 使用asyncio.wait_for设置单次接收超时
  • 定期发送心跳包维持连接
  • 异常分类处理(正常关闭/异常断开)

课后Quiz

Q1:如何处理同时接收文本和二进制消息的场景?
A:通过receive()方法获取消息类型判断:

message = await websocket.receive()
if message["type"] == "websocket.receive.text":
 handle_text(message["text"])
elif message["type"] == "websocket.receive.bytes":
 handle_bytes(message["bytes"])

Q2:为什么推荐使用Pydantic进行JSON验证?
A:① 自动类型转换 ② 字段约束检查 ③ 防御无效数据 ④ 生成API文档


常见报错解决方案

422 Validation Error

{
 "detail": [
 {
 "loc": [
 "body",
 "timestamp"
 ],
 "msg": "field required",
 "type": "value_error.missing"
 }
 ]
}

解决方法:

  1. 检查客户端发送的JSON字段是否完整
  2. 验证时间戳是否为数字类型
  3. 添加默认值字段:timestamp: float = None

WebSocketTimeoutException

  • 成因:长时间未发送/接收消息
  • 解决:调整wait_for超时参数,添加心跳机制

余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何在FastAPI中玩转WebSocket消息处理?

往期文章归档:

免费好用的热门在线工具

作者:Amd794原文地址:https://www.cnblogs.com/Amd794/p/18971961

%s 个评论

要回复文章请先登录注册