Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/verify_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ source venv/bin/activate
pip install mypy
pip install pytest
pip install types-requests
pip install types-Deprecated
pip install ./client
echo "Checking tests files..."
python -m mypy tests --exclude=venv --ignore-missing-imports
Expand Down
125 changes: 108 additions & 17 deletions client/src/cbltest/api/syncgateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
from cbltest.assertions import _assert_not_null
from cbltest.httplog import get_next_writer
from cbltest.jsonhelper import _get_typed_required
from cbltest.logging import cbl_warning
from cbltest.logging import cbl_warning, cbl_info
from cbltest.version import VERSION
from cbltest.utils import assert_not_null

from deprecated import deprecated


class _CollectionMap(JSONSerializable):
Expand Down Expand Up @@ -172,12 +175,38 @@ class DocumentUpdateEntry(JSONSerializable):
For creating a new document, set revid to None.
"""

@property
@deprecated("Only should be used until 4.0 SGW gets close to GA")
def id(self) -> str:
"""
Gets the ID of the entry (NOTE: Will go away after 4.0 SGW gets close to GA)
"""
return cast(str, self.__body["_id"])

@property
@deprecated("Only should be used until 4.0 SGW gets close to GA")
def rev(self) -> Optional[str]:
"""
Gets the rev ID of the entry (NOTE: Will go away after 4.0 SGW gets close to GA)
"""
if not "_rev" in self.__body:
return None

return cast(str, self.__body["_rev"])

def __init__(self, id: str, revid: Optional[str], body: dict):
self.__body = body.copy()
self.__body["_id"] = id
if revid:
self.__body["_rev"] = revid

@deprecated("Only should be used until 4.0 SGW gets close to GA")
def swap_rev(self, revid: str) -> None:
"""
Changes the revid to the provided one (NOTE: Will go away after 4.0 SGW gets close to GA)
"""
self.__body["_rev"] = revid

def to_json(self) -> Any:
return self.__body

Expand All @@ -193,29 +222,47 @@ def id(self) -> str:
return self.__id

@property
def revid(self) -> str:
def revid(self) -> Optional[str]:
"""Gets the revision ID of the document"""
return self.__rev

@property
def cv(self) -> Optional[str]:
"""Gets the CV of the document"""
return self.__cv

@property
def body(self) -> dict:
"""Gets the body of the document"""
return self.__body

@property
def revision(self) -> str:
"""Gets either the CV (preferred) or revid of the document"""
if self.__cv is not None:
return self.__cv

assert self.__rev is not None
return self.__rev

def __init__(self, body: dict) -> None:
if "error" in body:
raise ValueError("Trying to create remote document from error response")

self.__body = body.copy()
self.__id = cast(str, body["_id"])
self.__rev = cast(str, body["_rev"])
self.__rev = cast(str, body["_rev"]) if "_rev" in body else None
self.__cv = cast(str, body["_cv"]) if "_cv" in body else None
del self.__body["_id"]
del self.__body["_rev"]
if self.__cv is not None:
del self.__body["_cv"]

def to_json(self) -> Any:
ret_val = self.__body.copy()
ret_val["_id"] = self.__id
ret_val["_rev"] = self.__rev
ret_val["_cv"] = self.__cv
return ret_val


Expand Down Expand Up @@ -339,6 +386,19 @@ def replication_url(self, db_name: str):
"""
_assert_not_null(db_name, nameof(db_name))
return urljoin(self.__replication_url, db_name)

async def _put_database(self, db_name: str, payload: PutDatabasePayload, retry_count: int = 0) -> None:
with self.__tracer.start_as_current_span("put_database",
attributes={"cbl.database.name": db_name}) as current_span:
try:
await self._send_request("put", f"/{db_name}/", payload)
except CblSyncGatewayBadResponseError as e:
if e.code == 500 and retry_count < 10:
cbl_warning(f"Sync gateway returned 500 from PUT database call, retrying ({retry_count + 1})...")
current_span.add_event("SGW returned 500, retry")
await self._put_database(db_name, payload, retry_count + 1)
else:
raise

