assistant_base_runner.py 24 KB


  1. import logging
  2. import json
  3. from typing import Optional, List, Tuple, Union
  4. from datetime import datetime
  5. from mimetypes import guess_extension
  6. from core.app_runner.app_runner import AppRunner
  7. from extensions.ext_database import db
  8. from models.model import MessageAgentThought, Message, MessageFile
  9. from models.tools import ToolConversationVariables
  10. from core.tools.entities.tool_entities import ToolInvokeMessage, ToolInvokeMessageBinary, \
  11. ToolRuntimeVariablePool, ToolParamter
  12. from core.tools.tool.tool import Tool
  13. from core.tools.tool_manager import ToolManager
  14. from core.tools.tool_file_manager import ToolFileManager
  15. from core.tools.tool.dataset_retriever_tool import DatasetRetrieverTool
  16. from core.app_runner.app_runner import AppRunner
  17. from core.callback_handler.agent_tool_callback_handler import DifyAgentCallbackHandler
  18. from core.callback_handler.index_tool_callback_handler import DatasetIndexToolCallbackHandler
  19. from core.entities.application_entities import ModelConfigEntity, AgentEntity, AgentToolEntity
  20. from core.application_queue_manager import ApplicationQueueManager
  21. from core.memory.token_buffer_memory import TokenBufferMemory
  22. from core.entities.application_entities import ModelConfigEntity, \
  23. AgentEntity, AppOrchestrationConfigEntity, ApplicationGenerateEntity, InvokeFrom
  24. from core.model_runtime.entities.message_entities import PromptMessage, PromptMessageTool
  25. from core.model_runtime.entities.llm_entities import LLMUsage
  26. from core.model_runtime.utils.encoders import jsonable_encoder
  27. from core.file.message_file_parser import FileTransferMethod
  28. logger = logging.getLogger(__name__)
  29. class BaseAssistantApplicationRunner(AppRunner):
  30. def __init__(self, tenant_id: str,
  31. application_generate_entity: ApplicationGenerateEntity,
  32. app_orchestration_config: AppOrchestrationConfigEntity,
  33. model_config: ModelConfigEntity,
  34. config: AgentEntity,
  35. queue_manager: ApplicationQueueManager,
  36. message: Message,
  37. user_id: str,
  38. memory: Optional[TokenBufferMemory] = None,
  39. prompt_messages: Optional[List[PromptMessage]] = None,
  40. variables_pool: Optional[ToolRuntimeVariablePool] = None,
  41. db_variables: Optional[ToolConversationVariables] = None,
  42. ) -> None:
  43. """
  44. Agent runner
  45. :param tenant_id: tenant id
  46. :param app_orchestration_config: app orchestration config
  47. :param model_config: model config
  48. :param config: dataset config
  49. :param queue_manager: queue manager
  50. :param message: message
  51. :param user_id: user id
  52. :param agent_llm_callback: agent llm callback
  53. :param callback: callback
  54. :param memory: memory
  55. """
  56. self.tenant_id = tenant_id
  57. self.application_generate_entity = application_generate_entity
  58. self.app_orchestration_config = app_orchestration_config
  59. self.model_config = model_config
  60. self.config = config
  61. self.queue_manager = queue_manager
  62. self.message = message
  63. self.user_id = user_id
  64. self.memory = memory
  65. self.history_prompt_messages = prompt_messages
  66. self.variables_pool = variables_pool
  67. self.db_variables_pool = db_variables
  68. # init callback
  69. self.agent_callback = DifyAgentCallbackHandler()
  70. # init dataset tools
  71. hit_callback = DatasetIndexToolCallbackHandler(
  72. queue_manager=queue_manager,
  73. app_id=self.application_generate_entity.app_id,
  74. message_id=message.id,
  75. user_id=user_id,
  76. invoke_from=self.application_generate_entity.invoke_from,
  77. )
  78. self.dataset_tools = DatasetRetrieverTool.get_dataset_tools(
  79. tenant_id=tenant_id,
  80. dataset_ids=app_orchestration_config.dataset.dataset_ids if app_orchestration_config.dataset else [],
  81. retrieve_config=app_orchestration_config.dataset.retrieve_config if app_orchestration_config.dataset else None,
  82. return_resource=app_orchestration_config.show_retrieve_source,
  83. invoke_from=application_generate_entity.invoke_from,
  84. hit_callback=hit_callback
  85. )
  86. # get how many agent thoughts have been created
  87. self.agent_thought_count = db.session.query(MessageAgentThought).filter(
  88. MessageAgentThought.message_id == self.message.id,
  89. ).count()
  90. def _repacket_app_orchestration_config(self, app_orchestration_config: AppOrchestrationConfigEntity) -> AppOrchestrationConfigEntity:
  91. """
  92. Repacket app orchestration config
  93. """
  94. if app_orchestration_config.prompt_template.simple_prompt_template is None:
  95. app_orchestration_config.prompt_template.simple_prompt_template = ''
  96. return app_orchestration_config
  97. def _convert_tool_response_to_str(self, tool_response: List[ToolInvokeMessage]) -> str:
  98. """
  99. Handle tool response
  100. """
  101. result = ''
  102. for response in tool_response:
  103. if response.type == ToolInvokeMessage.MessageType.TEXT:
  104. result += response.message
  105. elif response.type == ToolInvokeMessage.MessageType.LINK:
  106. result += f"result link: {response.message}. please dirct user to check it."
  107. elif response.type == ToolInvokeMessage.MessageType.IMAGE_LINK or \
  108. response.type == ToolInvokeMessage.MessageType.IMAGE:
  109. result += f"image has been created and sent to user already, you should tell user to check it now."
  110. else:
  111. result += f"tool response: {response.message}."
  112. return result
  113. def _convert_tool_to_prompt_message_tool(self, tool: AgentToolEntity) -> Tuple[PromptMessageTool, Tool]:
  114. """
  115. convert tool to prompt message tool
  116. """
  117. tool_entity = ToolManager.get_tool_runtime(
  118. provider_type=tool.provider_type, provider_name=tool.provider_id, tool_name=tool.tool_name,
  119. tanent_id=self.application_generate_entity.tenant_id,
  120. agent_callback=self.agent_callback
  121. )
  122. tool_entity.load_variables(self.variables_pool)
  123. message_tool = PromptMessageTool(
  124. name=tool.tool_name,
  125. description=tool_entity.description.llm,
  126. parameters={
  127. "type": "object",
  128. "properties": {},
  129. "required": [],
  130. }
  131. )
  132. runtime_parameters = {}
  133. parameters = tool_entity.parameters or []
  134. user_parameters = tool_entity.get_runtime_parameters() or []
  135. # override parameters
  136. for parameter in user_parameters:
  137. # check if parameter in tool parameters
  138. found = False
  139. for tool_parameter in parameters:
  140. if tool_parameter.name == parameter.name:
  141. found = True
  142. break
  143. if found:
  144. # override parameter
  145. tool_parameter.type = parameter.type
  146. tool_parameter.form = parameter.form
  147. tool_parameter.required = parameter.required
  148. tool_parameter.default = parameter.default
  149. tool_parameter.options = parameter.options
  150. tool_parameter.llm_description = parameter.llm_description
  151. else:
  152. # add new parameter
  153. parameters.append(parameter)
  154. for parameter in parameters:
  155. parameter_type = 'string'
  156. enum = []
  157. if parameter.type == ToolParamter.ToolParameterType.STRING:
  158. parameter_type = 'string'
  159. elif parameter.type == ToolParamter.ToolParameterType.BOOLEAN:
  160. parameter_type = 'boolean'
  161. elif parameter.type == ToolParamter.ToolParameterType.NUMBER:
  162. parameter_type = 'number'
  163. elif parameter.type == ToolParamter.ToolParameterType.SELECT:
  164. for option in parameter.options:
  165. enum.append(option.value)
  166. parameter_type = 'string'
  167. else:
  168. raise ValueError(f"parameter type {parameter.type} is not supported")
  169. if parameter.form == ToolParamter.ToolParameterForm.FORM:
  170. # get tool parameter from form
  171. tool_parameter_config = tool.tool_parameters.get(parameter.name)
  172. if not tool_parameter_config:
  173. # get default value
  174. tool_parameter_config = parameter.default
  175. if not tool_parameter_config and parameter.required:
  176. raise ValueError(f"tool parameter {parameter.name} not found in tool config")
  177. if parameter.type == ToolParamter.ToolParameterType.SELECT:
  178. # check if tool_parameter_config in options
  179. options = list(map(lambda x: x.value, parameter.options))
  180. if tool_parameter_config not in options:
  181. raise ValueError(f"tool parameter {parameter.name} value {tool_parameter_config} not in options {options}")
  182. # convert tool parameter config to correct type
  183. try:
  184. if parameter.type == ToolParamter.ToolParameterType.NUMBER:
  185. # check if tool parameter is integer
  186. if isinstance(tool_parameter_config, int):
  187. tool_parameter_config = tool_parameter_config
  188. elif isinstance(tool_parameter_config, float):
  189. tool_parameter_config = tool_parameter_config
  190. elif isinstance(tool_parameter_config, str):
  191. if '.' in tool_parameter_config:
  192. tool_parameter_config = float(tool_parameter_config)
  193. else:
  194. tool_parameter_config = int(tool_parameter_config)
  195. elif parameter.type == ToolParamter.ToolParameterType.BOOLEAN:
  196. tool_parameter_config = bool(tool_parameter_config)
  197. elif parameter.type not in [ToolParamter.ToolParameterType.SELECT, ToolParamter.ToolParameterType.STRING]:
  198. tool_parameter_config = str(tool_parameter_config)
  199. elif parameter.type == ToolParamter.ToolParameterType:
  200. tool_parameter_config = str(tool_parameter_config)
  201. except Exception as e:
  202. raise ValueError(f"tool parameter {parameter.name} value {tool_parameter_config} is not correct type")
  203. # save tool parameter to tool entity memory
  204. runtime_parameters[parameter.name] = tool_parameter_config
  205. elif parameter.form == ToolParamter.ToolParameterForm.LLM:
  206. message_tool.parameters['properties'][parameter.name] = {
  207. "type": parameter_type,
  208. "description": parameter.llm_description or '',
  209. }
  210. if len(enum) > 0:
  211. message_tool.parameters['properties'][parameter.name]['enum'] = enum
  212. if parameter.required:
  213. message_tool.parameters['required'].append(parameter.name)
  214. tool_entity.runtime.runtime_parameters.update(runtime_parameters)
  215. return message_tool, tool_entity
  216. def _convert_dataset_retriever_tool_to_prompt_message_tool(self, tool: DatasetRetrieverTool) -> PromptMessageTool:
  217. """
  218. convert dataset retriever tool to prompt message tool
  219. """
  220. prompt_tool = PromptMessageTool(
  221. name=tool.identity.name,
  222. description=tool.description.llm,
  223. parameters={
  224. "type": "object",
  225. "properties": {},
  226. "required": [],
  227. }
  228. )
  229. for parameter in tool.get_runtime_parameters():
  230. parameter_type = 'string'
  231. prompt_tool.parameters['properties'][parameter.name] = {
  232. "type": parameter_type,
  233. "description": parameter.llm_description or '',
  234. }
  235. if parameter.required:
  236. if parameter.name not in prompt_tool.parameters['required']:
  237. prompt_tool.parameters['required'].append(parameter.name)
  238. return prompt_tool
  239. def update_prompt_message_tool(self, tool: Tool, prompt_tool: PromptMessageTool) -> PromptMessageTool:
  240. """
  241. update prompt message tool
  242. """
  243. # try to get tool runtime parameters
  244. tool_runtime_parameters = tool.get_runtime_parameters() or []
  245. for parameter in tool_runtime_parameters:
  246. parameter_type = 'string'
  247. enum = []
  248. if parameter.type == ToolParamter.ToolParameterType.STRING:
  249. parameter_type = 'string'
  250. elif parameter.type == ToolParamter.ToolParameterType.BOOLEAN:
  251. parameter_type = 'boolean'
  252. elif parameter.type == ToolParamter.ToolParameterType.NUMBER:
  253. parameter_type = 'number'
  254. elif parameter.type == ToolParamter.ToolParameterType.SELECT:
  255. for option in parameter.options:
  256. enum.append(option.value)
  257. parameter_type = 'string'
  258. else:
  259. raise ValueError(f"parameter type {parameter.type} is not supported")
  260. if parameter.form == ToolParamter.ToolParameterForm.LLM:
  261. prompt_tool.parameters['properties'][parameter.name] = {
  262. "type": parameter_type,
  263. "description": parameter.llm_description or '',
  264. }
  265. if len(enum) > 0:
  266. prompt_tool.parameters['properties'][parameter.name]['enum'] = enum
  267. if parameter.required:
  268. if parameter.name not in prompt_tool.parameters['required']:
  269. prompt_tool.parameters['required'].append(parameter.name)
  270. return prompt_tool
  271. def extract_tool_response_binary(self, tool_response: List[ToolInvokeMessage]) -> List[ToolInvokeMessageBinary]:
  272. """
  273. Extract tool response binary
  274. """
  275. result = []
  276. for response in tool_response:
  277. if response.type == ToolInvokeMessage.MessageType.IMAGE_LINK or \
  278. response.type == ToolInvokeMessage.MessageType.IMAGE:
  279. result.append(ToolInvokeMessageBinary(
  280. mimetype=response.meta.get('mime_type', 'octet/stream'),
  281. url=response.message,
  282. save_as=response.save_as,
  283. ))
  284. elif response.type == ToolInvokeMessage.MessageType.BLOB:
  285. result.append(ToolInvokeMessageBinary(
  286. mimetype=response.meta.get('mime_type', 'octet/stream'),
  287. url=response.message,
  288. save_as=response.save_as,
  289. ))
  290. elif response.type == ToolInvokeMessage.MessageType.LINK:
  291. # check if there is a mime type in meta
  292. if response.meta and 'mime_type' in response.meta:
  293. result.append(ToolInvokeMessageBinary(
  294. mimetype=response.meta.get('mime_type', 'octet/stream') if response.meta else 'octet/stream',
  295. url=response.message,
  296. save_as=response.save_as,
  297. ))
  298. return result
  299. def create_message_files(self, messages: List[ToolInvokeMessageBinary]) -> List[Tuple[MessageFile, bool]]:
  300. """
  301. Create message file
  302. :param messages: messages
  303. :return: message files, should save as variable
  304. """
  305. result = []
  306. for message in messages:
  307. file_type = 'bin'
  308. if 'image' in message.mimetype:
  309. file_type = 'image'
  310. elif 'video' in message.mimetype:
  311. file_type = 'video'
  312. elif 'audio' in message.mimetype:
  313. file_type = 'audio'
  314. elif 'text' in message.mimetype:
  315. file_type = 'text'
  316. elif 'pdf' in message.mimetype:
  317. file_type = 'pdf'
  318. elif 'zip' in message.mimetype:
  319. file_type = 'archive'
  320. # ...
  321. invoke_from = self.application_generate_entity.invoke_from
  322. message_file = MessageFile(
  323. message_id=self.message.id,
  324. type=file_type,
  325. transfer_method=FileTransferMethod.TOOL_FILE.value,
  326. belongs_to='assistant',
  327. url=message.url,
  328. upload_file_id=None,
  329. created_by_role=('account'if invoke_from in [InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER] else 'end_user'),
  330. created_by=self.user_id,
  331. )
  332. db.session.add(message_file)
  333. result.append((
  334. message_file,
  335. message.save_as
  336. ))
  337. db.session.commit()
  338. return result
  339. def create_agent_thought(self, message_id: str, message: str,
  340. tool_name: str, tool_input: str, messages_ids: List[str]
  341. ) -> MessageAgentThought:
  342. """
  343. Create agent thought
  344. """
  345. thought = MessageAgentThought(
  346. message_id=message_id,
  347. message_chain_id=None,
  348. thought='',
  349. tool=tool_name,
  350. tool_input=tool_input,
  351. message=message,
  352. message_token=0,
  353. message_unit_price=0,
  354. message_price_unit=0,
  355. message_files=json.dumps(messages_ids) if messages_ids else '',
  356. answer='',
  357. observation='',
  358. answer_token=0,
  359. answer_unit_price=0,
  360. answer_price_unit=0,
  361. tokens=0,
  362. total_price=0,
  363. position=self.agent_thought_count + 1,
  364. currency='USD',
  365. latency=0,
  366. created_by_role='account',
  367. created_by=self.user_id,
  368. )
  369. db.session.add(thought)
  370. db.session.commit()
  371. self.agent_thought_count += 1
  372. return thought
  373. def save_agent_thought(self,
  374. agent_thought: MessageAgentThought,
  375. tool_name: str,
  376. tool_input: Union[str, dict],
  377. thought: str,
  378. observation: str,
  379. answer: str,
  380. messages_ids: List[str],
  381. llm_usage: LLMUsage = None) -> MessageAgentThought:
  382. """
  383. Save agent thought
  384. """
  385. if thought is not None:
  386. agent_thought.thought = thought
  387. if tool_name is not None:
  388. agent_thought.tool = tool_name
  389. if tool_input is not None:
  390. if isinstance(tool_input, dict):
  391. try:
  392. tool_input = json.dumps(tool_input, ensure_ascii=False)
  393. except Exception as e:
  394. tool_input = json.dumps(tool_input)
  395. agent_thought.tool_input = tool_input
  396. if observation is not None:
  397. agent_thought.observation = observation
  398. if answer is not None:
  399. agent_thought.answer = answer
  400. if messages_ids is not None and len(messages_ids) > 0:
  401. agent_thought.message_files = json.dumps(messages_ids)
  402. if llm_usage:
  403. agent_thought.message_token = llm_usage.prompt_tokens
  404. agent_thought.message_price_unit = llm_usage.prompt_price_unit
  405. agent_thought.message_unit_price = llm_usage.prompt_unit_price
  406. agent_thought.answer_token = llm_usage.completion_tokens
  407. agent_thought.answer_price_unit = llm_usage.completion_price_unit
  408. agent_thought.answer_unit_price = llm_usage.completion_unit_price
  409. agent_thought.tokens = llm_usage.total_tokens
  410. agent_thought.total_price = llm_usage.total_price
  411. db.session.commit()
  412. def get_history_prompt_messages(self) -> List[PromptMessage]:
  413. """
  414. Get history prompt messages
  415. """
  416. if self.history_prompt_messages is None:
  417. self.history_prompt_messages = db.session.query(PromptMessage).filter(
  418. PromptMessage.message_id == self.message.id,
  419. ).order_by(PromptMessage.position.asc()).all()
  420. return self.history_prompt_messages
  421. def transform_tool_invoke_messages(self, messages: List[ToolInvokeMessage]) -> List[ToolInvokeMessage]:
  422. """
  423. Transform tool message into agent thought
  424. """
  425. result = []
  426. for message in messages:
  427. if message.type == ToolInvokeMessage.MessageType.TEXT:
  428. result.append(message)
  429. elif message.type == ToolInvokeMessage.MessageType.LINK:
  430. result.append(message)
  431. elif message.type == ToolInvokeMessage.MessageType.IMAGE:
  432. # try to download image
  433. try:
  434. file = ToolFileManager.create_file_by_url(user_id=self.user_id, tenant_id=self.tenant_id,
  435. conversation_id=self.message.conversation_id,
  436. file_url=message.message)
  437. url = f'/files/tools/{file.id}{guess_extension(file.mimetype) or ".png"}'
  438. result.append(ToolInvokeMessage(
  439. type=ToolInvokeMessage.MessageType.IMAGE_LINK,
  440. message=url,
  441. save_as=message.save_as,
  442. meta=message.meta.copy() if message.meta is not None else {},
  443. ))
  444. except Exception as e:
  445. logger.exception(e)
  446. result.append(ToolInvokeMessage(
  447. type=ToolInvokeMessage.MessageType.TEXT,
  448. message=f"Failed to download image: {message.message}, you can try to download it yourself.",
  449. meta=message.meta.copy() if message.meta is not None else {},
  450. save_as=message.save_as,
  451. ))
  452. elif message.type == ToolInvokeMessage.MessageType.BLOB:
  453. # get mime type and save blob to storage
  454. mimetype = message.meta.get('mime_type', 'octet/stream')
  455. # if message is str, encode it to bytes
  456. if isinstance(message.message, str):
  457. message.message = message.message.encode('utf-8')
  458. file = ToolFileManager.create_file_by_raw(user_id=self.user_id, tenant_id=self.tenant_id,
  459. conversation_id=self.message.conversation_id,
  460. file_binary=message.message,
  461. mimetype=mimetype)
  462. url = f'/files/tools/{file.id}{guess_extension(file.mimetype) or ".bin"}'
  463. # check if file is image
  464. if 'image' in mimetype:
  465. result.append(ToolInvokeMessage(
  466. type=ToolInvokeMessage.MessageType.IMAGE_LINK,
  467. message=url,
  468. save_as=message.save_as,
  469. meta=message.meta.copy() if message.meta is not None else {},
  470. ))
  471. else:
  472. result.append(ToolInvokeMessage(
  473. type=ToolInvokeMessage.MessageType.LINK,
  474. message=url,
  475. save_as=message.save_as,
  476. meta=message.meta.copy() if message.meta is not None else {},
  477. ))
  478. else:
  479. result.append(message)
  480. return result
  481. def update_db_variables(self, tool_variables: ToolRuntimeVariablePool, db_variables: ToolConversationVariables):
  482. """
  483. convert tool variables to db variables
  484. """
  485. db_variables.updated_at = datetime.utcnow()
  486. db_variables.variables_str = json.dumps(jsonable_encoder(tool_variables.pool))
  487. db.session.commit()