dashscopeStreamingASR.py 9.2 KB


  1. # -*- coding: utf-8 -*-
  2. import os
  3. import json
  4. import asyncio
  5. from http import HTTPStatus
  6. from fastapi import WebSocket, WebSocketDisconnect
  7. from dashscope.audio.asr import RecognitionCallback, Recognition
  8. from digitalHuman.utils import logger
  9. from digitalHuman.engine.builder import ASREngines
  10. from digitalHuman.protocol import *
  11. from digitalHuman.engine.engineBase import StreamBaseEngine
  12. __all__ = ["DashscopeStreamingASR"]
  13. class ASRCallback(RecognitionCallback):
  14. """ASR 回调处理类"""
  15. def __init__(self, websocket: WebSocket):
  16. self.websocket = websocket
  17. self.partial_text = ""
  18. self.final_text = ""
  19. def on_open(self):
  20. logger.debug("[DashscopeStreamingASR] Connection opened")
  21. def on_close(self):
  22. logger.debug("[DashscopeStreamingASR] Connection closed")
  23. def on_event(self, result):
  24. """处理识别事件"""
  25. try:
  26. if result.status_code == HTTPStatus.OK:
  27. sentence = result.get_sentence()
  28. if sentence:
  29. text = sentence.get('text', '')
  30. # 判断是否为最终结果
  31. if sentence.get('end_time'):
  32. # 最终结果
  33. self.final_text = text
  34. logger.debug(f"[DashscopeStreamingASR] Final: {text}")
  35. else:
  36. # 部分结果
  37. self.partial_text = text
  38. logger.debug(f"[DashscopeStreamingASR] Partial: {text}")
  39. else:
  40. logger.error(f"[DashscopeStreamingASR] Error: {result.message}")
  41. except Exception as e:
  42. logger.error(f"[DashscopeStreamingASR] Callback error: {e}")
  43. def on_error(self, error):
  44. logger.error(f"[DashscopeStreamingASR] Error: {error}")
  45. async def get_partial_result(self):
  46. """获取部分识别结果"""
  47. if self.partial_text:
  48. text = self.partial_text
  49. return text
  50. return ""
  51. async def get_final_result(self):
  52. """获取最终识别结果"""
  53. if self.final_text:
  54. text = self.final_text
  55. self.final_text = ""
  56. self.partial_text = ""
  57. return text
  58. return ""
  59. @ASREngines.register("dashscopeStreamingASR")
  60. class DashscopeStreamingASR(StreamBaseEngine):
  61. def setup(self):
  62. """初始化配置"""
  63. try:
  64. import dashscope
  65. # 从配置或环境变量获取 API Key
  66. api_key = self.cfg.get('CUSTOM', {}).get('api_key') or os.getenv('DASHSCOPE_API_KEY')
  67. if api_key:
  68. dashscope.api_key = api_key
  69. logger.info("[DashscopeStreamingASR] API Key configured successfully")
  70. else:
  71. logger.warning("[DashscopeStreamingASR] No API Key found, please set DASHSCOPE_API_KEY environment variable or configure in yaml")
  72. except ImportError:
  73. logger.error("[DashscopeStreamingASR] Please install dashscope: pip install dashscope")
  74. raise
  75. except Exception as e:
  76. logger.error(f"[DashscopeStreamingASR] Setup error: {e}")
  77. raise
  78. async def _task_send(self, adhWebsocket: WebSocket, asr_callback: ASRCallback):
  79. """
  80. 发送识别结果到前端
  81. """
  82. try:
  83. last_partial = ""
  84. while True:
  85. await asyncio.sleep(0.1) # 100ms 检查一次
  86. # 检查是否有最终结果
  87. final_text = await asr_callback.get_final_result()
  88. if final_text:
  89. await WebSocketHandler.send_message(
  90. adhWebsocket,
  91. WS_SEND_ACTION_TYPE.ENGINE_FINAL_OUTPUT,
  92. final_text
  93. )
  94. last_partial = ""
  95. continue
  96. # 检查是否有部分结果
  97. partial_text = await asr_callback.get_partial_result()
  98. if partial_text and partial_text != last_partial:
  99. await WebSocketHandler.send_message(
  100. adhWebsocket,
  101. WS_SEND_ACTION_TYPE.ENGINE_PARTIAL_OUTPUT,
  102. partial_text
  103. )
  104. last_partial = partial_text
  105. except WebSocketDisconnect:
  106. logger.debug("[DashscopeStreamingASR] adhWebsocket closed, task_send exit")
  107. except Exception as e:
  108. logger.error(f"[DashscopeStreamingASR] task_send error: {e}")
  109. await WebSocketHandler.send_message(adhWebsocket, WS_SEND_ACTION_TYPE.ERROR, str(e))
  110. async def _task_recv(self, adhWebsocket: WebSocket, recognition: Recognition):
  111. """
  112. 接收前端音频数据并发送到识别服务
  113. """
  114. try:
  115. await WebSocketHandler.send_message(adhWebsocket, WS_SEND_ACTION_TYPE.ENGINE_STARTED)
  116. while True:
  117. action, payload = await WebSocketHandler.recv_message(adhWebsocket)
  118. match action:
  119. case WS_RECV_ACTION_TYPE.PING:
  120. await WebSocketHandler.send_message(adhWebsocket, WS_SEND_ACTION_TYPE.PONG, b"")
  121. case WS_RECV_ACTION_TYPE.ENGINE_START:
  122. raise RuntimeError("[DashscopeStreamingASR] Engine has been started")
  123. case WS_RECV_ACTION_TYPE.ENGINE_PARTIAL_INPUT:
  124. # 发送音频数据到识别服务
  125. await asyncio.get_event_loop().run_in_executor(
  126. None, recognition.send_audio_frame, payload
  127. )
  128. case WS_RECV_ACTION_TYPE.ENGINE_FINAL_INPUT:
  129. # 发送最后的音频数据
  130. await asyncio.get_event_loop().run_in_executor(
  131. None, recognition.send_audio_frame, payload
  132. )
  133. case WS_RECV_ACTION_TYPE.ENGINE_STOP:
  134. # 停止识别
  135. await asyncio.get_event_loop().run_in_executor(
  136. None, recognition.stop
  137. )
  138. await WebSocketHandler.send_message(adhWebsocket, WS_SEND_ACTION_TYPE.ENGINE_STOPPED)
  139. return
  140. case _:
  141. raise RuntimeError(f"[DashscopeStreamingASR] Unknown action: {action}")
  142. except WebSocketDisconnect:
  143. logger.debug("[DashscopeStreamingASR] adhWebsocket closed, task_recv exit")
  144. except Exception as e:
  145. logger.error(f"[DashscopeStreamingASR] task_recv error: {e}")
  146. await WebSocketHandler.send_message(adhWebsocket, WS_SEND_ACTION_TYPE.ERROR, str(e))
  147. async def run(self, websocket: WebSocket, **kwargs) -> None:
  148. """运行流式识别"""
  149. # 参数校验
  150. paramters = self.checkParameter(**kwargs)
  151. model = paramters.get("model", "fun-asr-realtime")
  152. sample_rate = paramters.get("sample_rate", 16000)
  153. format_type = paramters.get("format", "pcm")
  154. language_hints = paramters.get("language_hints", ["zh", "en"])
  155. await WebSocketHandler.send_message(websocket, WS_SEND_ACTION_TYPE.ENGINE_INITIALZING)
  156. try:
  157. # 创建回调对象
  158. asr_callback = ASRCallback(websocket)
  159. # 创建识别对象
  160. # 注意:language_hints 只支持 paraformer-realtime-v2 和 paraformer-v2 模型
  161. if model in ['paraformer-realtime-v2', 'paraformer-v2']:
  162. recognition = Recognition(
  163. model=model,
  164. format=format_type,
  165. sample_rate=sample_rate,
  166. language_hints=language_hints,
  167. callback=asr_callback
  168. )
  169. else:
  170. # fun-asr-realtime 等模型不支持 language_hints
  171. recognition = Recognition(
  172. model=model,
  173. format=format_type,
  174. sample_rate=sample_rate,
  175. callback=asr_callback
  176. )
  177. # 启动识别
  178. await asyncio.get_event_loop().run_in_executor(
  179. None, recognition.start
  180. )
  181. # 创建发送和接收任务
  182. task_recv = asyncio.create_task(self._task_recv(websocket, recognition))
  183. task_send = asyncio.create_task(self._task_send(websocket, asr_callback))
  184. # 等待任务完成
  185. await asyncio.gather(task_recv, task_send)
  186. except Exception as e:
  187. logger.error(f"[DashscopeStreamingASR] Run error: {e}")
  188. await WebSocketHandler.send_message(websocket, WS_SEND_ACTION_TYPE.ERROR, str(e))
  189. finally:
  190. # 清理资源
  191. try:
  192. if recognition:
  193. await asyncio.get_event_loop().run_in_executor(
  194. None, recognition.stop
  195. )
  196. except:
  197. pass