Bläddra i källkod

ext_redis.py support redis clusters --- Fixes #9538 (#9789)

Signed-off-by: root <root@localhost.localdomain>
Co-authored-by: root <root@localhost.localdomain>
Co-authored-by: Bowen Liang <bowenliang@apache.org>
liuhaoran 5 månader sedan
förälder
incheckning
8ff65abbc6

+ 5 - 0
api/.env.example

@@ -42,6 +42,11 @@ REDIS_SENTINEL_USERNAME=
 REDIS_SENTINEL_PASSWORD=
 REDIS_SENTINEL_SOCKET_TIMEOUT=0.1
 
+# redis Cluster configuration.
+REDIS_USE_CLUSTERS=false
+REDIS_CLUSTERS=
+REDIS_CLUSTERS_PASSWORD=
+
 # PostgreSQL database configuration
 DB_USERNAME=postgres
 DB_PASSWORD=difyai123456

+ 15 - 0
api/configs/middleware/cache/redis_config.py

@@ -68,3 +68,18 @@ class RedisConfig(BaseSettings):
         description="Socket timeout in seconds for Redis Sentinel connections",
         default=0.1,
     )
+
+    REDIS_USE_CLUSTERS: bool = Field(
+        description="Enable Redis Clusters mode for high availability",
+        default=False,
+    )
+
+    REDIS_CLUSTERS: Optional[str] = Field(
+        description="Comma-separated list of Redis Clusters nodes (host:port)",
+        default=None,
+    )
+
+    REDIS_CLUSTERS_PASSWORD: Optional[str] = Field(
+        description="Password for Redis Clusters authentication (if required)",
+        default=None,
+    )

+ 8 - 1
api/extensions/ext_redis.py

@@ -1,11 +1,12 @@
 import redis
+from redis.cluster import ClusterNode, RedisCluster
 from redis.connection import Connection, SSLConnection
 from redis.sentinel import Sentinel
 
 from configs import dify_config
 
 
-class RedisClientWrapper(redis.Redis):
+class RedisClientWrapper:
     """
     A wrapper class for the Redis client that addresses the issue where the global
     `redis_client` variable cannot be updated when a new Redis instance is returned
@@ -71,6 +72,12 @@ def init_app(app):
         )
         master = sentinel.master_for(dify_config.REDIS_SENTINEL_SERVICE_NAME, **redis_params)
         redis_client.initialize(master)
+    elif dify_config.REDIS_USE_CLUSTERS:
+        nodes = [
+            ClusterNode(host=node.split(":")[0], port=int(node.split.split(":")[1]))
+            for node in dify_config.REDIS_CLUSTERS.split(",")
+        ]
+        redis_client.initialize(RedisCluster(startup_nodes=nodes, password=dify_config.REDIS_CLUSTERS_PASSWORD))
     else:
         redis_params.update(
             {

+ 15 - 9
api/tests/unit_tests/core/test_model_manager.py

@@ -1,10 +1,12 @@
-from unittest.mock import MagicMock
+from unittest.mock import MagicMock, patch
 
 import pytest
+import redis
 
 from core.entities.provider_entities import ModelLoadBalancingConfiguration
 from core.model_manager import LBModelManager
 from core.model_runtime.entities.model_entities import ModelType
+from extensions.ext_redis import redis_client
 
 
 @pytest.fixture
@@ -38,6 +40,9 @@ def lb_model_manager():
 
 
 def test_lb_model_manager_fetch_next(mocker, lb_model_manager):
+    # initialize redis client
+    redis_client.initialize(redis.Redis())
+
     assert len(lb_model_manager._load_balancing_configs) == 3
 
     config1 = lb_model_manager._load_balancing_configs[0]
@@ -55,12 +60,13 @@ def test_lb_model_manager_fetch_next(mocker, lb_model_manager):
         start_index += 1
         return start_index
 
-    mocker.patch("redis.Redis.incr", side_effect=incr)
-    mocker.patch("redis.Redis.set", return_value=None)
-    mocker.patch("redis.Redis.expire", return_value=None)
-
-    config = lb_model_manager.fetch_next()
-    assert config == config2
+    with (
+        patch.object(redis_client, "incr", side_effect=incr),
+        patch.object(redis_client, "set", return_value=None),
+        patch.object(redis_client, "expire", return_value=None),
+    ):
+        config = lb_model_manager.fetch_next()
+        assert config == config2
 
-    config = lb_model_manager.fetch_next()
-    assert config == config3
+        config = lb_model_manager.fetch_next()
+        assert config == config3

+ 6 - 0
docker/.env.example

@@ -240,6 +240,12 @@ REDIS_SENTINEL_USERNAME=
 REDIS_SENTINEL_PASSWORD=
 REDIS_SENTINEL_SOCKET_TIMEOUT=0.1
 
+# List of Redis Cluster nodes. If Cluster mode is enabled, provide at least one Cluster IP and port.
+# Format: `<Cluster1_ip>:<Cluster1_port>,<Cluster2_ip>:<Cluster2_port>,<Cluster3_ip>:<Cluster3_port>`
+REDIS_USE_CLUSTERS=false
+REDIS_CLUSTERS=
+REDIS_CLUSTERS_PASSWORD=
+
 # ------------------------------
 # Celery Configuration
 # ------------------------------

+ 3 - 0
docker/docker-compose.yaml

@@ -55,6 +55,9 @@ x-shared-env: &shared-api-worker-env
   REDIS_SENTINEL_USERNAME: ${REDIS_SENTINEL_USERNAME:-}
   REDIS_SENTINEL_PASSWORD: ${REDIS_SENTINEL_PASSWORD:-}
   REDIS_SENTINEL_SOCKET_TIMEOUT: ${REDIS_SENTINEL_SOCKET_TIMEOUT:-0.1}
+  REDIS_CLUSTERS: ${REDIS_CLUSTERS:-}
+  REDIS_USE_CLUSTERS: ${REDIS_USE_CLUSTERS:-false}
+  REDIS_CLUSTERS_PASSWORD: ${REDIS_CLUSTERS_PASSWORD:-}
   ACCESS_TOKEN_EXPIRE_MINUTES: ${ACCESS_TOKEN_EXPIRE_MINUTES:-60}
   CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://:difyai123456@redis:6379/1}
   BROKER_USE_SSL: ${BROKER_USE_SSL:-false}