Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 261 additions & 42 deletions client/src/cbltest/api/syncgateway.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import asyncio
import re
import ssl
from abc import ABC, abstractmethod
from json import dumps, loads
from pathlib import Path
from typing import Any, cast
from urllib.parse import urljoin

from aiohttp import BasicAuth, ClientSession, TCPConnector
from aiohttp import BasicAuth, ClientError, ClientSession, ClientTimeout, TCPConnector
from opentelemetry.trace import get_tracer

from cbltest.api.error import CblSyncGatewayBadResponseError
Expand Down Expand Up @@ -181,32 +182,6 @@ def __init__(self, key: str, id: str, revid: str | None, cv: str | None) -> None
self.__cv = cv


class DatabaseStatusResponse:
"""
A class representing a database status response from Sync Gateway
"""

@property
def db_name(self) -> str:
"""Gets the database name"""
return self.__db_name

@property
def state(self) -> str:
"""Gets the database state ('Online', 'Offline', etc.)"""
return self.__state

@property
def update_seq(self) -> int:
"""Gets the update sequence number"""
return self.__update_seq

def __init__(self, response: dict):
self.__db_name = response.get("db_name", "")
self.__state = response.get("state", "Unknown")
self.__update_seq = response.get("update_seq", 0)


class AllDocumentsResponse:
"""
A class representing an all_docs response from Sync Gateway
Expand Down Expand Up @@ -426,6 +401,32 @@ def parse(self, input: str) -> tuple[str, int]:
return input[0:first_lparen], int(input[first_lparen + 1 : first_semicol])


class DatabaseStatusResponse:
"""
A class representing a database status response from Sync Gateway
"""

@property
def db_name(self) -> str:
"""Gets the database name"""
return self.__db_name

@property
def state(self) -> str:
"""Gets the database state ('Online', 'Offline', etc.)"""
return self.__state

@property
def update_seq(self) -> int:
"""Gets the update sequence number"""
return self.__update_seq

def __init__(self, response: dict):
self.__db_name = response.get("db_name", "")
self.__state = response.get("state", "Unknown")
self.__update_seq = response.get("update_seq", 0)


class _SyncGatewayBase:
"""
Base class for Sync Gateway clients containing common document and database operations.
Expand Down Expand Up @@ -743,8 +744,7 @@ async def get_all_documents(
},
):
resp = await self._send_request(
"get",
f"/{db_name}.{scope}.{collection}/_all_docs",
"get", f"/{db_name}.{scope}.{collection}/_all_docs"
)

assert isinstance(resp, dict)
Expand Down Expand Up @@ -775,8 +775,7 @@ async def get_changes(
):
query_params = f"version_type={version_type}"
resp = await self._send_request(
"get",
f"/{db_name}.{scope}.{collection}/_changes?{query_params}",
"get", f"/{db_name}.{scope}.{collection}/_changes?{query_params}"
)

