model_manager.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595
  1. import logging
  2. from collections.abc import Callable, Generator, Iterable, Sequence
  3. from typing import IO, Any, Literal, Optional, Union, cast, overload
  4. from configs import dify_config
  5. from core.entities.embedding_type import EmbeddingInputType
  6. from core.entities.provider_configuration import ProviderConfiguration, ProviderModelBundle
  7. from core.entities.provider_entities import ModelLoadBalancingConfiguration
  8. from core.errors.error import ProviderTokenNotInitError
  9. from core.model_runtime.callbacks.base_callback import Callback
  10. from core.model_runtime.entities.llm_entities import LLMResult
  11. from core.model_runtime.entities.message_entities import PromptMessage, PromptMessageTool
  12. from core.model_runtime.entities.model_entities import ModelType
  13. from core.model_runtime.entities.rerank_entities import RerankResult
  14. from core.model_runtime.entities.text_embedding_entities import TextEmbeddingResult
  15. from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeConnectionError, InvokeRateLimitError
  16. from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
  17. from core.model_runtime.model_providers.__base.moderation_model import ModerationModel
  18. from core.model_runtime.model_providers.__base.rerank_model import RerankModel
  19. from core.model_runtime.model_providers.__base.speech2text_model import Speech2TextModel
  20. from core.model_runtime.model_providers.__base.text_embedding_model import TextEmbeddingModel
  21. from core.model_runtime.model_providers.__base.tts_model import TTSModel
  22. from core.provider_manager import ProviderManager
  23. from extensions.ext_redis import redis_client
  24. from models.provider import ProviderType
  25. logger = logging.getLogger(__name__)
  26. class ModelInstance:
  27. """
  28. Model instance class
  29. """
  30. def __init__(self, provider_model_bundle: ProviderModelBundle, model: str) -> None:
  31. self.provider_model_bundle = provider_model_bundle
  32. self.model = model
  33. self.provider = provider_model_bundle.configuration.provider.provider
  34. self.credentials = self._fetch_credentials_from_bundle(provider_model_bundle, model)
  35. self.model_type_instance = self.provider_model_bundle.model_type_instance
  36. self.load_balancing_manager = self._get_load_balancing_manager(
  37. configuration=provider_model_bundle.configuration,
  38. model_type=provider_model_bundle.model_type_instance.model_type,
  39. model=model,
  40. credentials=self.credentials,
  41. )
  42. @staticmethod
  43. def _fetch_credentials_from_bundle(provider_model_bundle: ProviderModelBundle, model: str) -> dict:
  44. """
  45. Fetch credentials from provider model bundle
  46. :param provider_model_bundle: provider model bundle
  47. :param model: model name
  48. :return:
  49. """
  50. configuration = provider_model_bundle.configuration
  51. model_type = provider_model_bundle.model_type_instance.model_type
  52. credentials = configuration.get_current_credentials(model_type=model_type, model=model)
  53. if credentials is None:
  54. raise ProviderTokenNotInitError(f"Model {model} credentials is not initialized.")
  55. return credentials
  56. @staticmethod
  57. def _get_load_balancing_manager(
  58. configuration: ProviderConfiguration, model_type: ModelType, model: str, credentials: dict
  59. ) -> Optional["LBModelManager"]:
  60. """
  61. Get load balancing model credentials
  62. :param configuration: provider configuration
  63. :param model_type: model type
  64. :param model: model name
  65. :param credentials: model credentials
  66. :return:
  67. """
  68. if configuration.model_settings and configuration.using_provider_type == ProviderType.CUSTOM:
  69. current_model_setting = None
  70. # check if model is disabled by admin
  71. for model_setting in configuration.model_settings:
  72. if model_setting.model_type == model_type and model_setting.model == model:
  73. current_model_setting = model_setting
  74. break
  75. # check if load balancing is enabled
  76. if current_model_setting and current_model_setting.load_balancing_configs:
  77. # use load balancing proxy to choose credentials
  78. lb_model_manager = LBModelManager(
  79. tenant_id=configuration.tenant_id,
  80. provider=configuration.provider.provider,
  81. model_type=model_type,
  82. model=model,
  83. load_balancing_configs=current_model_setting.load_balancing_configs,
  84. managed_credentials=credentials if configuration.custom_configuration.provider else None,
  85. )
  86. return lb_model_manager
  87. return None
  88. @overload
  89. def invoke_llm(
  90. self,
  91. prompt_messages: list[PromptMessage],
  92. model_parameters: Optional[dict] = None,
  93. tools: Sequence[PromptMessageTool] | None = None,
  94. stop: Optional[list[str]] = None,
  95. stream: Literal[True] = True,
  96. user: Optional[str] = None,
  97. callbacks: Optional[list[Callback]] = None,
  98. ) -> Generator: ...
  99. @overload
  100. def invoke_llm(
  101. self,
  102. prompt_messages: list[PromptMessage],
  103. model_parameters: Optional[dict] = None,
  104. tools: Sequence[PromptMessageTool] | None = None,
  105. stop: Optional[list[str]] = None,
  106. stream: Literal[False] = False,
  107. user: Optional[str] = None,
  108. callbacks: Optional[list[Callback]] = None,
  109. ) -> LLMResult: ...
  110. @overload
  111. def invoke_llm(
  112. self,
  113. prompt_messages: list[PromptMessage],
  114. model_parameters: Optional[dict] = None,
  115. tools: Sequence[PromptMessageTool] | None = None,
  116. stop: Optional[list[str]] = None,
  117. stream: bool = True,
  118. user: Optional[str] = None,
  119. callbacks: Optional[list[Callback]] = None,
  120. ) -> Union[LLMResult, Generator]: ...
  121. def invoke_llm(
  122. self,
  123. prompt_messages: Sequence[PromptMessage],
  124. model_parameters: Optional[dict] = None,
  125. tools: Sequence[PromptMessageTool] | None = None,
  126. stop: Optional[Sequence[str]] = None,
  127. stream: bool = True,
  128. user: Optional[str] = None,
  129. callbacks: Optional[list[Callback]] = None,
  130. ) -> Union[LLMResult, Generator]:
  131. """
  132. Invoke large language model
  133. :param prompt_messages: prompt messages
  134. :param model_parameters: model parameters
  135. :param tools: tools for tool calling
  136. :param stop: stop words
  137. :param stream: is stream response
  138. :param user: unique user id
  139. :param callbacks: callbacks
  140. :return: full response or stream response chunk generator result
  141. """
  142. if not isinstance(self.model_type_instance, LargeLanguageModel):
  143. raise Exception("Model type instance is not LargeLanguageModel")
  144. self.model_type_instance = cast(LargeLanguageModel, self.model_type_instance)
  145. return cast(
  146. Union[LLMResult, Generator],
  147. self._round_robin_invoke(
  148. function=self.model_type_instance.invoke,
  149. model=self.model,
  150. credentials=self.credentials,
  151. prompt_messages=prompt_messages,
  152. model_parameters=model_parameters,
  153. tools=tools,
  154. stop=stop,
  155. stream=stream,
  156. user=user,
  157. callbacks=callbacks,
  158. ),
  159. )
  160. def get_llm_num_tokens(
  161. self, prompt_messages: list[PromptMessage], tools: Optional[list[PromptMessageTool]] = None
  162. ) -> int:
  163. """
  164. Get number of tokens for llm
  165. :param prompt_messages: prompt messages
  166. :param tools: tools for tool calling
  167. :return:
  168. """
  169. if not isinstance(self.model_type_instance, LargeLanguageModel):
  170. raise Exception("Model type instance is not LargeLanguageModel")
  171. self.model_type_instance = cast(LargeLanguageModel, self.model_type_instance)
  172. return cast(
  173. int,
  174. self._round_robin_invoke(
  175. function=self.model_type_instance.get_num_tokens,
  176. model=self.model,
  177. credentials=self.credentials,
  178. prompt_messages=prompt_messages,
  179. tools=tools,
  180. ),
  181. )
  182. def invoke_text_embedding(
  183. self, texts: list[str], user: Optional[str] = None, input_type: EmbeddingInputType = EmbeddingInputType.DOCUMENT
  184. ) -> TextEmbeddingResult:
  185. """
  186. Invoke large language model
  187. :param texts: texts to embed
  188. :param user: unique user id
  189. :param input_type: input type
  190. :return: embeddings result
  191. """
  192. if not isinstance(self.model_type_instance, TextEmbeddingModel):
  193. raise Exception("Model type instance is not TextEmbeddingModel")
  194. self.model_type_instance = cast(TextEmbeddingModel, self.model_type_instance)
  195. return cast(
  196. TextEmbeddingResult,
  197. self._round_robin_invoke(
  198. function=self.model_type_instance.invoke,
  199. model=self.model,
  200. credentials=self.credentials,
  201. texts=texts,
  202. user=user,
  203. input_type=input_type,
  204. ),
  205. )
  206. def get_text_embedding_num_tokens(self, texts: list[str]) -> list[int]:
  207. """
  208. Get number of tokens for text embedding
  209. :param texts: texts to embed
  210. :return:
  211. """
  212. if not isinstance(self.model_type_instance, TextEmbeddingModel):
  213. raise Exception("Model type instance is not TextEmbeddingModel")
  214. self.model_type_instance = cast(TextEmbeddingModel, self.model_type_instance)
  215. return cast(
  216. list[int],
  217. self._round_robin_invoke(
  218. function=self.model_type_instance.get_num_tokens,
  219. model=self.model,
  220. credentials=self.credentials,
  221. texts=texts,
  222. ),
  223. )
  224. def invoke_rerank(
  225. self,
  226. query: str,
  227. docs: list[str],
  228. score_threshold: Optional[float] = None,
  229. top_n: Optional[int] = None,
  230. user: Optional[str] = None,
  231. ) -> RerankResult:
  232. """
  233. Invoke rerank model
  234. :param query: search query
  235. :param docs: docs for reranking
  236. :param score_threshold: score threshold
  237. :param top_n: top n
  238. :param user: unique user id
  239. :return: rerank result
  240. """
  241. if not isinstance(self.model_type_instance, RerankModel):
  242. raise Exception("Model type instance is not RerankModel")
  243. self.model_type_instance = cast(RerankModel, self.model_type_instance)
  244. return cast(
  245. RerankResult,
  246. self._round_robin_invoke(
  247. function=self.model_type_instance.invoke,
  248. model=self.model,
  249. credentials=self.credentials,
  250. query=query,
  251. docs=docs,
  252. score_threshold=score_threshold,
  253. top_n=top_n,
  254. user=user,
  255. ),
  256. )
  257. def invoke_moderation(self, text: str, user: Optional[str] = None) -> bool:
  258. """
  259. Invoke moderation model
  260. :param text: text to moderate
  261. :param user: unique user id
  262. :return: false if text is safe, true otherwise
  263. """
  264. if not isinstance(self.model_type_instance, ModerationModel):
  265. raise Exception("Model type instance is not ModerationModel")
  266. self.model_type_instance = cast(ModerationModel, self.model_type_instance)
  267. return cast(
  268. bool,
  269. self._round_robin_invoke(
  270. function=self.model_type_instance.invoke,
  271. model=self.model,
  272. credentials=self.credentials,
  273. text=text,
  274. user=user,
  275. ),
  276. )
  277. def invoke_speech2text(self, file: IO[bytes], user: Optional[str] = None) -> str:
  278. """
  279. Invoke large language model
  280. :param file: audio file
  281. :param user: unique user id
  282. :return: text for given audio file
  283. """
  284. if not isinstance(self.model_type_instance, Speech2TextModel):
  285. raise Exception("Model type instance is not Speech2TextModel")
  286. self.model_type_instance = cast(Speech2TextModel, self.model_type_instance)
  287. return cast(
  288. str,
  289. self._round_robin_invoke(
  290. function=self.model_type_instance.invoke,
  291. model=self.model,
  292. credentials=self.credentials,
  293. file=file,
  294. user=user,
  295. ),
  296. )
  297. def invoke_tts(self, content_text: str, tenant_id: str, voice: str, user: Optional[str] = None) -> Iterable[bytes]:
  298. """
  299. Invoke large language tts model
  300. :param content_text: text content to be translated
  301. :param tenant_id: user tenant id
  302. :param voice: model timbre
  303. :param user: unique user id
  304. :return: text for given audio file
  305. """
  306. if not isinstance(self.model_type_instance, TTSModel):
  307. raise Exception("Model type instance is not TTSModel")
  308. self.model_type_instance = cast(TTSModel, self.model_type_instance)
  309. return cast(
  310. Iterable[bytes],
  311. self._round_robin_invoke(
  312. function=self.model_type_instance.invoke,
  313. model=self.model,
  314. credentials=self.credentials,
  315. content_text=content_text,
  316. user=user,
  317. tenant_id=tenant_id,
  318. voice=voice,
  319. ),
  320. )
  321. def _round_robin_invoke(self, function: Callable[..., Any], *args, **kwargs) -> Any:
  322. """
  323. Round-robin invoke
  324. :param function: function to invoke
  325. :param args: function args
  326. :param kwargs: function kwargs
  327. :return:
  328. """
  329. if not self.load_balancing_manager:
  330. return function(*args, **kwargs)
  331. last_exception: Union[InvokeRateLimitError, InvokeAuthorizationError, InvokeConnectionError, None] = None
  332. while True:
  333. lb_config = self.load_balancing_manager.fetch_next()
  334. if not lb_config:
  335. if not last_exception:
  336. raise ProviderTokenNotInitError("Model credentials is not initialized.")
  337. else:
  338. raise last_exception
  339. try:
  340. if "credentials" in kwargs:
  341. del kwargs["credentials"]
  342. return function(*args, **kwargs, credentials=lb_config.credentials)
  343. except InvokeRateLimitError as e:
  344. # expire in 60 seconds
  345. self.load_balancing_manager.cooldown(lb_config, expire=60)
  346. last_exception = e
  347. continue
  348. except (InvokeAuthorizationError, InvokeConnectionError) as e:
  349. # expire in 10 seconds
  350. self.load_balancing_manager.cooldown(lb_config, expire=10)
  351. last_exception = e
  352. continue
  353. except Exception as e:
  354. raise e
  355. def get_tts_voices(self, language: Optional[str] = None) -> list:
  356. """
  357. Invoke large language tts model voices
  358. :param language: tts language
  359. :return: tts model voices
  360. """
  361. if not isinstance(self.model_type_instance, TTSModel):
  362. raise Exception("Model type instance is not TTSModel")
  363. self.model_type_instance = cast(TTSModel, self.model_type_instance)
  364. return self.model_type_instance.get_tts_model_voices(
  365. model=self.model, credentials=self.credentials, language=language
  366. )
  367. class ModelManager:
  368. def __init__(self) -> None:
  369. self._provider_manager = ProviderManager()
  370. def get_model_instance(self, tenant_id: str, provider: str, model_type: ModelType, model: str) -> ModelInstance:
  371. """
  372. Get model instance
  373. :param tenant_id: tenant id
  374. :param provider: provider name
  375. :param model_type: model type
  376. :param model: model name
  377. :return:
  378. """
  379. if not provider:
  380. return self.get_default_model_instance(tenant_id, model_type)
  381. provider_model_bundle = self._provider_manager.get_provider_model_bundle(
  382. tenant_id=tenant_id, provider=provider, model_type=model_type
  383. )
  384. return ModelInstance(provider_model_bundle, model)
  385. def get_default_provider_model_name(self, tenant_id: str, model_type: ModelType) -> tuple[str | None, str | None]:
  386. """
  387. Return first provider and the first model in the provider
  388. :param tenant_id: tenant id
  389. :param model_type: model type
  390. :return: provider name, model name
  391. """
  392. return self._provider_manager.get_first_provider_first_model(tenant_id, model_type)
  393. def get_default_model_instance(self, tenant_id: str, model_type: ModelType) -> ModelInstance:
  394. """
  395. Get default model instance
  396. :param tenant_id: tenant id
  397. :param model_type: model type
  398. :return:
  399. """
  400. default_model_entity = self._provider_manager.get_default_model(tenant_id=tenant_id, model_type=model_type)
  401. if not default_model_entity:
  402. raise ProviderTokenNotInitError(f"Default model not found for {model_type}")
  403. return self.get_model_instance(
  404. tenant_id=tenant_id,
  405. provider=default_model_entity.provider.provider,
  406. model_type=model_type,
  407. model=default_model_entity.model,
  408. )
  409. class LBModelManager:
  410. def __init__(
  411. self,
  412. tenant_id: str,
  413. provider: str,
  414. model_type: ModelType,
  415. model: str,
  416. load_balancing_configs: list[ModelLoadBalancingConfiguration],
  417. managed_credentials: Optional[dict] = None,
  418. ) -> None:
  419. """
  420. Load balancing model manager
  421. :param tenant_id: tenant_id
  422. :param provider: provider
  423. :param model_type: model_type
  424. :param model: model name
  425. :param load_balancing_configs: all load balancing configurations
  426. :param managed_credentials: credentials if load balancing configuration name is __inherit__
  427. """
  428. self._tenant_id = tenant_id
  429. self._provider = provider
  430. self._model_type = model_type
  431. self._model = model
  432. self._load_balancing_configs = load_balancing_configs
  433. for load_balancing_config in self._load_balancing_configs[:]: # Iterate over a shallow copy of the list
  434. if load_balancing_config.name == "__inherit__":
  435. if not managed_credentials:
  436. # remove __inherit__ if managed credentials is not provided
  437. self._load_balancing_configs.remove(load_balancing_config)
  438. else:
  439. load_balancing_config.credentials = managed_credentials
  440. def fetch_next(self) -> Optional[ModelLoadBalancingConfiguration]:
  441. """
  442. Get next model load balancing config
  443. Strategy: Round Robin
  444. :return:
  445. """
  446. cache_key = "model_lb_index:{}:{}:{}:{}".format(
  447. self._tenant_id, self._provider, self._model_type.value, self._model
  448. )
  449. cooldown_load_balancing_configs = []
  450. max_index = len(self._load_balancing_configs)
  451. while True:
  452. current_index = redis_client.incr(cache_key)
  453. current_index = cast(int, current_index)
  454. if current_index >= 10000000:
  455. current_index = 1
  456. redis_client.set(cache_key, current_index)
  457. redis_client.expire(cache_key, 3600)
  458. if current_index > max_index:
  459. current_index = current_index % max_index
  460. real_index = current_index - 1
  461. if real_index > max_index:
  462. real_index = 0
  463. config: ModelLoadBalancingConfiguration = self._load_balancing_configs[real_index]
  464. if self.in_cooldown(config):
  465. cooldown_load_balancing_configs.append(config)
  466. if len(cooldown_load_balancing_configs) >= len(self._load_balancing_configs):
  467. # all configs are in cooldown
  468. return None
  469. continue
  470. if dify_config.DEBUG:
  471. logger.info(
  472. f"Model LB\nid: {config.id}\nname:{config.name}\n"
  473. f"tenant_id: {self._tenant_id}\nprovider: {self._provider}\n"
  474. f"model_type: {self._model_type.value}\nmodel: {self._model}"
  475. )
  476. return config
  477. return None
  478. def cooldown(self, config: ModelLoadBalancingConfiguration, expire: int = 60) -> None:
  479. """
  480. Cooldown model load balancing config
  481. :param config: model load balancing config
  482. :param expire: cooldown time
  483. :return:
  484. """
  485. cooldown_cache_key = "model_lb_index:cooldown:{}:{}:{}:{}:{}".format(
  486. self._tenant_id, self._provider, self._model_type.value, self._model, config.id
  487. )
  488. redis_client.setex(cooldown_cache_key, expire, "true")
  489. def in_cooldown(self, config: ModelLoadBalancingConfiguration) -> bool:
  490. """
  491. Check if model load balancing config is in cooldown
  492. :param config: model load balancing config
  493. :return:
  494. """
  495. cooldown_cache_key = "model_lb_index:cooldown:{}:{}:{}:{}:{}".format(
  496. self._tenant_id, self._provider, self._model_type.value, self._model, config.id
  497. )
  498. res: bool = redis_client.exists(cooldown_cache_key)
  499. return res
  500. @staticmethod
  501. def get_config_in_cooldown_and_ttl(
  502. tenant_id: str, provider: str, model_type: ModelType, model: str, config_id: str
  503. ) -> tuple[bool, int]:
  504. """
  505. Get model load balancing config is in cooldown and ttl
  506. :param tenant_id: workspace id
  507. :param provider: provider name
  508. :param model_type: model type
  509. :param model: model name
  510. :param config_id: model load balancing config id
  511. :return:
  512. """
  513. cooldown_cache_key = "model_lb_index:cooldown:{}:{}:{}:{}:{}".format(
  514. tenant_id, provider, model_type.value, model, config_id
  515. )
  516. ttl = redis_client.ttl(cooldown_cache_key)
  517. if ttl == -2:
  518. return False, 0
  519. ttl = cast(int, ttl)
  520. return True, ttl