Skip to content
Merged
296 changes: 231 additions & 65 deletions client/src/cbltest/api/syncgateway.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import re
import asyncio
import ssl
from abc import ABC, abstractmethod
from json import dumps, loads
from json.decoder import JSONDecodeError
from pathlib import Path
from typing import Any, cast
from urllib.parse import urljoin
Expand Down Expand Up @@ -183,6 +184,32 @@ def __init__(self, key: str, id: str, revid: str | None, cv: str | None) -> None
self.__cv = cv


class DatabaseStatusResponse:
"""
A class representing a database status response from Sync Gateway
"""

@property
def db_name(self) -> str:
"""Gets the database name"""
return self.__db_name

@property
def state(self) -> str:
"""Gets the database state ('Online', 'Offline', etc.)"""
return self.__state

@property
def update_seq(self) -> int:
"""Gets the update sequence number"""
return self.__update_seq

def __init__(self, response: dict):
self.__db_name = response.get("db_name", "")
self.__state = response.get("state", "Unknown")
self.__update_seq = response.get("update_seq", 0)


class AllDocumentsResponse:
"""
A class representing an all_docs response from Sync Gateway
Expand Down Expand Up @@ -629,20 +656,24 @@ async def put_database(self, db_name: str, payload: PutDatabasePayload) -> None:
"""
await self._put_database(db_name, payload, 0)

async def database_exists(self, db_name: str) -> bool:
async def get_database_status(self, db_name: str) -> DatabaseStatusResponse | None:
"""
Checks if a database exists in Sync Gateway's configuration.
Gets the status of a database including its online/offline state.

:param db_name: The name of the Database to check
:return: True if the database exists, False otherwise
:param db_name: The name of the Database
:return: DatabaseStatusResponse with state, sequences, etc. Returns None if database doesn't exist (404/403)
"""
try:
await self._send_request("get", f"/{db_name}")
return True
except CblSyncGatewayBadResponseError as e:
if e.code == 403: # Database does not exist
return False
raise
with self.__tracer.start_as_current_span(
"get_database_status", attributes={"cbl.database.name": db_name}
):
try:
resp = await self._send_request("get", f"/{db_name}/")
assert isinstance(resp, dict)
return DatabaseStatusResponse(cast(dict, resp))
except CblSyncGatewayBadResponseError as e:
if e.code in [403, 404]: # Database doesn't exist
return None
raise

async def _delete_database(self, db_name: str, retry_count: int = 0) -> None:
with self.__tracer.start_as_current_span(
Expand All @@ -656,8 +687,6 @@ async def _delete_database(self, db_name: str, retry_count: int = 0) -> None:
f"Sync gateway returned 500 from DELETE database call, retrying ({retry_count + 1})..."
)
current_span.add_event("SGW returned 500, retry")
import asyncio

await asyncio.sleep(2)
await self._delete_database(db_name, retry_count + 1)
elif e.code == 403:
Expand Down Expand Up @@ -1316,20 +1345,7 @@ async def fetch_log_file(
"ssh.username": ssh_username,
},
):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# Load private key
private_key = paramiko.Ed25519Key.from_private_key_file(ssh_key_path)

# Connect to the remote server
ssh.connect(
self.__hostname,
username=ssh_username,
pkey=private_key,
)

