notion.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. """Notion reader."""
  2. import json
  3. import logging
  4. import os
  5. from datetime import datetime
  6. from typing import Any, Dict, List, Optional
  7. import requests # type: ignore
  8. from llama_index.readers.base import BaseReader
  9. from llama_index.readers.schema.base import Document
  10. INTEGRATION_TOKEN_NAME = "NOTION_INTEGRATION_TOKEN"
  11. BLOCK_CHILD_URL_TMPL = "https://api.notion.com/v1/blocks/{block_id}/children"
  12. DATABASE_URL_TMPL = "https://api.notion.com/v1/databases/{database_id}/query"
  13. SEARCH_URL = "https://api.notion.com/v1/search"
  14. RETRIEVE_PAGE_URL_TMPL = "https://api.notion.com/v1/pages/{page_id}"
  15. RETRIEVE_DATABASE_URL_TMPL = "https://api.notion.com/v1/databases/{database_id}"
  16. HEADING_TYPE = ['heading_1', 'heading_2', 'heading_3']
  17. logger = logging.getLogger(__name__)
  18. # TODO: Notion DB reader coming soon!
  19. class NotionPageReader(BaseReader):
  20. """Notion Page reader.
  21. Reads a set of Notion pages.
  22. Args:
  23. integration_token (str): Notion integration token.
  24. """
  25. def __init__(self, integration_token: Optional[str] = None) -> None:
  26. """Initialize with parameters."""
  27. if integration_token is None:
  28. integration_token = os.getenv(INTEGRATION_TOKEN_NAME)
  29. if integration_token is None:
  30. raise ValueError(
  31. "Must specify `integration_token` or set environment "
  32. "variable `NOTION_INTEGRATION_TOKEN`."
  33. )
  34. self.token = integration_token
  35. self.headers = {
  36. "Authorization": "Bearer " + self.token,
  37. "Content-Type": "application/json",
  38. "Notion-Version": "2022-06-28",
  39. }
  40. def _read_block(self, block_id: str, num_tabs: int = 0) -> str:
  41. """Read a block."""
  42. done = False
  43. result_lines_arr = []
  44. cur_block_id = block_id
  45. while not done:
  46. block_url = BLOCK_CHILD_URL_TMPL.format(block_id=cur_block_id)
  47. query_dict: Dict[str, Any] = {}
  48. res = requests.request(
  49. "GET", block_url, headers=self.headers, json=query_dict
  50. )
  51. data = res.json()
  52. if 'results' not in data or data["results"] is None:
  53. done = True
  54. break
  55. heading = ''
  56. for result in data["results"]:
  57. result_type = result["type"]
  58. result_obj = result[result_type]
  59. cur_result_text_arr = []
  60. if result_type == 'table':
  61. result_block_id = result["id"]
  62. text = self._read_table_rows(result_block_id)
  63. result_lines_arr.append(text)
  64. else:
  65. if "rich_text" in result_obj:
  66. for rich_text in result_obj["rich_text"]:
  67. # skip if doesn't have text object
  68. if "text" in rich_text:
  69. text = rich_text["text"]["content"]
  70. prefix = "\t" * num_tabs
  71. cur_result_text_arr.append(prefix + text)
  72. if result_type in HEADING_TYPE:
  73. heading = text
  74. result_block_id = result["id"]
  75. has_children = result["has_children"]
  76. block_type = result["type"]
  77. if has_children and block_type != 'child_page':
  78. children_text = self._read_block(
  79. result_block_id, num_tabs=num_tabs + 1
  80. )
  81. cur_result_text_arr.append(children_text)
  82. cur_result_text = "\n".join(cur_result_text_arr)
  83. if result_type in HEADING_TYPE:
  84. result_lines_arr.append(cur_result_text)
  85. else:
  86. result_lines_arr.append(f'{heading}\n{cur_result_text}')
  87. if data["next_cursor"] is None:
  88. done = True
  89. break
  90. else:
  91. cur_block_id = data["next_cursor"]
  92. result_lines = "\n".join(result_lines_arr)
  93. return result_lines
  94. def _read_table_rows(self, block_id: str) -> str:
  95. """Read table rows."""
  96. done = False
  97. result_lines_arr = []
  98. cur_block_id = block_id
  99. while not done:
  100. block_url = BLOCK_CHILD_URL_TMPL.format(block_id=cur_block_id)
  101. query_dict: Dict[str, Any] = {}
  102. res = requests.request(
  103. "GET", block_url, headers=self.headers, json=query_dict
  104. )
  105. data = res.json()
  106. # get table headers text
  107. table_header_cell_texts = []
  108. tabel_header_cells = data["results"][0]['table_row']['cells']
  109. for tabel_header_cell in tabel_header_cells:
  110. if tabel_header_cell:
  111. for table_header_cell_text in tabel_header_cell:
  112. text = table_header_cell_text["text"]["content"]
  113. table_header_cell_texts.append(text)
  114. # get table columns text and format
  115. results = data["results"]
  116. for i in range(len(results)-1):
  117. column_texts = []
  118. tabel_column_cells = data["results"][i+1]['table_row']['cells']
  119. for j in range(len(tabel_column_cells)):
  120. if tabel_column_cells[j]:
  121. for table_column_cell_text in tabel_column_cells[j]:
  122. column_text = table_column_cell_text["text"]["content"]
  123. column_texts.append(f'{table_header_cell_texts[j]}:{column_text}')
  124. cur_result_text = "\n".join(column_texts)
  125. result_lines_arr.append(cur_result_text)
  126. if data["next_cursor"] is None:
  127. done = True
  128. break
  129. else:
  130. cur_block_id = data["next_cursor"]
  131. result_lines = "\n".join(result_lines_arr)
  132. return result_lines
  133. def _read_parent_blocks(self, block_id: str, num_tabs: int = 0) -> List[str]:
  134. """Read a block."""
  135. done = False
  136. result_lines_arr = []
  137. cur_block_id = block_id
  138. while not done:
  139. block_url = BLOCK_CHILD_URL_TMPL.format(block_id=cur_block_id)
  140. query_dict: Dict[str, Any] = {}
  141. res = requests.request(
  142. "GET", block_url, headers=self.headers, json=query_dict
  143. )
  144. data = res.json()
  145. # current block's heading
  146. heading = ''
  147. for result in data["results"]:
  148. result_type = result["type"]
  149. result_obj = result[result_type]
  150. cur_result_text_arr = []
  151. if result_type == 'table':
  152. result_block_id = result["id"]
  153. text = self._read_table_rows(result_block_id)
  154. text += "\n\n"
  155. result_lines_arr.append(text)
  156. else:
  157. if "rich_text" in result_obj:
  158. for rich_text in result_obj["rich_text"]:
  159. # skip if doesn't have text object
  160. if "text" in rich_text:
  161. text = rich_text["text"]["content"]
  162. cur_result_text_arr.append(text)
  163. if result_type in HEADING_TYPE:
  164. heading = text
  165. result_block_id = result["id"]
  166. has_children = result["has_children"]
  167. block_type = result["type"]
  168. if has_children and block_type != 'child_page':
  169. children_text = self._read_block(
  170. result_block_id, num_tabs=num_tabs + 1
  171. )
  172. cur_result_text_arr.append(children_text)
  173. cur_result_text = "\n".join(cur_result_text_arr)
  174. cur_result_text += "\n\n"
  175. if result_type in HEADING_TYPE:
  176. result_lines_arr.append(cur_result_text)
  177. else:
  178. result_lines_arr.append(f'{heading}\n{cur_result_text}')
  179. if data["next_cursor"] is None:
  180. done = True
  181. break
  182. else:
  183. cur_block_id = data["next_cursor"]
  184. return result_lines_arr
  185. def read_page(self, page_id: str) -> str:
  186. """Read a page."""
  187. return self._read_block(page_id)
  188. def read_page_as_documents(self, page_id: str) -> List[str]:
  189. """Read a page as documents."""
  190. return self._read_parent_blocks(page_id)
  191. def query_database_data(
  192. self, database_id: str, query_dict: Dict[str, Any] = {}
  193. ) -> str:
  194. """Get all the pages from a Notion database."""
  195. res = requests.post\
  196. (
  197. DATABASE_URL_TMPL.format(database_id=database_id),
  198. headers=self.headers,
  199. json=query_dict,
  200. )
  201. data = res.json()
  202. database_content_list = []
  203. if 'results' not in data or data["results"] is None:
  204. return ""
  205. for result in data["results"]:
  206. properties = result['properties']
  207. data = {}
  208. for property_name, property_value in properties.items():
  209. type = property_value['type']
  210. if type == 'multi_select':
  211. value = []
  212. multi_select_list = property_value[type]
  213. for multi_select in multi_select_list:
  214. value.append(multi_select['name'])
  215. elif type == 'rich_text' or type == 'title':
  216. if len(property_value[type]) > 0:
  217. value = property_value[type][0]['plain_text']
  218. else:
  219. value = ''
  220. elif type == 'select' or type == 'status':
  221. if property_value[type]:
  222. value = property_value[type]['name']
  223. else:
  224. value = ''
  225. else:
  226. value = property_value[type]
  227. data[property_name] = value
  228. database_content_list.append(json.dumps(data, ensure_ascii=False))
  229. return "\n\n".join(database_content_list)
  230. def query_database(
  231. self, database_id: str, query_dict: Dict[str, Any] = {}
  232. ) -> List[str]:
  233. """Get all the pages from a Notion database."""
  234. res = requests.post\
  235. (
  236. DATABASE_URL_TMPL.format(database_id=database_id),
  237. headers=self.headers,
  238. json=query_dict,
  239. )
  240. data = res.json()
  241. page_ids = []
  242. for result in data["results"]:
  243. page_id = result["id"]
  244. page_ids.append(page_id)
  245. return page_ids
  246. def search(self, query: str) -> List[str]:
  247. """Search Notion page given a text query."""
  248. done = False
  249. next_cursor: Optional[str] = None
  250. page_ids = []
  251. while not done:
  252. query_dict = {
  253. "query": query,
  254. }
  255. if next_cursor is not None:
  256. query_dict["start_cursor"] = next_cursor
  257. res = requests.post(SEARCH_URL, headers=self.headers, json=query_dict)
  258. data = res.json()
  259. for result in data["results"]:
  260. page_id = result["id"]
  261. page_ids.append(page_id)
  262. if data["next_cursor"] is None:
  263. done = True
  264. break
  265. else:
  266. next_cursor = data["next_cursor"]
  267. return page_ids
  268. def load_data(
  269. self, page_ids: List[str] = [], database_id: Optional[str] = None
  270. ) -> List[Document]:
  271. """Load data from the input directory.
  272. Args:
  273. page_ids (List[str]): List of page ids to load.
  274. Returns:
  275. List[Document]: List of documents.
  276. """
  277. if not page_ids and not database_id:
  278. raise ValueError("Must specify either `page_ids` or `database_id`.")
  279. docs = []
  280. if database_id is not None:
  281. # get all the pages in the database
  282. page_ids = self.query_database(database_id)
  283. for page_id in page_ids:
  284. page_text = self.read_page(page_id)
  285. docs.append(Document(page_text))
  286. else:
  287. for page_id in page_ids:
  288. page_text = self.read_page(page_id)
  289. docs.append(Document(page_text))
  290. return docs
  291. def load_data_as_documents(
  292. self, page_ids: List[str] = [], database_id: Optional[str] = None
  293. ) -> List[Document]:
  294. if not page_ids and not database_id:
  295. raise ValueError("Must specify either `page_ids` or `database_id`.")
  296. docs = []
  297. if database_id is not None:
  298. # get all the pages in the database
  299. page_text = self.query_database_data(database_id)
  300. docs.append(Document(page_text))
  301. else:
  302. for page_id in page_ids:
  303. page_text_list = self.read_page_as_documents(page_id)
  304. for page_text in page_text_list:
  305. docs.append(Document(page_text))
  306. return docs
  307. def get_page_last_edited_time(self, page_id: str) -> str:
  308. retrieve_page_url = RETRIEVE_PAGE_URL_TMPL.format(page_id=page_id)
  309. query_dict: Dict[str, Any] = {}
  310. res = requests.request(
  311. "GET", retrieve_page_url, headers=self.headers, json=query_dict
  312. )
  313. data = res.json()
  314. return data["last_edited_time"]
  315. def get_database_last_edited_time(self, database_id: str) -> str:
  316. retrieve_page_url = RETRIEVE_DATABASE_URL_TMPL.format(database_id=database_id)
  317. query_dict: Dict[str, Any] = {}
  318. res = requests.request(
  319. "GET", retrieve_page_url, headers=self.headers, json=query_dict
  320. )
  321. data = res.json()
  322. return data["last_edited_time"]
  323. if __name__ == "__main__":
  324. reader = NotionPageReader()
  325. logger.info(reader.search("What I"))