ext_redis.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import redis
  2. from redis.cluster import ClusterNode, RedisCluster
  3. from redis.connection import Connection, SSLConnection
  4. from redis.sentinel import Sentinel
  5. from configs import dify_config
  6. from dify_app import DifyApp
  7. class RedisClientWrapper:
  8. """
  9. A wrapper class for the Redis client that addresses the issue where the global
  10. `redis_client` variable cannot be updated when a new Redis instance is returned
  11. by Sentinel.
  12. This class allows for deferred initialization of the Redis client, enabling the
  13. client to be re-initialized with a new instance when necessary. This is particularly
  14. useful in scenarios where the Redis instance may change dynamically, such as during
  15. a failover in a Sentinel-managed Redis setup.
  16. Attributes:
  17. _client (redis.Redis): The actual Redis client instance. It remains None until
  18. initialized with the `initialize` method.
  19. Methods:
  20. initialize(client): Initializes the Redis client if it hasn't been initialized already.
  21. __getattr__(item): Delegates attribute access to the Redis client, raising an error
  22. if the client is not initialized.
  23. """
  24. def __init__(self):
  25. self._client = None
  26. def initialize(self, client):
  27. if self._client is None:
  28. self._client = client
  29. def __getattr__(self, item):
  30. if self._client is None:
  31. raise RuntimeError("Redis client is not initialized. Call init_app first.")
  32. return getattr(self._client, item)
  33. redis_client = RedisClientWrapper()
  34. def init_app(app: DifyApp):
  35. global redis_client
  36. connection_class = Connection
  37. if dify_config.REDIS_USE_SSL:
  38. connection_class = SSLConnection
  39. redis_params = {
  40. "username": dify_config.REDIS_USERNAME,
  41. "password": dify_config.REDIS_PASSWORD,
  42. "db": dify_config.REDIS_DB,
  43. "encoding": "utf-8",
  44. "encoding_errors": "strict",
  45. "decode_responses": False,
  46. }
  47. if dify_config.REDIS_USE_SENTINEL:
  48. sentinel_hosts = [
  49. (node.split(":")[0], int(node.split(":")[1])) for node in dify_config.REDIS_SENTINELS.split(",")
  50. ]
  51. sentinel = Sentinel(
  52. sentinel_hosts,
  53. sentinel_kwargs={
  54. "socket_timeout": dify_config.REDIS_SENTINEL_SOCKET_TIMEOUT,
  55. "username": dify_config.REDIS_SENTINEL_USERNAME,
  56. "password": dify_config.REDIS_SENTINEL_PASSWORD,
  57. },
  58. )
  59. master = sentinel.master_for(dify_config.REDIS_SENTINEL_SERVICE_NAME, **redis_params)
  60. redis_client.initialize(master)
  61. elif dify_config.REDIS_USE_CLUSTERS:
  62. nodes = [
  63. ClusterNode(host=node.split(":")[0], port=int(node.split.split(":")[1]))
  64. for node in dify_config.REDIS_CLUSTERS.split(",")
  65. ]
  66. redis_client.initialize(RedisCluster(startup_nodes=nodes, password=dify_config.REDIS_CLUSTERS_PASSWORD))
  67. else:
  68. redis_params.update(
  69. {
  70. "host": dify_config.REDIS_HOST,
  71. "port": dify_config.REDIS_PORT,
  72. "connection_class": connection_class,
  73. }
  74. )
  75. pool = redis.ConnectionPool(**redis_params)
  76. redis_client.initialize(redis.Redis(connection_pool=pool))
  77. app.extensions["redis"] = redis_client