| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- # -*- coding: utf-8 -*-
- from ..builder import AGENTS
- from ..agentBase import BaseAgent
- import re
- import json
- from digitalHuman.protocol import *
- from digitalHuman.utils import httpxAsyncClient, logger, resonableStreamingParser
- __all__ = ["DifyApiAgent"]
- @AGENTS.register("Dify")
- class DifyApiAgent(BaseAgent):
- async def createConversation(self, **kwargs) -> str:
- # 参数校验
- paramters = self.checkParameter(**kwargs)
- api_server = paramters["api_server"]
- api_key = paramters["api_key"]
- username = paramters["username"]
- headers = {
- 'Content-Type': 'application/json',
- 'Authorization': f'Bearer {api_key}'
- }
- payload = {
- "inputs": {},
- "query": "hello",
- "response_mode": "blocking",
- "user": username,
- "conversation_id": "",
- "files":[]
- }
- response = await httpxAsyncClient.post(api_server + "/chat-messages", headers=headers, json=payload)
- if response.status_code != 200:
- raise RuntimeError(f"DifyAPI agent api error: {response.status_code}")
- data = json.loads(response.text)
- if 'conversation_id' not in data:
- logger.error(f"[AGENT] Engine create conversation failed: {data}")
- return ""
- return data['conversation_id']
- async def run(
- self,
- input: TextMessage,
- streaming: bool,
- **kwargs
- ):
- try:
- if not streaming:
- raise KeyError("Dify Agent only supports streaming mode")
- # 参数校验
- paramters = self.checkParameter(**kwargs)
- api_server = paramters["api_server"]
- api_key = paramters["api_key"]
- username = paramters["username"]
-
- conversation_id = paramters["conversation_id"] if "conversation_id" in paramters else ""
- headers = {
- 'Content-Type': 'application/json',
- 'Authorization': f'Bearer {api_key}'
- }
- responseMode = "streaming" if streaming else "blocking"
- payload = {
- "inputs": {},
- "query": input.data,
- "response_mode": responseMode,
- "user": username,
- "conversation_id": conversation_id,
- "files":[]
- }
- pattern = re.compile(r'data:\s*({.*})')
- async with httpxAsyncClient.stream('POST', api_server + "/chat-messages", headers=headers, json=payload) as response:
- coversaiotnIdRequire = False if conversation_id else True
- async def generator(coversaiotnIdRequire):
- message_id = ""
- async for chunk in response.aiter_lines():
- chunkStr = chunk.strip()
- if not chunkStr: continue
- chunkData = pattern.search(chunkStr)
- # 返回不完整,该模板匹配会失效
- if not chunkStr.endswith('}') or not chunkData:
- logger.warning(f"[AGENT] Engine return truncated data: {chunkStr}")
- continue
- chunkData = chunkData.group(1)
- # 处理流式返回字符串
- data = json.loads(chunkData)
- # 首次返回conversation_id
- if coversaiotnIdRequire and 'conversation_id' in data:
- yield (EVENT_TYPE.CONVERSATION_ID, data['conversation_id'])
- coversaiotnIdRequire = False
- if not message_id and 'message_id' in data:
- message_id = data['message_id']
- if "message" in data["event"] and 'answer' in data:
- logger.debug(f"[AGENT] Engine response: {data}")
- yield (EVENT_TYPE.TEXT, data['answer'])
- yield (EVENT_TYPE.MESSAGE_ID, message_id)
- async for parseResult in resonableStreamingParser(generator(coversaiotnIdRequire)):
- yield parseResult
- yield eventStreamDone()
- except Exception as e:
- logger.error(f"[DifyApiAgent] Exception: {e}", exc_info=True)
- yield eventStreamError(str(e))
|