Browse Source

Feat/add thread control (#675)

Jyong 1 year ago
parent
commit
9eaae770a6
1 changed files with 13 additions and 19 deletions
  1. 13 19
      api/core/indexing_runner.py

+ 13 - 19
api/core/indexing_runner.py

@@ -1,4 +1,3 @@
-import asyncio
 import concurrent
 import datetime
 import json
@@ -8,25 +7,17 @@ import threading
 import time
 import uuid
 from concurrent.futures import ThreadPoolExecutor
-from multiprocessing import Process
 from typing import Optional, List, cast
 
-import openai
-from billiard.pool import Pool
-from flask import current_app, Flask
 from flask_login import current_user
-from langchain.embeddings import OpenAIEmbeddings
 from langchain.schema import Document
 from langchain.text_splitter import RecursiveCharacterTextSplitter, TextSplitter
 
 from core.data_loader.file_extractor import FileExtractor
 from core.data_loader.loader.notion import NotionLoader
 from core.docstore.dataset_docstore import DatesetDocumentStore
-from core.embedding.cached_embedding import CacheEmbedding
 from core.generator.llm_generator import LLMGenerator
 from core.index.index import IndexBuilder
-from core.index.keyword_table_index.keyword_table_index import KeywordTableIndex, KeywordTableConfig
-from core.index.vector_index.vector_index import VectorIndex
 from core.llm.error import ProviderTokenNotInitError
 from core.llm.llm_builder import LLMBuilder
 from core.llm.streamable_open_ai import StreamableOpenAI
@@ -516,20 +507,23 @@ class IndexingRunner:
                 model_name='gpt-3.5-turbo',
                 max_tokens=2000
             )
-            threads = []
-            for doc in documents:
-                document_format_thread = threading.Thread(target=self.format_document, kwargs={
-                    'llm': llm, 'document_node': doc, 'split_documents': split_documents, 'document_form': document_form})
-                threads.append(document_format_thread)
-                document_format_thread.start()
-            for thread in threads:
-                thread.join()
+            for i in range(0, len(documents), 10):
+                threads = []
+                sub_documents = documents[i:i + 10]
+                for doc in sub_documents:
+                    document_format_thread = threading.Thread(target=self.format_document, kwargs={
+                        'llm': llm, 'document_node': doc, 'split_documents': split_documents,
+                        'document_form': document_form})
+                    threads.append(document_format_thread)
+                    document_format_thread.start()
+                for thread in threads:
+                    thread.join()
+
             all_documents.extend(split_documents)
 
         return all_documents
 
-    def format_document(self, llm: StreamableOpenAI, document_node, split_documents: List, document_form: str):
-        print(document_node.page_content)
+    def format_document(self, llm: StreamableOpenAI, document_node, split_documents, document_form: str):
         format_documents = []
         if document_node.page_content is None or not document_node.page_content.strip():
             return format_documents