Bladeren bron

fix: same chunk insert deadlock (#12502)

Co-authored-by: huangzhuo <huangzhuo1@xiaomi.com>
huangzhuo1949 3 maanden geleden
bovenliggende
commit
e84bf35e2a
1 gewijzigde bestanden met toevoegingen van 14 en 4 verwijderingen
  1. 14 4
      api/core/indexing_runner.py

+ 14 - 4
api/core/indexing_runner.py

@@ -530,7 +530,6 @@ class IndexingRunner:
         # chunk nodes by chunk size
         indexing_start_at = time.perf_counter()
         tokens = 0
-        chunk_size = 10
         if dataset_document.doc_form != IndexType.PARENT_CHILD_INDEX:
             # create keyword index
             create_keyword_thread = threading.Thread(
@@ -539,11 +538,22 @@ class IndexingRunner:
             )
             create_keyword_thread.start()
 
+        max_workers = 10
         if dataset.indexing_technique == "high_quality":
-            with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
+            with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                 futures = []
-                for i in range(0, len(documents), chunk_size):
-                    chunk_documents = documents[i : i + chunk_size]
+
+                # Distribute documents into multiple groups based on the hash values of page_content
+                # This is done to prevent multiple threads from processing the same document,
+                # Thereby avoiding potential database insertion deadlocks
+                document_groups: list[list[Document]] = [[] for _ in range(max_workers)]
+                for document in documents:
+                    hash = helper.generate_text_hash(document.page_content)
+                    group_index = int(hash, 16) % max_workers
+                    document_groups[group_index].append(document)
+                for chunk_documents in document_groups:
+                    if len(chunk_documents) == 0:
+                        continue
                     futures.append(
                         executor.submit(
                             self._process_chunk,