diff --git a/client/src/cbltest/api/syncgateway.py b/client/src/cbltest/api/syncgateway.py index 08e1660a..5e969252 100644 --- a/client/src/cbltest/api/syncgateway.py +++ b/client/src/cbltest/api/syncgateway.py @@ -1,4 +1,4 @@ -import re +import asyncio import ssl from abc import ABC, abstractmethod from json import dumps, loads @@ -6,7 +6,6 @@ from typing import Any, cast from urllib.parse import urljoin -import paramiko import requests from aiohttp import BasicAuth, ClientSession, TCPConnector from opentelemetry.trace import get_tracer @@ -183,6 +182,32 @@ def __init__(self, key: str, id: str, revid: str | None, cv: str | None) -> None self.__cv = cv +class DatabaseStatusResponse: + """ + A class representing a database status response from Sync Gateway + """ + + @property + def db_name(self) -> str: + """Gets the database name""" + return self.__db_name + + @property + def state(self) -> str: + """Gets the database state ('Online', 'Offline', etc.)""" + return self.__state + + @property + def update_seq(self) -> int: + """Gets the update sequence number""" + return self.__update_seq + + def __init__(self, response: dict): + self.__db_name = response.get("db_name", "") + self.__state = response.get("state", "Unknown") + self.__update_seq = response.get("update_seq", 0) + + class AllDocumentsResponse: """ A class representing an all_docs response from Sync Gateway @@ -629,20 +654,24 @@ async def put_database(self, db_name: str, payload: PutDatabasePayload) -> None: """ await self._put_database(db_name, payload, 0) - async def database_exists(self, db_name: str) -> bool: + async def get_database_status(self, db_name: str) -> DatabaseStatusResponse | None: """ - Checks if a database exists in Sync Gateway's configuration. + Gets the status of a database including its online/offline state. - :param db_name: The name of the Database to check - :return: True if the database exists, False otherwise + :param db_name: The name of the Database + :return: DatabaseStatusResponse with state, sequences, etc. Returns None if database doesn't exist (404/403) """ - try: - await self._send_request("get", f"/{db_name}") - return True - except CblSyncGatewayBadResponseError as e: - if e.code == 403: # Database does not exist - return False - raise + with self.__tracer.start_as_current_span( + "get_database_status", attributes={"cbl.database.name": db_name} + ): + try: + resp = await self._send_request("get", f"/{db_name}/") + assert isinstance(resp, dict) + return DatabaseStatusResponse(cast(dict, resp)) + except CblSyncGatewayBadResponseError as e: + if e.code in [403, 404]: # Database doesn't exist + return None + raise async def _delete_database(self, db_name: str, retry_count: int = 0) -> None: with self.__tracer.start_as_current_span( @@ -656,8 +685,6 @@ async def _delete_database(self, db_name: str, retry_count: int = 0) -> None: f"Sync gateway returned 500 from DELETE database call, retrying ({retry_count + 1})..." ) current_span.add_event("SGW returned 500, retry") - import asyncio - await asyncio.sleep(2) await self._delete_database(db_name, retry_count + 1) elif e.code == 403: @@ -1286,96 +1313,3 @@ async def get_document_revision_public( self.__secure, scheme, self.__hostname, 4984, auth ) as session: return await self._send_request("GET", path, params=params, session=session) - - async def fetch_log_file( - self, - log_type: str, - ssh_key_path: str, - ssh_username: str = "ec2-user", - ) -> str: - """ - Fetches a log file from the remote Sync Gateway server via SSH - - :param log_type: The type of log to fetch (e.g., 'debug', 'info', 'error', 'warn') - :param ssh_key_path: Path to SSH private key for authentication - :param ssh_username: SSH username (default: ec2-user) - :return: Contents of the log file as a string - """ - # Get log directory from SG configuration - server_config = await self._send_request("GET", "/_config") - log_dir = server_config.get("logging", {}).get( - "log_file_path", "/home/ec2-user/log" - ) - remote_log_path = f"{log_dir}/sg_{log_type}.log" - - with self.__tracer.start_as_current_span( - "fetch_log_file", - attributes={ - "log.type": log_type, - "remote.path": remote_log_path, - "ssh.username": ssh_username, - }, - ): - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - - # Load private key - private_key = paramiko.Ed25519Key.from_private_key_file(ssh_key_path) - - # Connect to the remote server - ssh.connect( - self.__hostname, - username=ssh_username, - pkey=private_key, - ) - - # Read the log file - sftp = ssh.open_sftp() - try: - with sftp.open(remote_log_path, "r") as remote_file: - log_contents = remote_file.read().decode("utf-8") - finally: - sftp.close() - ssh.close() - - return log_contents - - -def scan_logs_for_untagged_sensitive_data( - log_content: str, - sensitive_patterns: list[str], -) -> list[str]: - """ - Scans log content for sensitive data that is NOT wrapped in ... tags - - :param log_content: The log file content as a string - :param sensitive_patterns: List of sensitive strings to look for (e.g., doc IDs, usernames) - :return: List of violations found (sensitive data without tags) - """ - violations = [] - for pattern in sensitive_patterns: - # Escape special regex characters in the pattern - escaped_pattern = re.escape(pattern) - for match in re.finditer(escaped_pattern, log_content): - start_pos = match.start() - end_pos = match.end() - - # Check if this occurrence is within ... tags - # Look backwards for and forwards for - before_text = log_content[max(0, start_pos - 100) : start_pos] - after_text = log_content[end_pos : min(len(log_content), end_pos + 100)] - - # Check if there's an opening before and closing after - has_opening_tag = "" in before_text and before_text.rfind( - "" - ) > before_text.rfind("") - has_closing_tag = "" in after_text - - if not (has_opening_tag and has_closing_tag): - context_start = max(0, start_pos - 50) - context_end = min(len(log_content), end_pos + 50) - context = log_content[context_start:context_end] - violations.append( - f"Untagged '{pattern}' at position {start_pos}: ...{context}..." - ) - return violations diff --git a/spec/tests/QE/test_db_online_offline.md b/spec/tests/QE/test_db_online_offline.md new file mode 100644 index 00000000..4964f55e --- /dev/null +++ b/spec/tests/QE/test_db_online_offline.md @@ -0,0 +1,28 @@ +# Database Online/Offline Tests + +## test_db_offline_on_bucket_deletion + +Test that database goes offline when its bucket is deleted. + +This test verifies that when the Couchbase Server bucket backing a Sync Gateway database is deleted, the database properly enters offline state and all REST API endpoints return 403. + +1. Create bucket and default collection +2. Configure Sync Gateway database endpoint +3. Create user 'vipul' with access to ['ABC'] +4. Create 10 docs via Sync Gateway +5. Verify database is online - REST endpoints work +6. Delete bucket to sever connection +7. Verify database is offline - REST endpoints return 403 + +## test_multiple_dbs_bucket_deletion + +Test that deleting specific buckets causes only those databases to go offline. + +This test creates 4 databases with unique buckets, deletes 2 buckets, and verifies that only those 2 databases go offline while the other 2 remain online. + +1. Create buckets and configure databases +2. Create 10 docs via Sync Gateway +3. Verify all databases are online +4. Delete buckets for db1 and db3 +5. Verify db2 and db4 remain online +6. Verify db1 and db3 are offline (return 403) diff --git a/spec/tests/QE/test_log_redaction.md b/spec/tests/QE/test_log_redaction.md index d71b9a25..f36e76c0 100644 --- a/spec/tests/QE/test_log_redaction.md +++ b/spec/tests/QE/test_log_redaction.md @@ -10,7 +10,27 @@ This test verifies that NO document IDs or usernames appear in logs WITHOUT `` tags in redacted files +3. sync_gateway.log contains correct system information (hostname) + +**Prerequisites**: Sync Gateway bootstrap.json must be configured with `redaction_level: "partial"` in the logging section. + +1. Create bucket and default collection +2. Configure Sync Gateway +3. Create user 'vipul' with access to ['logging'] +4. Create 10 docs via Sync Gateway +5. Start SGCollect via REST API and wait for it to complete +6. Download and extract SGCollect redacted zip +7. Verify redacted zip marks sensitive data with tags +8. Verify content of sync_gateway.log diff --git a/tests/QE/test_db_online_offline.py b/tests/QE/test_db_online_offline.py new file mode 100644 index 00000000..34469b00 --- /dev/null +++ b/tests/QE/test_db_online_offline.py @@ -0,0 +1,226 @@ +import asyncio +from json.decoder import JSONDecodeError +from pathlib import Path +from typing import Any + +import pytest +from cbltest import CBLPyTest +from cbltest.api.cbltestclass import CBLTestClass +from cbltest.api.error import CblSyncGatewayBadResponseError +from cbltest.api.syncgateway import DocumentUpdateEntry, PutDatabasePayload, SyncGateway + + +@pytest.mark.sgw +@pytest.mark.min_sync_gateways(1) +@pytest.mark.min_couchbase_servers(1) +class TestDbOnlineOffline(CBLTestClass): + async def scan_rest_endpoints( + self, + sg: SyncGateway, + db_name: str, + expected_online: bool, + num_docs: int = 10, + ) -> tuple[int, int]: + """ + Scans multiple REST endpoints to verify database online/offline state. + """ + endpoints_tested = 0 + errors_403 = 0 + + test_operations = [ + ( + "GET /{db}/_all_docs", + lambda: sg.get_all_documents(db_name, "_default", "_default"), + ), + ( + "GET /{db}/_changes", + lambda: sg.get_changes(db_name, "_default", "_default"), + ), + ( + "GET /{db}/{doc}", + lambda: sg.get_document(db_name, "doc_0", "_default", "_default"), + ), + ( + "POST /{db}/_bulk_docs", + lambda: sg.update_documents( + db_name, + [DocumentUpdateEntry("test_doc", None, {"foo": "bar"})], + "_default", + "_default", + ), + ), + ] + + for endpoint_name, test_func in test_operations: + try: + await test_func() + endpoints_tested += 1 + except (CblSyncGatewayBadResponseError, JSONDecodeError) as e: + endpoints_tested += 1 + if isinstance(e, CblSyncGatewayBadResponseError): + if e.code in [403, 503]: + errors_403 += 1 + elif e.code != 404: # 404 is OK + raise e + elif isinstance(e, JSONDecodeError): + errors_403 += 1 + + return (endpoints_tested, errors_403) + + @pytest.mark.asyncio(loop_scope="session") + async def test_db_offline_on_bucket_deletion( + self, cblpytest: CBLPyTest, dataset_path: Path + ) -> None: + sg = cblpytest.sync_gateways[0] + cbs = cblpytest.couchbase_servers[0] + num_docs = 10 + sg_db = "db" + bucket_name = "data-bucket" + channels = ["ABC"] + username = "vipul" + password = "pass" + + self.mark_test_step("Create bucket and default collection") + cbs.drop_bucket(bucket_name) + cbs.create_bucket(bucket_name) + + self.mark_test_step("Configure Sync Gateway database endpoint") + db_config = { + "bucket": bucket_name, + "index": {"num_replicas": 0}, + "scopes": {"_default": {"collections": {"_default": {}}}}, + } + db_payload = PutDatabasePayload(db_config) + db_status = await sg.get_database_status(sg_db) + if db_status is not None: + await sg.delete_database(sg_db) + await sg.put_database(sg_db, db_payload) + + self.mark_test_step(f"Create user '{username}' with access to {channels}") + sg_user = await sg.create_user_client(sg, sg_db, username, password, channels) + + self.mark_test_step(f"Create {num_docs} docs via Sync Gateway") + sg_docs: list[DocumentUpdateEntry] = [] + for i in range(num_docs): + sg_docs.append( + DocumentUpdateEntry( + f"doc_{i}", + None, + body={"type": "test_doc", "index": i, "channels": channels}, + ) + ) + await sg.update_documents(sg_db, sg_docs, "_default", "_default") + + self.mark_test_step("Verify database is online - REST endpoints work") + endpoints_tested, errors_403 = await self.scan_rest_endpoints( + sg, sg_db, expected_online=True + ) + assert errors_403 == 0, ( + f"DB is online but {errors_403}/{endpoints_tested} endpoints returned 403" + ) + + self.mark_test_step("Delete bucket to sever connection") + cbs.drop_bucket(bucket_name) + db_status = await sg.get_database_status(sg_db) + while db_status is not None and db_status.state == "Online": + db_status = await sg.get_database_status(sg_db) + await asyncio.sleep(10) + + self.mark_test_step("Verify database is offline - REST endpoints return 403") + endpoints_tested, errors_403 = await self.scan_rest_endpoints( + sg, sg_db, expected_online=False + ) + assert endpoints_tested > 0, "No endpoints were tested" + assert errors_403 == endpoints_tested, ( + f"DB is offline but only {errors_403}/{endpoints_tested} endpoints returned 403" + ) + + await sg_user.close() + + @pytest.mark.asyncio(loop_scope="session") + async def test_multiple_dbs_bucket_deletion( + self, cblpytest: CBLPyTest, dataset_path: Path + ) -> None: + sg = cblpytest.sync_gateways[0] + cbs = cblpytest.couchbase_servers[0] + num_docs = 10 + + db_configs: list[list[Any]] = [ + ["db1", "data-bucket-1", "ABC", "vipul", None], + ["db2", "data-bucket-2", "CBS", "lupiv", None], + ["db3", "data-bucket-3", "ABC", "vipul", None], + ["db4", "data-bucket-4", "CBS", "lupiv", None], + ] + + self.mark_test_step("Create buckets and configure databases") + for i, [db_name, bucket_name, channel, username, _] in enumerate(db_configs): + cbs.drop_bucket(bucket_name) + cbs.create_bucket(bucket_name) + + db_config = { + "bucket": bucket_name, + "index": {"num_replicas": 0}, + "scopes": {"_default": {"collections": {"_default": {}}}}, + } + db_payload = PutDatabasePayload(db_config) + db_status = await sg.get_database_status(db_name) + if db_status is not None: + await sg.delete_database(db_name) + await sg.put_database(db_name, db_payload) + db_configs[i][4] = await sg.create_user_client( + sg, db_name, username, "pass", [channel] + ) + + self.mark_test_step(f"Create {num_docs} docs via Sync Gateway") + sg_docs: list[DocumentUpdateEntry] = [] + for j in range(num_docs): + sg_docs.append( + DocumentUpdateEntry( + f"doc_{j}", + None, + body={"db": db_name, "index": j, "channels": [channel]}, + ) + ) + await sg.update_documents(db_name, sg_docs, "_default", "_default") + + self.mark_test_step("Verify all databases are online") + for [db_name, _, _, _, _] in db_configs: + status = await sg.get_database_status(db_name) + assert status is not None, f"{db_name} database doesn't exist" + assert status.state == "Online", ( + f"{db_name} should be online, but state is: {status.state}" + ) + + self.mark_test_step( + "Delete buckets for db1 and db3 and wait for databases to go offline" + ) + cbs.drop_bucket("data-bucket-1") + cbs.drop_bucket("data-bucket-3") + for db_name in ["db1", "db3"]: + db_status = await sg.get_database_status(db_name) + while db_status is not None and db_status.state == "Online": + db_status = await sg.get_database_status(db_name) + await asyncio.sleep(10) + + self.mark_test_step("Verify db2 and db4 remain online") + for db_name in ["db2", "db4"]: + endpoints_tested, errors_403 = await self.scan_rest_endpoints( + sg, db_name, expected_online=True + ) + assert errors_403 == 0, ( + f"{db_name} should be online but got {errors_403} 403 errors" + ) + + self.mark_test_step("Verify db1 and db3 are offline (return 403)") + for db_name in ["db1", "db3"]: + endpoints_tested, errors_403 = await self.scan_rest_endpoints( + sg, db_name, expected_online=False + ) + assert errors_403 == endpoints_tested, ( + f"{db_name}: Expected all {endpoints_tested} endpoints to return 403, got {errors_403}" + ) + + for [db_name, bucket_name, _, _, user_client] in db_configs: + await user_client.close() + await sg.delete_database(db_name) + cbs.drop_bucket(bucket_name) diff --git a/tests/QE/test_log_redaction.py b/tests/QE/test_log_redaction.py deleted file mode 100644 index 0aa2cfcf..00000000 --- a/tests/QE/test_log_redaction.py +++ /dev/null @@ -1,96 +0,0 @@ -import os -from pathlib import Path - -import pytest -from cbltest import CBLPyTest -from cbltest.api.cbltestclass import CBLTestClass -from cbltest.api.syncgateway import ( - DocumentUpdateEntry, - PutDatabasePayload, - scan_logs_for_untagged_sensitive_data, -) - - -@pytest.mark.sgw -@pytest.mark.min_test_servers(0) -@pytest.mark.min_sync_gateways(1) -@pytest.mark.min_couchbase_servers(1) -class TestLogRedaction(CBLTestClass): - @pytest.mark.asyncio(loop_scope="session") - async def test_log_redaction_partial( - self, cblpytest: CBLPyTest, dataset_path: Path - ) -> None: - sg = cblpytest.sync_gateways[0] - cbs = cblpytest.couchbase_servers[0] - num_docs = 10 - sg_db = "db" - bucket_name = "data-bucket" - channels = ["log-redaction"] - username = "vipul" - password = "password" - ssh_key_path = os.environ.get( - "SSH_KEY_PATH", os.path.expanduser("~/.ssh/jborden.pem") - ) - - self.mark_test_step("Create bucket and default collection") - cbs.drop_bucket(bucket_name) - cbs.create_bucket(bucket_name) - - self.mark_test_step("Configure Sync Gateway with log redaction enabled") - db_config = { - "bucket": bucket_name, - "index": {"num_replicas": 0}, - "scopes": {"_default": {"collections": {"_default": {}}}}, - } - db_payload = PutDatabasePayload(db_config) - if await sg.database_exists(sg_db): - await sg.delete_database(sg_db) - await sg.put_database(sg_db, db_payload) - - self.mark_test_step(f"Create user '{username}' with access to channels") - sg_user = await sg.create_user_client(sg, sg_db, username, password, channels) - - self.mark_test_step(f"Create {num_docs} docs via Sync Gateway") - sg_docs: list[DocumentUpdateEntry] = [] - sg_doc_ids: list[str] = [] - for i in range(num_docs): - doc_id = f"sg_doc_{i}" - sg_doc_ids.append(doc_id) - sg_docs.append( - DocumentUpdateEntry( - doc_id, - None, - body={ - "type": "test_doc", - "index": i, - "channels": channels, - }, - ) - ) - await sg.update_documents(sg_db, sg_docs, "_default", "_default") - - self.mark_test_step("Verify docs were created (public API)") - all_docs = await sg_user.get_all_documents( - sg_db, "_default", "_default", use_public_api=True - ) - assert len(all_docs.rows) == num_docs, ( - f"Expected {num_docs} docs, got {len(all_docs.rows)}" - ) - - self.mark_test_step("Fetch and scan SG logs for redaction violations") - try: - log_contents = await sg.fetch_log_file("debug", ssh_key_path) - except Exception as e: - raise Exception(f"Could not fetch log file: {e}") - sensitive_patterns = sg_doc_ids + [username] - violations = scan_logs_for_untagged_sensitive_data( - log_contents, sensitive_patterns - ) - assert len(violations) == 0, ( - f"Found {len(violations)} log redaction violations: Showing first 10:\n" - + "\n".join(violations[:10]) - ) - - await sg_user.close() - await sg.delete_database(sg_db) - cbs.drop_bucket(bucket_name) diff --git a/tests/QE/test_xattrs.py b/tests/QE/test_xattrs.py index d03c24c4..9ecc40af 100644 --- a/tests/QE/test_xattrs.py +++ b/tests/QE/test_xattrs.py @@ -12,7 +12,6 @@ @pytest.mark.sgw -@pytest.mark.min_test_servers(0) @pytest.mark.min_sync_gateways(1) @pytest.mark.min_couchbase_servers(1) class TestXattrs(CBLTestClass): @@ -43,7 +42,8 @@ async def test_offline_processing_of_external_updates( "scopes": {"_default": {"collections": {"_default": {}}}}, } db_payload = PutDatabasePayload(db_config) - if await sg.database_exists(sg_db): + db_status = await sg.get_database_status(sg_db) + if db_status is not None: await sg.delete_database(sg_db) await sg.put_database(sg_db, db_payload) @@ -202,7 +202,8 @@ async def test_purge(self, cblpytest: CBLPyTest, dataset_path: Path) -> None: "scopes": {"_default": {"collections": {"_default": {}}}}, } db_payload = PutDatabasePayload(db_config) - if await sg.database_exists(sg_db): + db_status = await sg.get_database_status(sg_db) + if db_status is not None: await sg.delete_database(sg_db) await sg.put_database(sg_db, db_payload) @@ -399,7 +400,8 @@ async def test_sg_sdk_interop_unique_docs( "scopes": {"_default": {"collections": {"_default": {}}}}, } db_payload = PutDatabasePayload(db_config) - if await sg.database_exists(sg_db): + db_status = await sg.get_database_status(sg_db) + if db_status is not None: await sg.delete_database(sg_db) await sg.put_database(sg_db, db_payload) @@ -572,7 +574,8 @@ async def test_sg_sdk_interop_shared_docs( "scopes": {"_default": {"collections": {"_default": {}}}}, } db_payload = PutDatabasePayload(db_config) - if await sg.database_exists(sg_db): + db_status = await sg.get_database_status(sg_db) + if db_status is not None: await sg.delete_database(sg_db) await sg.put_database(sg_db, db_payload) @@ -832,7 +835,8 @@ async def test_sync_xattrs_update_concurrently( self.mark_test_step( "Configure Sync Gateway with custom sync function using xattrs" ) - if await sg.database_exists(sg_db): + db_status = await sg.get_database_status(sg_db) + if db_status is not None: await sg.delete_database(sg_db) # Custom sync function that reads channel from user xattr via meta parameter