cozeAgent.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. # -*- coding: utf-8 -*-
  2. from ..builder import AGENTS
  3. from ..agentBase import BaseAgent
  4. import re
  5. import json
  6. from digitalHuman.protocol import *
  7. from digitalHuman.utils import httpxAsyncClient, logger, resonableStreamingParser, checkResponse
  8. __all__ = ["CozeApiAgent"]
  9. @AGENTS.register("Coze")
  10. class CozeApiAgent(BaseAgent):
  11. async def createConversation(self, **kwargs) -> str:
  12. # 参数校验
  13. paramters = self.checkParameter(**kwargs)
  14. token = paramters["token"]
  15. headers = {
  16. 'Authorization': f'Bearer {token}',
  17. 'Content-Type': 'application/json'
  18. }
  19. response = await httpxAsyncClient.post('https://api.coze.cn/v1/conversation/create', headers=headers)
  20. result = checkResponse(response, "CozeApiAgent", "create conversation")
  21. return result['data']['id']
  22. async def run(
  23. self,
  24. input: TextMessage,
  25. streaming: bool,
  26. **kwargs
  27. ):
  28. try:
  29. if not streaming:
  30. raise KeyError("Dify Agent only supports streaming mode")
  31. # 参数校验
  32. paramters = self.checkParameter(**kwargs)
  33. token = paramters["token"]
  34. bot_id = paramters["bot_id"]
  35. conversation_id = paramters["conversation_id"] if "conversation_id" in paramters else ""
  36. headers = {
  37. 'Authorization': f'Bearer {token}',
  38. 'Content-Type': 'application/json'
  39. }
  40. payload = {
  41. 'bot_id': bot_id,
  42. 'user_id': 'adh',
  43. 'stream': True,
  44. 'auto_save_history': True,
  45. 'additional_messages': [{
  46. 'role': 'user',
  47. 'content': input.data,
  48. "content_type":"text"
  49. }]
  50. }
  51. api_url = f'https://api.coze.cn/v3/chat?conversation_id={conversation_id}'
  52. if not conversation_id:
  53. conversation_id = await self.createConversation(**kwargs)
  54. yield eventStreamConversationId(conversation_id)
  55. async with httpxAsyncClient.stream('POST', api_url, headers=headers, json=payload) as response:
  56. event = None
  57. async for chunk in response.aiter_lines():
  58. chunkStr = chunk.strip()
  59. if not chunkStr: continue
  60. if chunkStr.startswith('event:'):
  61. event = chunkStr.split(':', 1)[1].strip()
  62. if event == 'conversation.message.delta' and 'data:' in chunkStr:
  63. message_data = chunkStr.split('data:', 1)[1].strip()
  64. if message_data:
  65. message_json = json.loads(message_data)
  66. reasoning_content = message_json.get('reasoning_content', '')
  67. if reasoning_content:
  68. yield eventStreamThink(reasoning_content)
  69. content = message_json.get('content', '')
  70. if content:
  71. yield eventStreamText(content)
  72. yield eventStreamDone()
  73. except Exception as e:
  74. logger.error(f"[DifyApiAgent] Exception: {e}", exc_info=True)
  75. yield eventStreamError(str(e))