assistant_base_runner.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587
  1. import logging
  2. import json
  3. from typing import Optional, List, Tuple, Union, cast
  4. from datetime import datetime
  5. from mimetypes import guess_extension
  6. from core.app_runner.app_runner import AppRunner
  7. from extensions.ext_database import db
  8. from models.model import MessageAgentThought, Message, MessageFile
  9. from models.tools import ToolConversationVariables
  10. from core.tools.entities.tool_entities import ToolInvokeMessage, ToolInvokeMessageBinary, \
  11. ToolRuntimeVariablePool, ToolParameter
  12. from core.tools.tool.tool import Tool
  13. from core.tools.tool_manager import ToolManager
  14. from core.tools.tool_file_manager import ToolFileManager
  15. from core.tools.tool.dataset_retriever_tool import DatasetRetrieverTool
  16. from core.app_runner.app_runner import AppRunner
  17. from core.callback_handler.agent_tool_callback_handler import DifyAgentCallbackHandler
  18. from core.callback_handler.index_tool_callback_handler import DatasetIndexToolCallbackHandler
  19. from core.entities.application_entities import ModelConfigEntity, AgentEntity, AgentToolEntity
  20. from core.application_queue_manager import ApplicationQueueManager
  21. from core.memory.token_buffer_memory import TokenBufferMemory
  22. from core.entities.application_entities import ModelConfigEntity, \
  23. AgentEntity, AppOrchestrationConfigEntity, ApplicationGenerateEntity, InvokeFrom
  24. from core.model_runtime.entities.message_entities import PromptMessage, PromptMessageTool
  25. from core.model_runtime.entities.llm_entities import LLMUsage
  26. from core.model_runtime.entities.model_entities import ModelFeature
  27. from core.model_runtime.utils.encoders import jsonable_encoder
  28. from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
  29. from core.model_manager import ModelInstance
  30. from core.file.message_file_parser import FileTransferMethod
  31. logger = logging.getLogger(__name__)
  32. class BaseAssistantApplicationRunner(AppRunner):
  33. def __init__(self, tenant_id: str,
  34. application_generate_entity: ApplicationGenerateEntity,
  35. app_orchestration_config: AppOrchestrationConfigEntity,
  36. model_config: ModelConfigEntity,
  37. config: AgentEntity,
  38. queue_manager: ApplicationQueueManager,
  39. message: Message,
  40. user_id: str,
  41. memory: Optional[TokenBufferMemory] = None,
  42. prompt_messages: Optional[List[PromptMessage]] = None,
  43. variables_pool: Optional[ToolRuntimeVariablePool] = None,
  44. db_variables: Optional[ToolConversationVariables] = None,
  45. model_instance: ModelInstance = None
  46. ) -> None:
  47. """
  48. Agent runner
  49. :param tenant_id: tenant id
  50. :param app_orchestration_config: app orchestration config
  51. :param model_config: model config
  52. :param config: dataset config
  53. :param queue_manager: queue manager
  54. :param message: message
  55. :param user_id: user id
  56. :param agent_llm_callback: agent llm callback
  57. :param callback: callback
  58. :param memory: memory
  59. """
  60. self.tenant_id = tenant_id
  61. self.application_generate_entity = application_generate_entity
  62. self.app_orchestration_config = app_orchestration_config
  63. self.model_config = model_config
  64. self.config = config
  65. self.queue_manager = queue_manager
  66. self.message = message
  67. self.user_id = user_id
  68. self.memory = memory
  69. self.history_prompt_messages = prompt_messages
  70. self.variables_pool = variables_pool
  71. self.db_variables_pool = db_variables
  72. self.model_instance = model_instance
  73. # init callback
  74. self.agent_callback = DifyAgentCallbackHandler()
  75. # init dataset tools
  76. hit_callback = DatasetIndexToolCallbackHandler(
  77. queue_manager=queue_manager,
  78. app_id=self.application_generate_entity.app_id,
  79. message_id=message.id,
  80. user_id=user_id,
  81. invoke_from=self.application_generate_entity.invoke_from,
  82. )
  83. self.dataset_tools = DatasetRetrieverTool.get_dataset_tools(
  84. tenant_id=tenant_id,
  85. dataset_ids=app_orchestration_config.dataset.dataset_ids if app_orchestration_config.dataset else [],
  86. retrieve_config=app_orchestration_config.dataset.retrieve_config if app_orchestration_config.dataset else None,
  87. return_resource=app_orchestration_config.show_retrieve_source,
  88. invoke_from=application_generate_entity.invoke_from,
  89. hit_callback=hit_callback
  90. )
  91. # get how many agent thoughts have been created
  92. self.agent_thought_count = db.session.query(MessageAgentThought).filter(
  93. MessageAgentThought.message_id == self.message.id,
  94. ).count()
  95. # check if model supports stream tool call
  96. llm_model = cast(LargeLanguageModel, model_instance.model_type_instance)
  97. model_schema = llm_model.get_model_schema(model_instance.model, model_instance.credentials)
  98. if model_schema and ModelFeature.STREAM_TOOL_CALL in (model_schema.features or []):
  99. self.stream_tool_call = True
  100. else:
  101. self.stream_tool_call = False
  102. def _repack_app_orchestration_config(self, app_orchestration_config: AppOrchestrationConfigEntity) -> AppOrchestrationConfigEntity:
  103. """
  104. Repack app orchestration config
  105. """
  106. if app_orchestration_config.prompt_template.simple_prompt_template is None:
  107. app_orchestration_config.prompt_template.simple_prompt_template = ''
  108. return app_orchestration_config
  109. def _convert_tool_response_to_str(self, tool_response: List[ToolInvokeMessage]) -> str:
  110. """
  111. Handle tool response
  112. """
  113. result = ''
  114. for response in tool_response:
  115. if response.type == ToolInvokeMessage.MessageType.TEXT:
  116. result += response.message
  117. elif response.type == ToolInvokeMessage.MessageType.LINK:
  118. result += f"result link: {response.message}. please tell user to check it."
  119. elif response.type == ToolInvokeMessage.MessageType.IMAGE_LINK or \
  120. response.type == ToolInvokeMessage.MessageType.IMAGE:
  121. result += f"image has been created and sent to user already, you should tell user to check it now."
  122. else:
  123. result += f"tool response: {response.message}."
  124. return result
  125. def _convert_tool_to_prompt_message_tool(self, tool: AgentToolEntity) -> Tuple[PromptMessageTool, Tool]:
  126. """
  127. convert tool to prompt message tool
  128. """
  129. tool_entity = ToolManager.get_tool_runtime(
  130. provider_type=tool.provider_type, provider_name=tool.provider_id, tool_name=tool.tool_name,
  131. tenant_id=self.application_generate_entity.tenant_id,
  132. agent_callback=self.agent_callback
  133. )
  134. tool_entity.load_variables(self.variables_pool)
  135. message_tool = PromptMessageTool(
  136. name=tool.tool_name,
  137. description=tool_entity.description.llm,
  138. parameters={
  139. "type": "object",
  140. "properties": {},
  141. "required": [],
  142. }
  143. )
  144. runtime_parameters = {}
  145. parameters = tool_entity.parameters or []
  146. user_parameters = tool_entity.get_runtime_parameters() or []
  147. # override parameters
  148. for parameter in user_parameters:
  149. # check if parameter in tool parameters
  150. found = False
  151. for tool_parameter in parameters:
  152. if tool_parameter.name == parameter.name:
  153. found = True
  154. break
  155. if found:
  156. # override parameter
  157. tool_parameter.type = parameter.type
  158. tool_parameter.form = parameter.form
  159. tool_parameter.required = parameter.required
  160. tool_parameter.default = parameter.default
  161. tool_parameter.options = parameter.options
  162. tool_parameter.llm_description = parameter.llm_description
  163. else:
  164. # add new parameter
  165. parameters.append(parameter)
  166. for parameter in parameters:
  167. parameter_type = 'string'
  168. enum = []
  169. if parameter.type == ToolParameter.ToolParameterType.STRING:
  170. parameter_type = 'string'
  171. elif parameter.type == ToolParameter.ToolParameterType.BOOLEAN:
  172. parameter_type = 'boolean'
  173. elif parameter.type == ToolParameter.ToolParameterType.NUMBER:
  174. parameter_type = 'number'
  175. elif parameter.type == ToolParameter.ToolParameterType.SELECT:
  176. for option in parameter.options:
  177. enum.append(option.value)
  178. parameter_type = 'string'
  179. else:
  180. raise ValueError(f"parameter type {parameter.type} is not supported")
  181. if parameter.form == ToolParameter.ToolParameterForm.FORM:
  182. # get tool parameter from form
  183. tool_parameter_config = tool.tool_parameters.get(parameter.name)
  184. if not tool_parameter_config:
  185. # get default value
  186. tool_parameter_config = parameter.default
  187. if not tool_parameter_config and parameter.required:
  188. raise ValueError(f"tool parameter {parameter.name} not found in tool config")
  189. if parameter.type == ToolParameter.ToolParameterType.SELECT:
  190. # check if tool_parameter_config in options
  191. options = list(map(lambda x: x.value, parameter.options))
  192. if tool_parameter_config not in options:
  193. raise ValueError(f"tool parameter {parameter.name} value {tool_parameter_config} not in options {options}")
  194. # convert tool parameter config to correct type
  195. try:
  196. if parameter.type == ToolParameter.ToolParameterType.NUMBER:
  197. # check if tool parameter is integer
  198. if isinstance(tool_parameter_config, int):
  199. tool_parameter_config = tool_parameter_config
  200. elif isinstance(tool_parameter_config, float):
  201. tool_parameter_config = tool_parameter_config
  202. elif isinstance(tool_parameter_config, str):
  203. if '.' in tool_parameter_config:
  204. tool_parameter_config = float(tool_parameter_config)
  205. else:
  206. tool_parameter_config = int(tool_parameter_config)
  207. elif parameter.type == ToolParameter.ToolParameterType.BOOLEAN:
  208. tool_parameter_config = bool(tool_parameter_config)
  209. elif parameter.type not in [ToolParameter.ToolParameterType.SELECT, ToolParameter.ToolParameterType.STRING]:
  210. tool_parameter_config = str(tool_parameter_config)
  211. elif parameter.type == ToolParameter.ToolParameterType:
  212. tool_parameter_config = str(tool_parameter_config)
  213. except Exception as e:
  214. raise ValueError(f"tool parameter {parameter.name} value {tool_parameter_config} is not correct type")
  215. # save tool parameter to tool entity memory
  216. runtime_parameters[parameter.name] = tool_parameter_config
  217. elif parameter.form == ToolParameter.ToolParameterForm.LLM:
  218. message_tool.parameters['properties'][parameter.name] = {
  219. "type": parameter_type,
  220. "description": parameter.llm_description or '',
  221. }
  222. if len(enum) > 0:
  223. message_tool.parameters['properties'][parameter.name]['enum'] = enum
  224. if parameter.required:
  225. message_tool.parameters['required'].append(parameter.name)
  226. tool_entity.runtime.runtime_parameters.update(runtime_parameters)
  227. return message_tool, tool_entity
  228. def _convert_dataset_retriever_tool_to_prompt_message_tool(self, tool: DatasetRetrieverTool) -> PromptMessageTool:
  229. """
  230. convert dataset retriever tool to prompt message tool
  231. """
  232. prompt_tool = PromptMessageTool(
  233. name=tool.identity.name,
  234. description=tool.description.llm,
  235. parameters={
  236. "type": "object",
  237. "properties": {},
  238. "required": [],
  239. }
  240. )
  241. for parameter in tool.get_runtime_parameters():
  242. parameter_type = 'string'
  243. prompt_tool.parameters['properties'][parameter.name] = {
  244. "type": parameter_type,
  245. "description": parameter.llm_description or '',
  246. }
  247. if parameter.required:
  248. if parameter.name not in prompt_tool.parameters['required']:
  249. prompt_tool.parameters['required'].append(parameter.name)
  250. return prompt_tool
  251. def update_prompt_message_tool(self, tool: Tool, prompt_tool: PromptMessageTool) -> PromptMessageTool:
  252. """
  253. update prompt message tool
  254. """
  255. # try to get tool runtime parameters
  256. tool_runtime_parameters = tool.get_runtime_parameters() or []
  257. for parameter in tool_runtime_parameters:
  258. parameter_type = 'string'
  259. enum = []
  260. if parameter.type == ToolParameter.ToolParameterType.STRING:
  261. parameter_type = 'string'
  262. elif parameter.type == ToolParameter.ToolParameterType.BOOLEAN:
  263. parameter_type = 'boolean'
  264. elif parameter.type == ToolParameter.ToolParameterType.NUMBER:
  265. parameter_type = 'number'
  266. elif parameter.type == ToolParameter.ToolParameterType.SELECT:
  267. for option in parameter.options:
  268. enum.append(option.value)
  269. parameter_type = 'string'
  270. else:
  271. raise ValueError(f"parameter type {parameter.type} is not supported")
  272. if parameter.form == ToolParameter.ToolParameterForm.LLM:
  273. prompt_tool.parameters['properties'][parameter.name] = {
  274. "type": parameter_type,
  275. "description": parameter.llm_description or '',
  276. }
  277. if len(enum) > 0:
  278. prompt_tool.parameters['properties'][parameter.name]['enum'] = enum
  279. if parameter.required:
  280. if parameter.name not in prompt_tool.parameters['required']:
  281. prompt_tool.parameters['required'].append(parameter.name)
  282. return prompt_tool
  283. def extract_tool_response_binary(self, tool_response: List[ToolInvokeMessage]) -> List[ToolInvokeMessageBinary]:
  284. """
  285. Extract tool response binary
  286. """
  287. result = []
  288. for response in tool_response:
  289. if response.type == ToolInvokeMessage.MessageType.IMAGE_LINK or \
  290. response.type == ToolInvokeMessage.MessageType.IMAGE:
  291. result.append(ToolInvokeMessageBinary(
  292. mimetype=response.meta.get('mime_type', 'octet/stream'),
  293. url=response.message,
  294. save_as=response.save_as,
  295. ))
  296. elif response.type == ToolInvokeMessage.MessageType.BLOB:
  297. result.append(ToolInvokeMessageBinary(
  298. mimetype=response.meta.get('mime_type', 'octet/stream'),
  299. url=response.message,
  300. save_as=response.save_as,
  301. ))
  302. elif response.type == ToolInvokeMessage.MessageType.LINK:
  303. # check if there is a mime type in meta
  304. if response.meta and 'mime_type' in response.meta:
  305. result.append(ToolInvokeMessageBinary(
  306. mimetype=response.meta.get('mime_type', 'octet/stream') if response.meta else 'octet/stream',
  307. url=response.message,
  308. save_as=response.save_as,
  309. ))
  310. return result
  311. def create_message_files(self, messages: List[ToolInvokeMessageBinary]) -> List[Tuple[MessageFile, bool]]:
  312. """
  313. Create message file
  314. :param messages: messages
  315. :return: message files, should save as variable
  316. """
  317. result = []
  318. for message in messages:
  319. file_type = 'bin'
  320. if 'image' in message.mimetype:
  321. file_type = 'image'
  322. elif 'video' in message.mimetype:
  323. file_type = 'video'
  324. elif 'audio' in message.mimetype:
  325. file_type = 'audio'
  326. elif 'text' in message.mimetype:
  327. file_type = 'text'
  328. elif 'pdf' in message.mimetype:
  329. file_type = 'pdf'
  330. elif 'zip' in message.mimetype:
  331. file_type = 'archive'
  332. # ...
  333. invoke_from = self.application_generate_entity.invoke_from
  334. message_file = MessageFile(
  335. message_id=self.message.id,
  336. type=file_type,
  337. transfer_method=FileTransferMethod.TOOL_FILE.value,
  338. belongs_to='assistant',
  339. url=message.url,
  340. upload_file_id=None,
  341. created_by_role=('account'if invoke_from in [InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER] else 'end_user'),
  342. created_by=self.user_id,
  343. )
  344. db.session.add(message_file)
  345. result.append((
  346. message_file,
  347. message.save_as
  348. ))
  349. db.session.commit()
  350. return result
  351. def create_agent_thought(self, message_id: str, message: str,
  352. tool_name: str, tool_input: str, messages_ids: List[str]
  353. ) -> MessageAgentThought:
  354. """
  355. Create agent thought
  356. """
  357. thought = MessageAgentThought(
  358. message_id=message_id,
  359. message_chain_id=None,
  360. thought='',
  361. tool=tool_name,
  362. tool_labels_str='{}',
  363. tool_input=tool_input,
  364. message=message,
  365. message_token=0,
  366. message_unit_price=0,
  367. message_price_unit=0,
  368. message_files=json.dumps(messages_ids) if messages_ids else '',
  369. answer='',
  370. observation='',
  371. answer_token=0,
  372. answer_unit_price=0,
  373. answer_price_unit=0,
  374. tokens=0,
  375. total_price=0,
  376. position=self.agent_thought_count + 1,
  377. currency='USD',
  378. latency=0,
  379. created_by_role='account',
  380. created_by=self.user_id,
  381. )
  382. db.session.add(thought)
  383. db.session.commit()
  384. self.agent_thought_count += 1
  385. return thought
  386. def save_agent_thought(self,
  387. agent_thought: MessageAgentThought,
  388. tool_name: str,
  389. tool_input: Union[str, dict],
  390. thought: str,
  391. observation: str,
  392. answer: str,
  393. messages_ids: List[str],
  394. llm_usage: LLMUsage = None) -> MessageAgentThought:
  395. """
  396. Save agent thought
  397. """
  398. if thought is not None:
  399. agent_thought.thought = thought
  400. if tool_name is not None:
  401. agent_thought.tool = tool_name
  402. if tool_input is not None:
  403. if isinstance(tool_input, dict):
  404. try:
  405. tool_input = json.dumps(tool_input, ensure_ascii=False)
  406. except Exception as e:
  407. tool_input = json.dumps(tool_input)
  408. agent_thought.tool_input = tool_input
  409. if observation is not None:
  410. agent_thought.observation = observation
  411. if answer is not None:
  412. agent_thought.answer = answer
  413. if messages_ids is not None and len(messages_ids) > 0:
  414. agent_thought.message_files = json.dumps(messages_ids)
  415. if llm_usage:
  416. agent_thought.message_token = llm_usage.prompt_tokens
  417. agent_thought.message_price_unit = llm_usage.prompt_price_unit
  418. agent_thought.message_unit_price = llm_usage.prompt_unit_price
  419. agent_thought.answer_token = llm_usage.completion_tokens
  420. agent_thought.answer_price_unit = llm_usage.completion_price_unit
  421. agent_thought.answer_unit_price = llm_usage.completion_unit_price
  422. agent_thought.tokens = llm_usage.total_tokens
  423. agent_thought.total_price = llm_usage.total_price
  424. # check if tool labels is not empty
  425. labels = agent_thought.tool_labels or {}
  426. tools = agent_thought.tool.split(';') if agent_thought.tool else []
  427. for tool in tools:
  428. if not tool:
  429. continue
  430. if tool not in labels:
  431. tool_label = ToolManager.get_tool_label(tool)
  432. if tool_label:
  433. labels[tool] = tool_label.to_dict()
  434. else:
  435. labels[tool] = {'en_US': tool, 'zh_Hans': tool}
  436. agent_thought.tool_labels_str = json.dumps(labels)
  437. db.session.commit()
  438. def get_history_prompt_messages(self) -> List[PromptMessage]:
  439. """
  440. Get history prompt messages
  441. """
  442. if self.history_prompt_messages is None:
  443. self.history_prompt_messages = db.session.query(PromptMessage).filter(
  444. PromptMessage.message_id == self.message.id,
  445. ).order_by(PromptMessage.position.asc()).all()
  446. return self.history_prompt_messages
  447. def transform_tool_invoke_messages(self, messages: List[ToolInvokeMessage]) -> List[ToolInvokeMessage]:
  448. """
  449. Transform tool message into agent thought
  450. """
  451. result = []
  452. for message in messages:
  453. if message.type == ToolInvokeMessage.MessageType.TEXT:
  454. result.append(message)
  455. elif message.type == ToolInvokeMessage.MessageType.LINK:
  456. result.append(message)
  457. elif message.type == ToolInvokeMessage.MessageType.IMAGE:
  458. # try to download image
  459. try:
  460. file = ToolFileManager.create_file_by_url(user_id=self.user_id, tenant_id=self.tenant_id,
  461. conversation_id=self.message.conversation_id,
  462. file_url=message.message)
  463. url = f'/files/tools/{file.id}{guess_extension(file.mimetype) or ".png"}'
  464. result.append(ToolInvokeMessage(
  465. type=ToolInvokeMessage.MessageType.IMAGE_LINK,
  466. message=url,
  467. save_as=message.save_as,
  468. meta=message.meta.copy() if message.meta is not None else {},
  469. ))
  470. except Exception as e:
  471. logger.exception(e)
  472. result.append(ToolInvokeMessage(
  473. type=ToolInvokeMessage.MessageType.TEXT,
  474. message=f"Failed to download image: {message.message}, you can try to download it yourself.",
  475. meta=message.meta.copy() if message.meta is not None else {},
  476. save_as=message.save_as,
  477. ))
  478. elif message.type == ToolInvokeMessage.MessageType.BLOB:
  479. # get mime type and save blob to storage
  480. mimetype = message.meta.get('mime_type', 'octet/stream')
  481. # if message is str, encode it to bytes
  482. if isinstance(message.message, str):
  483. message.message = message.message.encode('utf-8')
  484. file = ToolFileManager.create_file_by_raw(user_id=self.user_id, tenant_id=self.tenant_id,
  485. conversation_id=self.message.conversation_id,
  486. file_binary=message.message,
  487. mimetype=mimetype)
  488. url = f'/files/tools/{file.id}{guess_extension(file.mimetype) or ".bin"}'
  489. # check if file is image
  490. if 'image' in mimetype:
  491. result.append(ToolInvokeMessage(
  492. type=ToolInvokeMessage.MessageType.IMAGE_LINK,
  493. message=url,
  494. save_as=message.save_as,
  495. meta=message.meta.copy() if message.meta is not None else {},
  496. ))
  497. else:
  498. result.append(ToolInvokeMessage(
  499. type=ToolInvokeMessage.MessageType.LINK,
  500. message=url,
  501. save_as=message.save_as,
  502. meta=message.meta.copy() if message.meta is not None else {},
  503. ))
  504. else:
  505. result.append(message)
  506. return result
  507. def update_db_variables(self, tool_variables: ToolRuntimeVariablePool, db_variables: ToolConversationVariables):
  508. """
  509. convert tool variables to db variables
  510. """
  511. db_variables.updated_at = datetime.utcnow()
  512. db_variables.variables_str = json.dumps(jsonable_encoder(tool_variables.pool))
  513. db.session.commit()