123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304 |
- import hashlib
- import json
- import logging
- import os
- import threading
- import time
- from collections.abc import Mapping
- from pathlib import Path
- from .python_3x import http_request, makedirs_wrapper
- from .utils import (
- CONFIGURATIONS,
- NAMESPACE_NAME,
- NOTIFICATION_ID,
- get_value_from_dict,
- init_ip,
- no_key_cache_key,
- signature,
- url_encode_wrapper,
- )
- logger = logging.getLogger(__name__)
- class ApolloClient:
- def __init__(
- self,
- config_url,
- app_id,
- cluster="default",
- secret="",
- start_hot_update=True,
- change_listener=None,
- _notification_map=None,
- ):
- # Core routing parameters
- self.config_url = config_url
- self.cluster = cluster
- self.app_id = app_id
- # Non-core parameters
- self.ip = init_ip()
- self.secret = secret
- # Check the parameter variables
- # Private control variables
- self._cycle_time = 5
- self._stopping = False
- self._cache = {}
- self._no_key = {}
- self._hash = {}
- self._pull_timeout = 75
- self._cache_file_path = os.path.expanduser("~") + "/.dify/config/remote-settings/apollo/cache/"
- self._long_poll_thread = None
- self._change_listener = change_listener # "add" "delete" "update"
- if _notification_map is None:
- _notification_map = {"application": -1}
- self._notification_map = _notification_map
- self.last_release_key = None
- # Private startup method
- self._path_checker()
- if start_hot_update:
- self._start_hot_update()
- # start the heartbeat thread
- heartbeat = threading.Thread(target=self._heart_beat)
- heartbeat.daemon = True
- heartbeat.start()
- def get_json_from_net(self, namespace="application"):
- url = "{}/configs/{}/{}/{}?releaseKey={}&ip={}".format(
- self.config_url, self.app_id, self.cluster, namespace, "", self.ip
- )
- try:
- code, body = http_request(url, timeout=3, headers=self._sign_headers(url))
- if code == 200:
- if not body:
- logger.error(f"get_json_from_net load configs failed, body is {body}")
- return None
- data = json.loads(body)
- data = data["configurations"]
- return_data = {CONFIGURATIONS: data}
- return return_data
- else:
- return None
- except Exception:
- logger.exception("an error occurred in get_json_from_net")
- return None
- def get_value(self, key, default_val=None, namespace="application"):
- try:
- # read memory configuration
- namespace_cache = self._cache.get(namespace)
- val = get_value_from_dict(namespace_cache, key)
- if val is not None:
- return val
- no_key = no_key_cache_key(namespace, key)
- if no_key in self._no_key:
- return default_val
- # read the network configuration
- namespace_data = self.get_json_from_net(namespace)
- val = get_value_from_dict(namespace_data, key)
- if val is not None:
- self._update_cache_and_file(namespace_data, namespace)
- return val
- # read the file configuration
- namespace_cache = self._get_local_cache(namespace)
- val = get_value_from_dict(namespace_cache, key)
- if val is not None:
- self._update_cache_and_file(namespace_cache, namespace)
- return val
- # If all of them are not obtained, the default value is returned
- # and the local cache is set to None
- self._set_local_cache_none(namespace, key)
- return default_val
- except Exception:
- logger.exception("get_value has error, [key is %s], [namespace is %s]", key, namespace)
- return default_val
- # Set the key of a namespace to none, and do not set default val
- # to ensure the real-time correctness of the function call.
- # If the user does not have the same default val twice
- # and the default val is used here, there may be a problem.
- def _set_local_cache_none(self, namespace, key):
- no_key = no_key_cache_key(namespace, key)
- self._no_key[no_key] = key
- def _start_hot_update(self):
- self._long_poll_thread = threading.Thread(target=self._listener)
- # When the asynchronous thread is started, the daemon thread will automatically exit
- # when the main thread is launched.
- self._long_poll_thread.daemon = True
- self._long_poll_thread.start()
- def stop(self):
- self._stopping = True
- logger.info("Stopping listener...")
- # Call the set callback function, and if it is abnormal, try it out
- def _call_listener(self, namespace, old_kv, new_kv):
- if self._change_listener is None:
- return
- if old_kv is None:
- old_kv = {}
- if new_kv is None:
- new_kv = {}
- try:
- for key in old_kv:
- new_value = new_kv.get(key)
- old_value = old_kv.get(key)
- if new_value is None:
- # If newValue is empty, it means key, and the value is deleted.
- self._change_listener("delete", namespace, key, old_value)
- continue
- if new_value != old_value:
- self._change_listener("update", namespace, key, new_value)
- continue
- for key in new_kv:
- new_value = new_kv.get(key)
- old_value = old_kv.get(key)
- if old_value is None:
- self._change_listener("add", namespace, key, new_value)
- except BaseException as e:
- logger.warning(str(e))
- def _path_checker(self):
- if not os.path.isdir(self._cache_file_path):
- makedirs_wrapper(self._cache_file_path)
- # update the local cache and file cache
- def _update_cache_and_file(self, namespace_data, namespace="application"):
- # update the local cache
- self._cache[namespace] = namespace_data
- # update the file cache
- new_string = json.dumps(namespace_data)
- new_hash = hashlib.md5(new_string.encode("utf-8")).hexdigest()
- if self._hash.get(namespace) == new_hash:
- pass
- else:
- file_path = Path(self._cache_file_path) / f"{self.app_id}_configuration_{namespace}.txt"
- file_path.write_text(new_string)
- self._hash[namespace] = new_hash
- # get the configuration from the local file
- def _get_local_cache(self, namespace="application"):
- cache_file_path = os.path.join(self._cache_file_path, f"{self.app_id}_configuration_{namespace}.txt")
- if os.path.isfile(cache_file_path):
- with open(cache_file_path) as f:
- result = json.loads(f.readline())
- return result
- return {}
- def _long_poll(self):
- notifications = []
- for key in self._cache:
- namespace_data = self._cache[key]
- notification_id = -1
- if NOTIFICATION_ID in namespace_data:
- notification_id = self._cache[key][NOTIFICATION_ID]
- notifications.append({NAMESPACE_NAME: key, NOTIFICATION_ID: notification_id})
- try:
- # if the length is 0 it is returned directly
- if len(notifications) == 0:
- return
- url = "{}/notifications/v2".format(self.config_url)
- params = {
- "appId": self.app_id,
- "cluster": self.cluster,
- "notifications": json.dumps(notifications, ensure_ascii=False),
- }
- param_str = url_encode_wrapper(params)
- url = url + "?" + param_str
- code, body = http_request(url, self._pull_timeout, headers=self._sign_headers(url))
- http_code = code
- if http_code == 304:
- logger.debug("No change, loop...")
- return
- if http_code == 200:
- if not body:
- logger.error(f"_long_poll load configs failed,body is {body}")
- return
- data = json.loads(body)
- for entry in data:
- namespace = entry[NAMESPACE_NAME]
- n_id = entry[NOTIFICATION_ID]
- logger.info("%s has changes: notificationId=%d", namespace, n_id)
- self._get_net_and_set_local(namespace, n_id, call_change=True)
- return
- else:
- logger.warning("Sleep...")
- except Exception as e:
- logger.warning(str(e))
- def _get_net_and_set_local(self, namespace, n_id, call_change=False):
- namespace_data = self.get_json_from_net(namespace)
- if not namespace_data:
- return
- namespace_data[NOTIFICATION_ID] = n_id
- old_namespace = self._cache.get(namespace)
- self._update_cache_and_file(namespace_data, namespace)
- if self._change_listener is not None and call_change and old_namespace:
- old_kv = old_namespace.get(CONFIGURATIONS)
- new_kv = namespace_data.get(CONFIGURATIONS)
- self._call_listener(namespace, old_kv, new_kv)
- def _listener(self):
- logger.info("start long_poll")
- while not self._stopping:
- self._long_poll()
- time.sleep(self._cycle_time)
- logger.info("stopped, long_poll")
- # add the need for endorsement to the header
- def _sign_headers(self, url: str) -> Mapping[str, str]:
- headers: dict[str, str] = {}
- if self.secret == "":
- return headers
- uri = url[len(self.config_url) : len(url)]
- time_unix_now = str(int(round(time.time() * 1000)))
- headers["Authorization"] = "Apollo " + self.app_id + ":" + signature(time_unix_now, uri, self.secret)
- headers["Timestamp"] = time_unix_now
- return headers
- def _heart_beat(self):
- while not self._stopping:
- for namespace in self._notification_map:
- self._do_heart_beat(namespace)
- time.sleep(60 * 10) # 10分钟
- def _do_heart_beat(self, namespace):
- url = "{}/configs/{}/{}/{}?ip={}".format(self.config_url, self.app_id, self.cluster, namespace, self.ip)
- try:
- code, body = http_request(url, timeout=3, headers=self._sign_headers(url))
- if code == 200:
- if not body:
- logger.error(f"_do_heart_beat load configs failed,body is {body}")
- return None
- data = json.loads(body)
- if self.last_release_key == data["releaseKey"]:
- return None
- self.last_release_key = data["releaseKey"]
- data = data["configurations"]
- self._update_cache_and_file(data, namespace)
- else:
- return None
- except Exception:
- logger.exception("an error occurred in _do_heart_beat")
- return None
- def get_all_dicts(self, namespace):
- namespace_data = self._cache.get(namespace)
- if namespace_data is None:
- net_namespace_data = self.get_json_from_net(namespace)
- if not net_namespace_data:
- return namespace_data
- namespace_data = net_namespace_data.get(CONFIGURATIONS)
- if namespace_data:
- self._update_cache_and_file(namespace_data, namespace)
- return namespace_data
|