浏览代码

update dataset embedding model, update document status to be indexing (#7145)

Jyong 8 月之前
父节点
当前提交
c6b0dc6a29
共有 1 个文件被更改,包括 70 次插入41 次删除
  1. 70 41
      api/tasks/deal_dataset_vector_index_task.py

+ 70 - 41
api/tasks/deal_dataset_vector_index_task.py

@@ -42,31 +42,42 @@ def deal_dataset_vector_index_task(dataset_id: str, action: str):
             ).all()
 
             if dataset_documents:
-                documents = []
-                for dataset_document in dataset_documents:
-                    # delete from vector index
-                    segments = db.session.query(DocumentSegment).filter(
-                        DocumentSegment.document_id == dataset_document.id,
-                        DocumentSegment.enabled == True
-                    ) .order_by(DocumentSegment.position.asc()).all()
-                    for segment in segments:
-                        document = Document(
-                            page_content=segment.content,
-                            metadata={
-                                "doc_id": segment.index_node_id,
-                                "doc_hash": segment.index_node_hash,
-                                "document_id": segment.document_id,
-                                "dataset_id": segment.dataset_id,
-                            }
-                        )
+                dataset_documents_ids = [doc.id for doc in dataset_documents]
+                db.session.query(DatasetDocument).filter(DatasetDocument.id.in_(dataset_documents_ids)) \
+                    .update({"indexing_status": "indexing"}, synchronize_session=False)
+                db.session.commit()
 
-                        documents.append(document)
+                for dataset_document in dataset_documents:
+                    try:
+                        # add from vector index
+                        segments = db.session.query(DocumentSegment).filter(
+                            DocumentSegment.document_id == dataset_document.id,
+                            DocumentSegment.enabled == True
+                        ) .order_by(DocumentSegment.position.asc()).all()
+                        if segments:
+                            documents = []
+                            for segment in segments:
+                                document = Document(
+                                    page_content=segment.content,
+                                    metadata={
+                                        "doc_id": segment.index_node_id,
+                                        "doc_hash": segment.index_node_hash,
+                                        "document_id": segment.document_id,
+                                        "dataset_id": segment.dataset_id,
+                                    }
+                                )
 
-                # save vector index
-                index_processor.load(dataset, documents, with_keywords=False)
+                                documents.append(document)
+                            # save vector index
+                            index_processor.load(dataset, documents, with_keywords=False)
+                        db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \
+                            .update({"indexing_status": "completed"}, synchronize_session=False)
+                        db.session.commit()
+                    except Exception as e:
+                        db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \
+                            .update({"indexing_status": "error", "error": str(e)}, synchronize_session=False)
+                        db.session.commit()
         elif action == 'update':
-            # clean index
-            index_processor.clean(dataset, None, with_keywords=False)
             dataset_documents = db.session.query(DatasetDocument).filter(
                 DatasetDocument.dataset_id == dataset_id,
                 DatasetDocument.indexing_status == 'completed',
@@ -75,28 +86,46 @@ def deal_dataset_vector_index_task(dataset_id: str, action: str):
             ).all()
             # add new index
             if dataset_documents:
-                documents = []
+                # update document status
+                dataset_documents_ids = [doc.id for doc in dataset_documents]
+                db.session.query(DatasetDocument).filter(DatasetDocument.id.in_(dataset_documents_ids)) \
+                    .update({"indexing_status": "indexing"}, synchronize_session=False)
+                db.session.commit()
+
+                # clean index
+                index_processor.clean(dataset, None, with_keywords=False)
+
                 for dataset_document in dataset_documents:
-                    # delete from vector index
-                    segments = db.session.query(DocumentSegment).filter(
-                        DocumentSegment.document_id == dataset_document.id,
-                        DocumentSegment.enabled == True
-                    ).order_by(DocumentSegment.position.asc()).all()
-                    for segment in segments:
-                        document = Document(
-                            page_content=segment.content,
-                            metadata={
-                                "doc_id": segment.index_node_id,
-                                "doc_hash": segment.index_node_hash,
-                                "document_id": segment.document_id,
-                                "dataset_id": segment.dataset_id,
-                            }
-                        )
+                    # update from vector index
+                    try:
+                        segments = db.session.query(DocumentSegment).filter(
+                            DocumentSegment.document_id == dataset_document.id,
+                            DocumentSegment.enabled == True
+                        ).order_by(DocumentSegment.position.asc()).all()
+                        if segments:
+                            documents = []
+                            for segment in segments:
+                                document = Document(
+                                    page_content=segment.content,
+                                    metadata={
+                                        "doc_id": segment.index_node_id,
+                                        "doc_hash": segment.index_node_hash,
+                                        "document_id": segment.document_id,
+                                        "dataset_id": segment.dataset_id,
+                                    }
+                                )
 
-                        documents.append(document)
+                                documents.append(document)
+                            # save vector index
+                            index_processor.load(dataset, documents, with_keywords=False)
+                        db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \
+                            .update({"indexing_status": "completed"}, synchronize_session=False)
+                        db.session.commit()
+                    except Exception as e:
+                        db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \
+                            .update({"indexing_status": "error", "error": str(e)}, synchronize_session=False)
+                        db.session.commit()
 
-                # save vector index
-                index_processor.load(dataset, documents, with_keywords=False)
 
         end_at = time.perf_counter()
         logging.info(