1414from cbltest .assertions import _assert_not_null
1515from cbltest .httplog import get_next_writer
1616from cbltest .jsonhelper import _get_typed_required
17- from cbltest .logging import cbl_warning
17+ from cbltest .logging import cbl_warning , cbl_info
1818from cbltest .version import VERSION
19+ from cbltest .utils import assert_not_null
20+
21+ from deprecated import deprecated
1922
2023
2124class _CollectionMap (JSONSerializable ):
@@ -172,12 +175,38 @@ class DocumentUpdateEntry(JSONSerializable):
172175 For creating a new document, set revid to None.
173176 """
174177
178+ @property
179+ @deprecated ("Only should be used until 4.0 SGW gets close to GA" )
180+ def id (self ) -> str :
181+ """
182+ Gets the ID of the entry (NOTE: Will go away after 4.0 SGW gets close to GA)
183+ """
184+ return cast (str , self .__body ["_id" ])
185+
186+ @property
187+ @deprecated ("Only should be used until 4.0 SGW gets close to GA" )
188+ def rev (self ) -> Optional [str ]:
189+ """
190+ Gets the rev ID of the entry (NOTE: Will go away after 4.0 SGW gets close to GA)
191+ """
192+ if not "_rev" in self .__body :
193+ return None
194+
195+ return cast (str , self .__body ["_rev" ])
196+
175197 def __init__ (self , id : str , revid : Optional [str ], body : dict ):
176198 self .__body = body .copy ()
177199 self .__body ["_id" ] = id
178200 if revid :
179201 self .__body ["_rev" ] = revid
180202
203+ @deprecated ("Only should be used until 4.0 SGW gets close to GA" )
204+ def swap_rev (self , revid : str ) -> None :
205+ """
206+ Changes the revid to the provided one (NOTE: Will go away after 4.0 SGW gets close to GA)
207+ """
208+ self .__body ["_rev" ] = revid
209+
181210 def to_json (self ) -> Any :
182211 return self .__body
183212
@@ -193,29 +222,47 @@ def id(self) -> str:
193222 return self .__id
194223
195224 @property
196- def revid (self ) -> str :
225+ def revid (self ) -> Optional [ str ] :
197226 """Gets the revision ID of the document"""
198227 return self .__rev
228+
229+ @property
230+ def cv (self ) -> Optional [str ]:
231+ """Gets the CV of the document"""
232+ return self .__cv
199233
200234 @property
201235 def body (self ) -> dict :
202236 """Gets the body of the document"""
203237 return self .__body
238+
239+ @property
240+ def revision (self ) -> str :
241+ """Gets either the CV (preferred) or revid of the document"""
242+ if self .__cv is not None :
243+ return self .__cv
244+
245+ assert self .__rev is not None
246+ return self .__rev
204247
205248 def __init__ (self , body : dict ) -> None :
206249 if "error" in body :
207250 raise ValueError ("Trying to create remote document from error response" )
208251
209252 self .__body = body .copy ()
210253 self .__id = cast (str , body ["_id" ])
211- self .__rev = cast (str , body ["_rev" ])
254+ self .__rev = cast (str , body ["_rev" ]) if "_rev" in body else None
255+ self .__cv = cast (str , body ["_cv" ]) if "_cv" in body else None
212256 del self .__body ["_id" ]
213257 del self .__body ["_rev" ]
258+ if self .__cv is not None :
259+ del self .__body ["_cv" ]
214260
215261 def to_json (self ) -> Any :
216262 ret_val = self .__body .copy ()
217263 ret_val ["_id" ] = self .__id
218264 ret_val ["_rev" ] = self .__rev
265+ ret_val ["_cv" ] = self .__cv
219266 return ret_val
220267
221268
@@ -339,6 +386,19 @@ def replication_url(self, db_name: str):
339386 """
340387 _assert_not_null (db_name , nameof (db_name ))
341388 return urljoin (self .__replication_url , db_name )
389+
390+ async def _put_database (self , db_name : str , payload : PutDatabasePayload , retry_count : int = 0 ) -> None :
391+ with self .__tracer .start_as_current_span ("put_database" ,
392+ attributes = {"cbl.database.name" : db_name }) as current_span :
393+ try :
394+ await self ._send_request ("put" , f"/{ db_name } /" , payload )
395+ except CblSyncGatewayBadResponseError as e :
396+ if e .code == 500 and retry_count < 10 :
397+ cbl_warning (f"Sync gateway returned 500 from PUT database call, retrying ({ retry_count + 1 } )..." )
398+ current_span .add_event ("SGW returned 500, retry" )
399+ await self ._put_database (db_name , payload , retry_count + 1 )
400+ else :
401+ raise
342402
343403 async def put_database (self , db_name : str , payload : PutDatabasePayload ) -> None :
344404 """
@@ -347,17 +407,7 @@ async def put_database(self, db_name: str, payload: PutDatabasePayload) -> None:
347407 :param db_name: The name of the DB to create
348408 :param payload: The options for the DB to create
349409 """
350- with self .__tracer .start_as_current_span ("put_database" ,
351- attributes = {"cbl.database.name" : db_name }) as current_span :
352- try :
353- await self ._send_request ("put" , f"/{ db_name } " , payload )
354- except CblSyncGatewayBadResponseError as e :
355- if e .code == 500 :
356- cbl_warning ("Sync gateway returned 500 from PUT database call, retrying..." )
357- current_span .add_event ("SGW returned 500, retry" )
358- await self .put_database (db_name , payload )
359- else :
360- raise
410+ await self ._put_database (db_name , payload , 0 )
361411
362412 async def delete_database (self , db_name : str ) -> None :
363413 """
@@ -370,7 +420,7 @@ async def delete_database(self, db_name: str) -> None:
370420 :param db_name: The name of the Database to delete
371421 """
372422 with self .__tracer .start_as_current_span ("delete_database" , attributes = {"cbl.database.name" : db_name }):
373- await self ._send_request ("delete" , f"/{ db_name } " )
423+ await self ._send_request ("delete" , f"/{ db_name } / " )
374424
375425 def create_collection_access_dict (self , input : Dict [str , List [str ]]) -> dict :
376426 """
@@ -527,6 +577,28 @@ async def get_all_documents(self, db_name: str, scope: str = "_default",
527577 resp = await self ._send_request ("get" , f"/{ db_name } .{ scope } .{ collection } /_all_docs?show_cv=true" )
528578 assert isinstance (resp , dict )
529579 return AllDocumentsResponse (cast (dict , resp ))
580+
581+ @deprecated ("Only should be used until 4.0 SGW gets close to GA" )
582+ async def _rewrite_rev_ids (self , db_name : str , updates : List [DocumentUpdateEntry ],
583+ scope : str , collection : str ) -> None :
584+ all_docs_body = list (u .id for u in updates if u .rev is not None )
585+ all_docs_response = await self ._send_request ("post" , f"/{ db_name } .{ scope } .{ collection } /_all_docs" ,
586+ JSONDictionary ({"keys" : all_docs_body }))
587+
588+ if not isinstance (all_docs_response , dict ):
589+ raise ValueError ("Inappropriate response from sync gateway _all_docs (not JSON dict)" )
590+
591+ rows = cast (dict , all_docs_response )["rows" ]
592+ if not isinstance (rows , list ):
593+ raise ValueError ("Inappropriate response from sync gateway _all_docs (rows not a list)" )
594+
595+ for r in cast (list , rows ):
596+ next_id = r ["id" ]
597+ found = assert_not_null (next ((u for u in updates if u .id == next_id ), None ),
598+ f"Unable to find { next_id } in updates!" )
599+ new_rev_id = r ["value" ]["rev" ]
600+ cbl_info (f"For document { found .id } : Swapping revid from { found .rev } to { new_rev_id } " )
601+ found .swap_rev (new_rev_id )
530602
531603 async def update_documents (self , db_name : str , updates : List [DocumentUpdateEntry ],
532604 scope : str = "_default" , collection : str = "_default" ) -> None :
@@ -541,12 +613,26 @@ async def update_documents(self, db_name: str, updates: List[DocumentUpdateEntry
541613 with self .__tracer .start_as_current_span ("update_documents" , attributes = {"cbl.database.name" : db_name ,
542614 "cbl.scope.name" : scope ,
543615 "cbl.collection.name" : collection }):
616+
617+ await self ._rewrite_rev_ids (db_name , updates , scope , collection )
618+
619+
620+
544621 body = {
545622 "docs" : list (u .to_json () for u in updates )
546623 }
547624
548625 await self ._send_request ("post" , f"/{ db_name } .{ scope } .{ collection } /_bulk_docs" ,
549626 JSONDictionary (body ))
627+
628+ @deprecated ("Only should be used until 4.0 SGW gets close to GA" )
629+ async def _replaced_revid (self , doc_id : str , revid : str , db_name : str , scope : str , collection : str ) -> str :
630+ response = await self ._send_request ("get" , f"/{ db_name } .{ scope } .{ collection } /{ doc_id } ?show_cv=true" )
631+ assert isinstance (response , dict )
632+ response_dict = cast (dict , response )
633+ assert revid == response_dict ["_cv" ] or revid == response_dict ["_rev" ]
634+ return cast (dict , response )["_rev" ]
635+
550636
551637 async def delete_document (self , doc_id : str , revid : str , db_name : str , scope : str = "_default" ,
552638 collection : str = "_default" ) -> None :
@@ -563,8 +649,13 @@ async def delete_document(self, doc_id: str, revid: str, db_name: str, scope: st
563649 "cbl.scope.name" : scope ,
564650 "cbl.collection.name" : collection ,
565651 "cbl.document.id" : doc_id }):
652+ if "@" in revid :
653+ new_rev_id = await self ._replaced_revid (doc_id , revid , db_name , scope , collection )
654+ else :
655+ new_rev_id = revid
656+
566657 await self ._send_request ("delete" , f"/{ db_name } .{ scope } .{ collection } /{ doc_id } " ,
567- params = {"rev" : revid })
658+ params = {"rev" : new_rev_id })
568659
569660 async def purge_document (self , doc_id : str , db_name : str , scope : str = "_default" ,
570661 collection : str = "_default" ) -> None :
@@ -601,7 +692,7 @@ async def get_document(self, db_name: str, doc_id: str, scope: str = "_default",
601692 "cbl.scope.name" : scope ,
602693 "cbl.collection.name" : collection ,
603694 "cbl.document.id" : doc_id }):
604- response = await self ._send_request ("get" , f"/{ db_name } .{ scope } .{ collection } /{ doc_id } " )
695+ response = await self ._send_request ("get" , f"/{ db_name } .{ scope } .{ collection } /{ doc_id } ?show_cv=true " )
605696 if not isinstance (response , dict ):
606697 raise ValueError ("Inappropriate response from sync gateway get /doc (not JSON)" )
607698
0 commit comments