assistant_base_runner.py 27 KB


  1. import json
  2. import logging
  3. import uuid
  4. from datetime import datetime
  5. from mimetypes import guess_extension
  6. from typing import Optional, Union, cast
  7. from core.app_runner.app_runner import AppRunner
  8. from core.application_queue_manager import ApplicationQueueManager
  9. from core.callback_handler.agent_tool_callback_handler import DifyAgentCallbackHandler
  10. from core.callback_handler.index_tool_callback_handler import DatasetIndexToolCallbackHandler
  11. from core.entities.application_entities import (
  12. AgentEntity,
  13. AgentToolEntity,
  14. ApplicationGenerateEntity,
  15. AppOrchestrationConfigEntity,
  16. InvokeFrom,
  17. ModelConfigEntity,
  18. )
  19. from core.file.message_file_parser import FileTransferMethod
  20. from core.memory.token_buffer_memory import TokenBufferMemory
  21. from core.model_manager import ModelInstance
  22. from core.model_runtime.entities.llm_entities import LLMUsage
  23. from core.model_runtime.entities.message_entities import (
  24. AssistantPromptMessage,
  25. PromptMessage,
  26. PromptMessageTool,
  27. SystemPromptMessage,
  28. ToolPromptMessage,
  29. UserPromptMessage,
  30. )
  31. from core.model_runtime.entities.model_entities import ModelFeature
  32. from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
  33. from core.model_runtime.utils.encoders import jsonable_encoder
  34. from core.tools.entities.tool_entities import (
  35. ToolInvokeMessage,
  36. ToolInvokeMessageBinary,
  37. ToolParameter,
  38. ToolRuntimeVariablePool,
  39. )
  40. from core.tools.tool.dataset_retriever_tool import DatasetRetrieverTool
  41. from core.tools.tool.tool import Tool
  42. from core.tools.tool_file_manager import ToolFileManager
  43. from core.tools.tool_manager import ToolManager
  44. from extensions.ext_database import db
  45. from models.model import Message, MessageAgentThought, MessageFile
  46. from models.tools import ToolConversationVariables
  47. logger = logging.getLogger(__name__)
  48. class BaseAssistantApplicationRunner(AppRunner):
  49. def __init__(self, tenant_id: str,
  50. application_generate_entity: ApplicationGenerateEntity,
  51. app_orchestration_config: AppOrchestrationConfigEntity,
  52. model_config: ModelConfigEntity,
  53. config: AgentEntity,
  54. queue_manager: ApplicationQueueManager,
  55. message: Message,
  56. user_id: str,
  57. memory: Optional[TokenBufferMemory] = None,
  58. prompt_messages: Optional[list[PromptMessage]] = None,
  59. variables_pool: Optional[ToolRuntimeVariablePool] = None,
  60. db_variables: Optional[ToolConversationVariables] = None,
  61. model_instance: ModelInstance = None
  62. ) -> None:
  63. """
  64. Agent runner
  65. :param tenant_id: tenant id
  66. :param app_orchestration_config: app orchestration config
  67. :param model_config: model config
  68. :param config: dataset config
  69. :param queue_manager: queue manager
  70. :param message: message
  71. :param user_id: user id
  72. :param agent_llm_callback: agent llm callback
  73. :param callback: callback
  74. :param memory: memory
  75. """
  76. self.tenant_id = tenant_id
  77. self.application_generate_entity = application_generate_entity
  78. self.app_orchestration_config = app_orchestration_config
  79. self.model_config = model_config
  80. self.config = config
  81. self.queue_manager = queue_manager
  82. self.message = message
  83. self.user_id = user_id
  84. self.memory = memory
  85. self.history_prompt_messages = self.organize_agent_history(
  86. prompt_messages=prompt_messages or []
  87. )
  88. self.variables_pool = variables_pool
  89. self.db_variables_pool = db_variables
  90. self.model_instance = model_instance
  91. # init callback
  92. self.agent_callback = DifyAgentCallbackHandler()
  93. # init dataset tools
  94. hit_callback = DatasetIndexToolCallbackHandler(
  95. queue_manager=queue_manager,
  96. app_id=self.application_generate_entity.app_id,
  97. message_id=message.id,
  98. user_id=user_id,
  99. invoke_from=self.application_generate_entity.invoke_from,
  100. )
  101. self.dataset_tools = DatasetRetrieverTool.get_dataset_tools(
  102. tenant_id=tenant_id,
  103. dataset_ids=app_orchestration_config.dataset.dataset_ids if app_orchestration_config.dataset else [],
  104. retrieve_config=app_orchestration_config.dataset.retrieve_config if app_orchestration_config.dataset else None,
  105. return_resource=app_orchestration_config.show_retrieve_source,
  106. invoke_from=application_generate_entity.invoke_from,
  107. hit_callback=hit_callback
  108. )
  109. # get how many agent thoughts have been created
  110. self.agent_thought_count = db.session.query(MessageAgentThought).filter(
  111. MessageAgentThought.message_id == self.message.id,
  112. ).count()
  113. # check if model supports stream tool call
  114. llm_model = cast(LargeLanguageModel, model_instance.model_type_instance)
  115. model_schema = llm_model.get_model_schema(model_instance.model, model_instance.credentials)
  116. if model_schema and ModelFeature.STREAM_TOOL_CALL in (model_schema.features or []):
  117. self.stream_tool_call = True
  118. else:
  119. self.stream_tool_call = False
  120. def _repack_app_orchestration_config(self, app_orchestration_config: AppOrchestrationConfigEntity) -> AppOrchestrationConfigEntity:
  121. """
  122. Repack app orchestration config
  123. """
  124. if app_orchestration_config.prompt_template.simple_prompt_template is None:
  125. app_orchestration_config.prompt_template.simple_prompt_template = ''
  126. return app_orchestration_config
  127. def _convert_tool_response_to_str(self, tool_response: list[ToolInvokeMessage]) -> str:
  128. """
  129. Handle tool response
  130. """
  131. result = ''
  132. for response in tool_response:
  133. if response.type == ToolInvokeMessage.MessageType.TEXT:
  134. result += response.message
  135. elif response.type == ToolInvokeMessage.MessageType.LINK:
  136. result += f"result link: {response.message}. please tell user to check it."
  137. elif response.type == ToolInvokeMessage.MessageType.IMAGE_LINK or \
  138. response.type == ToolInvokeMessage.MessageType.IMAGE:
  139. result += "image has been created and sent to user already, you should tell user to check it now."
  140. else:
  141. result += f"tool response: {response.message}."
  142. return result
  143. def _convert_tool_to_prompt_message_tool(self, tool: AgentToolEntity) -> tuple[PromptMessageTool, Tool]:
  144. """
  145. convert tool to prompt message tool
  146. """
  147. tool_entity = ToolManager.get_tool_runtime(
  148. provider_type=tool.provider_type, provider_name=tool.provider_id, tool_name=tool.tool_name,
  149. tenant_id=self.application_generate_entity.tenant_id,
  150. agent_callback=self.agent_callback
  151. )
  152. tool_entity.load_variables(self.variables_pool)
  153. message_tool = PromptMessageTool(
  154. name=tool.tool_name,
  155. description=tool_entity.description.llm,
  156. parameters={
  157. "type": "object",
  158. "properties": {},
  159. "required": [],
  160. }
  161. )
  162. runtime_parameters = {}
  163. parameters = tool_entity.parameters or []
  164. user_parameters = tool_entity.get_runtime_parameters() or []
  165. # override parameters
  166. for parameter in user_parameters:
  167. # check if parameter in tool parameters
  168. found = False
  169. for tool_parameter in parameters:
  170. if tool_parameter.name == parameter.name:
  171. found = True
  172. break
  173. if found:
  174. # override parameter
  175. tool_parameter.type = parameter.type
  176. tool_parameter.form = parameter.form
  177. tool_parameter.required = parameter.required
  178. tool_parameter.default = parameter.default
  179. tool_parameter.options = parameter.options
  180. tool_parameter.llm_description = parameter.llm_description
  181. else:
  182. # add new parameter
  183. parameters.append(parameter)
  184. for parameter in parameters:
  185. parameter_type = 'string'
  186. enum = []
  187. if parameter.type == ToolParameter.ToolParameterType.STRING:
  188. parameter_type = 'string'
  189. elif parameter.type == ToolParameter.ToolParameterType.BOOLEAN:
  190. parameter_type = 'boolean'
  191. elif parameter.type == ToolParameter.ToolParameterType.NUMBER:
  192. parameter_type = 'number'
  193. elif parameter.type == ToolParameter.ToolParameterType.SELECT:
  194. for option in parameter.options:
  195. enum.append(option.value)
  196. parameter_type = 'string'
  197. else:
  198. raise ValueError(f"parameter type {parameter.type} is not supported")
  199. if parameter.form == ToolParameter.ToolParameterForm.FORM:
  200. # get tool parameter from form
  201. tool_parameter_config = tool.tool_parameters.get(parameter.name)
  202. if not tool_parameter_config:
  203. # get default value
  204. tool_parameter_config = parameter.default
  205. if not tool_parameter_config and parameter.required:
  206. raise ValueError(f"tool parameter {parameter.name} not found in tool config")
  207. if parameter.type == ToolParameter.ToolParameterType.SELECT:
  208. # check if tool_parameter_config in options
  209. options = list(map(lambda x: x.value, parameter.options))
  210. if tool_parameter_config not in options:
  211. raise ValueError(f"tool parameter {parameter.name} value {tool_parameter_config} not in options {options}")
  212. # convert tool parameter config to correct type
  213. try:
  214. if parameter.type == ToolParameter.ToolParameterType.NUMBER:
  215. # check if tool parameter is integer
  216. if isinstance(tool_parameter_config, int):
  217. tool_parameter_config = tool_parameter_config
  218. elif isinstance(tool_parameter_config, float):
  219. tool_parameter_config = tool_parameter_config
  220. elif isinstance(tool_parameter_config, str):
  221. if '.' in tool_parameter_config:
  222. tool_parameter_config = float(tool_parameter_config)
  223. else:
  224. tool_parameter_config = int(tool_parameter_config)
  225. elif parameter.type == ToolParameter.ToolParameterType.BOOLEAN:
  226. tool_parameter_config = bool(tool_parameter_config)
  227. elif parameter.type not in [ToolParameter.ToolParameterType.SELECT, ToolParameter.ToolParameterType.STRING]:
  228. tool_parameter_config = str(tool_parameter_config)
  229. elif parameter.type == ToolParameter.ToolParameterType:
  230. tool_parameter_config = str(tool_parameter_config)
  231. except Exception as e:
  232. raise ValueError(f"tool parameter {parameter.name} value {tool_parameter_config} is not correct type")
  233. # save tool parameter to tool entity memory
  234. runtime_parameters[parameter.name] = tool_parameter_config
  235. elif parameter.form == ToolParameter.ToolParameterForm.LLM:
  236. message_tool.parameters['properties'][parameter.name] = {
  237. "type": parameter_type,
  238. "description": parameter.llm_description or '',
  239. }
  240. if len(enum) > 0:
  241. message_tool.parameters['properties'][parameter.name]['enum'] = enum
  242. if parameter.required:
  243. message_tool.parameters['required'].append(parameter.name)
  244. tool_entity.runtime.runtime_parameters.update(runtime_parameters)
  245. return message_tool, tool_entity
  246. def _convert_dataset_retriever_tool_to_prompt_message_tool(self, tool: DatasetRetrieverTool) -> PromptMessageTool:
  247. """
  248. convert dataset retriever tool to prompt message tool
  249. """
  250. prompt_tool = PromptMessageTool(
  251. name=tool.identity.name,
  252. description=tool.description.llm,
  253. parameters={
  254. "type": "object",
  255. "properties": {},
  256. "required": [],
  257. }
  258. )
  259. for parameter in tool.get_runtime_parameters():
  260. parameter_type = 'string'
  261. prompt_tool.parameters['properties'][parameter.name] = {
  262. "type": parameter_type,
  263. "description": parameter.llm_description or '',
  264. }
  265. if parameter.required:
  266. if parameter.name not in prompt_tool.parameters['required']:
  267. prompt_tool.parameters['required'].append(parameter.name)
  268. return prompt_tool
  269. def update_prompt_message_tool(self, tool: Tool, prompt_tool: PromptMessageTool) -> PromptMessageTool:
  270. """
  271. update prompt message tool
  272. """
  273. # try to get tool runtime parameters
  274. tool_runtime_parameters = tool.get_runtime_parameters() or []
  275. for parameter in tool_runtime_parameters:
  276. parameter_type = 'string'
  277. enum = []
  278. if parameter.type == ToolParameter.ToolParameterType.STRING:
  279. parameter_type = 'string'
  280. elif parameter.type == ToolParameter.ToolParameterType.BOOLEAN:
  281. parameter_type = 'boolean'
  282. elif parameter.type == ToolParameter.ToolParameterType.NUMBER:
  283. parameter_type = 'number'
  284. elif parameter.type == ToolParameter.ToolParameterType.SELECT:
  285. for option in parameter.options:
  286. enum.append(option.value)
  287. parameter_type = 'string'
  288. else:
  289. raise ValueError(f"parameter type {parameter.type} is not supported")
  290. if parameter.form == ToolParameter.ToolParameterForm.LLM:
  291. prompt_tool.parameters['properties'][parameter.name] = {
  292. "type": parameter_type,
  293. "description": parameter.llm_description or '',
  294. }
  295. if len(enum) > 0:
  296. prompt_tool.parameters['properties'][parameter.name]['enum'] = enum
  297. if parameter.required:
  298. if parameter.name not in prompt_tool.parameters['required']:
  299. prompt_tool.parameters['required'].append(parameter.name)
  300. return prompt_tool
  301. def extract_tool_response_binary(self, tool_response: list[ToolInvokeMessage]) -> list[ToolInvokeMessageBinary]:
  302. """
  303. Extract tool response binary
  304. """
  305. result = []
  306. for response in tool_response:
  307. if response.type == ToolInvokeMessage.MessageType.IMAGE_LINK or \
  308. response.type == ToolInvokeMessage.MessageType.IMAGE:
  309. result.append(ToolInvokeMessageBinary(
  310. mimetype=response.meta.get('mime_type', 'octet/stream'),
  311. url=response.message,
  312. save_as=response.save_as,
  313. ))
  314. elif response.type == ToolInvokeMessage.MessageType.BLOB:
  315. result.append(ToolInvokeMessageBinary(
  316. mimetype=response.meta.get('mime_type', 'octet/stream'),
  317. url=response.message,
  318. save_as=response.save_as,
  319. ))
  320. elif response.type == ToolInvokeMessage.MessageType.LINK:
  321. # check if there is a mime type in meta
  322. if response.meta and 'mime_type' in response.meta:
  323. result.append(ToolInvokeMessageBinary(
  324. mimetype=response.meta.get('mime_type', 'octet/stream') if response.meta else 'octet/stream',
  325. url=response.message,
  326. save_as=response.save_as,
  327. ))
  328. return result
  329. def create_message_files(self, messages: list[ToolInvokeMessageBinary]) -> list[tuple[MessageFile, bool]]:
  330. """
  331. Create message file
  332. :param messages: messages
  333. :return: message files, should save as variable
  334. """
  335. result = []
  336. for message in messages:
  337. file_type = 'bin'
  338. if 'image' in message.mimetype:
  339. file_type = 'image'
  340. elif 'video' in message.mimetype:
  341. file_type = 'video'
  342. elif 'audio' in message.mimetype:
  343. file_type = 'audio'
  344. elif 'text' in message.mimetype:
  345. file_type = 'text'
  346. elif 'pdf' in message.mimetype:
  347. file_type = 'pdf'
  348. elif 'zip' in message.mimetype:
  349. file_type = 'archive'
  350. # ...
  351. invoke_from = self.application_generate_entity.invoke_from
  352. message_file = MessageFile(
  353. message_id=self.message.id,
  354. type=file_type,
  355. transfer_method=FileTransferMethod.TOOL_FILE.value,
  356. belongs_to='assistant',
  357. url=message.url,
  358. upload_file_id=None,
  359. created_by_role=('account'if invoke_from in [InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER] else 'end_user'),
  360. created_by=self.user_id,
  361. )
  362. db.session.add(message_file)
  363. result.append((
  364. message_file,
  365. message.save_as
  366. ))
  367. db.session.commit()
  368. return result
  369. def create_agent_thought(self, message_id: str, message: str,
  370. tool_name: str, tool_input: str, messages_ids: list[str]
  371. ) -> MessageAgentThought:
  372. """
  373. Create agent thought
  374. """
  375. thought = MessageAgentThought(
  376. message_id=message_id,
  377. message_chain_id=None,
  378. thought='',
  379. tool=tool_name,
  380. tool_labels_str='{}',
  381. tool_input=tool_input,
  382. message=message,
  383. message_token=0,
  384. message_unit_price=0,
  385. message_price_unit=0,
  386. message_files=json.dumps(messages_ids) if messages_ids else '',
  387. answer='',
  388. observation='',
  389. answer_token=0,
  390. answer_unit_price=0,
  391. answer_price_unit=0,
  392. tokens=0,
  393. total_price=0,
  394. position=self.agent_thought_count + 1,
  395. currency='USD',
  396. latency=0,
  397. created_by_role='account',
  398. created_by=self.user_id,
  399. )
  400. db.session.add(thought)
  401. db.session.commit()
  402. self.agent_thought_count += 1
  403. return thought
  404. def save_agent_thought(self,
  405. agent_thought: MessageAgentThought,
  406. tool_name: str,
  407. tool_input: Union[str, dict],
  408. thought: str,
  409. observation: str,
  410. answer: str,
  411. messages_ids: list[str],
  412. llm_usage: LLMUsage = None) -> MessageAgentThought:
  413. """
  414. Save agent thought
  415. """
  416. if thought is not None:
  417. agent_thought.thought = thought
  418. if tool_name is not None:
  419. agent_thought.tool = tool_name
  420. if tool_input is not None:
  421. if isinstance(tool_input, dict):
  422. try:
  423. tool_input = json.dumps(tool_input, ensure_ascii=False)
  424. except Exception as e:
  425. tool_input = json.dumps(tool_input)
  426. agent_thought.tool_input = tool_input
  427. if observation is not None:
  428. agent_thought.observation = observation
  429. if answer is not None:
  430. agent_thought.answer = answer
  431. if messages_ids is not None and len(messages_ids) > 0:
  432. agent_thought.message_files = json.dumps(messages_ids)
  433. if llm_usage:
  434. agent_thought.message_token = llm_usage.prompt_tokens
  435. agent_thought.message_price_unit = llm_usage.prompt_price_unit
  436. agent_thought.message_unit_price = llm_usage.prompt_unit_price
  437. agent_thought.answer_token = llm_usage.completion_tokens
  438. agent_thought.answer_price_unit = llm_usage.completion_price_unit
  439. agent_thought.answer_unit_price = llm_usage.completion_unit_price
  440. agent_thought.tokens = llm_usage.total_tokens
  441. agent_thought.total_price = llm_usage.total_price
  442. # check if tool labels is not empty
  443. labels = agent_thought.tool_labels or {}
  444. tools = agent_thought.tool.split(';') if agent_thought.tool else []
  445. for tool in tools:
  446. if not tool:
  447. continue
  448. if tool not in labels:
  449. tool_label = ToolManager.get_tool_label(tool)
  450. if tool_label:
  451. labels[tool] = tool_label.to_dict()
  452. else:
  453. labels[tool] = {'en_US': tool, 'zh_Hans': tool}
  454. agent_thought.tool_labels_str = json.dumps(labels)
  455. db.session.commit()
  456. def transform_tool_invoke_messages(self, messages: list[ToolInvokeMessage]) -> list[ToolInvokeMessage]:
  457. """
  458. Transform tool message into agent thought
  459. """
  460. result = []
  461. for message in messages:
  462. if message.type == ToolInvokeMessage.MessageType.TEXT:
  463. result.append(message)
  464. elif message.type == ToolInvokeMessage.MessageType.LINK:
  465. result.append(message)
  466. elif message.type == ToolInvokeMessage.MessageType.IMAGE:
  467. # try to download image
  468. try:
  469. file = ToolFileManager.create_file_by_url(user_id=self.user_id, tenant_id=self.tenant_id,
  470. conversation_id=self.message.conversation_id,
  471. file_url=message.message)
  472. url = f'/files/tools/{file.id}{guess_extension(file.mimetype) or ".png"}'
  473. result.append(ToolInvokeMessage(
  474. type=ToolInvokeMessage.MessageType.IMAGE_LINK,
  475. message=url,
  476. save_as=message.save_as,
  477. meta=message.meta.copy() if message.meta is not None else {},
  478. ))
  479. except Exception as e:
  480. logger.exception(e)
  481. result.append(ToolInvokeMessage(
  482. type=ToolInvokeMessage.MessageType.TEXT,
  483. message=f"Failed to download image: {message.message}, you can try to download it yourself.",
  484. meta=message.meta.copy() if message.meta is not None else {},
  485. save_as=message.save_as,
  486. ))
  487. elif message.type == ToolInvokeMessage.MessageType.BLOB:
  488. # get mime type and save blob to storage
  489. mimetype = message.meta.get('mime_type', 'octet/stream')
  490. # if message is str, encode it to bytes
  491. if isinstance(message.message, str):
  492. message.message = message.message.encode('utf-8')
  493. file = ToolFileManager.create_file_by_raw(user_id=self.user_id, tenant_id=self.tenant_id,
  494. conversation_id=self.message.conversation_id,
  495. file_binary=message.message,
  496. mimetype=mimetype)
  497. url = f'/files/tools/{file.id}{guess_extension(file.mimetype) or ".bin"}'
  498. # check if file is image
  499. if 'image' in mimetype:
  500. result.append(ToolInvokeMessage(
  501. type=ToolInvokeMessage.MessageType.IMAGE_LINK,
  502. message=url,
  503. save_as=message.save_as,
  504. meta=message.meta.copy() if message.meta is not None else {},
  505. ))
  506. else:
  507. result.append(ToolInvokeMessage(
  508. type=ToolInvokeMessage.MessageType.LINK,
  509. message=url,
  510. save_as=message.save_as,
  511. meta=message.meta.copy() if message.meta is not None else {},
  512. ))
  513. else:
  514. result.append(message)
  515. return result
  516. def update_db_variables(self, tool_variables: ToolRuntimeVariablePool, db_variables: ToolConversationVariables):
  517. """
  518. convert tool variables to db variables
  519. """
  520. db_variables.updated_at = datetime.utcnow()
  521. db_variables.variables_str = json.dumps(jsonable_encoder(tool_variables.pool))
  522. db.session.commit()
  523. def organize_agent_history(self, prompt_messages: list[PromptMessage]) -> list[PromptMessage]:
  524. """
  525. Organize agent history
  526. """
  527. result = []
  528. # check if there is a system message in the beginning of the conversation
  529. if prompt_messages and isinstance(prompt_messages[0], SystemPromptMessage):
  530. result.append(prompt_messages[0])
  531. messages: list[Message] = db.session.query(Message).filter(
  532. Message.conversation_id == self.message.conversation_id,
  533. ).order_by(Message.created_at.asc()).all()
  534. for message in messages:
  535. result.append(UserPromptMessage(content=message.query))
  536. agent_thoughts: list[MessageAgentThought] = message.agent_thoughts
  537. for agent_thought in agent_thoughts:
  538. tools = agent_thought.tool
  539. if tools:
  540. tools = tools.split(';')
  541. tool_calls: list[AssistantPromptMessage.ToolCall] = []
  542. tool_call_response: list[ToolPromptMessage] = []
  543. tool_inputs = json.loads(agent_thought.tool_input)
  544. for tool in tools:
  545. # generate a uuid for tool call
  546. tool_call_id = str(uuid.uuid4())
  547. tool_calls.append(AssistantPromptMessage.ToolCall(
  548. id=tool_call_id,
  549. type='function',
  550. function=AssistantPromptMessage.ToolCall.ToolCallFunction(
  551. name=tool,
  552. arguments=json.dumps(tool_inputs.get(tool, {})),
  553. )
  554. ))
  555. tool_call_response.append(ToolPromptMessage(
  556. content=agent_thought.observation,
  557. name=tool,
  558. tool_call_id=tool_call_id,
  559. ))
  560. result.extend([
  561. AssistantPromptMessage(
  562. content=agent_thought.thought,
  563. tool_calls=tool_calls,
  564. ),
  565. *tool_call_response
  566. ])
  567. return result