ops_trace_manager.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811
  1. import json
  2. import logging
  3. import os
  4. import queue
  5. import threading
  6. import time
  7. from datetime import timedelta
  8. from typing import Any, Optional, Union
  9. from uuid import UUID, uuid4
  10. from flask import current_app
  11. from sqlalchemy import select
  12. from sqlalchemy.orm import Session
  13. from core.helper.encrypter import decrypt_token, encrypt_token, obfuscated_token
  14. from core.ops.entities.config_entity import (
  15. OPS_FILE_PATH,
  16. LangfuseConfig,
  17. LangSmithConfig,
  18. OpikConfig,
  19. TracingProviderEnum,
  20. )
  21. from core.ops.entities.trace_entity import (
  22. DatasetRetrievalTraceInfo,
  23. GenerateNameTraceInfo,
  24. MessageTraceInfo,
  25. ModerationTraceInfo,
  26. SuggestedQuestionTraceInfo,
  27. TaskData,
  28. ToolTraceInfo,
  29. TraceTaskName,
  30. WorkflowTraceInfo,
  31. )
  32. from core.ops.langfuse_trace.langfuse_trace import LangFuseDataTrace
  33. from core.ops.langsmith_trace.langsmith_trace import LangSmithDataTrace
  34. from core.ops.opik_trace.opik_trace import OpikDataTrace
  35. from core.ops.utils import get_message_data
  36. from extensions.ext_database import db
  37. from extensions.ext_storage import storage
  38. from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig
  39. from models.workflow import WorkflowAppLog, WorkflowRun
  40. from tasks.ops_trace_task import process_trace_tasks
  41. provider_config_map: dict[str, dict[str, Any]] = {
  42. TracingProviderEnum.LANGFUSE.value: {
  43. "config_class": LangfuseConfig,
  44. "secret_keys": ["public_key", "secret_key"],
  45. "other_keys": ["host", "project_key"],
  46. "trace_instance": LangFuseDataTrace,
  47. },
  48. TracingProviderEnum.LANGSMITH.value: {
  49. "config_class": LangSmithConfig,
  50. "secret_keys": ["api_key"],
  51. "other_keys": ["project", "endpoint"],
  52. "trace_instance": LangSmithDataTrace,
  53. },
  54. TracingProviderEnum.OPIK.value: {
  55. "config_class": OpikConfig,
  56. "secret_keys": ["api_key"],
  57. "other_keys": ["project", "url", "workspace"],
  58. "trace_instance": OpikDataTrace,
  59. },
  60. }
  61. class OpsTraceManager:
  62. @classmethod
  63. def encrypt_tracing_config(
  64. cls, tenant_id: str, tracing_provider: str, tracing_config: dict, current_trace_config=None
  65. ):
  66. """
  67. Encrypt tracing config.
  68. :param tenant_id: tenant id
  69. :param tracing_provider: tracing provider
  70. :param tracing_config: tracing config dictionary to be encrypted
  71. :param current_trace_config: current tracing configuration for keeping existing values
  72. :return: encrypted tracing configuration
  73. """
  74. # Get the configuration class and the keys that require encryption
  75. config_class, secret_keys, other_keys = (
  76. provider_config_map[tracing_provider]["config_class"],
  77. provider_config_map[tracing_provider]["secret_keys"],
  78. provider_config_map[tracing_provider]["other_keys"],
  79. )
  80. new_config = {}
  81. # Encrypt necessary keys
  82. for key in secret_keys:
  83. if key in tracing_config:
  84. if "*" in tracing_config[key]:
  85. # If the key contains '*', retain the original value from the current config
  86. new_config[key] = current_trace_config.get(key, tracing_config[key])
  87. else:
  88. # Otherwise, encrypt the key
  89. new_config[key] = encrypt_token(tenant_id, tracing_config[key])
  90. for key in other_keys:
  91. new_config[key] = tracing_config.get(key, "")
  92. # Create a new instance of the config class with the new configuration
  93. encrypted_config = config_class(**new_config)
  94. return encrypted_config.model_dump()
  95. @classmethod
  96. def decrypt_tracing_config(cls, tenant_id: str, tracing_provider: str, tracing_config: dict):
  97. """
  98. Decrypt tracing config
  99. :param tenant_id: tenant id
  100. :param tracing_provider: tracing provider
  101. :param tracing_config: tracing config
  102. :return:
  103. """
  104. config_class, secret_keys, other_keys = (
  105. provider_config_map[tracing_provider]["config_class"],
  106. provider_config_map[tracing_provider]["secret_keys"],
  107. provider_config_map[tracing_provider]["other_keys"],
  108. )
  109. new_config = {}
  110. for key in secret_keys:
  111. if key in tracing_config:
  112. new_config[key] = decrypt_token(tenant_id, tracing_config[key])
  113. for key in other_keys:
  114. new_config[key] = tracing_config.get(key, "")
  115. return config_class(**new_config).model_dump()
  116. @classmethod
  117. def obfuscated_decrypt_token(cls, tracing_provider: str, decrypt_tracing_config: dict):
  118. """
  119. Decrypt tracing config
  120. :param tracing_provider: tracing provider
  121. :param decrypt_tracing_config: tracing config
  122. :return:
  123. """
  124. config_class, secret_keys, other_keys = (
  125. provider_config_map[tracing_provider]["config_class"],
  126. provider_config_map[tracing_provider]["secret_keys"],
  127. provider_config_map[tracing_provider]["other_keys"],
  128. )
  129. new_config = {}
  130. for key in secret_keys:
  131. if key in decrypt_tracing_config:
  132. new_config[key] = obfuscated_token(decrypt_tracing_config[key])
  133. for key in other_keys:
  134. new_config[key] = decrypt_tracing_config.get(key, "")
  135. return config_class(**new_config).model_dump()
  136. @classmethod
  137. def get_decrypted_tracing_config(cls, app_id: str, tracing_provider: str):
  138. """
  139. Get decrypted tracing config
  140. :param app_id: app id
  141. :param tracing_provider: tracing provider
  142. :return:
  143. """
  144. trace_config_data: Optional[TraceAppConfig] = (
  145. db.session.query(TraceAppConfig)
  146. .filter(TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider)
  147. .first()
  148. )
  149. if not trace_config_data:
  150. return None
  151. # decrypt_token
  152. app = db.session.query(App).filter(App.id == app_id).first()
  153. if not app:
  154. raise ValueError("App not found")
  155. tenant_id = app.tenant_id
  156. decrypt_tracing_config = cls.decrypt_tracing_config(
  157. tenant_id, tracing_provider, trace_config_data.tracing_config
  158. )
  159. return decrypt_tracing_config
  160. @classmethod
  161. def get_ops_trace_instance(
  162. cls,
  163. app_id: Optional[Union[UUID, str]] = None,
  164. ):
  165. """
  166. Get ops trace through model config
  167. :param app_id: app_id
  168. :return:
  169. """
  170. if isinstance(app_id, UUID):
  171. app_id = str(app_id)
  172. if app_id is None:
  173. return None
  174. app: Optional[App] = db.session.query(App).filter(App.id == app_id).first()
  175. if app is None:
  176. return None
  177. app_ops_trace_config = json.loads(app.tracing) if app.tracing else None
  178. if app_ops_trace_config is None:
  179. return None
  180. tracing_provider = app_ops_trace_config.get("tracing_provider")
  181. if tracing_provider is None or tracing_provider not in provider_config_map:
  182. return None
  183. # decrypt_token
  184. decrypt_trace_config = cls.get_decrypted_tracing_config(app_id, tracing_provider)
  185. if app_ops_trace_config.get("enabled"):
  186. trace_instance, config_class = (
  187. provider_config_map[tracing_provider]["trace_instance"],
  188. provider_config_map[tracing_provider]["config_class"],
  189. )
  190. tracing_instance = trace_instance(config_class(**decrypt_trace_config))
  191. return tracing_instance
  192. return None
  193. @classmethod
  194. def get_app_config_through_message_id(cls, message_id: str):
  195. app_model_config = None
  196. message_data = db.session.query(Message).filter(Message.id == message_id).first()
  197. if not message_data:
  198. return None
  199. conversation_id = message_data.conversation_id
  200. conversation_data = db.session.query(Conversation).filter(Conversation.id == conversation_id).first()
  201. if not conversation_data:
  202. return None
  203. if conversation_data.app_model_config_id:
  204. app_model_config = (
  205. db.session.query(AppModelConfig)
  206. .filter(AppModelConfig.id == conversation_data.app_model_config_id)
  207. .first()
  208. )
  209. elif conversation_data.app_model_config_id is None and conversation_data.override_model_configs:
  210. app_model_config = conversation_data.override_model_configs
  211. return app_model_config
  212. @classmethod
  213. def update_app_tracing_config(cls, app_id: str, enabled: bool, tracing_provider: str):
  214. """
  215. Update app tracing config
  216. :param app_id: app id
  217. :param enabled: enabled
  218. :param tracing_provider: tracing provider
  219. :return:
  220. """
  221. # auth check
  222. if tracing_provider not in provider_config_map and tracing_provider is not None:
  223. raise ValueError(f"Invalid tracing provider: {tracing_provider}")
  224. app_config: Optional[App] = db.session.query(App).filter(App.id == app_id).first()
  225. if not app_config:
  226. raise ValueError("App not found")
  227. app_config.tracing = json.dumps(
  228. {
  229. "enabled": enabled,
  230. "tracing_provider": tracing_provider,
  231. }
  232. )
  233. db.session.commit()
  234. @classmethod
  235. def get_app_tracing_config(cls, app_id: str):
  236. """
  237. Get app tracing config
  238. :param app_id: app id
  239. :return:
  240. """
  241. app: Optional[App] = db.session.query(App).filter(App.id == app_id).first()
  242. if not app:
  243. raise ValueError("App not found")
  244. if not app.tracing:
  245. return {"enabled": False, "tracing_provider": None}
  246. app_trace_config = json.loads(app.tracing)
  247. return app_trace_config
  248. @staticmethod
  249. def check_trace_config_is_effective(tracing_config: dict, tracing_provider: str):
  250. """
  251. Check trace config is effective
  252. :param tracing_config: tracing config
  253. :param tracing_provider: tracing provider
  254. :return:
  255. """
  256. config_type, trace_instance = (
  257. provider_config_map[tracing_provider]["config_class"],
  258. provider_config_map[tracing_provider]["trace_instance"],
  259. )
  260. tracing_config = config_type(**tracing_config)
  261. return trace_instance(tracing_config).api_check()
  262. @staticmethod
  263. def get_trace_config_project_key(tracing_config: dict, tracing_provider: str):
  264. """
  265. get trace config is project key
  266. :param tracing_config: tracing config
  267. :param tracing_provider: tracing provider
  268. :return:
  269. """
  270. config_type, trace_instance = (
  271. provider_config_map[tracing_provider]["config_class"],
  272. provider_config_map[tracing_provider]["trace_instance"],
  273. )
  274. tracing_config = config_type(**tracing_config)
  275. return trace_instance(tracing_config).get_project_key()
  276. @staticmethod
  277. def get_trace_config_project_url(tracing_config: dict, tracing_provider: str):
  278. """
  279. get trace config is project key
  280. :param tracing_config: tracing config
  281. :param tracing_provider: tracing provider
  282. :return:
  283. """
  284. config_type, trace_instance = (
  285. provider_config_map[tracing_provider]["config_class"],
  286. provider_config_map[tracing_provider]["trace_instance"],
  287. )
  288. tracing_config = config_type(**tracing_config)
  289. return trace_instance(tracing_config).get_project_url()
  290. class TraceTask:
  291. def __init__(
  292. self,
  293. trace_type: Any,
  294. message_id: Optional[str] = None,
  295. workflow_run: Optional[WorkflowRun] = None,
  296. conversation_id: Optional[str] = None,
  297. user_id: Optional[str] = None,
  298. timer: Optional[Any] = None,
  299. **kwargs,
  300. ):
  301. self.trace_type = trace_type
  302. self.message_id = message_id
  303. self.workflow_run_id = workflow_run.id if workflow_run else None
  304. self.conversation_id = conversation_id
  305. self.user_id = user_id
  306. self.timer = timer
  307. self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
  308. self.app_id = None
  309. self.kwargs = kwargs
  310. def execute(self):
  311. return self.preprocess()
  312. def preprocess(self):
  313. preprocess_map = {
  314. TraceTaskName.CONVERSATION_TRACE: lambda: self.conversation_trace(**self.kwargs),
  315. TraceTaskName.WORKFLOW_TRACE: lambda: self.workflow_trace(
  316. workflow_run_id=self.workflow_run_id, conversation_id=self.conversation_id, user_id=self.user_id
  317. ),
  318. TraceTaskName.MESSAGE_TRACE: lambda: self.message_trace(message_id=self.message_id),
  319. TraceTaskName.MODERATION_TRACE: lambda: self.moderation_trace(
  320. message_id=self.message_id, timer=self.timer, **self.kwargs
  321. ),
  322. TraceTaskName.SUGGESTED_QUESTION_TRACE: lambda: self.suggested_question_trace(
  323. message_id=self.message_id, timer=self.timer, **self.kwargs
  324. ),
  325. TraceTaskName.DATASET_RETRIEVAL_TRACE: lambda: self.dataset_retrieval_trace(
  326. message_id=self.message_id, timer=self.timer, **self.kwargs
  327. ),
  328. TraceTaskName.TOOL_TRACE: lambda: self.tool_trace(
  329. message_id=self.message_id, timer=self.timer, **self.kwargs
  330. ),
  331. TraceTaskName.GENERATE_NAME_TRACE: lambda: self.generate_name_trace(
  332. conversation_id=self.conversation_id, timer=self.timer, **self.kwargs
  333. ),
  334. }
  335. return preprocess_map.get(self.trace_type, lambda: None)()
  336. # process methods for different trace types
  337. def conversation_trace(self, **kwargs):
  338. return kwargs
  339. def workflow_trace(
  340. self,
  341. *,
  342. workflow_run_id: str | None,
  343. conversation_id: str | None,
  344. user_id: str | None,
  345. ):
  346. if not workflow_run_id:
  347. return {}
  348. with Session(db.engine) as session:
  349. workflow_run_stmt = select(WorkflowRun).where(WorkflowRun.id == workflow_run_id)
  350. workflow_run = session.scalars(workflow_run_stmt).first()
  351. if not workflow_run:
  352. raise ValueError("Workflow run not found")
  353. workflow_id = workflow_run.workflow_id
  354. tenant_id = workflow_run.tenant_id
  355. workflow_run_id = workflow_run.id
  356. workflow_run_elapsed_time = workflow_run.elapsed_time
  357. workflow_run_status = workflow_run.status
  358. workflow_run_inputs = workflow_run.inputs_dict
  359. workflow_run_outputs = workflow_run.outputs_dict
  360. workflow_run_version = workflow_run.version
  361. error = workflow_run.error or ""
  362. total_tokens = workflow_run.total_tokens
  363. file_list = workflow_run_inputs.get("sys.file") or []
  364. query = workflow_run_inputs.get("query") or workflow_run_inputs.get("sys.query") or ""
  365. # get workflow_app_log_id
  366. workflow_app_log_data_stmt = select(WorkflowAppLog.id).where(
  367. WorkflowAppLog.tenant_id == tenant_id,
  368. WorkflowAppLog.app_id == workflow_run.app_id,
  369. WorkflowAppLog.workflow_run_id == workflow_run.id,
  370. )
  371. workflow_app_log_id = session.scalar(workflow_app_log_data_stmt)
  372. # get message_id
  373. message_id = None
  374. if conversation_id:
  375. message_data_stmt = select(Message.id).where(
  376. Message.conversation_id == conversation_id,
  377. Message.workflow_run_id == workflow_run_id,
  378. )
  379. message_id = session.scalar(message_data_stmt)
  380. metadata = {
  381. "workflow_id": workflow_id,
  382. "conversation_id": conversation_id,
  383. "workflow_run_id": workflow_run_id,
  384. "tenant_id": tenant_id,
  385. "elapsed_time": workflow_run_elapsed_time,
  386. "status": workflow_run_status,
  387. "version": workflow_run_version,
  388. "total_tokens": total_tokens,
  389. "file_list": file_list,
  390. "triggered_form": workflow_run.triggered_from,
  391. "user_id": user_id,
  392. }
  393. workflow_trace_info = WorkflowTraceInfo(
  394. workflow_data=workflow_run.to_dict(),
  395. conversation_id=conversation_id,
  396. workflow_id=workflow_id,
  397. tenant_id=tenant_id,
  398. workflow_run_id=workflow_run_id,
  399. workflow_run_elapsed_time=workflow_run_elapsed_time,
  400. workflow_run_status=workflow_run_status,
  401. workflow_run_inputs=workflow_run_inputs,
  402. workflow_run_outputs=workflow_run_outputs,
  403. workflow_run_version=workflow_run_version,
  404. error=error,
  405. total_tokens=total_tokens,
  406. file_list=file_list,
  407. query=query,
  408. metadata=metadata,
  409. workflow_app_log_id=workflow_app_log_id,
  410. message_id=message_id,
  411. start_time=workflow_run.created_at,
  412. end_time=workflow_run.finished_at,
  413. )
  414. return workflow_trace_info
  415. def message_trace(self, message_id: str | None):
  416. if not message_id:
  417. return {}
  418. message_data = get_message_data(message_id)
  419. if not message_data:
  420. return {}
  421. conversation_mode_stmt = select(Conversation.mode).where(Conversation.id == message_data.conversation_id)
  422. conversation_mode = db.session.scalars(conversation_mode_stmt).all()
  423. if not conversation_mode or len(conversation_mode) == 0:
  424. return {}
  425. conversation_mode = conversation_mode[0]
  426. created_at = message_data.created_at
  427. inputs = message_data.message
  428. # get message file data
  429. message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
  430. file_list = []
  431. if message_file_data and message_file_data.url is not None:
  432. file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else ""
  433. file_list.append(file_url)
  434. metadata = {
  435. "conversation_id": message_data.conversation_id,
  436. "ls_provider": message_data.model_provider,
  437. "ls_model_name": message_data.model_id,
  438. "status": message_data.status,
  439. "from_end_user_id": message_data.from_end_user_id,
  440. "from_account_id": message_data.from_account_id,
  441. "agent_based": message_data.agent_based,
  442. "workflow_run_id": message_data.workflow_run_id,
  443. "from_source": message_data.from_source,
  444. "message_id": message_id,
  445. }
  446. message_tokens = message_data.message_tokens
  447. message_trace_info = MessageTraceInfo(
  448. message_id=message_id,
  449. message_data=message_data.to_dict(),
  450. conversation_model=conversation_mode,
  451. message_tokens=message_tokens,
  452. answer_tokens=message_data.answer_tokens,
  453. total_tokens=message_tokens + message_data.answer_tokens,
  454. error=message_data.error or "",
  455. inputs=inputs,
  456. outputs=message_data.answer,
  457. file_list=file_list,
  458. start_time=created_at,
  459. end_time=created_at + timedelta(seconds=message_data.provider_response_latency),
  460. metadata=metadata,
  461. message_file_data=message_file_data,
  462. conversation_mode=conversation_mode,
  463. )
  464. return message_trace_info
  465. def moderation_trace(self, message_id, timer, **kwargs):
  466. moderation_result = kwargs.get("moderation_result")
  467. if not moderation_result:
  468. return {}
  469. inputs = kwargs.get("inputs")
  470. message_data = get_message_data(message_id)
  471. if not message_data:
  472. return {}
  473. metadata = {
  474. "message_id": message_id,
  475. "action": moderation_result.action,
  476. "preset_response": moderation_result.preset_response,
  477. "query": moderation_result.query,
  478. }
  479. # get workflow_app_log_id
  480. workflow_app_log_id = None
  481. if message_data.workflow_run_id:
  482. workflow_app_log_data = (
  483. db.session.query(WorkflowAppLog).filter_by(workflow_run_id=message_data.workflow_run_id).first()
  484. )
  485. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  486. moderation_trace_info = ModerationTraceInfo(
  487. message_id=workflow_app_log_id or message_id,
  488. inputs=inputs,
  489. message_data=message_data.to_dict(),
  490. flagged=moderation_result.flagged,
  491. action=moderation_result.action,
  492. preset_response=moderation_result.preset_response,
  493. query=moderation_result.query,
  494. start_time=timer.get("start"),
  495. end_time=timer.get("end"),
  496. metadata=metadata,
  497. )
  498. return moderation_trace_info
  499. def suggested_question_trace(self, message_id, timer, **kwargs):
  500. suggested_question = kwargs.get("suggested_question", [])
  501. message_data = get_message_data(message_id)
  502. if not message_data:
  503. return {}
  504. metadata = {
  505. "message_id": message_id,
  506. "ls_provider": message_data.model_provider,
  507. "ls_model_name": message_data.model_id,
  508. "status": message_data.status,
  509. "from_end_user_id": message_data.from_end_user_id,
  510. "from_account_id": message_data.from_account_id,
  511. "agent_based": message_data.agent_based,
  512. "workflow_run_id": message_data.workflow_run_id,
  513. "from_source": message_data.from_source,
  514. }
  515. # get workflow_app_log_id
  516. workflow_app_log_id = None
  517. if message_data.workflow_run_id:
  518. workflow_app_log_data = (
  519. db.session.query(WorkflowAppLog).filter_by(workflow_run_id=message_data.workflow_run_id).first()
  520. )
  521. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  522. suggested_question_trace_info = SuggestedQuestionTraceInfo(
  523. message_id=workflow_app_log_id or message_id,
  524. message_data=message_data.to_dict(),
  525. inputs=message_data.message,
  526. outputs=message_data.answer,
  527. start_time=timer.get("start"),
  528. end_time=timer.get("end"),
  529. metadata=metadata,
  530. total_tokens=message_data.message_tokens + message_data.answer_tokens,
  531. status=message_data.status,
  532. error=message_data.error,
  533. from_account_id=message_data.from_account_id,
  534. agent_based=message_data.agent_based,
  535. from_source=message_data.from_source,
  536. model_provider=message_data.model_provider,
  537. model_id=message_data.model_id,
  538. suggested_question=suggested_question,
  539. level=message_data.status,
  540. status_message=message_data.error,
  541. )
  542. return suggested_question_trace_info
  543. def dataset_retrieval_trace(self, message_id, timer, **kwargs):
  544. documents = kwargs.get("documents")
  545. message_data = get_message_data(message_id)
  546. if not message_data:
  547. return {}
  548. metadata = {
  549. "message_id": message_id,
  550. "ls_provider": message_data.model_provider,
  551. "ls_model_name": message_data.model_id,
  552. "status": message_data.status,
  553. "from_end_user_id": message_data.from_end_user_id,
  554. "from_account_id": message_data.from_account_id,
  555. "agent_based": message_data.agent_based,
  556. "workflow_run_id": message_data.workflow_run_id,
  557. "from_source": message_data.from_source,
  558. }
  559. dataset_retrieval_trace_info = DatasetRetrievalTraceInfo(
  560. message_id=message_id,
  561. inputs=message_data.query or message_data.inputs,
  562. documents=[doc.model_dump() for doc in documents] if documents else [],
  563. start_time=timer.get("start"),
  564. end_time=timer.get("end"),
  565. metadata=metadata,
  566. message_data=message_data.to_dict(),
  567. )
  568. return dataset_retrieval_trace_info
  569. def tool_trace(self, message_id, timer, **kwargs):
  570. tool_name = kwargs.get("tool_name", "")
  571. tool_inputs = kwargs.get("tool_inputs", {})
  572. tool_outputs = kwargs.get("tool_outputs", {})
  573. message_data = get_message_data(message_id)
  574. if not message_data:
  575. return {}
  576. tool_config = {}
  577. time_cost = 0
  578. error = None
  579. tool_parameters = {}
  580. created_time = message_data.created_at
  581. end_time = message_data.updated_at
  582. agent_thoughts = message_data.agent_thoughts
  583. for agent_thought in agent_thoughts:
  584. if tool_name in agent_thought.tools:
  585. created_time = agent_thought.created_at
  586. tool_meta_data = agent_thought.tool_meta.get(tool_name, {})
  587. tool_config = tool_meta_data.get("tool_config", {})
  588. time_cost = tool_meta_data.get("time_cost", 0)
  589. end_time = created_time + timedelta(seconds=time_cost)
  590. error = tool_meta_data.get("error", "")
  591. tool_parameters = tool_meta_data.get("tool_parameters", {})
  592. metadata = {
  593. "message_id": message_id,
  594. "tool_name": tool_name,
  595. "tool_inputs": tool_inputs,
  596. "tool_outputs": tool_outputs,
  597. "tool_config": tool_config,
  598. "time_cost": time_cost,
  599. "error": error,
  600. "tool_parameters": tool_parameters,
  601. }
  602. file_url = ""
  603. message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
  604. if message_file_data:
  605. message_file_id = message_file_data.id if message_file_data else None
  606. type = message_file_data.type
  607. created_by_role = message_file_data.created_by_role
  608. created_user_id = message_file_data.created_by
  609. file_url = f"{self.file_base_url}/{message_file_data.url}"
  610. metadata.update(
  611. {
  612. "message_file_id": message_file_id,
  613. "created_by_role": created_by_role,
  614. "created_user_id": created_user_id,
  615. "type": type,
  616. }
  617. )
  618. tool_trace_info = ToolTraceInfo(
  619. message_id=message_id,
  620. message_data=message_data.to_dict(),
  621. tool_name=tool_name,
  622. start_time=timer.get("start") if timer else created_time,
  623. end_time=timer.get("end") if timer else end_time,
  624. tool_inputs=tool_inputs,
  625. tool_outputs=tool_outputs,
  626. metadata=metadata,
  627. message_file_data=message_file_data,
  628. error=error,
  629. inputs=message_data.message,
  630. outputs=message_data.answer,
  631. tool_config=tool_config,
  632. time_cost=time_cost,
  633. tool_parameters=tool_parameters,
  634. file_url=file_url,
  635. )
  636. return tool_trace_info
  637. def generate_name_trace(self, conversation_id, timer, **kwargs):
  638. generate_conversation_name = kwargs.get("generate_conversation_name")
  639. inputs = kwargs.get("inputs")
  640. tenant_id = kwargs.get("tenant_id")
  641. if not tenant_id:
  642. return {}
  643. start_time = timer.get("start")
  644. end_time = timer.get("end")
  645. metadata = {
  646. "conversation_id": conversation_id,
  647. "tenant_id": tenant_id,
  648. }
  649. generate_name_trace_info = GenerateNameTraceInfo(
  650. conversation_id=conversation_id,
  651. inputs=inputs,
  652. outputs=generate_conversation_name,
  653. start_time=start_time,
  654. end_time=end_time,
  655. metadata=metadata,
  656. tenant_id=tenant_id,
  657. )
  658. return generate_name_trace_info
  659. trace_manager_timer: Optional[threading.Timer] = None
  660. trace_manager_queue: queue.Queue = queue.Queue()
  661. trace_manager_interval = int(os.getenv("TRACE_QUEUE_MANAGER_INTERVAL", 5))
  662. trace_manager_batch_size = int(os.getenv("TRACE_QUEUE_MANAGER_BATCH_SIZE", 100))
  663. class TraceQueueManager:
  664. def __init__(self, app_id=None, user_id=None):
  665. global trace_manager_timer
  666. self.app_id = app_id
  667. self.user_id = user_id
  668. self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
  669. self.flask_app = current_app._get_current_object() # type: ignore
  670. if trace_manager_timer is None:
  671. self.start_timer()
  672. def add_trace_task(self, trace_task: TraceTask):
  673. global trace_manager_timer, trace_manager_queue
  674. try:
  675. if self.trace_instance:
  676. trace_task.app_id = self.app_id
  677. trace_manager_queue.put(trace_task)
  678. except Exception as e:
  679. logging.exception(f"Error adding trace task, trace_type {trace_task.trace_type}")
  680. finally:
  681. self.start_timer()
  682. def collect_tasks(self):
  683. global trace_manager_queue
  684. tasks: list[TraceTask] = []
  685. while len(tasks) < trace_manager_batch_size and not trace_manager_queue.empty():
  686. task = trace_manager_queue.get_nowait()
  687. tasks.append(task)
  688. trace_manager_queue.task_done()
  689. return tasks
  690. def run(self):
  691. try:
  692. tasks = self.collect_tasks()
  693. if tasks:
  694. self.send_to_celery(tasks)
  695. except Exception as e:
  696. logging.exception("Error processing trace tasks")
  697. def start_timer(self):
  698. global trace_manager_timer
  699. if trace_manager_timer is None or not trace_manager_timer.is_alive():
  700. trace_manager_timer = threading.Timer(trace_manager_interval, self.run)
  701. trace_manager_timer.name = f"trace_manager_timer_{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}"
  702. trace_manager_timer.daemon = False
  703. trace_manager_timer.start()
  704. def send_to_celery(self, tasks: list[TraceTask]):
  705. with self.flask_app.app_context():
  706. for task in tasks:
  707. if task.app_id is None:
  708. continue
  709. file_id = uuid4().hex
  710. trace_info = task.execute()
  711. task_data = TaskData(
  712. app_id=task.app_id,
  713. trace_info_type=type(trace_info).__name__,
  714. trace_info=trace_info.model_dump() if trace_info else None,
  715. )
  716. file_path = f"{OPS_FILE_PATH}{task.app_id}/{file_id}.json"
  717. storage.save(file_path, task_data.model_dump_json().encode("utf-8"))
  718. file_info = {
  719. "file_id": file_id,
  720. "app_id": task.app_id,
  721. }
  722. process_trace_tasks.delay(file_info)