notion.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. """Notion reader."""
  2. import json
  3. import logging
  4. import os
  5. from datetime import datetime
  6. from typing import Any, Dict, List, Optional
  7. import requests # type: ignore
  8. from llama_index.readers.base import BaseReader
  9. from llama_index.readers.schema.base import Document
  10. INTEGRATION_TOKEN_NAME = "NOTION_INTEGRATION_TOKEN"
  11. BLOCK_CHILD_URL_TMPL = "https://api.notion.com/v1/blocks/{block_id}/children"
  12. DATABASE_URL_TMPL = "https://api.notion.com/v1/databases/{database_id}/query"
  13. SEARCH_URL = "https://api.notion.com/v1/search"
  14. RETRIEVE_PAGE_URL_TMPL = "https://api.notion.com/v1/pages/{page_id}"
  15. RETRIEVE_DATABASE_URL_TMPL = "https://api.notion.com/v1/databases/{database_id}"
  16. HEADING_TYPE = ['heading_1', 'heading_2', 'heading_3']
  17. logger = logging.getLogger(__name__)
  18. # TODO: Notion DB reader coming soon!
  19. class NotionPageReader(BaseReader):
  20. """Notion Page reader.
  21. Reads a set of Notion pages.
  22. Args:
  23. integration_token (str): Notion integration token.
  24. """
  25. def __init__(self, integration_token: Optional[str] = None) -> None:
  26. """Initialize with parameters."""
  27. if integration_token is None:
  28. integration_token = os.getenv(INTEGRATION_TOKEN_NAME)
  29. if integration_token is None:
  30. raise ValueError(
  31. "Must specify `integration_token` or set environment "
  32. "variable `NOTION_INTEGRATION_TOKEN`."
  33. )
  34. self.token = integration_token
  35. self.headers = {
  36. "Authorization": "Bearer " + self.token,
  37. "Content-Type": "application/json",
  38. "Notion-Version": "2022-06-28",
  39. }
  40. def _read_block(self, block_id: str, num_tabs: int = 0) -> str:
  41. """Read a block."""
  42. done = False
  43. result_lines_arr = []
  44. cur_block_id = block_id
  45. while not done:
  46. block_url = BLOCK_CHILD_URL_TMPL.format(block_id=cur_block_id)
  47. query_dict: Dict[str, Any] = {}
  48. res = requests.request(
  49. "GET", block_url, headers=self.headers, json=query_dict
  50. )
  51. data = res.json()
  52. if 'results' not in data or data["results"] is None:
  53. done = True
  54. break
  55. heading = ''
  56. for result in data["results"]:
  57. result_type = result["type"]
  58. result_obj = result[result_type]
  59. cur_result_text_arr = []
  60. if result_type == 'table':
  61. result_block_id = result["id"]
  62. text = self._read_table_rows(result_block_id)
  63. result_lines_arr.append(text)
  64. else:
  65. if "rich_text" in result_obj:
  66. for rich_text in result_obj["rich_text"]:
  67. # skip if doesn't have text object
  68. if "text" in rich_text:
  69. text = rich_text["text"]["content"]
  70. prefix = "\t" * num_tabs
  71. cur_result_text_arr.append(prefix + text)
  72. if result_type in HEADING_TYPE:
  73. heading = text
  74. result_block_id = result["id"]
  75. has_children = result["has_children"]
  76. if has_children:
  77. children_text = self._read_block(
  78. result_block_id, num_tabs=num_tabs + 1
  79. )
  80. cur_result_text_arr.append(children_text)
  81. cur_result_text = "\n".join(cur_result_text_arr)
  82. if result_type in HEADING_TYPE:
  83. result_lines_arr.append(cur_result_text)
  84. else:
  85. result_lines_arr.append(f'{heading}\n{cur_result_text}')
  86. if data["next_cursor"] is None:
  87. done = True
  88. break
  89. else:
  90. cur_block_id = data["next_cursor"]
  91. result_lines = "\n".join(result_lines_arr)
  92. return result_lines
  93. def _read_table_rows(self, block_id: str) -> str:
  94. """Read table rows."""
  95. done = False
  96. result_lines_arr = []
  97. cur_block_id = block_id
  98. while not done:
  99. block_url = BLOCK_CHILD_URL_TMPL.format(block_id=cur_block_id)
  100. query_dict: Dict[str, Any] = {}
  101. res = requests.request(
  102. "GET", block_url, headers=self.headers, json=query_dict
  103. )
  104. data = res.json()
  105. # get table headers text
  106. table_header_cell_texts = []
  107. tabel_header_cells = data["results"][0]['table_row']['cells']
  108. for tabel_header_cell in tabel_header_cells:
  109. if tabel_header_cell:
  110. for table_header_cell_text in tabel_header_cell:
  111. text = table_header_cell_text["text"]["content"]
  112. table_header_cell_texts.append(text)
  113. # get table columns text and format
  114. results = data["results"]
  115. for i in range(len(results)-1):
  116. column_texts = []
  117. tabel_column_cells = data["results"][i+1]['table_row']['cells']
  118. for j in range(len(tabel_column_cells)):
  119. if tabel_column_cells[j]:
  120. for table_column_cell_text in tabel_column_cells[j]:
  121. column_text = table_column_cell_text["text"]["content"]
  122. column_texts.append(f'{table_header_cell_texts[j]}:{column_text}')
  123. cur_result_text = "\n".join(column_texts)
  124. result_lines_arr.append(cur_result_text)
  125. if data["next_cursor"] is None:
  126. done = True
  127. break
  128. else:
  129. cur_block_id = data["next_cursor"]
  130. result_lines = "\n".join(result_lines_arr)
  131. return result_lines
  132. def _read_parent_blocks(self, block_id: str, num_tabs: int = 0) -> List[str]:
  133. """Read a block."""
  134. done = False
  135. result_lines_arr = []
  136. cur_block_id = block_id
  137. while not done:
  138. block_url = BLOCK_CHILD_URL_TMPL.format(block_id=cur_block_id)
  139. query_dict: Dict[str, Any] = {}
  140. res = requests.request(
  141. "GET", block_url, headers=self.headers, json=query_dict
  142. )
  143. data = res.json()
  144. # current block's heading
  145. heading = ''
  146. for result in data["results"]:
  147. result_type = result["type"]
  148. result_obj = result[result_type]
  149. cur_result_text_arr = []
  150. if result_type == 'table':
  151. result_block_id = result["id"]
  152. text = self._read_table_rows(result_block_id)
  153. text += "\n\n"
  154. result_lines_arr.append(text)
  155. else:
  156. if "rich_text" in result_obj:
  157. for rich_text in result_obj["rich_text"]:
  158. # skip if doesn't have text object
  159. if "text" in rich_text:
  160. text = rich_text["text"]["content"]
  161. cur_result_text_arr.append(text)
  162. if result_type in HEADING_TYPE:
  163. heading = text
  164. result_block_id = result["id"]
  165. has_children = result["has_children"]
  166. if has_children:
  167. children_text = self._read_block(
  168. result_block_id, num_tabs=num_tabs + 1
  169. )
  170. cur_result_text_arr.append(children_text)
  171. cur_result_text = "\n".join(cur_result_text_arr)
  172. cur_result_text += "\n\n"
  173. if result_type in HEADING_TYPE:
  174. result_lines_arr.append(cur_result_text)
  175. else:
  176. result_lines_arr.append(f'{heading}\n{cur_result_text}')
  177. if data["next_cursor"] is None:
  178. done = True
  179. break
  180. else:
  181. cur_block_id = data["next_cursor"]
  182. return result_lines_arr
  183. def read_page(self, page_id: str) -> str:
  184. """Read a page."""
  185. return self._read_block(page_id)
  186. def read_page_as_documents(self, page_id: str) -> List[str]:
  187. """Read a page as documents."""
  188. return self._read_parent_blocks(page_id)
  189. def query_database_data(
  190. self, database_id: str, query_dict: Dict[str, Any] = {}
  191. ) -> str:
  192. """Get all the pages from a Notion database."""
  193. res = requests.post\
  194. (
  195. DATABASE_URL_TMPL.format(database_id=database_id),
  196. headers=self.headers,
  197. json=query_dict,
  198. )
  199. data = res.json()
  200. database_content_list = []
  201. if 'results' not in data or data["results"] is None:
  202. return ""
  203. for result in data["results"]:
  204. properties = result['properties']
  205. data = {}
  206. for property_name, property_value in properties.items():
  207. type = property_value['type']
  208. if type == 'multi_select':
  209. value = []
  210. multi_select_list = property_value[type]
  211. for multi_select in multi_select_list:
  212. value.append(multi_select['name'])
  213. elif type == 'rich_text' or type == 'title':
  214. if len(property_value[type]) > 0:
  215. value = property_value[type][0]['plain_text']
  216. else:
  217. value = ''
  218. elif type == 'select' or type == 'status':
  219. if property_value[type]:
  220. value = property_value[type]['name']
  221. else:
  222. value = ''
  223. else:
  224. value = property_value[type]
  225. data[property_name] = value
  226. database_content_list.append(json.dumps(data))
  227. return "\n\n".join(database_content_list)
  228. def query_database(
  229. self, database_id: str, query_dict: Dict[str, Any] = {}
  230. ) -> List[str]:
  231. """Get all the pages from a Notion database."""
  232. res = requests.post\
  233. (
  234. DATABASE_URL_TMPL.format(database_id=database_id),
  235. headers=self.headers,
  236. json=query_dict,
  237. )
  238. data = res.json()
  239. page_ids = []
  240. for result in data["results"]:
  241. page_id = result["id"]
  242. page_ids.append(page_id)
  243. return page_ids
  244. def search(self, query: str) -> List[str]:
  245. """Search Notion page given a text query."""
  246. done = False
  247. next_cursor: Optional[str] = None
  248. page_ids = []
  249. while not done:
  250. query_dict = {
  251. "query": query,
  252. }
  253. if next_cursor is not None:
  254. query_dict["start_cursor"] = next_cursor
  255. res = requests.post(SEARCH_URL, headers=self.headers, json=query_dict)
  256. data = res.json()
  257. for result in data["results"]:
  258. page_id = result["id"]
  259. page_ids.append(page_id)
  260. if data["next_cursor"] is None:
  261. done = True
  262. break
  263. else:
  264. next_cursor = data["next_cursor"]
  265. return page_ids
  266. def load_data(
  267. self, page_ids: List[str] = [], database_id: Optional[str] = None
  268. ) -> List[Document]:
  269. """Load data from the input directory.
  270. Args:
  271. page_ids (List[str]): List of page ids to load.
  272. Returns:
  273. List[Document]: List of documents.
  274. """
  275. if not page_ids and not database_id:
  276. raise ValueError("Must specify either `page_ids` or `database_id`.")
  277. docs = []
  278. if database_id is not None:
  279. # get all the pages in the database
  280. page_ids = self.query_database(database_id)
  281. for page_id in page_ids:
  282. page_text = self.read_page(page_id)
  283. docs.append(Document(page_text))
  284. else:
  285. for page_id in page_ids:
  286. page_text = self.read_page(page_id)
  287. docs.append(Document(page_text))
  288. return docs
  289. def load_data_as_documents(
  290. self, page_ids: List[str] = [], database_id: Optional[str] = None
  291. ) -> List[Document]:
  292. if not page_ids and not database_id:
  293. raise ValueError("Must specify either `page_ids` or `database_id`.")
  294. docs = []
  295. if database_id is not None:
  296. # get all the pages in the database
  297. page_text = self.query_database_data(database_id)
  298. docs.append(Document(page_text))
  299. else:
  300. for page_id in page_ids:
  301. page_text_list = self.read_page_as_documents(page_id)
  302. for page_text in page_text_list:
  303. docs.append(Document(page_text))
  304. return docs
  305. def get_page_last_edited_time(self, page_id: str) -> str:
  306. retrieve_page_url = RETRIEVE_PAGE_URL_TMPL.format(page_id=page_id)
  307. query_dict: Dict[str, Any] = {}
  308. res = requests.request(
  309. "GET", retrieve_page_url, headers=self.headers, json=query_dict
  310. )
  311. data = res.json()
  312. return data["last_edited_time"]
  313. def get_database_last_edited_time(self, database_id: str) -> str:
  314. retrieve_page_url = RETRIEVE_DATABASE_URL_TMPL.format(database_id=database_id)
  315. query_dict: Dict[str, Any] = {}
  316. res = requests.request(
  317. "GET", retrieve_page_url, headers=self.headers, json=query_dict
  318. )
  319. data = res.json()
  320. return data["last_edited_time"]
  321. if __name__ == "__main__":
  322. reader = NotionPageReader()
  323. logger.info(reader.search("What I"))