Skip to content
Merged
348 changes: 348 additions & 0 deletions client/src/cbltest/api/syncgateway.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
import re
import ssl
from abc import ABC, abstractmethod
from json import dumps, loads
from json.decoder import JSONDecodeError
from pathlib import Path
from typing import Any, cast
from urllib.parse import urljoin
Expand Down Expand Up @@ -1340,6 +1342,352 @@ async def fetch_log_file(

return log_contents

async def run_sgcollect_info(
self,
ssh_key_path: str,
output_name: str = "sgcollect",
ssh_username: str = "ec2-user",
) -> str:
"""
Runs sgcollect_info on the remote SG server to create diagnostic zip

:param ssh_key_path: Path to SSH private key for authentication
:param output_name: Name for the output zip file (without extension)
:param ssh_username: SSH username (default: ec2-user)
:return: Full path to the generated zip file on remote server
"""
with self.__tracer.start_as_current_span(
"run_sgcollect_info",
attributes={
"output.name": output_name,
"ssh.username": ssh_username,
},
):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
private_key = paramiko.Ed25519Key.from_private_key_file(ssh_key_path)
ssh.connect(
self.__hostname,
username=ssh_username,
pkey=private_key,
)

# Run sgcollect_info
output_dir = "/tmp/sgcollect_output"
ssh.exec_command(f"rm -rf {output_dir}")
stdin, stdout, stderr = ssh.exec_command(
"which sgcollect_info || find /opt/couchbase-sync-gateway -name sgcollect_info 2>/dev/null | head -1"
)
sgcollect_path = stdout.read().decode("utf-8").strip()
if not sgcollect_path:
ssh.close()
raise Exception(
"sgcollect_info not found on remote server. Is Sync Gateway installed?"
)

server_config = await self._send_request("GET", "/_config")
redaction_level = server_config.get("logging", {}).get(
"redaction_level", "none"
)

scheme = "https" if self.__secure else "http"
sg_url = f"{scheme}://{self.__hostname}:{self.__admin_port}"
sgcollect_cmd = (
f"mkdir -p {output_dir} && cd {output_dir} && "
f"SG_PASSWORD=password {sgcollect_path} "
f"--sync-gateway-url={sg_url} "
f"--sync-gateway-username=admin "
f"--log-redaction-level={redaction_level} "
f"{output_name}.zip"
)
stdin, stdout, stderr = ssh.exec_command(sgcollect_cmd)
exit_status = stdout.channel.recv_exit_status()

stderr_output = stderr.read().decode("utf-8")
if exit_status != 0:
ssh.close()
raise Exception(
f"sgcollect_info failed with exit code {exit_status}: {stderr_output}"
)

# Check for redacted zip file
stdin, stdout, stderr = ssh.exec_command(
f"test -f {output_dir}/{output_name}-redacted.zip && echo 'redacted' || echo 'none'"
)
has_redacted = stdout.read().decode("utf-8").strip() == "redacted"
ssh.close()

# Return appropriate path
if has_redacted:
return f"{output_dir}/{output_name}-redacted.zip"
else:
raise Exception(
f"No zip files found in {output_dir} after sgcollect_info"
)

async def start_sgcollect_via_api(
self,
redact_level: str | None = None,
redact_salt: str | None = None,
output_dir: str | None = None,
) -> dict:
"""
Starts SGCollect using the REST API endpoint

:param redact_level: Redaction level ('none', 'partial', 'full')
:param redact_salt: Custom salt for redaction hashing
:param output_dir: Output directory on the remote server
:return: Response dict with status
"""
with self.__tracer.start_as_current_span(
"start_sgcollect_via_api",
attributes={
"redact.level": redact_level or "none",
},
):
body: dict[str, Any] = {"upload": False}

if redact_level is not None:
body["redact_level"] = redact_level
if redact_salt is not None:
body["redact_salt"] = redact_salt
if output_dir is not None:
body["output_dir"] = output_dir

resp = await self._send_request(
"post",
"/_sgcollect_info",
JSONDictionary(body),
)

assert isinstance(resp, dict)
return cast(dict, resp)

async def wait_for_sgcollect_to_complete(
self, max_attempts: int = 60, wait_time: int = 5
) -> None:
"""
Waits for SGCollect to complete

:param max_attempts: Maximum number of attempts to wait for SGCollect to complete
:param wait_time: Time to wait between attempts
"""
for _ in range(max_attempts):
status_resp = await self.get_sgcollect_status()
if status_resp.get("status") in ["stopped", "completed"]:
return
await asyncio.sleep(wait_time)
raise Exception(
f"SGCollect did not complete after {max_attempts * wait_time} seconds.\n"
f"Status: {status_resp.get('status')}.\n"
f"Error: {status_resp.get('error')}"
)

async def get_sgcollect_status(self) -> dict:
"""
Gets the current status of SGCollect operation

:return: Response dict with status ('stopped' or 'running')
"""
with self.__tracer.start_as_current_span("get_sgcollect_status"):
resp = await self._send_request("get", "/_sgcollect_info")
assert isinstance(resp, dict)
return cast(dict, resp)

def _ssh_connect(
self,
ssh_key_path: str,
ssh_username: str = "ec2-user",
) -> paramiko.SSHClient:
"""
Helper to create SSH connection to remote SG server

:param ssh_key_path: Path to SSH private key
:param ssh_username: SSH username (default: ec2-user)
:return: Connected SSH client (caller must close)
"""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
private_key = paramiko.Ed25519Key.from_private_key_file(ssh_key_path)
ssh.connect(self.__hostname, username=ssh_username, pkey=private_key)
return ssh

def ssh_exec_command(
self,
command: str,
ssh_key_path: str,
ssh_username: str = "ec2-user",
) -> str:
"""
Executes a command on remote SG server via SSH and returns stdout

:param command: Command to execute
:param ssh_key_path: Path to SSH private key
:param ssh_username: SSH username (default: ec2-user)
:return: Command stdout as string
"""
ssh = self._ssh_connect(ssh_key_path, ssh_username)
try:
stdin, stdout, stderr = ssh.exec_command(command)
return stdout.read().decode("utf-8").strip()
finally:
ssh.close()

def download_file(
self,
remote_path: str,
local_path: str,
ssh_key_path: str,
ssh_username: str = "ec2-user",
) -> None:
"""
Downloads a file from the remote SG server via SSH

:param remote_path: Path to file on remote server
:param local_path: Path where file should be saved locally
:param ssh_key_path: Path to SSH private key for authentication
:param ssh_username: SSH username (default: ec2-user)
"""
ssh = self._ssh_connect(ssh_key_path, ssh_username)
sftp = ssh.open_sftp()
try:
sftp.get(remote_path, local_path)
finally:
sftp.close()
ssh.close()

async def wait_for_database_to_be_offline(self, db_name: str) -> None:
"""
Waits for a database to be offline

:param db_name: Database name to wait for
"""
max_attempts = 6
wait_time = 5
for _ in range(max_attempts):
try:
resp = await self._send_request("get", f"/{db_name}")
if resp.code == 403:
return
except Exception as e:
if isinstance(e, CblSyncGatewayBadResponseError) and e.code == 403:
return
await asyncio.sleep(wait_time)
raise Exception(
f"Database {db_name} did not go offline after {max_attempts * wait_time} seconds."
)

async def scan_rest_endpoints(
self,
db_name: str,
expected_online: bool,
num_docs: int = 10,
) -> tuple[int, int]:
"""
Scans multiple REST endpoints to verify database online/offline state.

:param db_name: Database name to test
:param expected_online: True if DB should be online, False if offline
:param num_docs: Number of docs (for testing doc operations)
:return: Tuple of (endpoints_tested, errors_503)
"""
endpoints_tested = 0
errors_403 = 0

# List of endpoints to test
test_operations = [
(
"GET /{db}/_all_docs",
lambda: self.get_all_documents(db_name, "_default", "_default"),
),
(
"GET /{db}/_changes",
lambda: self.get_changes(db_name, "_default", "_default"),
),
(
"GET /{db}/{doc}",
lambda: self.get_document(
db_name, f"{db_name}_doc_0", "_default", "_default"
),
),
(
"POST /{db}/_bulk_docs",
lambda: self.update_documents(
db_name,
[DocumentUpdateEntry("test_doc", None, {"foo": "bar"})],
"_default",
"_default",
),
),
]
for endpoint_name, test_func in test_operations:
try:
await test_func()
endpoints_tested += 1
except (CblSyncGatewayBadResponseError, JSONDecodeError) as e:
endpoints_tested += 1
if isinstance(e, CblSyncGatewayBadResponseError) and e.code == 403:
errors_403 += 1
elif isinstance(e, JSONDecodeError):
errors_403 += 1
else:
raise e
except Exception as e:
endpoints_tested += 1
raise e
return (endpoints_tested, errors_403)


def verify_sensitive_data_is_hashed(
log_content: str,
sensitive_patterns: list[str],
) -> tuple[bool, list[str]]:
"""
Verifies that sensitive data appears only in hashed form (not plaintext) in logs.

This checks that:
1. Sensitive data never appears as plaintext outside <ud> tags, for
2. When sensitive data reference exists, it's inside <ud> tags with a HASHED value (not original)

:param log_content: The log file content as a string
:param sensitive_patterns: List of sensitive strings to check (e.g., usernames, passwords)
:return: Tuple of (is_hashed, violations). is_hashed=True if no plaintext found.
"""
violations = []

for pattern in sensitive_patterns:
# Look for ANY plaintext occurrences (inside or outside tags)
escaped_pattern = re.escape(pattern)
for match in re.finditer(escaped_pattern, log_content):
start_pos = match.start()
end_pos = match.end()

# Check if this plaintext is inside <ud> tags
before_text = log_content[max(0, start_pos - 100) : start_pos]
after_text = log_content[end_pos : min(len(log_content), end_pos + 100)]

has_opening_tag = "<ud>" in before_text and before_text.rfind(
"<ud>"
) > before_text.rfind("</ud>")
has_closing_tag = "</ud>" in after_text

# If we find the ACTUAL plaintext value anywhere (even inside tags), it's NOT hashed!
# In properly redacted logs, we should NEVER see the original value, only hashes
context = log_content[
max(0, start_pos - 50) : min(len(log_content), end_pos + 50)
]

if has_opening_tag and has_closing_tag:
violations.append(
f"Found plaintext '{pattern}' INSIDE <ud> tags (should be hashed): ...{context}..."
)
else:
violations.append(
f"Found plaintext '{pattern}' OUTSIDE <ud> tags: ...{context}..."
)

return (len(violations) == 0, violations)


def scan_logs_for_untagged_sensitive_data(
log_content: str,
Expand Down
28 changes: 28 additions & 0 deletions spec/tests/QE/test_db_online_offline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Database Online/Offline Tests

## test_db_offline_on_bucket_deletion

Test that database goes offline when its bucket is deleted.

This test verifies that when the Couchbase Server bucket backing a Sync Gateway database is deleted, the database properly enters offline state and all REST API endpoints return 503 (Service Unavailable).

1. Create bucket and default collection
2. Configure Sync Gateway database endpoint
3. Create user 'vipul' with access to ['ABC']
4. Create 10 docs via Sync Gateway
5. Verify database is online - REST endpoints work
6. Delete bucket to sever connection
7. Verify database is offline - REST endpoints return 403

## test_multiple_dbs_bucket_deletion

Test that deleting specific buckets causes only those databases to go offline.

This test creates 4 databases with unique buckets, deletes 2 buckets, and verifies that only those 2 databases go offline while the other 2 remain online.

1. Create buckets and configure databases
2. Create 10 docs via Sync Gateway
3. Verify all databases are online
4. Delete buckets for db1 and db3
5. Verify db2 and db4 remain online
6. Verify db1 and db3 are offline (return 403)
Loading
Loading