Jelajahi Sumber

Feat/clean vector dataset (#605)

Jyong 1 tahun lalu
induk
melakukan
026f0bfce9
3 mengubah file dengan 69 tambahan dan 6 penghapusan
  1. 66 4
      api/commands.py
  2. 3 1
      api/config.py
  3. 0 1
      api/controllers/console/datasets/datasets.py

+ 66 - 4
api/commands.py

@@ -2,6 +2,7 @@ import datetime
 import logging
 import random
 import string
+import time
 
 import click
 from flask import current_app
@@ -13,7 +14,7 @@ from libs.helper import email as email_validate
 from extensions.ext_database import db
 from libs.rsa import generate_key_pair
 from models.account import InvitationCode, Tenant
-from models.dataset import Dataset
+from models.dataset import Dataset, DatasetQuery, Document, DocumentSegment
 from models.model import Account
 import secrets
 import base64
@@ -172,7 +173,7 @@ def recreate_all_dataset_indexes():
     page = 1
     while True:
         try:
-            datasets = db.session.query(Dataset).filter(Dataset.indexing_technique == 'high_quality')\
+            datasets = db.session.query(Dataset).filter(Dataset.indexing_technique == 'high_quality') \
                 .order_by(Dataset.created_at.desc()).paginate(page=page, per_page=50)
         except NotFound:
             break
@@ -188,12 +189,70 @@ def recreate_all_dataset_indexes():
                 else:
                     click.echo('passed.')
             except Exception as e:
-                click.echo(click.style('Recreate dataset index error: {} {}'.format(e.__class__.__name__, str(e)), fg='red'))
+                click.echo(
+                    click.style('Recreate dataset index error: {} {}'.format(e.__class__.__name__, str(e)), fg='red'))
                 continue
 
     click.echo(click.style('Congratulations! Recreate {} dataset indexes.'.format(recreate_count), fg='green'))
 
 
+@click.command('clean-unused-dataset-indexes', help='Clean unused dataset indexes.')
+def clean_unused_dataset_indexes():
+    click.echo(click.style('Start clean unused dataset indexes.', fg='green'))
+    clean_days = int(current_app.config.get('CLEAN_DAY_SETTING'))
+    start_at = time.perf_counter()
+    thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
+    page = 1
+    while True:
+        try:
+            datasets = db.session.query(Dataset).filter(Dataset.created_at < thirty_days_ago) \
+                .order_by(Dataset.created_at.desc()).paginate(page=page, per_page=50)
+        except NotFound:
+            break
+        page += 1
+        for dataset in datasets:
+            dataset_query = db.session.query(DatasetQuery).filter(
+                DatasetQuery.created_at > thirty_days_ago,
+                DatasetQuery.dataset_id == dataset.id
+            ).all()
+            if not dataset_query or len(dataset_query) == 0:
+                documents = db.session.query(Document).filter(
+                    Document.dataset_id == dataset.id,
+                    Document.indexing_status == 'completed',
+                    Document.enabled == True,
+                    Document.archived == False,
+                    Document.updated_at > thirty_days_ago
+                ).all()
+                if not documents or len(documents) == 0:
+                    try:
+                        all_documents = db.session.query(Document).filter(
+                            Document.dataset_id == dataset.id,
+                            Document.indexing_status == 'completed',
+                            Document.enabled == True,
+                            Document.archived == False,
+                        ).all()
+                        if all_documents and len(all_documents)>0:
+                            update_params = {
+                                Document.enabled: False
+                            }
+
+                            Document.query.filter_by(dataset_id=dataset.id).update(update_params)
+                            db.session.commit()
+                            # remove index
+                            vector_index = IndexBuilder.get_index(dataset, 'high_quality')
+                            kw_index = IndexBuilder.get_index(dataset, 'economy')
+                            # delete from vector index
+                            if vector_index:
+                                vector_index.delete()
+                            kw_index.delete()
+                    except Exception as e:
+                        click.echo(
+                            click.style('clean dataset index error: {} {}'.format(e.__class__.__name__, str(e)),
+                                        fg='red'))
+    end_at = time.perf_counter()
+    click.echo(click.style('Cleaned unused dataset from db success latency: {}'.format(end_at - start_at), fg='green'))
+
+
 @click.command('sync-anthropic-hosted-providers', help='Sync anthropic hosted providers.')
 def sync_anthropic_hosted_providers():
     click.echo(click.style('Start sync anthropic hosted providers.', fg='green'))
@@ -218,7 +277,9 @@ def sync_anthropic_hosted_providers():
                 )
                 count += 1
             except Exception as e:
-                click.echo(click.style('Sync tenant anthropic hosted provider error: {} {}'.format(e.__class__.__name__, str(e)), fg='red'))
+                click.echo(click.style(
+                    'Sync tenant anthropic hosted provider error: {} {}'.format(e.__class__.__name__, str(e)),
+                    fg='red'))
                 continue
 
     click.echo(click.style('Congratulations! Synced {} anthropic hosted providers.'.format(count), fg='green'))
@@ -231,3 +292,4 @@ def register_commands(app):
     app.cli.add_command(reset_encrypt_key_pair)
     app.cli.add_command(recreate_all_dataset_indexes)
     app.cli.add_command(sync_anthropic_hosted_providers)
+    app.cli.add_command(clean_unused_dataset_indexes)

+ 3 - 1
api/config.py

@@ -53,7 +53,8 @@ DEFAULTS = {
     'DEFAULT_LLM_PROVIDER': 'openai',
     'OPENAI_HOSTED_QUOTA_LIMIT': 200,
     'ANTHROPIC_HOSTED_QUOTA_LIMIT': 1000,
-    'TENANT_DOCUMENT_COUNT': 100
+    'TENANT_DOCUMENT_COUNT': 100,
+    'CLEAN_DAY_SETTING': 30
 }
 
 
@@ -215,6 +216,7 @@ class Config:
         self.NOTION_INTEGRATION_TOKEN = get_env('NOTION_INTEGRATION_TOKEN')
 
         self.TENANT_DOCUMENT_COUNT = get_env('TENANT_DOCUMENT_COUNT')
+        self.CLEAN_DAY_SETTING = get_env('CLEAN_DAY_SETTING')
 
 
 class CloudEditionConfig(Config):

+ 0 - 1
api/controllers/console/datasets/datasets.py

@@ -3,7 +3,6 @@ from flask import request
 from flask_login import login_required, current_user
 from flask_restful import Resource, reqparse, fields, marshal, marshal_with
 from werkzeug.exceptions import NotFound, Forbidden
-
 import services
 from controllers.console import api
 from controllers.console.datasets.error import DatasetNameDuplicateError