async def put_database(self, db_name: str, payload: PutDatabasePayload) -> None:
"""
Expand All @@ -347,17 +407,7 @@ async def put_database(self, db_name: str, payload: PutDatabasePayload) -> None:
:param db_name: The name of the DB to create
:param payload: The options for the DB to create
"""
with self.__tracer.start_as_current_span("put_database",
attributes={"cbl.database.name": db_name}) as current_span:
try:
await self._send_request("put", f"/{db_name}", payload)
except CblSyncGatewayBadResponseError as e:
if e.code == 500:
cbl_warning("Sync gateway returned 500 from PUT database call, retrying...")
current_span.add_event("SGW returned 500, retry")
await self.put_database(db_name, payload)
else:
raise
await self._put_database(db_name, payload, 0)

async def delete_database(self, db_name: str) -> None:
"""
Expand All @@ -370,7 +420,7 @@ async def delete_database(self, db_name: str) -> None:
:param db_name: The name of the Database to delete
"""
with self.__tracer.start_as_current_span("delete_database", attributes={"cbl.database.name": db_name}):
await self._send_request("delete", f"/{db_name}")
await self._send_request("delete", f"/{db_name}/")

def create_collection_access_dict(self, input: Dict[str, List[str]]) -> dict:
"""
Expand Down Expand Up @@ -527,6 +577,28 @@ async def get_all_documents(self, db_name: str, scope: str = "_default",
resp = await self._send_request("get", f"/{db_name}.{scope}.{collection}/_all_docs?show_cv=true")
assert isinstance(resp, dict)
return AllDocumentsResponse(cast(dict, resp))

@deprecated("Only should be used until 4.0 SGW gets close to GA")
async def _rewrite_rev_ids(self, db_name: str, updates: List[DocumentUpdateEntry],
scope: str, collection: str) -> None:
all_docs_body = list(u.id for u in updates if u.rev is not None)
all_docs_response = await self._send_request("post", f"/{db_name}.{scope}.{collection}/_all_docs",
JSONDictionary({"keys": all_docs_body}))

if not isinstance(all_docs_response, dict):
raise ValueError("Inappropriate response from sync gateway _all_docs (not JSON dict)")

rows = cast(dict, all_docs_response)["rows"]
if not isinstance(rows, list):
raise ValueError("Inappropriate response from sync gateway _all_docs (rows not a list)")

for r in cast(list, rows):
next_id = r["id"]
found = assert_not_null(next((u for u in updates if u.id == next_id), None),
f"Unable to find {next_id} in updates!")
new_rev_id = r["value"]["rev"]
cbl_info(f"For document {found.id}: Swapping revid from {found.rev} to {new_rev_id}")
found.swap_rev(new_rev_id)

async def update_documents(self, db_name: str, updates: List[DocumentUpdateEntry],
scope: str = "_default", collection: str = "_default") -> None:
Expand All @@ -541,12 +613,26 @@ async def update_documents(self, db_name: str, updates: List[DocumentUpdateEntry
with self.__tracer.start_as_current_span("update_documents", attributes={"cbl.database.name": db_name,
"cbl.scope.name": scope,
"cbl.collection.name": collection}):

await self._rewrite_rev_ids(db_name, updates, scope, collection)



body = {
"docs": list(u.to_json() for u in updates)
}

await self._send_request("post", f"/{db_name}.{scope}.{collection}/_bulk_docs",
JSONDictionary(body))

@deprecated("Only should be used until 4.0 SGW gets close to GA")
async def _replaced_revid(self, doc_id: str, revid: str, db_name: str, scope: str, collection: str) -> str:
response = await self._send_request("get", f"/{db_name}.{scope}.{collection}/{doc_id}?show_cv=true")
assert isinstance(response, dict)
response_dict = cast(dict, response)
assert revid == response_dict["_cv"] or revid == response_dict["_rev"]
return cast(dict, response)["_rev"]


async def delete_document(self, doc_id: str, revid: str, db_name: str, scope: str = "_default",
collection: str = "_default") -> None:
Expand All @@ -563,8 +649,13 @@ async def delete_document(self, doc_id: str, revid: str, db_name: str, scope: st
"cbl.scope.name": scope,
"cbl.collection.name": collection,
"cbl.document.id": doc_id}):
if "@" in revid:
new_rev_id = await self._replaced_revid(doc_id, revid, db_name, scope, collection)
else:
new_rev_id = revid

await self._send_request("delete", f"/{db_name}.{scope}.{collection}/{doc_id}",
params={"rev": revid})
params={"rev": new_rev_id})

async def purge_document(self, doc_id: str, db_name: str, scope: str = "_default",
collection: str = "_default") -> None:
Expand Down Expand Up @@ -601,7 +692,7 @@ async def get_document(self, db_name: str, doc_id: str, scope: str = "_default",
"cbl.scope.name": scope,
"cbl.collection.name": collection,
"cbl.document.id": doc_id}):
response = await self._send_request("get", f"/{db_name}.{scope}.{collection}/{doc_id}")
response = await self._send_request("get", f"/{db_name}.{scope}.{collection}/{doc_id}?show_cv=true")
if not isinstance(response, dict):
raise ValueError("Inappropriate response from sync gateway get /doc (not JSON)")

Expand Down
6 changes: 5 additions & 1 deletion client/src/cbltest/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import time
from typing import Any, Callable, Dict, TypeVar, Type, Union
from typing import Any, Callable, Dict, TypeVar, Type, Union, Optional, cast

from .api.error import CblTimeoutError

Expand Down Expand Up @@ -27,3 +27,7 @@ def _try_n_times(num_times: int,
print(f"Trying {func.__name__} failed (reason='{e}')")

raise CblTimeoutError(f"Failed to call {func.__name__} after {num_times} attempts!")

def assert_not_null(input: Optional[T], msg: str) -> T:
assert input is not None, msg
return cast(T, input)
10 changes: 5 additions & 5 deletions environment/sg/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ SHELL ["/bin/bash", "-c"]
RUN apt -yqq update
RUN apt -yqq install curl systemctl
RUN mkdir -p /opt/sg
COPY cert /opt/sg/cert
COPY config /opt/sg/config
COPY start-sgw.sh /opt/sg

ARG SG_DEB
RUN [ -z "$SG_DEB" ] && echo "SG_DEB is required" && exit 1 || true
Expand All @@ -20,9 +17,12 @@ RUN ARCHITECTURE="$(dpkg --print-architecture)" && \
exit 1; \
fi

WORKDIR /opt/sg
RUN dpkg -i ./couchbase-sync-gateway.deb
RUN dpkg -i /opt/sg/couchbase-sync-gateway.deb
COPY cert /opt/sg/cert
COPY config /opt/sg/config
COPY start-sgw.sh /opt/sg

WORKDIR /opt/sg
EXPOSE 4984
EXPOSE 4985

Expand Down
8 changes: 8 additions & 0 deletions environment/sg/config/bootstrap-nonssl.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
"enabled": true,
"log_level": "info",
"log_keys": ["*"]
},
"log_file_path": "/opt/sg/log",
"debug": {
"enabled": true,
"rotation": {
"max_size": 512,
"rotated_logs_size_limit": 1024
}
}
}
}
8 changes: 8 additions & 0 deletions environment/sg/config/bootstrap.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@
"enabled": true,
"log_level": "info",
"log_keys": ["*"]
},
"log_file_path": "/opt/sg/log",
"debug": {
"enabled": true,
"rotation": {
"max_size": 512,
"rotated_logs_size_limit": 1024
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public static Task NewSessionHandler(int version, JsonDocument body, HttpListene
}

Log.Logger = new LoggerConfiguration()
.MinimumLevel.Verbose()
.WriteTo.Logger(Original)
.WriteTo.LogSlurp(newSessionBody.logging.url, newSessionBody.id, newSessionBody.logging.tag)
.CreateLogger();
Expand Down
2 changes: 1 addition & 1 deletion servers/dotnet/testserver/MauiProgram.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public static MauiApp CreateMauiApp()

LogFilePath = $"{Path.GetTempFileName()}.txt";
var logConfig = new LoggerConfiguration()
.MinimumLevel.Debug()
.MinimumLevel.Verbose()
.WriteTo.File(LogFilePath)
.WriteTo.Console(restrictedToMinimumLevel: LogEventLevel.Warning);

Expand Down
2 changes: 2 additions & 0 deletions servers/dotnet/testserver/testserver.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
<UseInterpreter Condition="$(TargetFramework.Contains('-ios'))">true</UseInterpreter>
<RuntimeIdentifiers Condition="$([MSBuild]::GetTargetPlatformIdentifier('$(TargetFramework)')) == 'android'">android-arm64;android-arm</RuntimeIdentifiers>
<RuntimeIdentifier Condition="$([MSBuild]::GetTargetPlatformIdentifier('$(TargetFramework)')) == 'maccatalyst'">maccatalyst-x64</RuntimeIdentifier>

<WindowsAppSDKSelfContained>true</WindowsAppSDKSelfContained>
</PropertyGroup>

<ItemGroup>
Expand Down
7 changes: 5 additions & 2 deletions tests/test_basic_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from cbltest.api.replicator_types import ReplicatorBasicAuthenticator, ReplicatorDocumentFlags
from cbltest.api.syncgateway import DocumentUpdateEntry
from cbltest.api.test_functions import compare_local_and_remote
from cbltest.utils import assert_not_null


class TestBasicReplication(CBLTestClass):
Expand Down Expand Up @@ -325,7 +326,8 @@ async def test_continuous_pull(self, cblpytest: CBLPyTest, dataset_path: Path):
hotels_all_docs = await cblpytest.sync_gateways[0].get_all_documents("travel", "travel", "hotels")
for doc in hotels_all_docs.rows:
if doc.id == "hotel_400" or doc.id == "hotel_500":
await cblpytest.sync_gateways[0].delete_document(doc.id, doc.revision, "travel", "travel", "hotels")
revid = assert_not_null(doc.revid, f"Missing revid on {doc.id}")
await cblpytest.sync_gateways[0].delete_document(doc.id, revid, "travel", "travel", "hotels")

self.mark_test_step("Wait until receiving all document replication events")
await replicator.wait_for_all_doc_events({
Expand Down Expand Up @@ -459,7 +461,8 @@ async def test_continuous_push_and_pull(self, cblpytest: CBLPyTest, dataset_path
hotels_all_docs = await cblpytest.sync_gateways[0].get_all_documents("travel", "travel", "hotels")
for doc in hotels_all_docs.rows:
if doc.id == "hotel_400" or doc.id == "hotel_500":
await cblpytest.sync_gateways[0].delete_document(doc.id, doc.revision, "travel", "travel", "hotels")
revid = assert_not_null(doc.revid, f"Missing revid on {doc.id}")
await cblpytest.sync_gateways[0].delete_document(doc.id, revid, "travel", "travel", "hotels")

self.mark_test_step("Wait until receiving all document replication events")
await replicator.wait_for_all_doc_events({
Expand Down
4 changes: 3 additions & 1 deletion tests/test_replication_behavior.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pathlib import Path
from cbltest import CBLPyTest
from cbltest.utils import assert_not_null
from cbltest.api.cloud import CouchbaseCloud
from cbltest.api.replicator import Replicator
from cbltest.api.replicator_types import ReplicatorCollectionEntry, ReplicatorType, \
Expand All @@ -19,7 +20,8 @@ async def test_pull_empty_database_active_only(self, cblpytest: CBLPyTest, datas
for row in all_docs.rows:
name_number = int(row.id[-3:])
if name_number <= 150:
await cblpytest.sync_gateways[0].delete_document(row.id, row.revision, "names")
revid = assert_not_null(row.revid, f"Missing revid on {row.id}")
await cblpytest.sync_gateways[0].delete_document(row.id, revid, "names")

self.mark_test_step("Reset local database, and load `empty` dataset")
dbs = await cblpytest.test_servers[0].create_and_reset_db(["db1"])
Expand Down
Loading
Loading