indexing_runner.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720
  1. import asyncio
  2. import concurrent
  3. import datetime
  4. import json
  5. import logging
  6. import re
  7. import threading
  8. import time
  9. import uuid
  10. from multiprocessing import Process
  11. from typing import Optional, List, cast
  12. import openai
  13. from billiard.pool import Pool
  14. from flask import current_app, Flask
  15. from flask_login import current_user
  16. from gevent.threadpool import ThreadPoolExecutor
  17. from langchain.embeddings import OpenAIEmbeddings
  18. from langchain.schema import Document
  19. from langchain.text_splitter import RecursiveCharacterTextSplitter, TextSplitter
  20. from core.data_loader.file_extractor import FileExtractor
  21. from core.data_loader.loader.notion import NotionLoader
  22. from core.docstore.dataset_docstore import DatesetDocumentStore
  23. from core.embedding.cached_embedding import CacheEmbedding
  24. from core.generator.llm_generator import LLMGenerator
  25. from core.index.index import IndexBuilder
  26. from core.index.keyword_table_index.keyword_table_index import KeywordTableIndex, KeywordTableConfig
  27. from core.index.vector_index.vector_index import VectorIndex
  28. from core.llm.error import ProviderTokenNotInitError
  29. from core.llm.llm_builder import LLMBuilder
  30. from core.llm.streamable_open_ai import StreamableOpenAI
  31. from core.spiltter.fixed_text_splitter import FixedRecursiveCharacterTextSplitter
  32. from core.llm.token_calculator import TokenCalculator
  33. from extensions.ext_database import db
  34. from extensions.ext_redis import redis_client
  35. from extensions.ext_storage import storage
  36. from libs import helper
  37. from models.dataset import Document as DatasetDocument
  38. from models.dataset import Dataset, DocumentSegment, DatasetProcessRule
  39. from models.model import UploadFile
  40. from models.source import DataSourceBinding
  41. class IndexingRunner:
  42. def __init__(self, embedding_model_name: str = "text-embedding-ada-002"):
  43. self.storage = storage
  44. self.embedding_model_name = embedding_model_name
  45. def run(self, dataset_documents: List[DatasetDocument]):
  46. """Run the indexing process."""
  47. for dataset_document in dataset_documents:
  48. try:
  49. # get dataset
  50. dataset = Dataset.query.filter_by(
  51. id=dataset_document.dataset_id
  52. ).first()
  53. if not dataset:
  54. raise ValueError("no dataset found")
  55. # load file
  56. text_docs = self._load_data(dataset_document)
  57. # get the process rule
  58. processing_rule = db.session.query(DatasetProcessRule). \
  59. filter(DatasetProcessRule.id == dataset_document.dataset_process_rule_id). \
  60. first()
  61. # get splitter
  62. splitter = self._get_splitter(processing_rule)
  63. # split to documents
  64. documents = self._step_split(
  65. text_docs=text_docs,
  66. splitter=splitter,
  67. dataset=dataset,
  68. dataset_document=dataset_document,
  69. processing_rule=processing_rule
  70. )
  71. # new_documents = []
  72. # for document in documents:
  73. # response = LLMGenerator.generate_qa_document(dataset.tenant_id, document.page_content)
  74. # document_qa_list = self.format_split_text(response)
  75. # for result in document_qa_list:
  76. # document = Document(page_content=result['question'], metadata={'source': result['answer']})
  77. # new_documents.append(document)
  78. # build index
  79. self._build_index(
  80. dataset=dataset,
  81. dataset_document=dataset_document,
  82. documents=documents
  83. )
  84. except DocumentIsPausedException:
  85. raise DocumentIsPausedException('Document paused, document id: {}'.format(dataset_document.id))
  86. except ProviderTokenNotInitError as e:
  87. dataset_document.indexing_status = 'error'
  88. dataset_document.error = str(e.description)
  89. dataset_document.stopped_at = datetime.datetime.utcnow()
  90. db.session.commit()
  91. except Exception as e:
  92. logging.exception("consume document failed")
  93. dataset_document.indexing_status = 'error'
  94. dataset_document.error = str(e)
  95. dataset_document.stopped_at = datetime.datetime.utcnow()
  96. db.session.commit()
  97. def format_split_text(self, text):
  98. regex = r"Q\d+:\s*(.*?)\s*A\d+:\s*([\s\S]*?)(?=Q|$)"
  99. matches = re.findall(regex, text, re.MULTILINE)
  100. result = []
  101. for match in matches:
  102. q = match[0]
  103. a = match[1]
  104. if q and a:
  105. result.append({
  106. "question": q,
  107. "answer": re.sub(r"\n\s*", "\n", a.strip())
  108. })
  109. return result
  110. def run_in_splitting_status(self, dataset_document: DatasetDocument):
  111. """Run the indexing process when the index_status is splitting."""
  112. try:
  113. # get dataset
  114. dataset = Dataset.query.filter_by(
  115. id=dataset_document.dataset_id
  116. ).first()
  117. if not dataset:
  118. raise ValueError("no dataset found")
  119. # get exist document_segment list and delete
  120. document_segments = DocumentSegment.query.filter_by(
  121. dataset_id=dataset.id,
  122. document_id=dataset_document.id
  123. ).all()
  124. db.session.delete(document_segments)
  125. db.session.commit()
  126. # load file
  127. text_docs = self._load_data(dataset_document)
  128. # get the process rule
  129. processing_rule = db.session.query(DatasetProcessRule). \
  130. filter(DatasetProcessRule.id == dataset_document.dataset_process_rule_id). \
  131. first()
  132. # get splitter
  133. splitter = self._get_splitter(processing_rule)
  134. # split to documents
  135. documents = self._step_split(
  136. text_docs=text_docs,
  137. splitter=splitter,
  138. dataset=dataset,
  139. dataset_document=dataset_document,
  140. processing_rule=processing_rule
  141. )
  142. # build index
  143. self._build_index(
  144. dataset=dataset,
  145. dataset_document=dataset_document,
  146. documents=documents
  147. )
  148. except DocumentIsPausedException:
  149. raise DocumentIsPausedException('Document paused, document id: {}'.format(dataset_document.id))
  150. except ProviderTokenNotInitError as e:
  151. dataset_document.indexing_status = 'error'
  152. dataset_document.error = str(e.description)
  153. dataset_document.stopped_at = datetime.datetime.utcnow()
  154. db.session.commit()
  155. except Exception as e:
  156. logging.exception("consume document failed")
  157. dataset_document.indexing_status = 'error'
  158. dataset_document.error = str(e)
  159. dataset_document.stopped_at = datetime.datetime.utcnow()
  160. db.session.commit()
  161. def run_in_indexing_status(self, dataset_document: DatasetDocument):
  162. """Run the indexing process when the index_status is indexing."""
  163. try:
  164. # get dataset
  165. dataset = Dataset.query.filter_by(
  166. id=dataset_document.dataset_id
  167. ).first()
  168. if not dataset:
  169. raise ValueError("no dataset found")
  170. # get exist document_segment list and delete
  171. document_segments = DocumentSegment.query.filter_by(
  172. dataset_id=dataset.id,
  173. document_id=dataset_document.id
  174. ).all()
  175. documents = []
  176. if document_segments:
  177. for document_segment in document_segments:
  178. # transform segment to node
  179. if document_segment.status != "completed":
  180. document = Document(
  181. page_content=document_segment.content,
  182. metadata={
  183. "doc_id": document_segment.index_node_id,
  184. "doc_hash": document_segment.index_node_hash,
  185. "document_id": document_segment.document_id,
  186. "dataset_id": document_segment.dataset_id,
  187. }
  188. )
  189. documents.append(document)
  190. # build index
  191. self._build_index(
  192. dataset=dataset,
  193. dataset_document=dataset_document,
  194. documents=documents
  195. )
  196. except DocumentIsPausedException:
  197. raise DocumentIsPausedException('Document paused, document id: {}'.format(dataset_document.id))
  198. except ProviderTokenNotInitError as e:
  199. dataset_document.indexing_status = 'error'
  200. dataset_document.error = str(e.description)
  201. dataset_document.stopped_at = datetime.datetime.utcnow()
  202. db.session.commit()
  203. except Exception as e:
  204. logging.exception("consume document failed")
  205. dataset_document.indexing_status = 'error'
  206. dataset_document.error = str(e)
  207. dataset_document.stopped_at = datetime.datetime.utcnow()
  208. db.session.commit()
  209. def file_indexing_estimate(self, file_details: List[UploadFile], tmp_processing_rule: dict,
  210. doc_form: str = None) -> dict:
  211. """
  212. Estimate the indexing for the document.
  213. """
  214. tokens = 0
  215. preview_texts = []
  216. total_segments = 0
  217. for file_detail in file_details:
  218. # load data from file
  219. text_docs = FileExtractor.load(file_detail)
  220. processing_rule = DatasetProcessRule(
  221. mode=tmp_processing_rule["mode"],
  222. rules=json.dumps(tmp_processing_rule["rules"])
  223. )
  224. # get splitter
  225. splitter = self._get_splitter(processing_rule)
  226. # split to documents
  227. documents = self._split_to_documents_for_estimate(
  228. text_docs=text_docs,
  229. splitter=splitter,
  230. processing_rule=processing_rule
  231. )
  232. total_segments += len(documents)
  233. for document in documents:
  234. if len(preview_texts) < 5:
  235. preview_texts.append(document.page_content)
  236. tokens += TokenCalculator.get_num_tokens(self.embedding_model_name,
  237. self.filter_string(document.page_content))
  238. if doc_form and doc_form == 'qa_model':
  239. if len(preview_texts) > 0:
  240. # qa model document
  241. llm: StreamableOpenAI = LLMBuilder.to_llm(
  242. tenant_id=current_user.current_tenant_id,
  243. model_name='gpt-3.5-turbo',
  244. max_tokens=2000
  245. )
  246. response = LLMGenerator.generate_qa_document_sync(llm, preview_texts[0])
  247. document_qa_list = self.format_split_text(response)
  248. return {
  249. "total_segments": total_segments * 20,
  250. "tokens": total_segments * 2000,
  251. "total_price": '{:f}'.format(
  252. TokenCalculator.get_token_price('gpt-3.5-turbo', total_segments * 2000, 'completion')),
  253. "currency": TokenCalculator.get_currency(self.embedding_model_name),
  254. "qa_preview": document_qa_list,
  255. "preview": preview_texts
  256. }
  257. return {
  258. "total_segments": total_segments,
  259. "tokens": tokens,
  260. "total_price": '{:f}'.format(TokenCalculator.get_token_price(self.embedding_model_name, tokens)),
  261. "currency": TokenCalculator.get_currency(self.embedding_model_name),
  262. "preview": preview_texts
  263. }
  264. def notion_indexing_estimate(self, notion_info_list: list, tmp_processing_rule: dict, doc_form: str = None) -> dict:
  265. """
  266. Estimate the indexing for the document.
  267. """
  268. # load data from notion
  269. tokens = 0
  270. preview_texts = []
  271. total_segments = 0
  272. for notion_info in notion_info_list:
  273. workspace_id = notion_info['workspace_id']
  274. data_source_binding = DataSourceBinding.query.filter(
  275. db.and_(
  276. DataSourceBinding.tenant_id == current_user.current_tenant_id,
  277. DataSourceBinding.provider == 'notion',
  278. DataSourceBinding.disabled == False,
  279. DataSourceBinding.source_info['workspace_id'] == f'"{workspace_id}"'
  280. )
  281. ).first()
  282. if not data_source_binding:
  283. raise ValueError('Data source binding not found.')
  284. for page in notion_info['pages']:
  285. loader = NotionLoader(
  286. notion_access_token=data_source_binding.access_token,
  287. notion_workspace_id=workspace_id,
  288. notion_obj_id=page['page_id'],
  289. notion_page_type=page['type']
  290. )
  291. documents = loader.load()
  292. processing_rule = DatasetProcessRule(
  293. mode=tmp_processing_rule["mode"],
  294. rules=json.dumps(tmp_processing_rule["rules"])
  295. )
  296. # get splitter
  297. splitter = self._get_splitter(processing_rule)
  298. # split to documents
  299. documents = self._split_to_documents_for_estimate(
  300. text_docs=documents,
  301. splitter=splitter,
  302. processing_rule=processing_rule
  303. )
  304. total_segments += len(documents)
  305. for document in documents:
  306. if len(preview_texts) < 5:
  307. preview_texts.append(document.page_content)
  308. tokens += TokenCalculator.get_num_tokens(self.embedding_model_name, document.page_content)
  309. if doc_form and doc_form == 'qa_model':
  310. if len(preview_texts) > 0:
  311. # qa model document
  312. llm: StreamableOpenAI = LLMBuilder.to_llm(
  313. tenant_id=current_user.current_tenant_id,
  314. model_name='gpt-3.5-turbo',
  315. max_tokens=2000
  316. )
  317. response = LLMGenerator.generate_qa_document_sync(llm, preview_texts[0])
  318. document_qa_list = self.format_split_text(response)
  319. return {
  320. "total_segments": total_segments * 20,
  321. "tokens": total_segments * 2000,
  322. "total_price": '{:f}'.format(
  323. TokenCalculator.get_token_price('gpt-3.5-turbo', total_segments * 2000, 'completion')),
  324. "currency": TokenCalculator.get_currency(self.embedding_model_name),
  325. "qa_preview": document_qa_list,
  326. "preview": preview_texts
  327. }
  328. return {
  329. "total_segments": total_segments,
  330. "tokens": tokens,
  331. "total_price": '{:f}'.format(TokenCalculator.get_token_price(self.embedding_model_name, tokens)),
  332. "currency": TokenCalculator.get_currency(self.embedding_model_name),
  333. "preview": preview_texts
  334. }
  335. def _load_data(self, dataset_document: DatasetDocument) -> List[Document]:
  336. # load file
  337. if dataset_document.data_source_type not in ["upload_file", "notion_import"]:
  338. return []
  339. data_source_info = dataset_document.data_source_info_dict
  340. text_docs = []
  341. if dataset_document.data_source_type == 'upload_file':
  342. if not data_source_info or 'upload_file_id' not in data_source_info:
  343. raise ValueError("no upload file found")
  344. file_detail = db.session.query(UploadFile). \
  345. filter(UploadFile.id == data_source_info['upload_file_id']). \
  346. one_or_none()
  347. text_docs = FileExtractor.load(file_detail)
  348. elif dataset_document.data_source_type == 'notion_import':
  349. loader = NotionLoader.from_document(dataset_document)
  350. text_docs = loader.load()
  351. # update document status to splitting
  352. self._update_document_index_status(
  353. document_id=dataset_document.id,
  354. after_indexing_status="splitting",
  355. extra_update_params={
  356. DatasetDocument.word_count: sum([len(text_doc.page_content) for text_doc in text_docs]),
  357. DatasetDocument.parsing_completed_at: datetime.datetime.utcnow()
  358. }
  359. )
  360. # replace doc id to document model id
  361. text_docs = cast(List[Document], text_docs)
  362. for text_doc in text_docs:
  363. # remove invalid symbol
  364. text_doc.page_content = self.filter_string(text_doc.page_content)
  365. text_doc.metadata['document_id'] = dataset_document.id
  366. text_doc.metadata['dataset_id'] = dataset_document.dataset_id
  367. return text_docs
  368. def filter_string(self, text):
  369. text = re.sub(r'<\|', '<', text)
  370. text = re.sub(r'\|>', '>', text)
  371. text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F\x80-\xFF]', '', text)
  372. return text
  373. def _get_splitter(self, processing_rule: DatasetProcessRule) -> TextSplitter:
  374. """
  375. Get the NodeParser object according to the processing rule.
  376. """
  377. if processing_rule.mode == "custom":
  378. # The user-defined segmentation rule
  379. rules = json.loads(processing_rule.rules)
  380. segmentation = rules["segmentation"]
  381. if segmentation["max_tokens"] < 50 or segmentation["max_tokens"] > 1000:
  382. raise ValueError("Custom segment length should be between 50 and 1000.")
  383. separator = segmentation["separator"]
  384. if separator:
  385. separator = separator.replace('\\n', '\n')
  386. character_splitter = FixedRecursiveCharacterTextSplitter.from_tiktoken_encoder(
  387. chunk_size=segmentation["max_tokens"],
  388. chunk_overlap=0,
  389. fixed_separator=separator,
  390. separators=["\n\n", "。", ".", " ", ""]
  391. )
  392. else:
  393. # Automatic segmentation
  394. character_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
  395. chunk_size=DatasetProcessRule.AUTOMATIC_RULES['segmentation']['max_tokens'],
  396. chunk_overlap=0,
  397. separators=["\n\n", "。", ".", " ", ""]
  398. )
  399. return character_splitter
  400. def _step_split(self, text_docs: List[Document], splitter: TextSplitter,
  401. dataset: Dataset, dataset_document: DatasetDocument, processing_rule: DatasetProcessRule) \
  402. -> List[Document]:
  403. """
  404. Split the text documents into documents and save them to the document segment.
  405. """
  406. documents = self._split_to_documents(
  407. text_docs=text_docs,
  408. splitter=splitter,
  409. processing_rule=processing_rule,
  410. tenant_id=dataset.tenant_id,
  411. document_form=dataset_document.doc_form
  412. )
  413. # save node to document segment
  414. doc_store = DatesetDocumentStore(
  415. dataset=dataset,
  416. user_id=dataset_document.created_by,
  417. embedding_model_name=self.embedding_model_name,
  418. document_id=dataset_document.id
  419. )
  420. # add document segments
  421. doc_store.add_documents(documents)
  422. # update document status to indexing
  423. cur_time = datetime.datetime.utcnow()
  424. self._update_document_index_status(
  425. document_id=dataset_document.id,
  426. after_indexing_status="indexing",
  427. extra_update_params={
  428. DatasetDocument.cleaning_completed_at: cur_time,
  429. DatasetDocument.splitting_completed_at: cur_time,
  430. }
  431. )
  432. # update segment status to indexing
  433. self._update_segments_by_document(
  434. dataset_document_id=dataset_document.id,
  435. update_params={
  436. DocumentSegment.status: "indexing",
  437. DocumentSegment.indexing_at: datetime.datetime.utcnow()
  438. }
  439. )
  440. return documents
  441. def _split_to_documents(self, text_docs: List[Document], splitter: TextSplitter,
  442. processing_rule: DatasetProcessRule, tenant_id: str, document_form: str) -> List[Document]:
  443. """
  444. Split the text documents into nodes.
  445. """
  446. all_documents = []
  447. for text_doc in text_docs:
  448. # document clean
  449. document_text = self._document_clean(text_doc.page_content, processing_rule)
  450. text_doc.page_content = document_text
  451. # parse document to nodes
  452. documents = splitter.split_documents([text_doc])
  453. split_documents = []
  454. llm: StreamableOpenAI = LLMBuilder.to_llm(
  455. tenant_id=tenant_id,
  456. model_name='gpt-3.5-turbo',
  457. max_tokens=2000
  458. )
  459. self.format_document(llm, documents, split_documents, document_form)
  460. all_documents.extend(split_documents)
  461. return all_documents
  462. def format_document(self, llm: StreamableOpenAI, documents: List[Document], split_documents: List, document_form: str):
  463. for document_node in documents:
  464. format_documents = []
  465. if document_node.page_content is None or not document_node.page_content.strip():
  466. return format_documents
  467. if document_form == 'text_model':
  468. # text model document
  469. doc_id = str(uuid.uuid4())
  470. hash = helper.generate_text_hash(document_node.page_content)
  471. document_node.metadata['doc_id'] = doc_id
  472. document_node.metadata['doc_hash'] = hash
  473. format_documents.append(document_node)
  474. elif document_form == 'qa_model':
  475. try:
  476. # qa model document
  477. response = LLMGenerator.generate_qa_document_sync(llm, document_node.page_content)
  478. document_qa_list = self.format_split_text(response)
  479. qa_documents = []
  480. for result in document_qa_list:
  481. qa_document = Document(page_content=result['question'], metadata=document_node.metadata.copy())
  482. doc_id = str(uuid.uuid4())
  483. hash = helper.generate_text_hash(result['question'])
  484. qa_document.metadata['answer'] = result['answer']
  485. qa_document.metadata['doc_id'] = doc_id
  486. qa_document.metadata['doc_hash'] = hash
  487. qa_documents.append(qa_document)
  488. format_documents.extend(qa_documents)
  489. except Exception:
  490. continue
  491. split_documents.extend(format_documents)
  492. def _split_to_documents_for_estimate(self, text_docs: List[Document], splitter: TextSplitter,
  493. processing_rule: DatasetProcessRule) -> List[Document]:
  494. """
  495. Split the text documents into nodes.
  496. """
  497. all_documents = []
  498. for text_doc in text_docs:
  499. # document clean
  500. document_text = self._document_clean(text_doc.page_content, processing_rule)
  501. text_doc.page_content = document_text
  502. # parse document to nodes
  503. documents = splitter.split_documents([text_doc])
  504. split_documents = []
  505. for document in documents:
  506. if document.page_content is None or not document.page_content.strip():
  507. continue
  508. doc_id = str(uuid.uuid4())
  509. hash = helper.generate_text_hash(document.page_content)
  510. document.metadata['doc_id'] = doc_id
  511. document.metadata['doc_hash'] = hash
  512. split_documents.append(document)
  513. all_documents.extend(split_documents)
  514. return all_documents
  515. def _document_clean(self, text: str, processing_rule: DatasetProcessRule) -> str:
  516. """
  517. Clean the document text according to the processing rules.
  518. """
  519. if processing_rule.mode == "automatic":
  520. rules = DatasetProcessRule.AUTOMATIC_RULES
  521. else:
  522. rules = json.loads(processing_rule.rules) if processing_rule.rules else {}
  523. if 'pre_processing_rules' in rules:
  524. pre_processing_rules = rules["pre_processing_rules"]
  525. for pre_processing_rule in pre_processing_rules:
  526. if pre_processing_rule["id"] == "remove_extra_spaces" and pre_processing_rule["enabled"] is True:
  527. # Remove extra spaces
  528. pattern = r'\n{3,}'
  529. text = re.sub(pattern, '\n\n', text)
  530. pattern = r'[\t\f\r\x20\u00a0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]{2,}'
  531. text = re.sub(pattern, ' ', text)
  532. elif pre_processing_rule["id"] == "remove_urls_emails" and pre_processing_rule["enabled"] is True:
  533. # Remove email
  534. pattern = r'([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)'
  535. text = re.sub(pattern, '', text)
  536. # Remove URL
  537. pattern = r'https?://[^\s]+'
  538. text = re.sub(pattern, '', text)
  539. return text
  540. def format_split_text(self, text):
  541. regex = r"Q\d+:\s*(.*?)\s*A\d+:\s*([\s\S]*?)(?=Q|$)" # 匹配Q和A的正则表达式
  542. matches = re.findall(regex, text, re.MULTILINE) # 获取所有匹配到的结果
  543. result = [] # 存储最终的结果
  544. for match in matches:
  545. q = match[0]
  546. a = match[1]
  547. if q and a:
  548. # 如果Q和A都存在,就将其添加到结果中
  549. result.append({
  550. "question": q,
  551. "answer": re.sub(r"\n\s*", "\n", a.strip())
  552. })
  553. return result
  554. def _build_index(self, dataset: Dataset, dataset_document: DatasetDocument, documents: List[Document]) -> None:
  555. """
  556. Build the index for the document.
  557. """
  558. vector_index = IndexBuilder.get_index(dataset, 'high_quality')
  559. keyword_table_index = IndexBuilder.get_index(dataset, 'economy')
  560. # chunk nodes by chunk size
  561. indexing_start_at = time.perf_counter()
  562. tokens = 0
  563. chunk_size = 100
  564. for i in range(0, len(documents), chunk_size):
  565. # check document is paused
  566. self._check_document_paused_status(dataset_document.id)
  567. chunk_documents = documents[i:i + chunk_size]
  568. tokens += sum(
  569. TokenCalculator.get_num_tokens(self.embedding_model_name, document.page_content)
  570. for document in chunk_documents
  571. )
  572. # save vector index
  573. if vector_index:
  574. vector_index.add_texts(chunk_documents)
  575. # save keyword index
  576. keyword_table_index.add_texts(chunk_documents)
  577. document_ids = [document.metadata['doc_id'] for document in chunk_documents]
  578. db.session.query(DocumentSegment).filter(
  579. DocumentSegment.document_id == dataset_document.id,
  580. DocumentSegment.index_node_id.in_(document_ids),
  581. DocumentSegment.status == "indexing"
  582. ).update({
  583. DocumentSegment.status: "completed",
  584. DocumentSegment.completed_at: datetime.datetime.utcnow()
  585. })
  586. db.session.commit()
  587. indexing_end_at = time.perf_counter()
  588. # update document status to completed
  589. self._update_document_index_status(
  590. document_id=dataset_document.id,
  591. after_indexing_status="completed",
  592. extra_update_params={
  593. DatasetDocument.tokens: tokens,
  594. DatasetDocument.completed_at: datetime.datetime.utcnow(),
  595. DatasetDocument.indexing_latency: indexing_end_at - indexing_start_at,
  596. }
  597. )
  598. def _check_document_paused_status(self, document_id: str):
  599. indexing_cache_key = 'document_{}_is_paused'.format(document_id)
  600. result = redis_client.get(indexing_cache_key)
  601. if result:
  602. raise DocumentIsPausedException()
  603. def _update_document_index_status(self, document_id: str, after_indexing_status: str,
  604. extra_update_params: Optional[dict] = None) -> None:
  605. """
  606. Update the document indexing status.
  607. """
  608. count = DatasetDocument.query.filter_by(id=document_id, is_paused=True).count()
  609. if count > 0:
  610. raise DocumentIsPausedException()
  611. update_params = {
  612. DatasetDocument.indexing_status: after_indexing_status
  613. }
  614. if extra_update_params:
  615. update_params.update(extra_update_params)
  616. DatasetDocument.query.filter_by(id=document_id).update(update_params)
  617. db.session.commit()
  618. def _update_segments_by_document(self, dataset_document_id: str, update_params: dict) -> None:
  619. """
  620. Update the document segment by document id.
  621. """
  622. DocumentSegment.query.filter_by(document_id=dataset_document_id).update(update_params)
  623. db.session.commit()
  624. class DocumentIsPausedException(Exception):
  625. pass