Skip to content
Merged
185 changes: 94 additions & 91 deletions client/src/cbltest/api/syncgateway.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import re
import asyncio
import ssl
from abc import ABC, abstractmethod
from json import dumps, loads
from pathlib import Path
from typing import Any, cast
from urllib.parse import urljoin

import paramiko
import requests
from aiohttp import BasicAuth, ClientSession, TCPConnector
from opentelemetry.trace import get_tracer
Expand Down Expand Up @@ -183,6 +182,32 @@ def __init__(self, key: str, id: str, revid: str | None, cv: str | None) -> None
self.__cv = cv


class DatabaseStatusResponse:
"""
A class representing a database status response from Sync Gateway
"""

@property
def db_name(self) -> str:
"""Gets the database name"""
return self.__db_name

@property
def state(self) -> str:
"""Gets the database state ('Online', 'Offline', etc.)"""
return self.__state

@property
def update_seq(self) -> int:
"""Gets the update sequence number"""
return self.__update_seq

def __init__(self, response: dict):
self.__db_name = response.get("db_name", "")
self.__state = response.get("state", "Unknown")
self.__update_seq = response.get("update_seq", 0)


class AllDocumentsResponse:
"""
A class representing an all_docs response from Sync Gateway
Expand Down Expand Up @@ -629,20 +654,24 @@ async def put_database(self, db_name: str, payload: PutDatabasePayload) -> None:
"""
await self._put_database(db_name, payload, 0)

async def database_exists(self, db_name: str) -> bool:
async def get_database_status(self, db_name: str) -> DatabaseStatusResponse | None:
"""
Checks if a database exists in Sync Gateway's configuration.
Gets the status of a database including its online/offline state.

:param db_name: The name of the Database to check
:return: True if the database exists, False otherwise
:param db_name: The name of the Database
:return: DatabaseStatusResponse with state, sequences, etc. Returns None if database doesn't exist (404/403)
"""
try:
await self._send_request("get", f"/{db_name}")
return True
except CblSyncGatewayBadResponseError as e:
if e.code == 403: # Database does not exist
return False
raise
with self.__tracer.start_as_current_span(
"get_database_status", attributes={"cbl.database.name": db_name}
):
try:
resp = await self._send_request("get", f"/{db_name}/")
assert isinstance(resp, dict)
return DatabaseStatusResponse(cast(dict, resp))
except CblSyncGatewayBadResponseError as e:
if e.code in [403, 404]: # Database doesn't exist
return None
raise

async def _delete_database(self, db_name: str, retry_count: int = 0) -> None:
with self.__tracer.start_as_current_span(
Expand All @@ -656,8 +685,6 @@ async def _delete_database(self, db_name: str, retry_count: int = 0) -> None:
f"Sync gateway returned 500 from DELETE database call, retrying ({retry_count + 1})..."
)
current_span.add_event("SGW returned 500, retry")
import asyncio

await asyncio.sleep(2)
await self._delete_database(db_name, retry_count + 1)
elif e.code == 403:
Expand Down Expand Up @@ -1287,95 +1314,71 @@ async def get_document_revision_public(
) as session:
return await self._send_request("GET", path, params=params, session=session)

async def fetch_log_file(
async def start_sgcollect_via_api(
self,
log_type: str,
ssh_key_path: str,
ssh_username: str = "ec2-user",
) -> str:
redact_level: str | None = None,
redact_salt: str | None = None,
output_dir: str | None = None,
) -> dict:
"""
Fetches a log file from the remote Sync Gateway server via SSH
Starts SGCollect using the REST API endpoint

:param log_type: The type of log to fetch (e.g., 'debug', 'info', 'error', 'warn')
:param ssh_key_path: Path to SSH private key for authentication
:param ssh_username: SSH username (default: ec2-user)
:return: Contents of the log file as a string
:param redact_level: Redaction level ('none', 'partial', 'full')
:param redact_salt: Custom salt for redaction hashing
:param output_dir: Output directory on the remote server
:return: Response dict with status
"""
# Get log directory from SG configuration
server_config = await self._send_request("GET", "/_config")
log_dir = server_config.get("logging", {}).get(
"log_file_path", "/home/ec2-user/log"
)
remote_log_path = f"{log_dir}/sg_{log_type}.log"

with self.__tracer.start_as_current_span(
"fetch_log_file",
"start_sgcollect_via_api",
attributes={
"log.type": log_type,
"remote.path": remote_log_path,
"ssh.username": ssh_username,
"redact.level": redact_level or "none",
},
):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
body: dict[str, Any] = {"upload": False}

# Load private key
private_key = paramiko.Ed25519Key.from_private_key_file(ssh_key_path)
if redact_level is not None:
body["redact_level"] = redact_level
if redact_salt is not None:
body["redact_salt"] = redact_salt
if output_dir is not None:
body["output_dir"] = output_dir