assert isinstance(resp, dict)
Expand Down Expand Up @@ -1012,8 +1011,7 @@ async def get_document(
},
):
response = await self._send_request(
"get",
f"/{db_name}.{scope}.{collection}/{doc_id}",
"get", f"/{db_name}.{scope}.{collection}/{doc_id}"
)
if not isinstance(response, dict):
raise ValueError(
Expand Down Expand Up @@ -1098,6 +1096,236 @@ async def get_document_revision_public(
) as session:
return await self._send_request("GET", path, params=params, session=session)

async def _caddy_http_request(
self,
url: str,
operation: str,
timeout: int = 30,
) -> tuple[int, bytes]:
"""
Internal helper to make HTTP requests to Caddy server.

:param url: Full Caddy URL to request
:param operation: Description of operation (for error messages)
:param timeout: Request timeout in seconds
:return: Tuple of (status_code, content as bytes)
:raises FileNotFoundError: If resource returns 404
:raises Exception: For other HTTP or network errors
"""
try:
async with ClientSession() as session:
async with session.get(
url, timeout=ClientTimeout(total=timeout)
) as response:
if response.status == 404:
raise FileNotFoundError(f"{operation} not found at {url}")
elif response.status != 200:
error_text = await response.text()
raise Exception(
f"{operation} failed: HTTP {response.status} - {error_text}"
)

# Return content as bytes
content = await response.read()
return response.status, content

except ClientError as e:
raise Exception(f"Network error during {operation}: {e}") from e

async def fetch_log_file_via_caddy(
self,
log_type: str,
caddy_port: int = 20000,
) -> str:
"""
Fetches a log file from the remote Sync Gateway server via Caddy HTTP server

:param log_type: Type of log file to fetch (e.g., 'debug', 'info', 'warn', 'error')
:param caddy_port: Port where Caddy is serving files (default: 20000)
:return: Content of the log file as a string
:raises FileNotFoundError: If the log file doesn't exist
:raises Exception: For other HTTP errors
"""
log_filename = f"sg_{log_type}.log"
caddy_url = f"http://{self.hostname}:{caddy_port}/{log_filename}"

with self._tracer.start_as_current_span(
"fetch_log_file_via_caddy",
attributes={
"cbl.log.type": log_type,
"cbl.log.filename": log_filename,
"cbl.caddy.url": caddy_url,
},
):
_, content = await self._caddy_http_request(
caddy_url, f"Fetch {log_filename}", timeout=30
)
log_content = content.decode("utf-8")
cbl_info(f"Successfully fetched {log_filename} ({len(log_content)} bytes)")
return log_content

async def download_file_via_caddy(
self,
remote_filename: str,
local_path: str,
caddy_port: int = 20000,
) -> None:
"""
Downloads a file from the remote server via Caddy HTTP server

:param remote_filename: Name of the file on the remote server (e.g., 'sgcollectinfo-xxx-redacted.zip')
:param local_path: Local path where the file should be saved
:param caddy_port: Port where Caddy is serving files (default: 20000)
:raises FileNotFoundError: If the file doesn't exist
:raises Exception: For other HTTP errors
"""
caddy_url = f"http://{self.hostname}:{caddy_port}/{remote_filename}"

with self._tracer.start_as_current_span(
"download_file_via_caddy",
attributes={
"cbl.remote.filename": remote_filename,
"cbl.local.path": local_path,
"cbl.caddy.url": caddy_url,
},
):
_, content = await self._caddy_http_request(
caddy_url, f"Download {remote_filename}", timeout=120
)

# Ensure local directory exists and write file
local_file_path = Path(local_path)
local_file_path.parent.mkdir(parents=True, exist_ok=True)
local_file_path.write_bytes(content)

cbl_info(
f"Successfully downloaded {remote_filename} to {local_path} ({len(content)} bytes)"
)

async def list_files_via_caddy(
self,
caddy_port: int = 20000,
pattern: str | None = None,
) -> list[str]:
"""
Lists files available in the Caddy-served directory (requires 'browse' enabled in Caddyfile)

:param caddy_port: Port where Caddy is serving files (default: 20000)
:param pattern: Optional regex pattern to filter filenames (e.g., 'sgcollect_info.*redacted.zip')
:return: List of filenames available in the directory
:raises Exception: If directory browsing is not enabled or request fails
"""
caddy_url = f"http://{self.hostname}:{caddy_port}/"

with self._tracer.start_as_current_span(
"list_files_via_caddy",
attributes={
"cbl.caddy.url": caddy_url,
"cbl.pattern": pattern or "all",
},
):
try:
_, content = await self._caddy_http_request(
caddy_url, "List directory", timeout=30
)
except FileNotFoundError:
raise Exception(
"Directory browsing endpoint not found. "
"Ensure Caddy is configured with 'file_server browse'"
)

html_content = content.decode("utf-8")

# Extract filenames from HTML anchor tags
# Caddy browse uses: <a href="./filename.ext">
href_pattern = re.compile(r'<a\s+href="\.?/([^"]+)"')
files = []

for match in href_pattern.finditer(html_content):
filename = match.group(1)
# Skip parent directory link and query parameters
if filename and filename != "../" and not filename.startswith("?"):
files.append(filename)

# Filter by pattern if provided
if pattern:
regex = re.compile(pattern)
files = [f for f in files if regex.search(f)]

cbl_info(
f"Found {len(files)} files via Caddy browse"
+ (f" (filtered by '{pattern}')" if pattern else "")
)
return files

async def start_sgcollect_via_api(
self,
redact_level: str | None = None,
redact_salt: str | None = None,
output_dir: str | None = None,
) -> dict:
"""
Starts SGCollect using the REST API endpoint

:param redact_level: Redaction level ('none', 'partial', 'full')
:param redact_salt: Custom salt for redaction hashing
:param output_dir: Output directory on the remote server
:return: Response dict with status
"""
with self._tracer.start_as_current_span(
"start_sgcollect_via_api",
attributes={
"redact.level": redact_level or "none",
},
):
body: dict[str, Any] = {"upload": False}
if redact_level is not None:
body["redact_level"] = redact_level
if redact_salt is not None:
body["redact_salt"] = redact_salt
if output_dir is not None:
body["output_dir"] = output_dir

resp = await self._send_request(
"post",
"/_sgcollect_info",
JSONDictionary(body),
)
assert isinstance(resp, dict)
return cast(dict, resp)

async def get_sgcollect_status(self) -> dict:
"""
Gets the current status of SGCollect operation

:return: Response dict with status ('stopped' or 'running')
"""
with self._tracer.start_as_current_span("get_sgcollect_status"):
resp = await self._send_request("get", "/_sgcollect_info")
assert isinstance(resp, dict)
return cast(dict, resp)

async def wait_for_sgcollect_to_complete(
self, max_attempts: int = 60, wait_time: int = 5
) -> None:
"""
Waits for SGCollect to complete

:param max_attempts: Maximum number of attempts to wait for SGCollect to complete
:param wait_time: Time to wait between attempts
"""
for _ in range(max_attempts):
status_resp = await self.get_sgcollect_status()
if status_resp.get("status") in ["stopped", "completed"]:
return
await asyncio.sleep(wait_time)

raise Exception(
f"SGCollect did not complete after {max_attempts * wait_time} seconds.\n"
f"Status: {status_resp.get('status')}.\n"
f"Error: {status_resp.get('error')}"
)


class SyncGateway(_SyncGatewayBase):
"""
Expand Down Expand Up @@ -1304,15 +1532,6 @@ class SyncGatewayUserClient(_SyncGatewayBase):

This class inherits common operations from _SyncGatewayBase and does NOT
include admin methods (user management, roles, etc.).

Use SyncGateway.create_user_client() to create instances with proper user credentials
and channel access.

Example:
admin_sg = SyncGateway("localhost", "admin", "password")
user_sg = await admin_sg.create_user_client("db", "alice", "pass", ["channel1"])
# user_sg automatically uses port 4984 for all API calls
docs = await user_sg.get_all_documents("db")
"""

def __init__(
Expand Down
2 changes: 2 additions & 0 deletions environment/aws/.terraform.lock.hcl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion environment/aws/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ resource "aws_instance" "sync_gateway" {
associate_public_ip_address = true

root_block_device {
volume_size = 10 # 10 GiB
volume_size = 30 # 30 GiB (minimum required by AMI)
volume_type = "gp2"
}

Expand Down
2 changes: 1 addition & 1 deletion environment/aws/sgw_setup/Caddyfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
:20000
bind 0.0.0.0
root * /home/ec2-user/log
file_server
file_server browse
Loading
Loading