test_couchbase.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import subprocess
  2. import time
  3. from core.rag.datasource.vdb.couchbase.couchbase_vector import CouchbaseConfig, CouchbaseVector
  4. from tests.integration_tests.vdb.test_vector_store import (
  5. AbstractVectorTest,
  6. get_example_text,
  7. setup_mock_redis,
  8. )
  9. def wait_for_healthy_container(service_name="couchbase-server", timeout=300):
  10. start_time = time.time()
  11. while time.time() - start_time < timeout:
  12. result = subprocess.run(
  13. ["docker", "inspect", "--format", "{{.State.Health.Status}}", service_name], capture_output=True, text=True
  14. )
  15. if result.stdout.strip() == "healthy":
  16. print(f"{service_name} is healthy!")
  17. return True
  18. else:
  19. print(f"Waiting for {service_name} to be healthy...")
  20. time.sleep(10)
  21. raise TimeoutError(f"{service_name} did not become healthy in time")
  22. class CouchbaseTest(AbstractVectorTest):
  23. def __init__(self):
  24. super().__init__()
  25. self.vector = CouchbaseVector(
  26. collection_name=self.collection_name,
  27. config=CouchbaseConfig(
  28. connection_string="couchbase://127.0.0.1",
  29. user="Administrator",
  30. password="password",
  31. bucket_name="Embeddings",
  32. scope_name="_default",
  33. ),
  34. )
  35. def search_by_vector(self):
  36. # brief sleep to ensure document is indexed
  37. time.sleep(5)
  38. hits_by_vector = self.vector.search_by_vector(query_vector=self.example_embedding)
  39. assert len(hits_by_vector) == 1
  40. def test_couchbase(setup_mock_redis):
  41. wait_for_healthy_container("couchbase-server", timeout=60)
  42. CouchbaseTest().run_all_tests()