difyAgent.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # -*- coding: utf-8 -*-
  2. from ..builder import AGENTS
  3. from ..agentBase import BaseAgent
  4. import re
  5. import json
  6. from digitalHuman.protocol import *
  7. from digitalHuman.utils import httpxAsyncClient, logger, resonableStreamingParser
  8. __all__ = ["DifyApiAgent"]
  9. @AGENTS.register("Dify")
  10. class DifyApiAgent(BaseAgent):
  11. async def createConversation(self, **kwargs) -> str:
  12. # 参数校验
  13. paramters = self.checkParameter(**kwargs)
  14. api_server = paramters["api_server"]
  15. api_key = paramters["api_key"]
  16. username = paramters["username"]
  17. headers = {
  18. 'Content-Type': 'application/json',
  19. 'Authorization': f'Bearer {api_key}'
  20. }
  21. payload = {
  22. "inputs": {},
  23. "query": "hello",
  24. "response_mode": "blocking",
  25. "user": username,
  26. "conversation_id": "",
  27. "files":[]
  28. }
  29. response = await httpxAsyncClient.post(api_server + "/chat-messages", headers=headers, json=payload)
  30. if response.status_code != 200:
  31. raise RuntimeError(f"DifyAPI agent api error: {response.status_code}")
  32. data = json.loads(response.text)
  33. if 'conversation_id' not in data:
  34. logger.error(f"[AGENT] Engine create conversation failed: {data}")
  35. return ""
  36. return data['conversation_id']
  37. async def run(
  38. self,
  39. input: TextMessage,
  40. streaming: bool,
  41. **kwargs
  42. ):
  43. try:
  44. if not streaming:
  45. raise KeyError("Dify Agent only supports streaming mode")
  46. # 参数校验
  47. paramters = self.checkParameter(**kwargs)
  48. api_server = paramters["api_server"]
  49. api_key = paramters["api_key"]
  50. username = paramters["username"]
  51. conversation_id = paramters["conversation_id"] if "conversation_id" in paramters else ""
  52. headers = {
  53. 'Content-Type': 'application/json',
  54. 'Authorization': f'Bearer {api_key}'
  55. }
  56. responseMode = "streaming" if streaming else "blocking"
  57. payload = {
  58. "inputs": {},
  59. "query": input.data,
  60. "response_mode": responseMode,
  61. "user": username,
  62. "conversation_id": conversation_id,
  63. "files":[]
  64. }
  65. pattern = re.compile(r'data:\s*({.*})')
  66. async with httpxAsyncClient.stream('POST', api_server + "/chat-messages", headers=headers, json=payload) as response:
  67. coversaiotnIdRequire = False if conversation_id else True
  68. async def generator(coversaiotnIdRequire):
  69. message_id = ""
  70. async for chunk in response.aiter_lines():
  71. chunkStr = chunk.strip()
  72. if not chunkStr: continue
  73. chunkData = pattern.search(chunkStr)
  74. # 返回不完整,该模板匹配会失效
  75. if not chunkStr.endswith('}') or not chunkData:
  76. logger.warning(f"[AGENT] Engine return truncated data: {chunkStr}")
  77. continue
  78. chunkData = chunkData.group(1)
  79. # 处理流式返回字符串
  80. data = json.loads(chunkData)
  81. # 首次返回conversation_id
  82. if coversaiotnIdRequire and 'conversation_id' in data:
  83. yield (EVENT_TYPE.CONVERSATION_ID, data['conversation_id'])
  84. coversaiotnIdRequire = False
  85. if not message_id and 'message_id' in data:
  86. message_id = data['message_id']
  87. if "message" in data["event"] and 'answer' in data:
  88. logger.debug(f"[AGENT] Engine response: {data}")
  89. yield (EVENT_TYPE.TEXT, data['answer'])
  90. yield (EVENT_TYPE.MESSAGE_ID, message_id)
  91. async for parseResult in resonableStreamingParser(generator(coversaiotnIdRequire)):
  92. yield parseResult
  93. yield eventStreamDone()
  94. except Exception as e:
  95. logger.error(f"[DifyApiAgent] Exception: {e}", exc_info=True)
  96. yield eventStreamError(str(e))