# -*- coding: utf-8 -*- import os import base64 import asyncio import tempfile from http import HTTPStatus from dashscope.audio.asr import Recognition from digitalHuman.utils import logger from digitalHuman.engine.builder import ASREngines from digitalHuman.protocol import AudioMessage, TextMessage, AUDIO_TYPE, DATA_TYPE from digitalHuman.engine.engineBase import BaseASREngine __all__ = ["DashscopeASR"] @ASREngines.register("dashscopeASR") class DashscopeASR(BaseASREngine): def setup(self): """初始化配置""" try: import dashscope # 从配置或环境变量获取 API Key custom_config = self.custom() api_key = custom_config.get('api_key') or os.getenv('DASHSCOPE_API_KEY') if api_key: dashscope.api_key = api_key logger.info("[DashscopeASR] API Key configured successfully") else: logger.warning("[DashscopeASR] No API Key found, please set DASHSCOPE_API_KEY environment variable or configure in yaml") except ImportError: logger.error("[DashscopeASR] Please install dashscope: pip install dashscope") raise except Exception as e: logger.error(f"[DashscopeASR] Setup error: {e}") raise async def run(self, input: AudioMessage, **kwargs) -> TextMessage: """ 执行语音识别 input: AudioMessage,包含音频数据 返回: TextMessage,包含识别文本 """ # 参数校验 paramters = self.checkParameter(**kwargs) model = paramters.get("model", "fun-asr-realtime") sample_rate = paramters.get("sample_rate", 16000) format_type = paramters.get("format", "wav") language_hints = paramters.get("language_hints", ["zh", "en"]) try: # 处理音频数据 audio_data = input.data if isinstance(audio_data, str): # 如果是base64编码的字符串,先解码 audio_data = base64.b64decode(audio_data) # 保存为临时文件 file_suffix = f'.{input.type}' if hasattr(input, 'type') else '.wav' with tempfile.NamedTemporaryFile(delete=False, suffix=file_suffix) as tmp_file: tmp_file.write(audio_data) audio_path = tmp_file.name logger.debug(f"[DashscopeASR] Using model: {model}, format: {format_type}, sample_rate: {sample_rate}") # 创建识别对象 # 注意:language_hints 只支持 paraformer-realtime-v2 模型 if model in ['paraformer-realtime-v2', 'paraformer-v2']: recognition = Recognition( model=model, format=format_type, sample_rate=sample_rate, language_hints=language_hints, callback=None ) else: # fun-asr-realtime 等模型不支持 language_hints recognition = Recognition( model=model, format=format_type, sample_rate=sample_rate, callback=None ) # 执行识别(在线程池中执行同步调用) logger.debug(f"[DashscopeASR] Starting recognition for audio file: {audio_path}") result = await asyncio.get_event_loop().run_in_executor( None, recognition.call, audio_path ) # 清理临时文件 try: os.remove(audio_path) except Exception as e: logger.warning(f"[DashscopeASR] Failed to remove temp file: {e}") # 处理结果 if result.status_code == HTTPStatus.OK: # 获取识别结果 sentence = result.get_sentence() logger.debug(f"[DashscopeASR] Sentence type: {type(sentence)}, content: {sentence}") # 从句子对象中提取文本 if isinstance(sentence, dict): # 字典类型,提取text字段 text = sentence.get('text', '') elif isinstance(sentence, list) and len(sentence) > 0: # 如果是列表,获取第一个元素 first_item = sentence[0] text = first_item.get('text', '') if isinstance(first_item, dict) else str(first_item) elif isinstance(sentence, str): text = sentence else: # 尝试获取所有可用的文本字段 text = str(sentence) if sentence else '' logger.info(f"[DashscopeASR] Recognition result: {text}") logger.debug( f"[Metric] requestId: {recognition.get_last_request_id()}, " f"first package delay ms: {recognition.get_first_package_delay()}, " f"last package delay ms: {recognition.get_last_package_delay()}" ) return TextMessage(data=text) else: error_msg = f"Recognition failed: {result.message}" logger.error(f"[DashscopeASR] {error_msg}") raise RuntimeError(error_msg) except Exception as e: logger.error(f"[DashscopeASR] Error during recognition: {e}") raise