|
| 1 | +import os |
| 2 | +from datetime import timedelta |
| 3 | +from time import sleep |
| 4 | +from typing import Optional |
| 5 | + |
| 6 | +import requests |
| 7 | +from requests.auth import HTTPBasicAuth |
| 8 | + |
| 9 | +from couchbase.auth import PasswordAuthenticator |
| 10 | +from couchbase.cluster import Cluster |
| 11 | +from couchbase.options import ClusterOptions, ClusterTimeoutOptions, TLSVerifyMode |
| 12 | +from testcontainers.core.generic import DbContainer |
| 13 | +from testcontainers.core.waiting_utils import wait_container_is_ready, wait_for_logs |
| 14 | + |
| 15 | + |
| 16 | +# noinspection HttpUrlsUsage,SpellCheckingInspection |
| 17 | +class CouchbaseContainer(DbContainer): |
| 18 | + """ |
| 19 | + Couchbase database container. |
| 20 | +
|
| 21 | + Example: |
| 22 | + The example spins up a Couchbase database and connects to it using |
| 23 | + the `Couchbase Python Client`. |
| 24 | +
|
| 25 | + .. doctest:: |
| 26 | +
|
| 27 | + >>> from couchbase.auth import PasswordAuthenticator |
| 28 | + >>> from couchbase.cluster import Cluster |
| 29 | + >>> from testcontainers.couchbase import CouchbaseContainer |
| 30 | +
|
| 31 | + >>> with CouchbaseContainer("couchbase:latest") as couchbase: |
| 32 | + ... cluster = couchbase.client() |
| 33 | + ... # Use the cluster for various operations |
| 34 | +
|
| 35 | + This creates a single-node Couchbase database container with the default bucket, scope, and collection. |
| 36 | +
|
| 37 | + If you would like to pass custom values for the image, cluster_port, username, password, bucket, scope, and collection, you can use: |
| 38 | + username = "administrator" |
| 39 | + password = "password" |
| 40 | + bucket_name = "mybucket" |
| 41 | + scope_name = "myscope" |
| 42 | + collection_name = "mycollection" |
| 43 | + image = "couchbase:latest" |
| 44 | + cluster_port = 8091 |
| 45 | +
|
| 46 | + with CouchbaseContainer(image=image, cluster_port=cluster_port, username=username, password=password, bucket=bucket_name, scope=scope_name, |
| 47 | + collection=collection_name) as couchbase_container: |
| 48 | + cluster = couchbase_container.client() |
| 49 | + collection = cluster.bucket(bucket_name=bucket_name).scope(name=scope_name).collection(name=collection_name) |
| 50 | + key = uuid.uuid4().hex |
| 51 | + value = "world" |
| 52 | + doc = { |
| 53 | + "hello": value, |
| 54 | + } |
| 55 | + collection.upsert(key=key, value=doc) |
| 56 | + returned_doc = collection.get(key=key) |
| 57 | + print(returned_doc.value['hello']) |
| 58 | +
|
| 59 | + # Output: world |
| 60 | + """ |
| 61 | + |
| 62 | + def __init__( |
| 63 | + self, |
| 64 | + image: str = "couchbase:latest", |
| 65 | + cluster_port: Optional[int] = 8091, |
| 66 | + username: Optional[str] = None, |
| 67 | + password: Optional[str] = None, |
| 68 | + bucket: Optional[str] = None, |
| 69 | + scope: Optional[str] = None, |
| 70 | + collection: Optional[str] = None, |
| 71 | + **kwargs, |
| 72 | + ) -> None: |
| 73 | + super().__init__(image=image, **kwargs) |
| 74 | + self._username = username or os.environ.get("COUCHBASE_USERNAME", "Administrator") |
| 75 | + self._password = password or os.environ.get("COUCHBASE_PASSWORD", "password") |
| 76 | + self._bucket = bucket or os.environ.get("COUCHBASE_BUCKET", "default") |
| 77 | + self._scope = scope or os.environ.get("COUCHBASE_SCOPE", "default") |
| 78 | + self._collection = collection or os.environ.get("COUCHBASE_COLLECTION", "default") |
| 79 | + self._cluster_port = cluster_port |
| 80 | + |
| 81 | + ports = [ |
| 82 | + cluster_port, |
| 83 | + 8092, |
| 84 | + 8093, |
| 85 | + 8094, |
| 86 | + 8095, |
| 87 | + 8096, |
| 88 | + 8097, |
| 89 | + 9123, |
| 90 | + 11207, |
| 91 | + 11210, |
| 92 | + 11280, |
| 93 | + 18091, |
| 94 | + 18092, |
| 95 | + 18093, |
| 96 | + 18094, |
| 97 | + 18095, |
| 98 | + 18096, |
| 99 | + 18097, |
| 100 | + ] |
| 101 | + |
| 102 | + for port in ports: |
| 103 | + self.with_exposed_ports(port) |
| 104 | + self.with_bind_ports(port, port) |
| 105 | + |
| 106 | + @wait_container_is_ready() |
| 107 | + def _connect(self): |
| 108 | + wait_for_logs(self, "and logs available in") |
| 109 | + while True: |
| 110 | + sleep(1) |
| 111 | + try: |
| 112 | + url = f"http://{self.get_container_host_ip()}:{self.get_exposed_port(self._cluster_port)}/settings/web" |
| 113 | + response = requests.get(url) |
| 114 | + if 200 <= response.status_code < 300: |
| 115 | + break |
| 116 | + else: |
| 117 | + pass |
| 118 | + except requests.exceptions.ConnectionError: |
| 119 | + pass |
| 120 | + |
| 121 | + def _configure(self) -> None: |
| 122 | + self.with_env("COUCHBASE_USERNAME", self._username) |
| 123 | + self.with_env("COUCHBASE_PASSWORD", self._password) |
| 124 | + self.with_env("COUCHBASE_BUCKET", self._bucket) |
| 125 | + |
| 126 | + def start(self) -> "CouchbaseContainer": |
| 127 | + self._configure() |
| 128 | + super().start() |
| 129 | + self._connect() |
| 130 | + self.set_admin_credentials() |
| 131 | + self._create_bucket() |
| 132 | + self._create_scope() |
| 133 | + self._create_collection() |
| 134 | + return self |
| 135 | + |
| 136 | + def set_admin_credentials(self): |
| 137 | + url = f"http://{self.get_container_host_ip()}:{self.get_exposed_port(self._cluster_port)}/settings/web" |
| 138 | + data = {"username": self._username, "password": self._password, "port": "SAME"} |
| 139 | + response = requests.post(url, data=data) |
| 140 | + if 200 <= response.status_code < 300: |
| 141 | + return |
| 142 | + else: |
| 143 | + raise RuntimeError(response.text) |
| 144 | + |
| 145 | + def _create_bucket(self) -> None: |
| 146 | + url = f"http://{self.get_container_host_ip()}:{self.get_exposed_port(self._cluster_port)}/pools/default/buckets" |
| 147 | + data = {"name": self._bucket, "bucketType": "couchbase", "ramQuotaMB": 256} |
| 148 | + response = requests.post(url, data=data, auth=HTTPBasicAuth(self._username, self._password)) |
| 149 | + if 200 <= response.status_code < 300: |
| 150 | + return |
| 151 | + else: |
| 152 | + raise RuntimeError(response.text) |
| 153 | + |
| 154 | + def _create_scope(self): |
| 155 | + url = f"http://{self.get_container_host_ip()}:{self.get_exposed_port(self._cluster_port)}/pools/default/buckets/{self._bucket}/scopes" |
| 156 | + data = {"name": self._scope} |
| 157 | + response = requests.post(url, data=data, auth=HTTPBasicAuth(self._username, self._password)) |
| 158 | + if 200 <= response.status_code < 300: |
| 159 | + return |
| 160 | + else: |
| 161 | + raise RuntimeError(response.text) |
| 162 | + |
| 163 | + def _create_collection(self): |
| 164 | + url = f"http://{self.get_container_host_ip()}:{self.get_exposed_port(self._cluster_port)}/pools/default/buckets/{self._bucket}/scopes/{self._scope}/collections" |
| 165 | + data = {"name": self._collection, "maxTTL": 3600, "history": str(False).lower()} |
| 166 | + response = requests.post(url, data=data, auth=HTTPBasicAuth(self._username, self._password)) |
| 167 | + if 200 <= response.status_code < 300: |
| 168 | + return |
| 169 | + else: |
| 170 | + raise RuntimeError(response.text) |
| 171 | + |
| 172 | + def get_connection_url(self) -> str: |
| 173 | + return f"couchbases://{self.get_container_host_ip()}" |
| 174 | + |
| 175 | + def client(self, cluster_options: ClusterOptions = None): |
| 176 | + auth = PasswordAuthenticator(self._username, self._password) |
| 177 | + if cluster_options is None: |
| 178 | + cluster_options = ClusterOptions( |
| 179 | + auth, |
| 180 | + timeout_options=ClusterTimeoutOptions(kv_timeout=timedelta(seconds=10)), |
| 181 | + enable_tcp_keep_alive=True, |
| 182 | + tls_verify=TLSVerifyMode.NONE, |
| 183 | + ) |
| 184 | + cluster = Cluster(self.get_connection_url(), cluster_options) |
| 185 | + cluster.wait_until_ready(timedelta(seconds=15)) |
| 186 | + return cluster |
0 commit comments