# Connect to the remote server
ssh.connect(
self.__hostname,
username=ssh_username,
pkey=private_key,
resp = await self._send_request(
"post",
"/_sgcollect_info",
JSONDictionary(body),
)

# Read the log file
sftp = ssh.open_sftp()
try:
with sftp.open(remote_log_path, "r") as remote_file:
log_contents = remote_file.read().decode("utf-8")
finally:
sftp.close()
ssh.close()
assert isinstance(resp, dict)
return cast(dict, resp)

return log_contents
async def wait_for_sgcollect_to_complete(
self, max_attempts: int = 60, wait_time: int = 5
) -> None:
"""
Waits for SGCollect to complete

:param max_attempts: Maximum number of attempts to wait for SGCollect to complete
:param wait_time: Time to wait between attempts
"""
for _ in range(max_attempts):
status_resp = await self.get_sgcollect_status()
if status_resp.get("status") in ["stopped", "completed"]:
return
await asyncio.sleep(wait_time)
raise Exception(
f"SGCollect did not complete after {max_attempts * wait_time} seconds.\n"
f"Status: {status_resp.get('status')}.\n"
f"Error: {status_resp.get('error')}"
)

def scan_logs_for_untagged_sensitive_data(
log_content: str,
sensitive_patterns: list[str],
) -> list[str]:
"""
Scans log content for sensitive data that is NOT wrapped in <ud>...</ud> tags
async def get_sgcollect_status(self) -> dict:
"""
Gets the current status of SGCollect operation

:param log_content: The log file content as a string
:param sensitive_patterns: List of sensitive strings to look for (e.g., doc IDs, usernames)
:return: List of violations found (sensitive data without <ud> tags)
"""
violations = []
for pattern in sensitive_patterns:
# Escape special regex characters in the pattern
escaped_pattern = re.escape(pattern)
for match in re.finditer(escaped_pattern, log_content):
start_pos = match.start()
end_pos = match.end()

# Check if this occurrence is within <ud>...</ud> tags
# Look backwards for <ud> and forwards for </ud>
before_text = log_content[max(0, start_pos - 100) : start_pos]
after_text = log_content[end_pos : min(len(log_content), end_pos + 100)]

# Check if there's an opening <ud> before and closing </ud> after
has_opening_tag = "<ud>" in before_text and before_text.rfind(
"<ud>"
) > before_text.rfind("</ud>")
has_closing_tag = "</ud>" in after_text

if not (has_opening_tag and has_closing_tag):
context_start = max(0, start_pos - 50)
context_end = min(len(log_content), end_pos + 50)
context = log_content[context_start:context_end]
violations.append(
f"Untagged '{pattern}' at position {start_pos}: ...{context}..."
)
return violations
:return: Response dict with status ('stopped' or 'running')
"""
with self.__tracer.start_as_current_span("get_sgcollect_status"):
resp = await self._send_request("get", "/_sgcollect_info")
assert isinstance(resp, dict)
return cast(dict, resp)
28 changes: 28 additions & 0 deletions spec/tests/QE/test_db_online_offline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Database Online/Offline Tests

## test_db_offline_on_bucket_deletion

Test that database goes offline when its bucket is deleted.

This test verifies that when the Couchbase Server bucket backing a Sync Gateway database is deleted, the database properly enters offline state and all REST API endpoints return 503 (Service Unavailable).

1. Create bucket and default collection
2. Configure Sync Gateway database endpoint
3. Create user 'vipul' with access to ['ABC']
4. Create 10 docs via Sync Gateway
5. Verify database is online - REST endpoints work
6. Delete bucket to sever connection
7. Verify database is offline - REST endpoints return 403

## test_multiple_dbs_bucket_deletion

Test that deleting specific buckets causes only those databases to go offline.

This test creates 4 databases with unique buckets, deletes 2 buckets, and verifies that only those 2 databases go offline while the other 2 remain online.

1. Create buckets and configure databases
2. Create 10 docs via Sync Gateway
3. Verify all databases are online
4. Delete buckets for db1 and db3
5. Verify db2 and db4 remain online
6. Verify db1 and db3 are offline (return 403)
26 changes: 23 additions & 3 deletions spec/tests/QE/test_log_redaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,27 @@ This test verifies that NO document IDs or usernames appear in logs WITHOUT `<ud

1. Create bucket and default collection
2. Configure Sync Gateway with log redaction enabled
3. Create user 'autotest' with access to channels
4. Create 10 docs via Sync Gateway with xattrs
5. Verify docs were created
3. Create user 'vipul' with access to channels
4. Create 10 docs via Sync Gateway
5. Verify docs were created (public API)
6. Fetch and scan SG logs for redaction violations

## test_sgcollect_redacted_files_and_contents

Test SGCollect REST API for redacted files and log file contents (Combined test).

This comprehensive test uses the `/_sgcollect_info` REST API to trigger SGCollect with partial redaction, then verifies:
1. All expected log files are present in the zip
2. Sensitive data is marked with `<ud>` tags in redacted files
3. sync_gateway.log contains correct system information (hostname)

**Prerequisites**: Sync Gateway bootstrap.json must be configured with `redaction_level: "partial"` in the logging section.

1. Create bucket and default collection
2. Configure Sync Gateway
3. Create user 'vipul' with access to ['logging']
4. Create 10 docs via Sync Gateway
5. Start SGCollect via REST API and wait for it to complete
6. Download and extract SGCollect redacted zip
7. Verify redacted zip marks sensitive data with <ud> tags
8. Verify content of sync_gateway.log
Loading
Loading