| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- import json
- import asyncio
- import time
- import websockets
- from fastapi import WebSocket, WebSocketDisconnect
- from digitalHuman.utils import logger
- from digitalHuman.engine.builder import ASREngines
- from digitalHuman.protocol import *
- from digitalHuman.engine.engineBase import StreamBaseEngine
- __all__ = ["FunasrStreamingAsr"]
- @ASREngines.register("funasrStreaming")
- class FunasrStreamingAsr(StreamBaseEngine):
- async def _reset_sentence(self, funasrWebsocket: websockets.ClientConnection):
- """重置说话识别, 防止连续识别添加标点符号"""
- message = json.dumps(
- {
- "is_speaking": False,
- }
- )
- await funasrWebsocket.send(message)
- message = json.dumps(
- {
- "is_speaking": True,
- }
- )
- await funasrWebsocket.send(message)
- async def _task_send(self, adhWebsocket: WebSocket, funasrWebsocket: websockets.ClientConnection):
- """
- funasr server -> adh server -> adh web
- """
- text_send = ""
- text_send_2pass_online = ""
- text_send_2pass_offline = ""
- wake_word = "小天小天"
- is_awake = False
- inactivity_deadline = time.monotonic() + 300 # 5分钟超时
- def process_text_for_wake(text: str) -> tuple[bool, str]:
- nonlocal is_awake
- if not is_awake:
- if wake_word in text:
- is_awake = True
- return True, text.replace(wake_word, "").strip()
- return False, ""
- return True, text.replace(wake_word, "").strip()
- try:
- while True:
- # 超时检查
- if time.monotonic() > inactivity_deadline:
- await WebSocketHandler.send_message(adhWebsocket, WS_SEND_ACTION_TYPE.ENGINE_STOPPED, "inactivity_timeout")
- return
- meg = await funasrWebsocket.recv()
- meg = json.loads(meg)
- wav_name = meg.get("wav_name", "demo")
- text = meg["text"]
- timestamp = ""
- offline_msg_done = meg.get("is_final", False)
- if "timestamp" in meg:
- timestamp = meg["timestamp"]
- if "mode" not in meg:
- continue
- if meg["mode"] == "online":
- text_send += text
- elif meg["mode"] == "offline":
- text_send += text
- offline_msg_done = True
- else:
- if meg["mode"] == "2pass-online":
- text_send_2pass_online += text
- text_send = text_send_2pass_offline + text_send_2pass_online
- else:
- offline_msg_done = True
- text_send_2pass_online = ""
- text_send = text_send_2pass_offline + text
- text_send_2pass_offline += text
- if offline_msg_done:
- awakened, cleaned = process_text_for_wake(text_send)
- if awakened and cleaned:
- await WebSocketHandler.send_message(adhWebsocket, WS_SEND_ACTION_TYPE.ENGINE_FINAL_OUTPUT, cleaned)
- inactivity_deadline = time.monotonic() + 300
- text_send = ""
- text_send_2pass_online = ""
- text_send_2pass_offline = ""
- await self._reset_sentence(funasrWebsocket)
- else:
- awakened, cleaned = process_text_for_wake(text_send)
- if awakened and cleaned:
- await WebSocketHandler.send_message(adhWebsocket, WS_SEND_ACTION_TYPE.ENGINE_PARTIAL_OUTPUT, cleaned)
- inactivity_deadline = time.monotonic() + 300
- except WebSocketDisconnect:
- logger.debug("adhWebsocket closed, task_send exit")
- except websockets.ConnectionClosed:
- logger.debug("funasrWebsocket closed, task_send exit")
- except Exception as e:
- logger.error(f"FunasrStreamingAsr task_send error: {e}")
- await WebSocketHandler.send_message(adhWebsocket, WS_SEND_ACTION_TYPE.ERROR, str(e))
- async def _task_recv(self, adhWebsocket: WebSocket, funasrWebsocket: websockets.ClientConnection, mode: str):
- """
- adh web -> adh server -> funasr server
- """
- try:
- message = json.dumps(
- {
- "mode": mode,
- "chunk_size": [5, 10, 5], # chunk_size: 60 * 10 ms. 左看300ms, 右看300ms
- "chunk_interval": 10,
- "encoder_chunk_look_back": 4,
- "decoder_chunk_look_back": 0,
- "wav_name": "adh",
- "is_speaking": True,
- "hotwords": "",
- "itn": True,
- }
- )
- await funasrWebsocket.send(message)
- await WebSocketHandler.send_message(adhWebsocket, WS_SEND_ACTION_TYPE.ENGINE_STARTED)
- while True:
- action, payload = await WebSocketHandler.recv_message(adhWebsocket)
- match action:
- case WS_RECV_ACTION_TYPE.PING:
- await WebSocketHandler.send_message(adhWebsocket, WS_SEND_ACTION_TYPE.PONG.value, b"")
- case WS_RECV_ACTION_TYPE.ENGINE_START:
- raise RuntimeError("FunasrStreamingAsr has benn started")
- case WS_RECV_ACTION_TYPE.ENGINE_PARTIAL_INPUT:
- await funasrWebsocket.send(payload)
- case WS_RECV_ACTION_TYPE.ENGINE_FINAL_INPUT:
- message = json.dumps(
- {
- "is_speaking": False
- }
- )
- await funasrWebsocket.send(message)
- await funasrWebsocket.send(payload)
- case WS_RECV_ACTION_TYPE.ENGINE_STOP:
- await funasrWebsocket.close()
- await WebSocketHandler.send_message(adhWebsocket, WS_SEND_ACTION_TYPE.ENGINE_STOPPED)
- return
- case _:
- raise RuntimeError(f"FunasrStreamingAsr task_recv error: {action} not found")
- except WebSocketDisconnect:
- logger.debug("funasrWebsocket closed, task_recv exit")
- except Exception as e:
- logger.error(f"FunasrStreamingAsr task_recv error: {e}")
- await WebSocketHandler.send_message(adhWebsocket, WS_SEND_ACTION_TYPE.ERROR, str(e))
- async def run(self, websocket: WebSocket, **kwargs) -> None:
- # 参数校验
- paramters = self.checkParameter(**kwargs)
- API_URL = paramters["api_url"]
- MODE = paramters["mode"]
- await WebSocketHandler.send_message(websocket, WS_SEND_ACTION_TYPE.ENGINE_INITIALZING)
- # 连接服务器
- try:
- async with websockets.connect(API_URL, subprotocols=["binary"], ping_interval=None) as funasrWebsocket:
- # adh web -> adh server -> funasr server
- task_recv = asyncio.create_task(self._task_recv(websocket, funasrWebsocket, MODE))
- # funasr server -> adh server -> adh web
- task_send = asyncio.create_task(self._task_send(websocket, funasrWebsocket))
- await asyncio.gather(task_recv, task_send)
- except Exception as e:
- logger.error(f"FunasrStreamingAsr run error: {e}")
- # 异常会被 async with 自动处理,这里只记录错误
|