# Read the log file
ssh = self._ssh_connect(ssh_key_path, ssh_username)
sftp = ssh.open_sftp()
try:
with sftp.open(remote_log_path, "r") as remote_file:
Expand All @@ -1340,42 +1356,192 @@ async def fetch_log_file(

return log_contents

async def start_sgcollect_via_api(
self,
redact_level: str | None = None,
redact_salt: str | None = None,
output_dir: str | None = None,
) -> dict:
"""
Starts SGCollect using the REST API endpoint

def scan_logs_for_untagged_sensitive_data(
log_content: str,
sensitive_patterns: list[str],
) -> list[str]:
"""
Scans log content for sensitive data that is NOT wrapped in <ud>...</ud> tags
:param redact_level: Redaction level ('none', 'partial', 'full')
:param redact_salt: Custom salt for redaction hashing
:param output_dir: Output directory on the remote server
:return: Response dict with status
"""
with self.__tracer.start_as_current_span(
"start_sgcollect_via_api",
attributes={
"redact.level": redact_level or "none",
},
):
body: dict[str, Any] = {"upload": False}

:param log_content: The log file content as a string
:param sensitive_patterns: List of sensitive strings to look for (e.g., doc IDs, usernames)
:return: List of violations found (sensitive data without <ud> tags)
"""
violations = []
for pattern in sensitive_patterns:
# Escape special regex characters in the pattern
escaped_pattern = re.escape(pattern)
for match in re.finditer(escaped_pattern, log_content):
start_pos = match.start()
end_pos = match.end()

# Check if this occurrence is within <ud>...</ud> tags
# Look backwards for <ud> and forwards for </ud>
before_text = log_content[max(0, start_pos - 100) : start_pos]
after_text = log_content[end_pos : min(len(log_content), end_pos + 100)]

# Check if there's an opening <ud> before and closing </ud> after
has_opening_tag = "<ud>" in before_text and before_text.rfind(
"<ud>"
) > before_text.rfind("</ud>")
has_closing_tag = "</ud>" in after_text

if not (has_opening_tag and has_closing_tag):
context_start = max(0, start_pos - 50)
context_end = min(len(log_content), end_pos + 50)
context = log_content[context_start:context_end]
violations.append(
f"Untagged '{pattern}' at position {start_pos}: ...{context}..."
)
return violations
if redact_level is not None:
body["redact_level"] = redact_level
if redact_salt is not None:
body["redact_salt"] = redact_salt
if output_dir is not None:
body["output_dir"] = output_dir

resp = await self._send_request(
"post",
"/_sgcollect_info",
JSONDictionary(body),
)

assert isinstance(resp, dict)
return cast(dict, resp)

async def wait_for_sgcollect_to_complete(
self, max_attempts: int = 60, wait_time: int = 5
) -> None:
"""
Waits for SGCollect to complete

:param max_attempts: Maximum number of attempts to wait for SGCollect to complete
:param wait_time: Time to wait between attempts
"""
for _ in range(max_attempts):
status_resp = await self.get_sgcollect_status()
if status_resp.get("status") in ["stopped", "completed"]:
return
await asyncio.sleep(wait_time)
raise Exception(
f"SGCollect did not complete after {max_attempts * wait_time} seconds.\n"
f"Status: {status_resp.get('status')}.\n"
f"Error: {status_resp.get('error')}"
)

async def get_sgcollect_status(self) -> dict:
"""
Gets the current status of SGCollect operation

:return: Response dict with status ('stopped' or 'running')
"""
with self.__tracer.start_as_current_span("get_sgcollect_status"):
resp = await self._send_request("get", "/_sgcollect_info")
assert isinstance(resp, dict)
return cast(dict, resp)

def _ssh_connect(
self,
ssh_key_path: str,
ssh_username: str = "ec2-user",
) -> paramiko.SSHClient:
"""
Helper to create SSH connection to remote SG server

:param ssh_key_path: Path to SSH private key
:param ssh_username: SSH username (default: ec2-user)
:return: Connected SSH client (caller must close)
"""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
private_key = paramiko.Ed25519Key.from_private_key_file(ssh_key_path)
ssh.connect(self.__hostname, username=ssh_username, pkey=private_key)
return ssh

def ssh_exec_command(
self,
command: str,
ssh_key_path: str,
ssh_username: str = "ec2-user",
) -> str:
"""
Executes a command on remote SG server via SSH and returns stdout

:param command: Command to execute
:param ssh_key_path: Path to SSH private key
:param ssh_username: SSH username (default: ec2-user)
:return: Command stdout as string
"""
ssh = self._ssh_connect(ssh_key_path, ssh_username)
try:
stdin, stdout, stderr = ssh.exec_command(command)
return stdout.read().decode("utf-8").strip()
finally:
ssh.close()

def download_file(
self,
remote_path: str,
local_path: str,
ssh_key_path: str,
ssh_username: str = "ec2-user",
) -> None:
"""
Downloads a file from the remote SG server via SSH

:param remote_path: Path to file on remote server
:param local_path: Path where file should be saved locally
:param ssh_key_path: Path to SSH private key for authentication
:param ssh_username: SSH username (default: ec2-user)
"""
ssh = self._ssh_connect(ssh_key_path, ssh_username)
sftp = ssh.open_sftp()
try:
sftp.get(remote_path, local_path)
finally:
sftp.close()
ssh.close()

async def scan_rest_endpoints(
self,
db_name: str,
expected_online: bool,
num_docs: int = 10,
) -> tuple[int, int]:
"""
Scans multiple REST endpoints to verify database online/offline state.

:param db_name: Database name to test
:param expected_online: True if DB should be online, False if offline
:param num_docs: Number of docs (for testing doc operations)
:return: Tuple of (endpoints_tested, errors_403) where errors_403 is count of 403/503 errors or connection failures
"""
endpoints_tested = 0
errors_403 = 0

test_operations = [
(
"GET /{db}/_all_docs",
lambda: self.get_all_documents(db_name, "_default", "_default"),
),
(
"GET /{db}/_changes",
lambda: self.get_changes(db_name, "_default", "_default"),
),
(
"GET /{db}/{doc}",
lambda: self.get_document(db_name, "doc_0", "_default", "_default"),
),
(
"POST /{db}/_bulk_docs",
lambda: self.update_documents(
db_name,
[DocumentUpdateEntry("test_doc", None, {"foo": "bar"})],
"_default",
"_default",
),
),
]
for endpoint_name, test_func in test_operations:
try:
await test_func()
endpoints_tested += 1
except (CblSyncGatewayBadResponseError, JSONDecodeError) as e:
endpoints_tested += 1
if isinstance(e, CblSyncGatewayBadResponseError) and e.code in [
403,
503,
]:
errors_403 += 1
elif isinstance(e, JSONDecodeError):
errors_403 += 1
else:
raise e
except Exception as e:
raise e
return (endpoints_tested, errors_403)
28 changes: 28 additions & 0 deletions spec/tests/QE/test_db_online_offline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Database Online/Offline Tests

## test_db_offline_on_bucket_deletion

Test that database goes offline when its bucket is deleted.

This test verifies that when the Couchbase Server bucket backing a Sync Gateway database is deleted, the database properly enters offline state and all REST API endpoints return 503 (Service Unavailable).

1. Create bucket and default collection
2. Configure Sync Gateway database endpoint
3. Create user 'vipul' with access to ['ABC']
4. Create 10 docs via Sync Gateway
5. Verify database is online - REST endpoints work
6. Delete bucket to sever connection
7. Verify database is offline - REST endpoints return 403

## test_multiple_dbs_bucket_deletion

Test that deleting specific buckets causes only those databases to go offline.

This test creates 4 databases with unique buckets, deletes 2 buckets, and verifies that only those 2 databases go offline while the other 2 remain online.

1. Create buckets and configure databases
2. Create 10 docs via Sync Gateway
3. Verify all databases are online
4. Delete buckets for db1 and db3
5. Verify db2 and db4 remain online
6. Verify db1 and db3 are offline (return 403)
26 changes: 23 additions & 3 deletions spec/tests/QE/test_log_redaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,27 @@ This test verifies that NO document IDs or usernames appear in logs WITHOUT `<ud

1. Create bucket and default collection
2. Configure Sync Gateway with log redaction enabled
3. Create user 'autotest' with access to channels
4. Create 10 docs via Sync Gateway with xattrs
5. Verify docs were created
3. Create user 'vipul' with access to channels
4. Create 10 docs via Sync Gateway
5. Verify docs were created (public API)
6. Fetch and scan SG logs for redaction violations

## test_sgcollect_redacted_files_and_contents

Test SGCollect REST API for redacted files and log file contents (Combined test).

This comprehensive test uses the `/_sgcollect_info` REST API to trigger SGCollect with partial redaction, then verifies:
1. All expected log files are present in the zip
2. Sensitive data is marked with `<ud>` tags in redacted files
3. sync_gateway.log contains correct system information (hostname)

**Prerequisites**: Sync Gateway bootstrap.json must be configured with `redaction_level: "partial"` in the logging section.

1. Create bucket and default collection
2. Configure Sync Gateway
3. Create user 'vipul' with access to ['logging']
4. Create 10 docs via Sync Gateway
5. Start SGCollect via REST API and wait for it to complete
6. Download and extract SGCollect redacted zip
7. Verify redacted zip marks sensitive data with <ud> tags
8. Verify content of sync_gateway.log
Loading
Loading