tool_provider.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. from abc import ABC, abstractmethod
  2. from typing import Any, Dict, List, Optional
  3. from core.tools.entities.tool_entities import (ToolParameter, ToolProviderCredentials, ToolProviderIdentity,
  4. ToolProviderType)
  5. from core.tools.entities.user_entities import UserToolProviderCredentials
  6. from core.tools.errors import ToolNotFoundError, ToolParameterValidationError, ToolProviderCredentialValidationError
  7. from core.tools.tool.tool import Tool
  8. from pydantic import BaseModel
  9. class ToolProviderController(BaseModel, ABC):
  10. identity: Optional[ToolProviderIdentity] = None
  11. tools: Optional[List[Tool]] = None
  12. credentials_schema: Optional[Dict[str, ToolProviderCredentials]] = None
  13. def get_credentials_schema(self) -> Dict[str, ToolProviderCredentials]:
  14. """
  15. returns the credentials schema of the provider
  16. :return: the credentials schema
  17. """
  18. return self.credentials_schema.copy()
  19. def user_get_credentials_schema(self) -> UserToolProviderCredentials:
  20. """
  21. returns the credentials schema of the provider, this method is used for user
  22. :return: the credentials schema
  23. """
  24. credentials = self.credentials_schema.copy()
  25. return UserToolProviderCredentials(credentials=credentials)
  26. @abstractmethod
  27. def get_tools(self) -> List[Tool]:
  28. """
  29. returns a list of tools that the provider can provide
  30. :return: list of tools
  31. """
  32. pass
  33. @abstractmethod
  34. def get_tool(self, tool_name: str) -> Tool:
  35. """
  36. returns a tool that the provider can provide
  37. :return: tool
  38. """
  39. pass
  40. def get_parameters(self, tool_name: str) -> List[ToolParameter]:
  41. """
  42. returns the parameters of the tool
  43. :param tool_name: the name of the tool, defined in `get_tools`
  44. :return: list of parameters
  45. """
  46. tool = next(filter(lambda x: x.identity.name == tool_name, self.get_tools()), None)
  47. if tool is None:
  48. raise ToolNotFoundError(f'tool {tool_name} not found')
  49. return tool.parameters
  50. @property
  51. def app_type(self) -> ToolProviderType:
  52. """
  53. returns the type of the provider
  54. :return: type of the provider
  55. """
  56. return ToolProviderType.BUILT_IN
  57. def validate_parameters(self, tool_id: int, tool_name: str, tool_parameters: Dict[str, Any]) -> None:
  58. """
  59. validate the parameters of the tool and set the default value if needed
  60. :param tool_name: the name of the tool, defined in `get_tools`
  61. :param tool_parameters: the parameters of the tool
  62. """
  63. tool_parameters_schema = self.get_parameters(tool_name)
  64. tool_parameters_need_to_validate: Dict[str, ToolParameter] = {}
  65. for parameter in tool_parameters_schema:
  66. tool_parameters_need_to_validate[parameter.name] = parameter
  67. for parameter in tool_parameters:
  68. if parameter not in tool_parameters_need_to_validate:
  69. raise ToolParameterValidationError(f'parameter {parameter} not found in tool {tool_name}')
  70. # check type
  71. parameter_schema = tool_parameters_need_to_validate[parameter]
  72. if parameter_schema.type == ToolParameter.ToolParameterType.STRING:
  73. if not isinstance(tool_parameters[parameter], str):
  74. raise ToolParameterValidationError(f'parameter {parameter} should be string')
  75. elif parameter_schema.type == ToolParameter.ToolParameterType.NUMBER:
  76. if not isinstance(tool_parameters[parameter], (int, float)):
  77. raise ToolParameterValidationError(f'parameter {parameter} should be number')
  78. if parameter_schema.min is not None and tool_parameters[parameter] < parameter_schema.min:
  79. raise ToolParameterValidationError(f'parameter {parameter} should be greater than {parameter_schema.min}')
  80. if parameter_schema.max is not None and tool_parameters[parameter] > parameter_schema.max:
  81. raise ToolParameterValidationError(f'parameter {parameter} should be less than {parameter_schema.max}')
  82. elif parameter_schema.type == ToolParameter.ToolParameterType.BOOLEAN:
  83. if not isinstance(tool_parameters[parameter], bool):
  84. raise ToolParameterValidationError(f'parameter {parameter} should be boolean')
  85. elif parameter_schema.type == ToolParameter.ToolParameterType.SELECT:
  86. if not isinstance(tool_parameters[parameter], str):
  87. raise ToolParameterValidationError(f'parameter {parameter} should be string')
  88. options = parameter_schema.options
  89. if not isinstance(options, list):
  90. raise ToolParameterValidationError(f'parameter {parameter} options should be list')
  91. if tool_parameters[parameter] not in [x.value for x in options]:
  92. raise ToolParameterValidationError(f'parameter {parameter} should be one of {options}')
  93. tool_parameters_need_to_validate.pop(parameter)
  94. for parameter in tool_parameters_need_to_validate:
  95. parameter_schema = tool_parameters_need_to_validate[parameter]
  96. if parameter_schema.required:
  97. raise ToolParameterValidationError(f'parameter {parameter} is required')
  98. # the parameter is not set currently, set the default value if needed
  99. if parameter_schema.default is not None:
  100. default_value = parameter_schema.default
  101. # parse default value into the correct type
  102. if parameter_schema.type == ToolParameter.ToolParameterType.STRING or \
  103. parameter_schema.type == ToolParameter.ToolParameterType.SELECT:
  104. default_value = str(default_value)
  105. elif parameter_schema.type == ToolParameter.ToolParameterType.NUMBER:
  106. default_value = float(default_value)
  107. elif parameter_schema.type == ToolParameter.ToolParameterType.BOOLEAN:
  108. default_value = bool(default_value)
  109. tool_parameters[parameter] = default_value
  110. def validate_credentials_format(self, credentials: Dict[str, Any]) -> None:
  111. """
  112. validate the format of the credentials of the provider and set the default value if needed
  113. :param credentials: the credentials of the tool
  114. """
  115. credentials_schema = self.credentials_schema
  116. if credentials_schema is None:
  117. return
  118. credentials_need_to_validate: Dict[str, ToolProviderCredentials] = {}
  119. for credential_name in credentials_schema:
  120. credentials_need_to_validate[credential_name] = credentials_schema[credential_name]
  121. for credential_name in credentials:
  122. if credential_name not in credentials_need_to_validate:
  123. raise ToolProviderCredentialValidationError(f'credential {credential_name} not found in provider {self.identity.name}')
  124. # check type
  125. credential_schema = credentials_need_to_validate[credential_name]
  126. if credential_schema == ToolProviderCredentials.CredentialsType.SECRET_INPUT or \
  127. credential_schema == ToolProviderCredentials.CredentialsType.TEXT_INPUT:
  128. if not isinstance(credentials[credential_name], str):
  129. raise ToolProviderCredentialValidationError(f'credential {credential_name} should be string')
  130. elif credential_schema.type == ToolProviderCredentials.CredentialsType.SELECT:
  131. if not isinstance(credentials[credential_name], str):
  132. raise ToolProviderCredentialValidationError(f'credential {credential_name} should be string')
  133. options = credential_schema.options
  134. if not isinstance(options, list):
  135. raise ToolProviderCredentialValidationError(f'credential {credential_name} options should be list')
  136. if credentials[credential_name] not in [x.value for x in options]:
  137. raise ToolProviderCredentialValidationError(f'credential {credential_name} should be one of {options}')
  138. credentials_need_to_validate.pop(credential_name)
  139. for credential_name in credentials_need_to_validate:
  140. credential_schema = credentials_need_to_validate[credential_name]
  141. if credential_schema.required:
  142. raise ToolProviderCredentialValidationError(f'credential {credential_name} is required')
  143. # the credential is not set currently, set the default value if needed
  144. if credential_schema.default is not None:
  145. default_value = credential_schema.default
  146. # parse default value into the correct type
  147. if credential_schema.type == ToolProviderCredentials.CredentialsType.SECRET_INPUT or \
  148. credential_schema.type == ToolProviderCredentials.CredentialsType.TEXT_INPUT or \
  149. credential_schema.type == ToolProviderCredentials.CredentialsType.SELECT:
  150. default_value = str(default_value)
  151. credentials[credential_name] = default_value
  152. def validate_credentials(self, credentials: Dict[str, Any]) -> None:
  153. """
  154. validate the credentials of the provider
  155. :param tool_name: the name of the tool, defined in `get_tools`
  156. :param credentials: the credentials of the tool
  157. """
  158. # validate credentials format
  159. self.validate_credentials_format(credentials)
  160. # validate credentials
  161. self._validate_credentials(credentials)
  162. @abstractmethod
  163. def _validate_credentials(self, credentials: Dict[str, Any]) -> None:
  164. """
  165. validate the credentials of the provider
  166. :param tool_name: the name of the tool, defined in `get_tools`
  167. :param credentials: the credentials of the tool
  168. """
  169. pass