client.py 11 KB


  1. import hashlib
  2. import json
  3. import logging
  4. import os
  5. import threading
  6. import time
  7. from collections.abc import Mapping
  8. from pathlib import Path
  9. from .python_3x import http_request, makedirs_wrapper
  10. from .utils import (
  11. CONFIGURATIONS,
  12. NAMESPACE_NAME,
  13. NOTIFICATION_ID,
  14. get_value_from_dict,
  15. init_ip,
  16. no_key_cache_key,
  17. signature,
  18. url_encode_wrapper,
  19. )
  20. logger = logging.getLogger(__name__)
  21. class ApolloClient:
  22. def __init__(
  23. self,
  24. config_url,
  25. app_id,
  26. cluster="default",
  27. secret="",
  28. start_hot_update=True,
  29. change_listener=None,
  30. _notification_map=None,
  31. ):
  32. # Core routing parameters
  33. self.config_url = config_url
  34. self.cluster = cluster
  35. self.app_id = app_id
  36. # Non-core parameters
  37. self.ip = init_ip()
  38. self.secret = secret
  39. # Check the parameter variables
  40. # Private control variables
  41. self._cycle_time = 5
  42. self._stopping = False
  43. self._cache = {}
  44. self._no_key = {}
  45. self._hash = {}
  46. self._pull_timeout = 75
  47. self._cache_file_path = os.path.expanduser("~") + "/.dify/config/remote-settings/apollo/cache/"
  48. self._long_poll_thread = None
  49. self._change_listener = change_listener # "add" "delete" "update"
  50. if _notification_map is None:
  51. _notification_map = {"application": -1}
  52. self._notification_map = _notification_map
  53. self.last_release_key = None
  54. # Private startup method
  55. self._path_checker()
  56. if start_hot_update:
  57. self._start_hot_update()
  58. # start the heartbeat thread
  59. heartbeat = threading.Thread(target=self._heart_beat)
  60. heartbeat.daemon = True
  61. heartbeat.start()
  62. def get_json_from_net(self, namespace="application"):
  63. url = "{}/configs/{}/{}/{}?releaseKey={}&ip={}".format(
  64. self.config_url, self.app_id, self.cluster, namespace, "", self.ip
  65. )
  66. try:
  67. code, body = http_request(url, timeout=3, headers=self._sign_headers(url))
  68. if code == 200:
  69. if not body:
  70. logger.error(f"get_json_from_net load configs failed, body is {body}")
  71. return None
  72. data = json.loads(body)
  73. data = data["configurations"]
  74. return_data = {CONFIGURATIONS: data}
  75. return return_data
  76. else:
  77. return None
  78. except Exception:
  79. logger.exception("an error occurred in get_json_from_net")
  80. return None
  81. def get_value(self, key, default_val=None, namespace="application"):
  82. try:
  83. # read memory configuration
  84. namespace_cache = self._cache.get(namespace)
  85. val = get_value_from_dict(namespace_cache, key)
  86. if val is not None:
  87. return val
  88. no_key = no_key_cache_key(namespace, key)
  89. if no_key in self._no_key:
  90. return default_val
  91. # read the network configuration
  92. namespace_data = self.get_json_from_net(namespace)
  93. val = get_value_from_dict(namespace_data, key)
  94. if val is not None:
  95. self._update_cache_and_file(namespace_data, namespace)
  96. return val
  97. # read the file configuration
  98. namespace_cache = self._get_local_cache(namespace)
  99. val = get_value_from_dict(namespace_cache, key)
  100. if val is not None:
  101. self._update_cache_and_file(namespace_cache, namespace)
  102. return val
  103. # If all of them are not obtained, the default value is returned
  104. # and the local cache is set to None
  105. self._set_local_cache_none(namespace, key)
  106. return default_val
  107. except Exception:
  108. logger.exception("get_value has error, [key is %s], [namespace is %s]", key, namespace)
  109. return default_val
  110. # Set the key of a namespace to none, and do not set default val
  111. # to ensure the real-time correctness of the function call.
  112. # If the user does not have the same default val twice
  113. # and the default val is used here, there may be a problem.
  114. def _set_local_cache_none(self, namespace, key):
  115. no_key = no_key_cache_key(namespace, key)
  116. self._no_key[no_key] = key
  117. def _start_hot_update(self):
  118. self._long_poll_thread = threading.Thread(target=self._listener)
  119. # When the asynchronous thread is started, the daemon thread will automatically exit
  120. # when the main thread is launched.
  121. self._long_poll_thread.daemon = True
  122. self._long_poll_thread.start()
  123. def stop(self):
  124. self._stopping = True
  125. logger.info("Stopping listener...")
  126. # Call the set callback function, and if it is abnormal, try it out
  127. def _call_listener(self, namespace, old_kv, new_kv):
  128. if self._change_listener is None:
  129. return
  130. if old_kv is None:
  131. old_kv = {}
  132. if new_kv is None:
  133. new_kv = {}
  134. try:
  135. for key in old_kv:
  136. new_value = new_kv.get(key)
  137. old_value = old_kv.get(key)
  138. if new_value is None:
  139. # If newValue is empty, it means key, and the value is deleted.
  140. self._change_listener("delete", namespace, key, old_value)
  141. continue
  142. if new_value != old_value:
  143. self._change_listener("update", namespace, key, new_value)
  144. continue
  145. for key in new_kv:
  146. new_value = new_kv.get(key)
  147. old_value = old_kv.get(key)
  148. if old_value is None:
  149. self._change_listener("add", namespace, key, new_value)
  150. except BaseException as e:
  151. logger.warning(str(e))
  152. def _path_checker(self):
  153. if not os.path.isdir(self._cache_file_path):
  154. makedirs_wrapper(self._cache_file_path)
  155. # update the local cache and file cache
  156. def _update_cache_and_file(self, namespace_data, namespace="application"):
  157. # update the local cache
  158. self._cache[namespace] = namespace_data
  159. # update the file cache
  160. new_string = json.dumps(namespace_data)
  161. new_hash = hashlib.md5(new_string.encode("utf-8")).hexdigest()
  162. if self._hash.get(namespace) == new_hash:
  163. pass
  164. else:
  165. file_path = Path(self._cache_file_path) / f"{self.app_id}_configuration_{namespace}.txt"
  166. file_path.write_text(new_string)
  167. self._hash[namespace] = new_hash
  168. # get the configuration from the local file
  169. def _get_local_cache(self, namespace="application"):
  170. cache_file_path = os.path.join(self._cache_file_path, f"{self.app_id}_configuration_{namespace}.txt")
  171. if os.path.isfile(cache_file_path):
  172. with open(cache_file_path) as f:
  173. result = json.loads(f.readline())
  174. return result
  175. return {}
  176. def _long_poll(self):
  177. notifications = []
  178. for key in self._cache:
  179. namespace_data = self._cache[key]
  180. notification_id = -1
  181. if NOTIFICATION_ID in namespace_data:
  182. notification_id = self._cache[key][NOTIFICATION_ID]
  183. notifications.append({NAMESPACE_NAME: key, NOTIFICATION_ID: notification_id})
  184. try:
  185. # if the length is 0 it is returned directly
  186. if len(notifications) == 0:
  187. return
  188. url = "{}/notifications/v2".format(self.config_url)
  189. params = {
  190. "appId": self.app_id,
  191. "cluster": self.cluster,
  192. "notifications": json.dumps(notifications, ensure_ascii=False),
  193. }
  194. param_str = url_encode_wrapper(params)
  195. url = url + "?" + param_str
  196. code, body = http_request(url, self._pull_timeout, headers=self._sign_headers(url))
  197. http_code = code
  198. if http_code == 304:
  199. logger.debug("No change, loop...")
  200. return
  201. if http_code == 200:
  202. if not body:
  203. logger.error(f"_long_poll load configs failed,body is {body}")
  204. return
  205. data = json.loads(body)
  206. for entry in data:
  207. namespace = entry[NAMESPACE_NAME]
  208. n_id = entry[NOTIFICATION_ID]
  209. logger.info("%s has changes: notificationId=%d", namespace, n_id)
  210. self._get_net_and_set_local(namespace, n_id, call_change=True)
  211. return
  212. else:
  213. logger.warning("Sleep...")
  214. except Exception as e:
  215. logger.warning(str(e))
  216. def _get_net_and_set_local(self, namespace, n_id, call_change=False):
  217. namespace_data = self.get_json_from_net(namespace)
  218. if not namespace_data:
  219. return
  220. namespace_data[NOTIFICATION_ID] = n_id
  221. old_namespace = self._cache.get(namespace)
  222. self._update_cache_and_file(namespace_data, namespace)
  223. if self._change_listener is not None and call_change and old_namespace:
  224. old_kv = old_namespace.get(CONFIGURATIONS)
  225. new_kv = namespace_data.get(CONFIGURATIONS)
  226. self._call_listener(namespace, old_kv, new_kv)
  227. def _listener(self):
  228. logger.info("start long_poll")
  229. while not self._stopping:
  230. self._long_poll()
  231. time.sleep(self._cycle_time)
  232. logger.info("stopped, long_poll")
  233. # add the need for endorsement to the header
  234. def _sign_headers(self, url: str) -> Mapping[str, str]:
  235. headers: dict[str, str] = {}
  236. if self.secret == "":
  237. return headers
  238. uri = url[len(self.config_url) : len(url)]
  239. time_unix_now = str(int(round(time.time() * 1000)))
  240. headers["Authorization"] = "Apollo " + self.app_id + ":" + signature(time_unix_now, uri, self.secret)
  241. headers["Timestamp"] = time_unix_now
  242. return headers
  243. def _heart_beat(self):
  244. while not self._stopping:
  245. for namespace in self._notification_map:
  246. self._do_heart_beat(namespace)
  247. time.sleep(60 * 10) # 10分钟
  248. def _do_heart_beat(self, namespace):
  249. url = "{}/configs/{}/{}/{}?ip={}".format(self.config_url, self.app_id, self.cluster, namespace, self.ip)
  250. try:
  251. code, body = http_request(url, timeout=3, headers=self._sign_headers(url))
  252. if code == 200:
  253. if not body:
  254. logger.error(f"_do_heart_beat load configs failed,body is {body}")
  255. return None
  256. data = json.loads(body)
  257. if self.last_release_key == data["releaseKey"]:
  258. return None
  259. self.last_release_key = data["releaseKey"]
  260. data = data["configurations"]
  261. self._update_cache_and_file(data, namespace)
  262. else:
  263. return None
  264. except Exception:
  265. logger.exception("an error occurred in _do_heart_beat")
  266. return None
  267. def get_all_dicts(self, namespace):
  268. namespace_data = self._cache.get(namespace)
  269. if namespace_data is None:
  270. net_namespace_data = self.get_json_from_net(namespace)
  271. if not net_namespace_data:
  272. return namespace_data
  273. namespace_data = net_namespace_data.get(CONFIGURATIONS)
  274. if namespace_data:
  275. self._update_cache_and_file(namespace_data, namespace)
  276. return namespace_data