protocol.py 8.3 KB


  1. # -*- coding: utf-8 -*-
  2. import struct
  3. import asyncio
  4. from enum import Enum
  5. from uuid import uuid4
  6. from typing import Optional, Union, List, Dict, Tuple
  7. from datetime import datetime
  8. from pydantic import BaseModel, Field
  9. from fastapi import WebSocket
  10. # ======================= 枚举类型 =======================
  11. class StrEnum(str, Enum):
  12. def __str__(self):
  13. return str(self.value)
  14. class IntEnum(int, Enum):
  15. def __str__(self):
  16. return str(self.value)
  17. class ENGINE_TYPE(StrEnum):
  18. ASR = "ASR"
  19. TTS = "TTS"
  20. LLM = "LLM"
  21. AGENT = "AGENT"
  22. class GENDER_TYPE(StrEnum):
  23. MALE = 'MALE'
  24. FEMALE = 'FEMALE'
  25. class EVENT_TYPE(StrEnum):
  26. CONVERSATION_ID = 'CONVERSATION_ID'
  27. MESSAGE_ID = 'MESSAGE_ID'
  28. TEXT = 'TEXT'
  29. THINK = 'THINK'
  30. TASK = 'TASK'
  31. DONE = 'DONE'
  32. ERROR = 'ERROR'
  33. class PARAM_TYPE(StrEnum):
  34. STRING = 'string'
  35. INT = 'int'
  36. FLOAT = 'float'
  37. BOOL = 'bool'
  38. LIST = 'list'
  39. class AUDIO_TYPE(StrEnum):
  40. MP3 = 'mp3'
  41. WAV = 'wav'
  42. class DATA_TYPE(StrEnum):
  43. TEXT = 'text'
  44. AUDIO_URL = 'audio_url'
  45. AUDIO_STREAM = 'audio_stream'
  46. class ROLE_TYPE(StrEnum):
  47. SYSTEM = 'system'
  48. USER = 'user'
  49. ASSISTANT = 'assistant'
  50. TOOL = 'tool'
  51. class INFER_TYPE(StrEnum):
  52. NORMAL = 'normal'
  53. STREAM = 'stream'
  54. class RESPONSE_CODE(IntEnum):
  55. OK = 0
  56. ERROR = -1
  57. # ========================== Message =============================
  58. class BaseMessage(BaseModel):
  59. """
  60. Base Protocol
  61. """
  62. # id: str = Field(default_factory=lambda: str(uuid4()))
  63. def __str__(self) -> str:
  64. return f'Message({self.model_dump()})'
  65. class AudioMessage(BaseMessage):
  66. data: Optional[Union[str, bytes]] = None
  67. dataType: DATA_TYPE = DATA_TYPE.AUDIO_STREAM # 数据类型:音频流、音频URL等
  68. type: AUDIO_TYPE = AUDIO_TYPE.WAV # 音频格式:WAV、MP3等
  69. sampleRate: int = 16000
  70. sampleWidth: int = 2
  71. class TextMessage(BaseMessage):
  72. data: Optional[str] = None
  73. dataType: DATA_TYPE = DATA_TYPE.TEXT # 数据类型
  74. class RoleMessage(BaseMessage):
  75. role: ROLE_TYPE
  76. content: str
  77. # ========================== server =============================
  78. class BaseResponse(BaseModel):
  79. code: RESPONSE_CODE
  80. message: str
  81. # ========================== voice =============================
  82. class VoiceDesc(BaseModel):
  83. name: str
  84. gender: GENDER_TYPE
  85. # ========================== param =============================
  86. class ParamDesc(BaseModel):
  87. name: str
  88. description: str
  89. type: PARAM_TYPE
  90. required: bool
  91. range: List[Union[str, int, float]] = []
  92. choices: List[Union[str, int, float]] = []
  93. default: Union[str, int, float, bool, List]
  94. # ========================== engine =============================
  95. class EngineDesc(BaseModel):
  96. name: str
  97. type: ENGINE_TYPE
  98. infer_type: INFER_TYPE
  99. desc: str = ""
  100. meta: Dict = {}
  101. class EngineConfig(BaseModel):
  102. name: str
  103. type: ENGINE_TYPE
  104. config: Dict
  105. # ========================== user =============================
  106. class UserDesc(BaseModel):
  107. user_id: str
  108. request_id: str
  109. cookie: str
  110. # ========================== func =============================
  111. def eventStreamResponse(event: EVENT_TYPE, data: str) -> str:
  112. message = "event: " + str(event) + "\ndata: " + data.replace("\n", "\\n") + "\n\n"
  113. return message
  114. def eventStreamText(data: str) -> str:
  115. return eventStreamResponse(EVENT_TYPE.TEXT, data)
  116. def eventStreamTask(task_id: str) -> str:
  117. return eventStreamResponse(EVENT_TYPE.TASK, task_id)
  118. def eventStreamThink(data: str) -> str:
  119. return eventStreamResponse(EVENT_TYPE.THINK, data)
  120. def eventStreamConversationId(conversation_id: str) -> str:
  121. return eventStreamResponse(EVENT_TYPE.CONVERSATION_ID, conversation_id)
  122. def eventStreamMessageId(message_id: str) -> str:
  123. return eventStreamResponse(EVENT_TYPE.MESSAGE_ID, message_id)
  124. def eventStreamDone() -> str:
  125. return f"event: {EVENT_TYPE.DONE}\ndata: Done\n\n"
  126. def eventStreamError(error: str):
  127. return eventStreamResponse(EVENT_TYPE.ERROR, error)
  128. def isEventStreamResponse(message: str) -> bool:
  129. return message.startswith("event:")
  130. # ========================== websocket =============================
  131. # 协议常量定义
  132. ACTION_HEADER_SIZE = 18 # action字段大小(18字节)
  133. # 协议格式: [Action(18字节)] + [Payload Size(4字节)] + [Payload(可变长度)]
  134. PROTOCOL_HEADER_FORMAT = ">18sI" # 大端序: 18字节action + 4字节无符号整数payload_size
  135. PROTOCOL_HEADER_SIZE = struct.calcsize(PROTOCOL_HEADER_FORMAT) # 22字节
  136. class WS_RECV_ACTION_TYPE(StrEnum):
  137. """客户端请求类型"""
  138. PING = "PING" # 心跳包
  139. ENGINE_START = "ENGINE_START" # 启动引擎
  140. ENGINE_PARTIAL_INPUT = "PARTIAL_INPUT" # 引擎输入
  141. ENGINE_FINAL_INPUT = "FINAL_INPUT" # 引擎输入
  142. ENGINE_STOP = "ENGINE_STOP" # 停止引擎
  143. class WS_SEND_ACTION_TYPE(StrEnum):
  144. """服务端响应类型"""
  145. PONG = "PONG" # 心跳响应
  146. ENGINE_INITIALZING = "ENGINE_INITIALZING" # 引擎初始化
  147. ENGINE_STARTED = "ENGINE_STARTED" # 引擎准备就绪
  148. ENGINE_PARTIAL_OUTPUT = "PARTIAL_OUTPUT" # 引擎输出
  149. ENGINE_FINAL_OUTPUT = "FINAL_OUTPUT" # 引擎输出
  150. ENGINE_STOPPED = "ENGINE_STOPPED" # 关闭引擎
  151. ERROR = "ERROR" # 错误响应
  152. def _format_action(action_name: str) -> bytes:
  153. """格式化action名称为18字节,右侧用空格填充"""
  154. if len(action_name) > ACTION_HEADER_SIZE:
  155. raise ValueError(
  156. f"Action name '{action_name}' exceeds {ACTION_HEADER_SIZE} bytes"
  157. )
  158. return action_name.ljust(ACTION_HEADER_SIZE).encode("utf-8")
  159. def struct_message(action: str, message: str | bytes) -> bytes:
  160. """构造发送消息"""
  161. if isinstance(message, str):
  162. message = message.encode("utf-8")
  163. action_bytes = _format_action(action)
  164. payload_size = len(message)
  165. # 打包协议头部: action(18字节) + payload_size(4字节)
  166. header = struct.pack(PROTOCOL_HEADER_FORMAT, action_bytes, payload_size)
  167. return header + message
  168. def parse_message(message: bytes) -> Tuple[str, bytes]:
  169. """解析接收到的消息"""
  170. if len(message) < PROTOCOL_HEADER_SIZE:
  171. raise ValueError(
  172. f"Message too short: {len(message)} bytes, expected at least {PROTOCOL_HEADER_SIZE}"
  173. )
  174. # 解析协议头部: action(18字节) + payload_size(4字节)
  175. action, payload_size = struct.unpack(
  176. PROTOCOL_HEADER_FORMAT, message[:PROTOCOL_HEADER_SIZE]
  177. )
  178. expected_total_size = PROTOCOL_HEADER_SIZE + payload_size
  179. if len(message) != expected_total_size:
  180. raise ValueError(
  181. f"Message size mismatch: got {len(message)} bytes, expected {expected_total_size}"
  182. )
  183. # 提取payload
  184. payload = message[PROTOCOL_HEADER_SIZE : PROTOCOL_HEADER_SIZE + payload_size] if payload_size > 0 else b""
  185. return (action.decode("utf-8").strip(), payload)
  186. class WebSocketHandler():
  187. """
  188. websocket处理类(协议控制)
  189. """
  190. @staticmethod
  191. async def connect(ws: WebSocket) -> None:
  192. """连接WebSocket"""
  193. await ws.accept()
  194. # logger.debug(f"WebSocket connected: {ws.client.host}")
  195. @staticmethod
  196. async def disconnect(ws: WebSocket):
  197. """断开WebSocket连接"""
  198. try:
  199. await ws.close()
  200. except (RuntimeError, AttributeError, Exception):
  201. # 忽略关闭时的错误,避免在事件循环关闭后尝试关闭连接
  202. # 这是 Windows 上 ProactorEventLoop 的已知问题
  203. # 当事件循环关闭后,WebSocket 连接的析构函数会尝试关闭连接,但此时事件循环已经关闭
  204. pass
  205. # logger.debug(f"WebSocket disconnected: {ws.client.host}")
  206. @staticmethod
  207. async def send_message(ws: WebSocket, action: str, message: str | bytes = b'') -> None:
  208. """发送WebSocket消息"""
  209. data = struct_message(action, message)
  210. await ws.send_bytes(data)
  211. # logger.debug(f"Sent action: {action}, payload size: {len(data) - PROTOCOL_HEADER_SIZE} bytes")
  212. @staticmethod
  213. async def recv_message(ws: WebSocket) -> Tuple[str, bytes]:
  214. """接收WebSocket消息"""
  215. message = await ws.receive_bytes()
  216. action, payload = parse_message(message)
  217. # logger.debug(f"Received action: {action.decode('utf-8').strip()}, payload size: {len(payload)} bytes")
  218. return action, payload