Skip to content

Commit 0f35bdd

Browse files
authored
Migrating SGW tests related to XATTRS (#294)
* Added a new test. Wrote a function for doc deletion in CBS. Added a delete database similar to put since the older functionality was flaky and a failed test-mid-run was still running into errors in reruns. * Added another new test. Everything works fine, ran it standalone and amongst all the others too. * A 3rd test related to xattrs * fixed mypy errors * addressed Ritesh's comments, using public APIs now for tests focusing on SG Client based verification
1 parent 1948718 commit 0f35bdd

File tree

4 files changed

+934
-22
lines changed

4 files changed

+934
-22
lines changed

client/src/cbltest/api/couchbaseserver.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from couchbase.management.collections import CollectionSpec
2424
from couchbase.management.options import CreatePrimaryQueryIndexOptions
2525
from couchbase.options import ClusterOptions
26+
from couchbase.subdocument import upsert
2627
from opentelemetry.trace import get_tracer
2728

2829
from cbltest.api.error import CblTestError
@@ -292,6 +293,36 @@ def upsert_document(
292293
f"Failed to insert document '{doc_id}' into {bucket}.{scope}.{collection}: {e}"
293294
)
294295

296+
def delete_document(
297+
self,
298+
bucket: str,
299+
doc_id: str,
300+
scope: str = "_default",
301+
collection: str = "_default",
302+
) -> None:
303+
"""
304+
Deletes a document from the specified bucket.scope.collection.
305+
"""
306+
with self.__tracer.start_as_current_span(
307+
"delete_document",
308+
attributes={
309+
"cbl.bucket.name": bucket,
310+
"cbl.scope.name": scope,
311+
"cbl.collection.name": collection,
312+
"cbl.document.id": doc_id,
313+
},
314+
):
315+
try:
316+
bucket_obj = _try_n_times(10, 1, False, self.__cluster.bucket, bucket)
317+
coll = bucket_obj.scope(scope).collection(collection)
318+
coll.remove(doc_id)
319+
except DocumentNotFoundException:
320+
pass
321+
except Exception as e:
322+
raise CblTestError(
323+
f"Failed to delete document '{doc_id}' from {bucket}.{scope}.{collection}: {e}"
324+
)
325+
295326
def get_document(
296327
self,
297328
bucket: str,
@@ -329,6 +360,84 @@ def get_document(
329360
f"Failed to get document '{doc_id}' from {bucket}.{scope}.{collection}: {e}"
330361
)
331362

363+
def upsert_document_xattr(
364+
self,
365+
bucket: str,
366+
doc_id: str,
367+
xattr_key: str,
368+
xattr_value: str,
369+
scope: str = "_default",
370+
collection: str = "_default",
371+
) -> None:
372+
"""
373+
Upserts an xattr on a document using subdocument operations
374+
375+
:param bucket: The bucket containing the document
376+
:param doc_id: The ID of the document to update
377+
:param xattr_key: The xattr key to upsert
378+
:param xattr_value: The value to set for the xattr
379+
:param scope: The scope containing the document (default '_default')
380+
:param collection: The collection containing the document (default '_default')
381+
"""
382+
with self.__tracer.start_as_current_span(
383+
"upsert_document_xattr",
384+
attributes={
385+
"cbl.bucket": bucket,
386+
"cbl.scope": scope,
387+
"cbl.collection": collection,
388+
"cbl.document.id": doc_id,
389+
"cbl.xattr.key": xattr_key,
390+
},
391+
):
392+
try:
393+
col = self.__cluster.bucket(bucket).scope(scope).collection(collection)
394+
col.mutate_in(
395+
doc_id,
396+
[upsert(xattr_key, xattr_value, xattr=True, create_parents=True)],
397+
)
398+
except Exception as e:
399+
raise CblTestError(
400+
f"Failed to upsert xattr '{xattr_key}' on document '{doc_id}' in {bucket}.{scope}.{collection}: {e}"
401+
)
402+
403+
def delete_document_xattr(
404+
self,
405+
bucket: str,
406+
doc_id: str,
407+
xattr_key: str,
408+
scope: str = "_default",
409+
collection: str = "_default",
410+
) -> None:
411+
"""
412+
Deletes an xattr from a document using subdocument operations
413+
414+
:param bucket: The bucket containing the document
415+
:param doc_id: The ID of the document
416+
:param xattr_key: The xattr key to delete
417+
:param scope: The scope containing the document (default '_default')
418+
:param collection: The collection containing the document (default '_default')
419+
"""
420+
with self.__tracer.start_as_current_span(
421+
"delete_document_xattr",
422+
attributes={
423+
"cbl.bucket": bucket,
424+
"cbl.scope": scope,
425+
"cbl.collection": collection,
426+
"cbl.document.id": doc_id,
427+
"cbl.xattr.key": xattr_key,
428+
},
429+
):
430+
try:
431+
from couchbase.subdocument import remove
432+
433+
col = self.__cluster.bucket(bucket).scope(scope).collection(collection)
434+
col.mutate_in(
435+
doc_id,
436+
[remove(xattr_key, xattr=True)],
437+
)
438+
except Exception:
439+
pass
440+
332441
def start_xdcr(self, target: "CouchbaseServer", bucket_name: str) -> None:
333442
"""
334443
Starts an XDCR replication from this cluster to the target cluster

client/src/cbltest/api/syncgateway.py

Lines changed: 157 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -425,11 +425,74 @@ def __init__(
425425
self.__tracer = get_tracer(__name__, VERSION)
426426
self.__secure: bool = secure
427427
self.__hostname: str = url
428+
self.__port: int = port
428429
self.__admin_port: int = admin_port
429430
self.__admin_session: ClientSession = self._create_session(
430431
secure, scheme, url, admin_port, BasicAuth(username, password, "ascii")
431432
)
432433

434+
@property
435+
def hostname(self) -> str:
436+
"""Gets the hostname of the Sync Gateway instance"""
437+
return self.__hostname
438+
439+
@property
440+
def port(self) -> int:
441+
"""Gets the public port of the Sync Gateway instance"""
442+
return self.__port
443+
444+
@property
445+
def admin_port(self) -> int:
446+
"""Gets the admin port of the Sync Gateway instance"""
447+
return self.__admin_port
448+
449+
@property
450+
def secure(self) -> bool:
451+
"""Gets whether the Sync Gateway instance uses TLS"""
452+
return self.__secure
453+
454+
@classmethod
455+
async def create_user_client(
456+
cls,
457+
admin_sg: "SyncGateway",
458+
db_name: str,
459+
username: str,
460+
password: str,
461+
channels: list[str],
462+
) -> "SyncGateway":
463+
"""
464+
Helper method to create a user with channel access and return a user-specific SG client.
465+
466+
This is a convenience method for tests that need to verify user-level access control.
467+
468+
:param admin_sg: The admin SyncGateway instance
469+
:param db_name: The database name
470+
:param username: The username to create
471+
:param password: The password for the user
472+
:param channels: List of channels the user should have access to
473+
:return: A SyncGateway instance authenticated as the user for public API access
474+
"""
475+
# Clean up user if exists from previous run
476+
await admin_sg.delete_user(db_name, username)
477+
478+
# Create user with channel access
479+
await admin_sg.add_user(
480+
db_name,
481+
username,
482+
password=password,
483+
collection_access={"_default": {"_default": {"admin_channels": channels}}},
484+
)
485+
486+
# Return user-specific SG client for public API access
487+
return cls(
488+
admin_sg.hostname,
489+
username,
490+
password,
491+
port=admin_sg.port,
492+
admin_port=admin_sg.admin_port,
493+
secure=admin_sg.secure,
494+
)
495+
433496
def _create_session(
434497
self, secure: bool, scheme: str, url: str, port: int, auth: BasicAuth | None
435498
) -> ClientSession:
@@ -583,31 +646,39 @@ async def database_exists(self, db_name: str) -> bool:
583646
return False
584647
raise
585648

586-
async def delete_database(self, db_name: str) -> None:
587-
"""
588-
Deletes a database from Sync Gateway's configuration.
589-
590-
.. warning:: This will not delete the data from the Couchbase Server bucket.
591-
To delete the data see the
592-
:func:`drop_bucket()<cbltest.api.couchbaseserver.CouchbaseServer.drop_bucket>` function
593-
594-
:param db_name: The name of the Database to delete
595-
"""
649+
async def _delete_database(self, db_name: str, retry_count: int = 0) -> None:
596650
with self.__tracer.start_as_current_span(
597651
"delete_database", attributes={"cbl.database.name": db_name}
598-
):
652+
) as current_span:
599653
try:
600654
await self._send_request("delete", f"/{db_name}")
601655
except CblSyncGatewayBadResponseError as e:
602-
if e.code == 500:
656+
if e.code == 500 and retry_count < 3:
603657
cbl_warning(
604-
f"SGW returned 500 when deleting {db_name}, database may be in transitional state. Ignoring."
658+
f"Sync gateway returned 500 from DELETE database call, retrying ({retry_count + 1})..."
605659
)
606-
elif e.code == 404:
660+
current_span.add_event("SGW returned 500, retry")
661+
import asyncio
662+
663+
await asyncio.sleep(2)
664+
await self._delete_database(db_name, retry_count + 1)
665+
elif e.code == 403:
607666
pass
608667
else:
609668
raise
610669

670+
async def delete_database(self, db_name: str) -> None:
671+
"""
672+
Deletes a database from Sync Gateway's configuration.
673+
674+
.. warning:: This will not delete the data from the Couchbase Server bucket.
675+
To delete the data see the
676+
:func:`drop_bucket()<cbltest.api.couchbaseserver.CouchbaseServer.drop_bucket>` function
677+
678+
:param db_name: The name of the Database to delete
679+
"""
680+
await self._delete_database(db_name, 0)
681+
611682
def create_collection_access_dict(self, input: dict[str, list[str]]) -> dict:
612683
"""
613684
Creates a collection access dictionary in the format that Sync Gateway expects,
@@ -689,6 +760,25 @@ async def add_user(
689760
"put", f"/{db_name}/_user/{name}", JSONDictionary(body)
690761
)
691762

763+
async def delete_user(self, db_name: str, name: str) -> None:
764+
"""
765+
Deletes a user from a Sync Gateway database
766+
767+
:param db_name: The name of the Database
768+
:param name: The username to delete
769+
"""
770+
with self.__tracer.start_as_current_span(
771+
"delete_user", attributes={"cbl.user.name": name}
772+
):
773+
try:
774+
await self._send_request("delete", f"/{db_name}/_user/{name}")
775+
except CblSyncGatewayBadResponseError as e:
776+
if e.code == 404:
777+
# User doesn't exist, that's fine
778+
pass
779+
else:
780+
raise
781+
692782
async def add_role(self, db_name: str, role: str, collection_access: dict) -> None:
693783
"""
694784
Adds the specified role to a Sync Gateway database with the specified collection access
@@ -785,14 +875,19 @@ async def load_dataset(self, db_name: str, path: Path) -> None:
785875
self._analyze_dataset_response(cast(list, resp))
786876

787877
async def get_all_documents(
788-
self, db_name: str, scope: str = "_default", collection: str = "_default"
878+
self,
879+
db_name: str,
880+
scope: str = "_default",
881+
collection: str = "_default",
882+
use_public_api: bool = False,
789883
) -> AllDocumentsResponse:
790884
"""
791885
Gets all the documents in the given collection from Sync Gateway (id and revid)
792886
793887
:param db_name: The name of the Sync Gateway database to query
794888
:param scope: The scope to use when querying Sync Gateway
795889
:param collection: The collection to use when querying Sync Gateway
890+
:param use_public_api: If True, uses public port (4984) with user auth instead of admin port (4985)
796891
"""
797892
with self.__tracer.start_as_current_span(
798893
"get_all_documents",
@@ -802,9 +897,28 @@ async def get_all_documents(
802897
"cbl.collection.name": collection,
803898
},
804899
):
805-
resp = await self._send_request(
806-
"get", f"/{db_name}.{scope}.{collection}/_all_docs"
807-
)
900+
if use_public_api:
901+
# Use public port (4984) - required for regular user access
902+
scheme = "https://" if self.__secure else "http://"
903+
# Create session with user's credentials on public port
904+
async with self._create_session(
905+
self.__secure,
906+
scheme,
907+
self.__hostname,
908+
4984,
909+
self.__admin_session.auth,
910+
) as session:
911+
resp = await self._send_request(
912+
"get",
913+
f"/{db_name}.{scope}.{collection}/_all_docs",
914+
session=session,
915+
)
916+
else:
917+
# Use admin port (4985) - default behavior
918+
resp = await self._send_request(
919+
"get", f"/{db_name}.{scope}.{collection}/_all_docs"
920+
)
921+
808922
assert isinstance(resp, dict)
809923
return AllDocumentsResponse(cast(dict, resp))
810924

@@ -814,6 +928,7 @@ async def get_changes(
814928
scope: str = "_default",
815929
collection: str = "_default",
816930
version_type: str = "rev",
931+
use_public_api: bool = False,
817932
) -> ChangesResponse:
818933
"""
819934
Gets the changes feed from Sync Gateway, including deleted documents
@@ -822,6 +937,7 @@ async def get_changes(
822937
:param scope: The scope to use when querying Sync Gateway
823938
:param collection: The collection to use when querying Sync Gateway
824939
:param version_type: The version type to use ('rev' for revision IDs, 'cv' for version vectors in SGW 4.0+)
940+
:param use_public_api: If True, uses public port (4984) with user auth instead of admin port (4985)
825941
"""
826942
with self.__tracer.start_as_current_span(
827943
"get_changes",
@@ -832,9 +948,29 @@ async def get_changes(
832948
},
833949
):
834950
query_params = f"version_type={version_type}"
835-
resp = await self._send_request(
836-
"get", f"/{db_name}.{scope}.{collection}/_changes?{query_params}"
837-
)
951+
952+
if use_public_api:
953+
# Use public port (4984) - required for regular user access
954+
scheme = "https://" if self.__secure else "http://"
955+
# Create session with user's credentials on public port
956+
async with self._create_session(
957+
self.__secure,
958+
scheme,
959+
self.__hostname,
960+
4984,
961+
self.__admin_session.auth,
962+
) as session:
963+
resp = await self._send_request(
964+
"get",
965+
f"/{db_name}.{scope}.{collection}/_changes?{query_params}",
966+
session=session,
967+
)
968+
else:
969+
# Use admin port (4985) - default behavior
970+
resp = await self._send_request(
971+
"get", f"/{db_name}.{scope}.{collection}/_changes?{query_params}"
972+
)
973+
838974
assert isinstance(resp, dict)
839975
return ChangesResponse(cast(dict, resp))
840976

0 commit comments

Comments
 (0)