external_data_fetch.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. import concurrent
  2. import json
  3. import logging
  4. from concurrent.futures import ThreadPoolExecutor
  5. from typing import Tuple, Optional
  6. from flask import current_app, Flask
  7. from core.entities.application_entities import ExternalDataVariableEntity
  8. from core.external_data_tool.factory import ExternalDataToolFactory
  9. logger = logging.getLogger(__name__)
  10. class ExternalDataFetchFeature:
  11. def fetch(self, tenant_id: str,
  12. app_id: str,
  13. external_data_tools: list[ExternalDataVariableEntity],
  14. inputs: dict,
  15. query: str) -> dict:
  16. """
  17. Fill in variable inputs from external data tools if exists.
  18. :param tenant_id: workspace id
  19. :param app_id: app id
  20. :param external_data_tools: external data tools configs
  21. :param inputs: the inputs
  22. :param query: the query
  23. :return: the filled inputs
  24. """
  25. # Group tools by type and config
  26. grouped_tools = {}
  27. for tool in external_data_tools:
  28. tool_key = (tool.type, json.dumps(tool.config, sort_keys=True))
  29. grouped_tools.setdefault(tool_key, []).append(tool)
  30. results = {}
  31. with ThreadPoolExecutor() as executor:
  32. futures = {}
  33. for tool in external_data_tools:
  34. future = executor.submit(
  35. self._query_external_data_tool,
  36. current_app._get_current_object(),
  37. tenant_id,
  38. app_id,
  39. tool,
  40. inputs,
  41. query
  42. )
  43. futures[future] = tool
  44. for future in concurrent.futures.as_completed(futures):
  45. tool_variable, result = future.result()
  46. results[tool_variable] = result
  47. inputs.update(results)
  48. return inputs
  49. def _query_external_data_tool(self, flask_app: Flask,
  50. tenant_id: str,
  51. app_id: str,
  52. external_data_tool: ExternalDataVariableEntity,
  53. inputs: dict,
  54. query: str) -> Tuple[Optional[str], Optional[str]]:
  55. """
  56. Query external data tool.
  57. :param flask_app: flask app
  58. :param tenant_id: tenant id
  59. :param app_id: app id
  60. :param external_data_tool: external data tool
  61. :param inputs: inputs
  62. :param query: query
  63. :return:
  64. """
  65. with flask_app.app_context():
  66. tool_variable = external_data_tool.variable
  67. tool_type = external_data_tool.type
  68. tool_config = external_data_tool.config
  69. external_data_tool_factory = ExternalDataToolFactory(
  70. name=tool_type,
  71. tenant_id=tenant_id,
  72. app_id=app_id,
  73. variable=tool_variable,
  74. config=tool_config
  75. )
  76. # query external data tool
  77. result = external_data_tool_factory.query(
  78. inputs=inputs,
  79. query=query
  80. )
  81. return tool_variable, result