client.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. import hashlib
  2. import json
  3. import logging
  4. import os
  5. import threading
  6. import time
  7. from collections.abc import Mapping
  8. from pathlib import Path
  9. from .python_3x import http_request, makedirs_wrapper
  10. from .utils import (
  11. CONFIGURATIONS,
  12. NAMESPACE_NAME,
  13. NOTIFICATION_ID,
  14. get_value_from_dict,
  15. init_ip,
  16. no_key_cache_key,
  17. signature,
  18. url_encode_wrapper,
  19. )
  20. logger = logging.getLogger(__name__)
  21. class ApolloClient:
  22. def __init__(
  23. self,
  24. config_url,
  25. app_id,
  26. cluster="default",
  27. secret="",
  28. start_hot_update=True,
  29. change_listener=None,
  30. _notification_map=None,
  31. ):
  32. # Core routing parameters
  33. self.config_url = config_url
  34. self.cluster = cluster
  35. self.app_id = app_id
  36. # Non-core parameters
  37. self.ip = init_ip()
  38. self.secret = secret
  39. # Check the parameter variables
  40. # Private control variables
  41. self._cycle_time = 5
  42. self._stopping = False
  43. self._cache = {}
  44. self._no_key = {}
  45. self._hash = {}
  46. self._pull_timeout = 75
  47. self._cache_file_path = os.path.expanduser("~") + "/.dify/config/remote-settings/apollo/cache/"
  48. self._long_poll_thread = None
  49. self._change_listener = change_listener # "add" "delete" "update"
  50. if _notification_map is None:
  51. _notification_map = {"application": -1}
  52. self._notification_map = _notification_map
  53. self.last_release_key = None
  54. # Private startup method
  55. self._path_checker()
  56. if start_hot_update:
  57. self._start_hot_update()
  58. # start the heartbeat thread
  59. heartbeat = threading.Thread(target=self._heart_beat)
  60. heartbeat.daemon = True
  61. heartbeat.start()
  62. def get_json_from_net(self, namespace="application"):
  63. url = "{}/configs/{}/{}/{}?releaseKey={}&ip={}".format(
  64. self.config_url, self.app_id, self.cluster, namespace, "", self.ip
  65. )
  66. try:
  67. code, body = http_request(url, timeout=3, headers=self._sign_headers(url))
  68. if code == 200:
  69. if not body:
  70. logger.error(f"get_json_from_net load configs failed, body is {body}")
  71. return None
  72. data = json.loads(body)
  73. data = data["configurations"]
  74. return_data = {CONFIGURATIONS: data}
  75. return return_data
  76. else:
  77. return None
  78. except Exception:
  79. logger.exception("an error occurred in get_json_from_net")
  80. return None
  81. def get_value(self, key, default_val=None, namespace="application"):
  82. try:
  83. # read memory configuration
  84. namespace_cache = self._cache.get(namespace)
  85. val = get_value_from_dict(namespace_cache, key)
  86. if val is not None:
  87. return val
  88. no_key = no_key_cache_key(namespace, key)
  89. if no_key in self._no_key:
  90. return default_val
  91. # read the network configuration
  92. namespace_data = self.get_json_from_net(namespace)
  93. val = get_value_from_dict(namespace_data, key)
  94. if val is not None:
  95. self._update_cache_and_file(namespace_data, namespace)
  96. return val
  97. # read the file configuration
  98. namespace_cache = self._get_local_cache(namespace)
  99. val = get_value_from_dict(namespace_cache, key)
  100. if val is not None:
  101. self._update_cache_and_file(namespace_cache, namespace)
  102. return val
  103. # If all of them are not obtained, the default value is returned
  104. # and the local cache is set to None
  105. self._set_local_cache_none(namespace, key)
  106. return default_val
  107. except Exception:
  108. logger.exception("get_value has error, [key is %s], [namespace is %s]", key, namespace)
  109. return default_val
  110. # Set the key of a namespace to none, and do not set default val
  111. # to ensure the real-time correctness of the function call.
  112. # If the user does not have the same default val twice
  113. # and the default val is used here, there may be a problem.
  114. def _set_local_cache_none(self, namespace, key):
  115. no_key = no_key_cache_key(namespace, key)
  116. self._no_key[no_key] = key
  117. def _start_hot_update(self):
  118. self._long_poll_thread = threading.Thread(target=self._listener)
  119. # When the asynchronous thread is started, the daemon thread will automatically exit
  120. # when the main thread is launched.
  121. self._long_poll_thread.daemon = True
  122. self._long_poll_thread.start()
  123. def stop(self):
  124. self._stopping = True
  125. logger.info("Stopping listener...")
  126. # Call the set callback function, and if it is abnormal, try it out
  127. def _call_listener(self, namespace, old_kv, new_kv):
  128. if self._change_listener is None:
  129. return
  130. if old_kv is None:
  131. old_kv = {}
  132. if new_kv is None:
  133. new_kv = {}
  134. try:
  135. for key in old_kv:
  136. new_value = new_kv.get(key)
  137. old_value = old_kv.get(key)
  138. if new_value is None:
  139. # If newValue is empty, it means key, and the value is deleted.
  140. self._change_listener("delete", namespace, key, old_value)
  141. continue
  142. if new_value != old_value:
  143. self._change_listener("update", namespace, key, new_value)
  144. continue
  145. for key in new_kv:
  146. new_value = new_kv.get(key)
  147. old_value = old_kv.get(key)
  148. if old_value is None:
  149. self._change_listener("add", namespace, key, new_value)
  150. except BaseException as e:
  151. logger.warning(str(e))
  152. def _path_checker(self):
  153. if not os.path.isdir(self._cache_file_path):
  154. makedirs_wrapper(self._cache_file_path)
  155. # update the local cache and file cache
  156. def _update_cache_and_file(self, namespace_data, namespace="application"):
  157. # update the local cache
  158. self._cache[namespace] = namespace_data
  159. # update the file cache
  160. new_string = json.dumps(namespace_data)
  161. new_hash = hashlib.md5(new_string.encode("utf-8")).hexdigest()
  162. if self._hash.get(namespace) == new_hash:
  163. pass
  164. else:
  165. file_path = Path(self._cache_file_path) / f"{self.app_id}_configuration_{namespace}.txt"
  166. file_path.write_text(new_string)
  167. self._hash[namespace] = new_hash
  168. # get the configuration from the local file
  169. def _get_local_cache(self, namespace="application"):
  170. cache_file_path = os.path.join(self._cache_file_path, f"{self.app_id}_configuration_{namespace}.txt")
  171. if os.path.isfile(cache_file_path):
  172. with open(cache_file_path) as f:
  173. result = json.loads(f.readline())
  174. return result
  175. return {}
  176. def _long_poll(self):
  177. notifications = []
  178. for key in self._cache:
  179. namespace_data = self._cache[key]
  180. notification_id = -1
  181. if NOTIFICATION_ID in namespace_data:
  182. notification_id = self._cache[key][NOTIFICATION_ID]
  183. notifications.append({NAMESPACE_NAME: key, NOTIFICATION_ID: notification_id})
  184. try:
  185. # if the length is 0 it is returned directly
  186. if len(notifications) == 0:
  187. return
  188. url = "{}/notifications/v2".format(self.config_url)
  189. params = {
  190. "appId": self.app_id,
  191. "cluster": self.cluster,
  192. "notifications": json.dumps(notifications, ensure_ascii=False),
  193. }
  194. param_str = url_encode_wrapper(params)
  195. url = url + "?" + param_str
  196. code, body = http_request(url, self._pull_timeout, headers=self._sign_headers(url))
  197. http_code = code
  198. if http_code == 304:
  199. logger.debug("No change, loop...")
  200. return
  201. if http_code == 200:
  202. if not body:
  203. logger.error(f"_long_poll load configs failed,body is {body}")
  204. return
  205. data = json.loads(body)
  206. for entry in data:
  207. namespace = entry[NAMESPACE_NAME]
  208. n_id = entry[NOTIFICATION_ID]
  209. logger.info("%s has changes: notificationId=%d", namespace, n_id)
  210. self._get_net_and_set_local(namespace, n_id, call_change=True)
  211. return
  212. else:
  213. logger.warning("Sleep...")
  214. except Exception as e:
  215. logger.warning(str(e))
  216. def _get_net_and_set_local(self, namespace, n_id, call_change=False):
  217. namespace_data = self.get_json_from_net(namespace)
  218. if not namespace_data:
  219. return
  220. namespace_data[NOTIFICATION_ID] = n_id
  221. old_namespace = self._cache.get(namespace)
  222. self._update_cache_and_file(namespace_data, namespace)
  223. if self._change_listener is not None and call_change and old_namespace:
  224. old_kv = old_namespace.get(CONFIGURATIONS)
  225. new_kv = namespace_data.get(CONFIGURATIONS)
  226. self._call_listener(namespace, old_kv, new_kv)
  227. def _listener(self):
  228. logger.info("start long_poll")
  229. while not self._stopping:
  230. self._long_poll()
  231. time.sleep(self._cycle_time)
  232. logger.info("stopped, long_poll")
  233. # add the need for endorsement to the header
  234. def _sign_headers(self, url: str) -> Mapping[str, str]:
  235. headers: dict[str, str] = {}
  236. if self.secret == "":
  237. return headers
  238. uri = url[len(self.config_url) : len(url)]
  239. time_unix_now = str(int(round(time.time() * 1000)))
  240. headers["Authorization"] = "Apollo " + self.app_id + ":" + signature(time_unix_now, uri, self.secret)
  241. headers["Timestamp"] = time_unix_now
  242. return headers
  243. def _heart_beat(self):
  244. while not self._stopping:
  245. for namespace in self._notification_map:
  246. self._do_heart_beat(namespace)
  247. time.sleep(60 * 10) # 10分钟
  248. def _do_heart_beat(self, namespace):
  249. url = "{}/configs/{}/{}/{}?ip={}".format(self.config_url, self.app_id, self.cluster, namespace, self.ip)
  250. try:
  251. code, body = http_request(url, timeout=3, headers=self._sign_headers(url))
  252. if code == 200:
  253. if not body:
  254. logger.error(f"_do_heart_beat load configs failed,body is {body}")
  255. return None
  256. data = json.loads(body)
  257. if self.last_release_key == data["releaseKey"]:
  258. return None
  259. self.last_release_key = data["releaseKey"]
  260. data = data["configurations"]
  261. self._update_cache_and_file(data, namespace)
  262. else:
  263. return None
  264. except Exception:
  265. logger.exception("an error occurred in _do_heart_beat")
  266. return None
  267. def get_all_dicts(self, namespace):
  268. namespace_data = self._cache.get(namespace)
  269. if namespace_data is None:
  270. net_namespace_data = self.get_json_from_net(namespace)
  271. if not net_namespace_data:
  272. return namespace_data
  273. namespace_data = net_namespace_data.get(CONFIGURATIONS)
  274. if namespace_data:
  275. self._update_cache_and_file(namespace_data, namespace)
  276. return namespace_data