tool_provider.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. from abc import ABC, abstractmethod
  2. from typing import Any, Optional
  3. from pydantic import BaseModel
  4. from core.tools.entities.tool_entities import (
  5. ToolParameter,
  6. ToolProviderCredentials,
  7. ToolProviderIdentity,
  8. ToolProviderType,
  9. )
  10. from core.tools.errors import ToolNotFoundError, ToolParameterValidationError, ToolProviderCredentialValidationError
  11. from core.tools.tool.tool import Tool
  12. class ToolProviderController(BaseModel, ABC):
  13. identity: Optional[ToolProviderIdentity] = None
  14. tools: Optional[list[Tool]] = None
  15. credentials_schema: Optional[dict[str, ToolProviderCredentials]] = None
  16. def get_credentials_schema(self) -> dict[str, ToolProviderCredentials]:
  17. """
  18. returns the credentials schema of the provider
  19. :return: the credentials schema
  20. """
  21. if self.credentials_schema is None:
  22. return {}
  23. return self.credentials_schema.copy()
  24. @abstractmethod
  25. def get_tools(self, user_id: str = "", tenant_id: str = "") -> Optional[list[Tool]]:
  26. """
  27. returns a list of tools that the provider can provide
  28. :return: list of tools
  29. """
  30. pass
  31. @abstractmethod
  32. def get_tool(self, tool_name: str) -> Optional[Tool]:
  33. """
  34. returns a tool that the provider can provide
  35. :return: tool
  36. """
  37. pass
  38. def get_parameters(self, tool_name: str) -> list[ToolParameter]:
  39. """
  40. returns the parameters of the tool
  41. :param tool_name: the name of the tool, defined in `get_tools`
  42. :return: list of parameters
  43. """
  44. tools = self.get_tools()
  45. if tools is None:
  46. raise ToolNotFoundError(f"tool {tool_name} not found")
  47. tool = next((t for t in tools if t.identity and t.identity.name == tool_name), None)
  48. if tool is None:
  49. raise ToolNotFoundError(f"tool {tool_name} not found")
  50. return tool.parameters or []
  51. @property
  52. def provider_type(self) -> ToolProviderType:
  53. """
  54. returns the type of the provider
  55. :return: type of the provider
  56. """
  57. return ToolProviderType.BUILT_IN
  58. def validate_parameters(self, tool_id: int, tool_name: str, tool_parameters: dict[str, Any]) -> None:
  59. """
  60. validate the parameters of the tool and set the default value if needed
  61. :param tool_name: the name of the tool, defined in `get_tools`
  62. :param tool_parameters: the parameters of the tool
  63. """
  64. tool_parameters_schema = self.get_parameters(tool_name)
  65. tool_parameters_need_to_validate: dict[str, ToolParameter] = {}
  66. for parameter in tool_parameters_schema:
  67. tool_parameters_need_to_validate[parameter.name] = parameter
  68. for tool_parameter in tool_parameters:
  69. if tool_parameter not in tool_parameters_need_to_validate:
  70. raise ToolParameterValidationError(f"parameter {tool_parameter} not found in tool {tool_name}")
  71. # check type
  72. parameter_schema = tool_parameters_need_to_validate[tool_parameter]
  73. if parameter_schema.type == ToolParameter.ToolParameterType.STRING:
  74. if not isinstance(tool_parameters[tool_parameter], str):
  75. raise ToolParameterValidationError(f"parameter {tool_parameter} should be string")
  76. elif parameter_schema.type == ToolParameter.ToolParameterType.NUMBER:
  77. if not isinstance(tool_parameters[tool_parameter], int | float):
  78. raise ToolParameterValidationError(f"parameter {tool_parameter} should be number")
  79. if parameter_schema.min is not None and tool_parameters[tool_parameter] < parameter_schema.min:
  80. raise ToolParameterValidationError(
  81. f"parameter {tool_parameter} should be greater than {parameter_schema.min}"
  82. )
  83. if parameter_schema.max is not None and tool_parameters[tool_parameter] > parameter_schema.max:
  84. raise ToolParameterValidationError(
  85. f"parameter {tool_parameter} should be less than {parameter_schema.max}"
  86. )
  87. elif parameter_schema.type == ToolParameter.ToolParameterType.BOOLEAN:
  88. if not isinstance(tool_parameters[tool_parameter], bool):
  89. raise ToolParameterValidationError(f"parameter {tool_parameter} should be boolean")
  90. elif parameter_schema.type == ToolParameter.ToolParameterType.SELECT:
  91. if not isinstance(tool_parameters[tool_parameter], str):
  92. raise ToolParameterValidationError(f"parameter {tool_parameter} should be string")
  93. options = parameter_schema.options
  94. if not isinstance(options, list):
  95. raise ToolParameterValidationError(f"parameter {tool_parameter} options should be list")
  96. if tool_parameters[tool_parameter] not in [x.value for x in options]:
  97. raise ToolParameterValidationError(f"parameter {tool_parameter} should be one of {options}")
  98. tool_parameters_need_to_validate.pop(tool_parameter)
  99. for tool_parameter_validate in tool_parameters_need_to_validate:
  100. parameter_schema = tool_parameters_need_to_validate[tool_parameter_validate]
  101. if parameter_schema.required:
  102. raise ToolParameterValidationError(f"parameter {tool_parameter_validate} is required")
  103. # the parameter is not set currently, set the default value if needed
  104. if parameter_schema.default is not None:
  105. tool_parameters[tool_parameter_validate] = parameter_schema.type.cast_value(parameter_schema.default)
  106. def validate_credentials_format(self, credentials: dict[str, Any]) -> None:
  107. """
  108. validate the format of the credentials of the provider and set the default value if needed
  109. :param credentials: the credentials of the tool
  110. """
  111. credentials_schema = self.credentials_schema
  112. if credentials_schema is None:
  113. return
  114. credentials_need_to_validate: dict[str, ToolProviderCredentials] = {}
  115. for credential_name in credentials_schema:
  116. credentials_need_to_validate[credential_name] = credentials_schema[credential_name]
  117. for credential_name in credentials:
  118. if credential_name not in credentials_need_to_validate:
  119. if self.identity is None:
  120. raise ValueError("identity is not set")
  121. raise ToolProviderCredentialValidationError(
  122. f"credential {credential_name} not found in provider {self.identity.name}"
  123. )
  124. # check type
  125. credential_schema = credentials_need_to_validate[credential_name]
  126. if not credential_schema.required and credentials[credential_name] is None:
  127. continue
  128. if credential_schema.type in {
  129. ToolProviderCredentials.CredentialsType.SECRET_INPUT,
  130. ToolProviderCredentials.CredentialsType.TEXT_INPUT,
  131. }:
  132. if not isinstance(credentials[credential_name], str):
  133. raise ToolProviderCredentialValidationError(f"credential {credential_name} should be string")
  134. elif credential_schema.type == ToolProviderCredentials.CredentialsType.SELECT:
  135. if not isinstance(credentials[credential_name], str):
  136. raise ToolProviderCredentialValidationError(f"credential {credential_name} should be string")
  137. options = credential_schema.options
  138. if not isinstance(options, list):
  139. raise ToolProviderCredentialValidationError(f"credential {credential_name} options should be list")
  140. if credentials[credential_name] not in [x.value for x in options]:
  141. raise ToolProviderCredentialValidationError(
  142. f"credential {credential_name} should be one of {options}"
  143. )
  144. credentials_need_to_validate.pop(credential_name)
  145. for credential_name in credentials_need_to_validate:
  146. credential_schema = credentials_need_to_validate[credential_name]
  147. if credential_schema.required:
  148. raise ToolProviderCredentialValidationError(f"credential {credential_name} is required")
  149. # the credential is not set currently, set the default value if needed
  150. if credential_schema.default is not None:
  151. default_value = credential_schema.default
  152. # parse default value into the correct type
  153. if credential_schema.type in {
  154. ToolProviderCredentials.CredentialsType.SECRET_INPUT,
  155. ToolProviderCredentials.CredentialsType.TEXT_INPUT,
  156. ToolProviderCredentials.CredentialsType.SELECT,
  157. }:
  158. default_value = str(default_value)
  159. credentials[credential_name] = default_value