data_source.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. import datetime
  2. import json
  3. from cachetools import TTLCache
  4. from flask import request, current_app
  5. from flask_login import login_required, current_user
  6. from flask_restful import Resource, marshal_with, fields, reqparse, marshal
  7. from werkzeug.exceptions import NotFound
  8. from controllers.console import api
  9. from controllers.console.setup import setup_required
  10. from controllers.console.wraps import account_initialization_required
  11. from core.data_source.notion import NotionPageReader
  12. from core.indexing_runner import IndexingRunner
  13. from extensions.ext_database import db
  14. from libs.helper import TimestampField
  15. from libs.oauth_data_source import NotionOAuth
  16. from models.dataset import Document
  17. from models.source import DataSourceBinding
  18. from services.dataset_service import DatasetService, DocumentService
  19. from tasks.document_indexing_sync_task import document_indexing_sync_task
  20. cache = TTLCache(maxsize=None, ttl=30)
  21. FILE_SIZE_LIMIT = 15 * 1024 * 1024 # 15MB
  22. ALLOWED_EXTENSIONS = ['txt', 'markdown', 'md', 'pdf', 'html', 'htm']
  23. PREVIEW_WORDS_LIMIT = 3000
  24. class DataSourceApi(Resource):
  25. integrate_icon_fields = {
  26. 'type': fields.String,
  27. 'url': fields.String,
  28. 'emoji': fields.String
  29. }
  30. integrate_page_fields = {
  31. 'page_name': fields.String,
  32. 'page_id': fields.String,
  33. 'page_icon': fields.Nested(integrate_icon_fields, allow_null=True),
  34. 'parent_id': fields.String,
  35. 'type': fields.String
  36. }
  37. integrate_workspace_fields = {
  38. 'workspace_name': fields.String,
  39. 'workspace_id': fields.String,
  40. 'workspace_icon': fields.String,
  41. 'pages': fields.List(fields.Nested(integrate_page_fields)),
  42. 'total': fields.Integer
  43. }
  44. integrate_fields = {
  45. 'id': fields.String,
  46. 'provider': fields.String,
  47. 'created_at': TimestampField,
  48. 'is_bound': fields.Boolean,
  49. 'disabled': fields.Boolean,
  50. 'link': fields.String,
  51. 'source_info': fields.Nested(integrate_workspace_fields)
  52. }
  53. integrate_list_fields = {
  54. 'data': fields.List(fields.Nested(integrate_fields)),
  55. }
  56. @setup_required
  57. @login_required
  58. @account_initialization_required
  59. @marshal_with(integrate_list_fields)
  60. def get(self):
  61. # get workspace data source integrates
  62. data_source_integrates = db.session.query(DataSourceBinding).filter(
  63. DataSourceBinding.tenant_id == current_user.current_tenant_id,
  64. DataSourceBinding.disabled == False
  65. ).all()
  66. base_url = request.url_root.rstrip('/')
  67. data_source_oauth_base_path = "/console/api/oauth/data-source"
  68. providers = ["notion"]
  69. integrate_data = []
  70. for provider in providers:
  71. # existing_integrate = next((ai for ai in data_source_integrates if ai.provider == provider), None)
  72. existing_integrates = filter(lambda item: item.provider == provider, data_source_integrates)
  73. if existing_integrates:
  74. for existing_integrate in list(existing_integrates):
  75. integrate_data.append({
  76. 'id': existing_integrate.id,
  77. 'provider': provider,
  78. 'created_at': existing_integrate.created_at,
  79. 'is_bound': True,
  80. 'disabled': existing_integrate.disabled,
  81. 'source_info': existing_integrate.source_info,
  82. 'link': f'{base_url}{data_source_oauth_base_path}/{provider}'
  83. })
  84. else:
  85. integrate_data.append({
  86. 'id': None,
  87. 'provider': provider,
  88. 'created_at': None,
  89. 'source_info': None,
  90. 'is_bound': False,
  91. 'disabled': None,
  92. 'link': f'{base_url}{data_source_oauth_base_path}/{provider}'
  93. })
  94. return {'data': integrate_data}, 200
  95. @setup_required
  96. @login_required
  97. @account_initialization_required
  98. def patch(self, binding_id, action):
  99. binding_id = str(binding_id)
  100. action = str(action)
  101. data_source_binding = DataSourceBinding.query.filter_by(
  102. id=binding_id
  103. ).first()
  104. if data_source_binding is None:
  105. raise NotFound('Data source binding not found.')
  106. # enable binding
  107. if action == 'enable':
  108. if data_source_binding.disabled:
  109. data_source_binding.disabled = False
  110. data_source_binding.updated_at = datetime.datetime.utcnow()
  111. db.session.add(data_source_binding)
  112. db.session.commit()
  113. else:
  114. raise ValueError('Data source is not disabled.')
  115. # disable binding
  116. if action == 'disable':
  117. if not data_source_binding.disabled:
  118. data_source_binding.disabled = True
  119. data_source_binding.updated_at = datetime.datetime.utcnow()
  120. db.session.add(data_source_binding)
  121. db.session.commit()
  122. else:
  123. raise ValueError('Data source is disabled.')
  124. return {'result': 'success'}, 200
  125. class DataSourceNotionListApi(Resource):
  126. integrate_icon_fields = {
  127. 'type': fields.String,
  128. 'url': fields.String,
  129. 'emoji': fields.String
  130. }
  131. integrate_page_fields = {
  132. 'page_name': fields.String,
  133. 'page_id': fields.String,
  134. 'page_icon': fields.Nested(integrate_icon_fields, allow_null=True),
  135. 'is_bound': fields.Boolean,
  136. 'parent_id': fields.String,
  137. 'type': fields.String
  138. }
  139. integrate_workspace_fields = {
  140. 'workspace_name': fields.String,
  141. 'workspace_id': fields.String,
  142. 'workspace_icon': fields.String,
  143. 'pages': fields.List(fields.Nested(integrate_page_fields))
  144. }
  145. integrate_notion_info_list_fields = {
  146. 'notion_info': fields.List(fields.Nested(integrate_workspace_fields)),
  147. }
  148. @setup_required
  149. @login_required
  150. @account_initialization_required
  151. @marshal_with(integrate_notion_info_list_fields)
  152. def get(self):
  153. dataset_id = request.args.get('dataset_id', default=None, type=str)
  154. exist_page_ids = []
  155. # import notion in the exist dataset
  156. if dataset_id:
  157. dataset = DatasetService.get_dataset(dataset_id)
  158. if not dataset:
  159. raise NotFound('Dataset not found.')
  160. if dataset.data_source_type != 'notion_import':
  161. raise ValueError('Dataset is not notion type.')
  162. documents = Document.query.filter_by(
  163. dataset_id=dataset_id,
  164. tenant_id=current_user.current_tenant_id,
  165. data_source_type='notion_import',
  166. enabled=True
  167. ).all()
  168. if documents:
  169. for document in documents:
  170. data_source_info = json.loads(document.data_source_info)
  171. exist_page_ids.append(data_source_info['notion_page_id'])
  172. # get all authorized pages
  173. data_source_bindings = DataSourceBinding.query.filter_by(
  174. tenant_id=current_user.current_tenant_id,
  175. provider='notion',
  176. disabled=False
  177. ).all()
  178. if not data_source_bindings:
  179. return {
  180. 'notion_info': []
  181. }, 200
  182. pre_import_info_list = []
  183. for data_source_binding in data_source_bindings:
  184. source_info = data_source_binding.source_info
  185. pages = source_info['pages']
  186. # Filter out already bound pages
  187. for page in pages:
  188. if page['page_id'] in exist_page_ids:
  189. page['is_bound'] = True
  190. else:
  191. page['is_bound'] = False
  192. pre_import_info = {
  193. 'workspace_name': source_info['workspace_name'],
  194. 'workspace_icon': source_info['workspace_icon'],
  195. 'workspace_id': source_info['workspace_id'],
  196. 'pages': pages,
  197. }
  198. pre_import_info_list.append(pre_import_info)
  199. return {
  200. 'notion_info': pre_import_info_list
  201. }, 200
  202. class DataSourceNotionApi(Resource):
  203. @setup_required
  204. @login_required
  205. @account_initialization_required
  206. def get(self, workspace_id, page_id, page_type):
  207. workspace_id = str(workspace_id)
  208. page_id = str(page_id)
  209. data_source_binding = DataSourceBinding.query.filter(
  210. db.and_(
  211. DataSourceBinding.tenant_id == current_user.current_tenant_id,
  212. DataSourceBinding.provider == 'notion',
  213. DataSourceBinding.disabled == False,
  214. DataSourceBinding.source_info['workspace_id'] == f'"{workspace_id}"'
  215. )
  216. ).first()
  217. if not data_source_binding:
  218. raise NotFound('Data source binding not found.')
  219. reader = NotionPageReader(integration_token=data_source_binding.access_token)
  220. if page_type == 'page':
  221. page_content = reader.read_page(page_id)
  222. elif page_type == 'database':
  223. page_content = reader.query_database_data(page_id)
  224. else:
  225. page_content = ""
  226. return {
  227. 'content': page_content
  228. }, 200
  229. @setup_required
  230. @login_required
  231. @account_initialization_required
  232. def post(self):
  233. parser = reqparse.RequestParser()
  234. parser.add_argument('notion_info_list', type=list, required=True, nullable=True, location='json')
  235. parser.add_argument('process_rule', type=dict, required=True, nullable=True, location='json')
  236. args = parser.parse_args()
  237. # validate args
  238. DocumentService.estimate_args_validate(args)
  239. indexing_runner = IndexingRunner()
  240. response = indexing_runner.notion_indexing_estimate(args['notion_info_list'], args['process_rule'])
  241. return response, 200
  242. class DataSourceNotionDatasetSyncApi(Resource):
  243. @setup_required
  244. @login_required
  245. @account_initialization_required
  246. def get(self, dataset_id):
  247. dataset_id_str = str(dataset_id)
  248. dataset = DatasetService.get_dataset(dataset_id_str)
  249. if dataset is None:
  250. raise NotFound("Dataset not found.")
  251. documents = DocumentService.get_document_by_dataset_id(dataset_id_str)
  252. for document in documents:
  253. document_indexing_sync_task.delay(dataset_id_str, document.id)
  254. return 200
  255. class DataSourceNotionDocumentSyncApi(Resource):
  256. @setup_required
  257. @login_required
  258. @account_initialization_required
  259. def get(self, dataset_id, document_id):
  260. dataset_id_str = str(dataset_id)
  261. document_id_str = str(document_id)
  262. dataset = DatasetService.get_dataset(dataset_id_str)
  263. if dataset is None:
  264. raise NotFound("Dataset not found.")
  265. document = DocumentService.get_document(dataset_id_str, document_id_str)
  266. if document is None:
  267. raise NotFound("Document not found.")
  268. document_indexing_sync_task.delay(dataset_id_str, document_id_str)
  269. return 200
  270. api.add_resource(DataSourceApi, '/data-source/integrates', '/data-source/integrates/<uuid:binding_id>/<string:action>')
  271. api.add_resource(DataSourceNotionListApi, '/notion/pre-import/pages')
  272. api.add_resource(DataSourceNotionApi,
  273. '/notion/workspaces/<uuid:workspace_id>/pages/<uuid:page_id>/<string:page_type>/preview',
  274. '/datasets/notion-indexing-estimate')
  275. api.add_resource(DataSourceNotionDatasetSyncApi, '/datasets/<uuid:dataset_id>/notion/sync')
  276. api.add_resource(DataSourceNotionDocumentSyncApi, '/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/notion/sync')