import inspect import json import logging from collections.abc import Callable, Generator from typing import TypeVar import requests from pydantic import BaseModel from yarl import URL from configs import dify_config from core.model_runtime.errors.invoke import ( InvokeAuthorizationError, InvokeBadRequestError, InvokeConnectionError, InvokeRateLimitError, InvokeServerUnavailableError, ) from core.model_runtime.errors.validate import CredentialsValidateFailedError from core.plugin.entities.plugin_daemon import PluginDaemonBasicResponse, PluginDaemonError, PluginDaemonInnerError from core.plugin.manager.exc import ( PluginDaemonBadRequestError, PluginDaemonInternalServerError, PluginDaemonNotFoundError, PluginDaemonUnauthorizedError, PluginInvokeError, PluginNotFoundError, PluginPermissionDeniedError, PluginUniqueIdentifierError, ) plugin_daemon_inner_api_baseurl = dify_config.PLUGIN_DAEMON_URL plugin_daemon_inner_api_key = dify_config.PLUGIN_DAEMON_KEY T = TypeVar("T", bound=(BaseModel | dict | list | bool | str)) logger = logging.getLogger(__name__) class BasePluginManager: def _request( self, method: str, path: str, headers: dict | None = None, data: bytes | dict | str | None = None, params: dict | None = None, files: dict | None = None, stream: bool = False, ) -> requests.Response: """ Make a request to the plugin daemon inner API. """ url = URL(str(plugin_daemon_inner_api_baseurl)) / path headers = headers or {} headers["X-Api-Key"] = plugin_daemon_inner_api_key headers["Accept-Encoding"] = "gzip, deflate, br" if headers.get("Content-Type") == "application/json" and isinstance(data, dict): data = json.dumps(data) try: response = requests.request( method=method, url=str(url), headers=headers, data=data, params=params, stream=stream, files=files ) except requests.exceptions.ConnectionError: logger.exception("Request to Plugin Daemon Service failed") raise PluginDaemonInnerError(code=-500, message="Request to Plugin Daemon Service failed") return response def _stream_request( self, method: str, path: str, params: dict | None = None, headers: dict | None = None, data: bytes | dict | None = None, files: dict | None = None, ) -> Generator[bytes, None, None]: """ Make a stream request to the plugin daemon inner API """ response = self._request(method, path, headers, data, params, files, stream=True) for line in response.iter_lines(): line = line.decode("utf-8").strip() if line.startswith("data:"): line = line[5:].strip() if line: yield line def _stream_request_with_model( self, method: str, path: str, type: type[T], headers: dict | None = None, data: bytes | dict | None = None, params: dict | None = None, files: dict | None = None, ) -> Generator[T, None, None]: """ Make a stream request to the plugin daemon inner API and yield the response as a model. """ for line in self._stream_request(method, path, params, headers, data, files): yield type(**json.loads(line)) # type: ignore def _request_with_model( self, method: str, path: str, type: type[T], headers: dict | None = None, data: bytes | None = None, params: dict | None = None, files: dict | None = None, ) -> T: """ Make a request to the plugin daemon inner API and return the response as a model. """ response = self._request(method, path, headers, data, params, files) return type(**response.json()) # type: ignore def _request_with_plugin_daemon_response( self, method: str, path: str, type: type[T], headers: dict | None = None, data: bytes | dict | None = None, params: dict | None = None, files: dict | None = None, transformer: Callable[[dict], dict] | None = None, ) -> T: """ Make a request to the plugin daemon inner API and return the response as a model. """ response = self._request(method, path, headers, data, params, files) json_response = response.json() if transformer: json_response = transformer(json_response) rep = PluginDaemonBasicResponse[type](**json_response) # type: ignore if rep.code != 0: try: error = PluginDaemonError(**json.loads(rep.message)) except Exception: raise ValueError(f"{rep.message}, code: {rep.code}") self._handle_plugin_daemon_error(error.error_type, error.message) if rep.data is None: frame = inspect.currentframe() raise ValueError(f"got empty data from plugin daemon: {frame.f_lineno if frame else 'unknown'}") return rep.data def _request_with_plugin_daemon_response_stream( self, method: str, path: str, type: type[T], headers: dict | None = None, data: bytes | dict | None = None, params: dict | None = None, files: dict | None = None, ) -> Generator[T, None, None]: """ Make a stream request to the plugin daemon inner API and yield the response as a model. """ for line in self._stream_request(method, path, params, headers, data, files): line_data = None try: line_data = json.loads(line) rep = PluginDaemonBasicResponse[type](**line_data) # type: ignore except Exception: # TODO modify this when line_data has code and message if line_data and "error" in line_data: raise ValueError(line_data["error"]) else: raise ValueError(line) if rep.code != 0: if rep.code == -500: try: error = PluginDaemonError(**json.loads(rep.message)) except Exception: raise PluginDaemonInnerError(code=rep.code, message=rep.message) self._handle_plugin_daemon_error(error.error_type, error.message) raise ValueError(f"plugin daemon: {rep.message}, code: {rep.code}") if rep.data is None: frame = inspect.currentframe() raise ValueError(f"got empty data from plugin daemon: {frame.f_lineno if frame else 'unknown'}") yield rep.data def _handle_plugin_daemon_error(self, error_type: str, message: str): """ handle the error from plugin daemon """ match error_type: case PluginDaemonInnerError.__name__: raise PluginDaemonInnerError(code=-500, message=message) case PluginInvokeError.__name__: error_object = json.loads(message) invoke_error_type = error_object.get("error_type") args = error_object.get("args") match invoke_error_type: case InvokeRateLimitError.__name__: raise InvokeRateLimitError(description=args.get("description")) case InvokeAuthorizationError.__name__: raise InvokeAuthorizationError(description=args.get("description")) case InvokeBadRequestError.__name__: raise InvokeBadRequestError(description=args.get("description")) case InvokeConnectionError.__name__: raise InvokeConnectionError(description=args.get("description")) case InvokeServerUnavailableError.__name__: raise InvokeServerUnavailableError(description=args.get("description")) case CredentialsValidateFailedError.__name__: raise CredentialsValidateFailedError(error_object.get("message")) case _: raise PluginInvokeError(description=message) case PluginDaemonInternalServerError.__name__: raise PluginDaemonInternalServerError(description=message) case PluginDaemonBadRequestError.__name__: raise PluginDaemonBadRequestError(description=message) case PluginDaemonNotFoundError.__name__: raise PluginDaemonNotFoundError(description=message) case PluginUniqueIdentifierError.__name__: raise PluginUniqueIdentifierError(description=message) case PluginNotFoundError.__name__: raise PluginNotFoundError(description=message) case PluginDaemonUnauthorizedError.__name__: raise PluginDaemonUnauthorizedError(description=message) case PluginPermissionDeniedError.__name__: raise PluginPermissionDeniedError(description=message) case _: raise Exception(f"got unknown error from plugin daemon: {error_type}, message: {message}")