metadata_service.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. import copy
  2. import datetime
  3. import logging
  4. from typing import Optional
  5. from flask_login import current_user # type: ignore
  6. from core.rag.index_processor.constant.built_in_field import BuiltInField, MetadataDataSource
  7. from extensions.ext_database import db
  8. from extensions.ext_redis import redis_client
  9. from models.dataset import Dataset, DatasetMetadata, DatasetMetadataBinding
  10. from services.dataset_service import DocumentService
  11. from services.entities.knowledge_entities.knowledge_entities import (
  12. MetadataArgs,
  13. MetadataOperationData,
  14. )
  15. class MetadataService:
  16. @staticmethod
  17. def create_metadata(dataset_id: str, metadata_args: MetadataArgs) -> DatasetMetadata:
  18. # check if metadata name already exists
  19. if DatasetMetadata.query.filter_by(
  20. tenant_id=current_user.current_tenant_id, dataset_id=dataset_id, name=metadata_args.name
  21. ).first():
  22. raise ValueError("Metadata name already exists.")
  23. for field in BuiltInField:
  24. if field.value == metadata_args.name:
  25. raise ValueError("Metadata name already exists in Built-in fields.")
  26. metadata = DatasetMetadata(
  27. tenant_id=current_user.current_tenant_id,
  28. dataset_id=dataset_id,
  29. type=metadata_args.type,
  30. name=metadata_args.name,
  31. created_by=current_user.id,
  32. )
  33. db.session.add(metadata)
  34. db.session.commit()
  35. return metadata
  36. @staticmethod
  37. def update_metadata_name(dataset_id: str, metadata_id: str, name: str) -> DatasetMetadata: # type: ignore
  38. lock_key = f"dataset_metadata_lock_{dataset_id}"
  39. # check if metadata name already exists
  40. if DatasetMetadata.query.filter_by(
  41. tenant_id=current_user.current_tenant_id, dataset_id=dataset_id, name=name
  42. ).first():
  43. raise ValueError("Metadata name already exists.")
  44. for field in BuiltInField:
  45. if field.value == name:
  46. raise ValueError("Metadata name already exists in Built-in fields.")
  47. try:
  48. MetadataService.knowledge_base_metadata_lock_check(dataset_id, None)
  49. metadata = DatasetMetadata.query.filter_by(id=metadata_id).first()
  50. if metadata is None:
  51. raise ValueError("Metadata not found.")
  52. old_name = metadata.name
  53. metadata.name = name
  54. metadata.updated_by = current_user.id
  55. metadata.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
  56. # update related documents
  57. dataset_metadata_bindings = DatasetMetadataBinding.query.filter_by(metadata_id=metadata_id).all()
  58. if dataset_metadata_bindings:
  59. document_ids = [binding.document_id for binding in dataset_metadata_bindings]
  60. documents = DocumentService.get_document_by_ids(document_ids)
  61. for document in documents:
  62. doc_metadata = copy.deepcopy(document.doc_metadata)
  63. value = doc_metadata.pop(old_name, None)
  64. doc_metadata[name] = value
  65. document.doc_metadata = doc_metadata
  66. db.session.add(document)
  67. db.session.commit()
  68. return metadata # type: ignore
  69. except Exception:
  70. logging.exception("Update metadata name failed")
  71. finally:
  72. redis_client.delete(lock_key)
  73. @staticmethod
  74. def delete_metadata(dataset_id: str, metadata_id: str):
  75. lock_key = f"dataset_metadata_lock_{dataset_id}"
  76. try:
  77. MetadataService.knowledge_base_metadata_lock_check(dataset_id, None)
  78. metadata = DatasetMetadata.query.filter_by(id=metadata_id).first()
  79. if metadata is None:
  80. raise ValueError("Metadata not found.")
  81. db.session.delete(metadata)
  82. # deal related documents
  83. dataset_metadata_bindings = DatasetMetadataBinding.query.filter_by(metadata_id=metadata_id).all()
  84. if dataset_metadata_bindings:
  85. document_ids = [binding.document_id for binding in dataset_metadata_bindings]
  86. documents = DocumentService.get_document_by_ids(document_ids)
  87. for document in documents:
  88. doc_metadata = copy.deepcopy(document.doc_metadata)
  89. doc_metadata.pop(metadata.name, None)
  90. document.doc_metadata = doc_metadata
  91. db.session.add(document)
  92. db.session.commit()
  93. return metadata
  94. except Exception:
  95. logging.exception("Delete metadata failed")
  96. finally:
  97. redis_client.delete(lock_key)
  98. @staticmethod
  99. def get_built_in_fields():
  100. return [
  101. {"name": BuiltInField.document_name.value, "type": "string"},
  102. {"name": BuiltInField.uploader.value, "type": "string"},
  103. {"name": BuiltInField.upload_date.value, "type": "time"},
  104. {"name": BuiltInField.last_update_date.value, "type": "time"},
  105. {"name": BuiltInField.source.value, "type": "string"},
  106. ]
  107. @staticmethod
  108. def enable_built_in_field(dataset: Dataset):
  109. if dataset.built_in_field_enabled:
  110. return
  111. lock_key = f"dataset_metadata_lock_{dataset.id}"
  112. try:
  113. MetadataService.knowledge_base_metadata_lock_check(dataset.id, None)
  114. dataset.built_in_field_enabled = True
  115. db.session.add(dataset)
  116. documents = DocumentService.get_working_documents_by_dataset_id(dataset.id)
  117. if documents:
  118. for document in documents:
  119. if not document.doc_metadata:
  120. doc_metadata = {}
  121. else:
  122. doc_metadata = copy.deepcopy(document.doc_metadata)
  123. doc_metadata[BuiltInField.document_name.value] = document.name
  124. doc_metadata[BuiltInField.uploader.value] = document.uploader
  125. doc_metadata[BuiltInField.upload_date.value] = document.upload_date.timestamp()
  126. doc_metadata[BuiltInField.last_update_date.value] = document.last_update_date.timestamp()
  127. doc_metadata[BuiltInField.source.value] = MetadataDataSource[document.data_source_type].value
  128. document.doc_metadata = doc_metadata
  129. db.session.add(document)
  130. db.session.commit()
  131. except Exception:
  132. logging.exception("Enable built-in field failed")
  133. finally:
  134. redis_client.delete(lock_key)
  135. @staticmethod
  136. def disable_built_in_field(dataset: Dataset):
  137. if not dataset.built_in_field_enabled:
  138. return
  139. lock_key = f"dataset_metadata_lock_{dataset.id}"
  140. try:
  141. MetadataService.knowledge_base_metadata_lock_check(dataset.id, None)
  142. dataset.built_in_field_enabled = False
  143. db.session.add(dataset)
  144. documents = DocumentService.get_working_documents_by_dataset_id(dataset.id)
  145. document_ids = []
  146. if documents:
  147. for document in documents:
  148. doc_metadata = copy.deepcopy(document.doc_metadata)
  149. doc_metadata.pop(BuiltInField.document_name.value, None)
  150. doc_metadata.pop(BuiltInField.uploader.value, None)
  151. doc_metadata.pop(BuiltInField.upload_date.value, None)
  152. doc_metadata.pop(BuiltInField.last_update_date.value, None)
  153. doc_metadata.pop(BuiltInField.source.value, None)
  154. document.doc_metadata = doc_metadata
  155. db.session.add(document)
  156. document_ids.append(document.id)
  157. db.session.commit()
  158. except Exception:
  159. logging.exception("Disable built-in field failed")
  160. finally:
  161. redis_client.delete(lock_key)
  162. @staticmethod
  163. def update_documents_metadata(dataset: Dataset, metadata_args: MetadataOperationData):
  164. for operation in metadata_args.operation_data:
  165. lock_key = f"document_metadata_lock_{operation.document_id}"
  166. try:
  167. MetadataService.knowledge_base_metadata_lock_check(None, operation.document_id)
  168. document = DocumentService.get_document(dataset.id, operation.document_id)
  169. if document is None:
  170. raise ValueError("Document not found.")
  171. doc_metadata = {}
  172. for metadata_value in operation.metadata_list:
  173. doc_metadata[metadata_value.name] = metadata_value.value
  174. if dataset.built_in_field_enabled:
  175. doc_metadata[BuiltInField.document_name.value] = document.name
  176. doc_metadata[BuiltInField.uploader.value] = document.uploader
  177. doc_metadata[BuiltInField.upload_date.value] = document.upload_date.timestamp()
  178. doc_metadata[BuiltInField.last_update_date.value] = document.last_update_date.timestamp()
  179. doc_metadata[BuiltInField.source.value] = MetadataDataSource[document.data_source_type].value
  180. document.doc_metadata = doc_metadata
  181. db.session.add(document)
  182. db.session.commit()
  183. # deal metadata binding
  184. DatasetMetadataBinding.query.filter_by(document_id=operation.document_id).delete()
  185. for metadata_value in operation.metadata_list:
  186. dataset_metadata_binding = DatasetMetadataBinding(
  187. tenant_id=current_user.current_tenant_id,
  188. dataset_id=dataset.id,
  189. document_id=operation.document_id,
  190. metadata_id=metadata_value.id,
  191. created_by=current_user.id,
  192. )
  193. db.session.add(dataset_metadata_binding)
  194. db.session.commit()
  195. except Exception:
  196. logging.exception("Update documents metadata failed")
  197. finally:
  198. redis_client.delete(lock_key)
  199. @staticmethod
  200. def knowledge_base_metadata_lock_check(dataset_id: Optional[str], document_id: Optional[str]):
  201. if dataset_id:
  202. lock_key = f"dataset_metadata_lock_{dataset_id}"
  203. if redis_client.get(lock_key):
  204. raise ValueError("Another knowledge base metadata operation is running, please wait a moment.")
  205. redis_client.set(lock_key, 1, ex=3600)
  206. if document_id:
  207. lock_key = f"document_metadata_lock_{document_id}"
  208. if redis_client.get(lock_key):
  209. raise ValueError("Another document metadata operation is running, please wait a moment.")
  210. redis_client.set(lock_key, 1, ex=3600)
  211. @staticmethod
  212. def get_dataset_metadatas(dataset: Dataset):
  213. return {
  214. "doc_metadata": [
  215. {
  216. "id": item.get("id"),
  217. "name": item.get("name"),
  218. "type": item.get("type"),
  219. "count": DatasetMetadataBinding.query.filter_by(
  220. metadata_id=item.get("id"), dataset_id=dataset.id
  221. ).count(),
  222. }
  223. for item in dataset.doc_metadata or []
  224. if item.get("id") != "built-in"
  225. ],
  226. "built_in_field_enabled": dataset.built_in_field_enabled,
  227. }