openaiAgent.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. # -*- coding: utf-8 -*-
  2. from ..builder import AGENTS
  3. from ..agentBase import BaseAgent
  4. from digitalHuman.protocol import *
  5. from digitalHuman.utils import logger, resonableStreamingParser
  6. from digitalHuman.core import OpenaiLLM
  7. __all__ = ["OpenaiApiAgent"]
  8. @AGENTS.register("OpenAI")
  9. class OpenaiApiAgent(BaseAgent):
  10. async def run(
  11. self,
  12. user: UserDesc,
  13. input: TextMessage,
  14. streaming: bool = True,
  15. conversation_id: str = "",
  16. **kwargs
  17. ):
  18. try:
  19. if not isinstance(input, TextMessage):
  20. raise RuntimeError("OpenAI Agent only support TextMessage")
  21. # 参数校验
  22. paramters = self.checkParameter(**kwargs)
  23. API_URL = paramters["base_url"]
  24. API_KEY = paramters["api_key"]
  25. API_MODEL = paramters["model"]
  26. coversaiotnIdRequire = False if conversation_id else True
  27. if coversaiotnIdRequire:
  28. conversation_id = await self.createConversation()
  29. yield eventStreamConversationId(conversation_id)
  30. async def generator(user_id: str, conversation_id: str, query: str):
  31. thinkResponses = ""
  32. responses = ""
  33. currentMessage = [RoleMessage(role=ROLE_TYPE.USER, content=query)]
  34. messages = currentMessage
  35. async for chunk in OpenaiLLM.chat(
  36. base_url=API_URL,
  37. api_key=API_KEY,
  38. model=API_MODEL,
  39. messages=messages
  40. ):
  41. if not chunk: continue
  42. if len(chunk.choices) == 0: continue
  43. delta = chunk.choices[0].delta.model_dump()
  44. if 'reasoning_content' in delta and delta['reasoning_content']:
  45. reasoning_content = delta['reasoning_content']
  46. thinkResponses += reasoning_content
  47. yield (EVENT_TYPE.THINK, reasoning_content)
  48. elif 'content' in delta and delta['content']:
  49. content = delta['content']
  50. responses += content
  51. yield (EVENT_TYPE.TEXT, content)
  52. currentMessage.append(RoleMessage(role=ROLE_TYPE.ASSISTANT, content=responses))
  53. async for parseResult in resonableStreamingParser(generator(user.user_id, conversation_id, input.data)):
  54. yield parseResult
  55. yield eventStreamDone()
  56. except Exception as e:
  57. logger.error(f"[OpenaiApiAgent] Exception: {e}", exc_info=True)
  58. yield eventStreamError(str(e))