@@ -2,6 +2,7 @@ mod utils;
22
33use couchbase_lite:: * ;
44use std:: path:: Path ;
5+ use std:: sync:: { Arc , Mutex } ;
56use utils:: * ;
67
78#[ allow( deprecated) ]
@@ -72,9 +73,20 @@ fn main() {
7273 let session_token = get_session ( "test_user" ) ;
7374 reporter. log ( & format ! ( "Sync gateway session token: {session_token}\n " ) ) ;
7475
76+ // Track replication events
77+ let repl_events = Arc :: new ( Mutex :: new ( Vec :: < String > :: new ( ) ) ) ;
78+ let repl_events_clone = repl_events. clone ( ) ;
79+
7580 // Setup replicator with auto-purge ENABLED
7681 let mut repl = setup_replicator ( db_cblite. clone ( ) , session_token. clone ( ) )
77- . add_document_listener ( Box :: new ( doc_listener) ) ;
82+ . add_document_listener ( Box :: new ( move |dir, docs| {
83+ let mut events = repl_events_clone. lock ( ) . unwrap ( ) ;
84+ for doc in docs {
85+ let event = format ! ( "{:?}: {} (flags={})" , dir, doc. id, doc. flags) ;
86+ println ! ( " 📡 {}" , event) ;
87+ events. push ( event) ;
88+ }
89+ } ) ) ;
7890
7991 repl. start ( false ) ;
8092 std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 3 ) ) ;
@@ -106,24 +118,24 @@ fn main() {
106118 reporter. log ( "✓ doc1 created and replicated to central\n " ) ;
107119
108120 // STOP replication
109- reporter. log ( "Stopping replication..." ) ;
121+ reporter. log ( "STEP 1b: Stopping replication..." ) ;
110122 repl. stop ( None ) ;
111123 std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 2 ) ) ;
112124 reporter. log ( "✓ Replication stopped\n " ) ;
113125
114126 // STEP 2: Delete doc1 from CENTRAL only (doc remains in cblite)
115- reporter. log ( "STEP 2: Deleting doc1 from CENTRAL only (simulating central deletion) ..." ) ;
127+ reporter. log ( "STEP 2: Deleting doc1 from CENTRAL only..." ) ;
116128 let deletion_success = delete_doc_from_central ( "doc1" ) ;
117129
118130 if !deletion_success {
119- reporter. log ( "⚠ Failed to delete document from central - test may not be valid" ) ;
131+ reporter. log ( "⚠ Failed to delete document from central - test may not be valid\n " ) ;
120132 } else {
121133 std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 3 ) ) ;
122134 reporter. log ( "✓ doc1 deleted from central (tombstone created in central)\n " ) ;
123135 }
124136
125137 // Verify doc still exists in cblite
126- reporter. log ( "Verifying doc1 still exists in local cblite..." ) ;
138+ reporter. log ( "STEP 2b: Verifying doc1 still exists in local cblite..." ) ;
127139 if get_doc ( & db_cblite, "doc1" ) . is_ok ( ) {
128140 reporter. log ( "✓ doc1 still present in cblite (as expected)\n " ) ;
129141 } else {
@@ -140,10 +152,10 @@ fn main() {
140152 ] ,
141153 ) ;
142154
143- // STEP 3-7 : Wait for purge interval + compact
155+ // STEP 3: Wait for purge interval + compact
144156 reporter. log ( "STEP 3: Waiting 65 minutes for central tombstone to be eligible for purge..." ) ;
145- reporter. log ( "This allows the document's updatedAt to become > 1 hour old." ) ;
146- reporter. log ( "Progress updates every 5 minutes:\n " ) ;
157+ reporter. log ( " This allows the document's updatedAt to become > 1 hour old." ) ;
158+ reporter. log ( " Progress updates every 5 minutes:\n " ) ;
147159
148160 let start_time = std:: time:: Instant :: now ( ) ;
149161 for minute in 1 ..=65 {
@@ -162,22 +174,35 @@ fn main() {
162174 }
163175 reporter. log ( "✓ Wait complete (65 minutes elapsed)\n " ) ;
164176
165- // Compact CBS and SGW
177+ // STEP 4: Compact CBS
166178 reporter. log ( "STEP 4: Compacting CBS bucket..." ) ;
167179 compact_cbs_bucket ( ) ;
168180 std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 5 ) ) ;
169181 reporter. log ( "✓ CBS compaction triggered\n " ) ;
170182
183+ // STEP 5: Compact SGW
171184 reporter. log ( "STEP 5: Compacting SGW database..." ) ;
172185 compact_sgw_database ( ) ;
173186 std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 5 ) ) ;
174187 reporter. log ( "✓ SGW compaction complete\n " ) ;
175188
176- // STEP 8 : Verify tombstone purged from central
189+ // STEP 6 : Verify tombstone purged from central
177190 reporter. log ( "STEP 6: Checking if central tombstone was purged..." ) ;
178- check_doc_in_cbs ( "doc1" ) ;
179191 let state6 = get_sync_xattr ( "doc1" ) ;
180192 let purged = state6. is_none ( ) || state6. as_ref ( ) . and_then ( |s| s. get ( "flags" ) ) . is_none ( ) ;
193+
194+ if purged {
195+ reporter. log ( " ✓ Central tombstone successfully purged (xattr absent)\n " ) ;
196+ } else {
197+ if let Some ( ref xattr) = state6 {
198+ let flags = xattr. get ( "flags" ) . and_then ( |f| f. as_i64 ( ) ) . unwrap_or ( 0 ) ;
199+ reporter. log ( & format ! (
200+ " ⚠ Central tombstone still present (flags: {})\n " ,
201+ flags
202+ ) ) ;
203+ }
204+ }
205+
181206 reporter. checkpoint (
182207 "STEP_6_TOMBSTONE_CHECK" ,
183208 state6,
@@ -187,47 +212,104 @@ fn main() {
187212 vec ! [ "Central tombstone still present (unexpected)" . to_string( ) ]
188213 } ,
189214 ) ;
190- reporter. log ( "" ) ;
191215
192- // STEP 9: Restart replication with RESET CHECKPOINT
193- reporter. log ( "STEP 7: Restarting replication with RESET CHECKPOINT..." ) ;
216+ // STEP 7: Prepare for replication reset - Touch document to force push
217+ reporter. log ( "STEP 7: Preparing document for replication reset..." ) ;
218+ reporter. log ( " Touching doc1 to ensure it will be pushed during reset checkpoint..." ) ;
219+
220+ // Modify document slightly to trigger a change
221+ {
222+ let mut doc = get_doc ( & db_cblite, "doc1" ) . unwrap ( ) ;
223+ let mut props = doc. mutable_properties ( ) ;
224+ props. at ( "_resurrection_test" ) . put_bool ( true ) ;
225+ db_cblite. save_document ( & mut doc) . unwrap ( ) ;
226+ reporter. log ( " ✓ Document modified to trigger replication\n " ) ;
227+ }
228+
229+ // STEP 8: Restart replication with RESET CHECKPOINT
230+ reporter. log ( "STEP 8: Restarting replication with RESET CHECKPOINT..." ) ;
194231 reporter. log ( " This simulates a fresh sync where cblite will push doc1 back to central." ) ;
195232 reporter. log ( & format ! (
196233 " doc1's updatedAt ({}) is now > 1 hour old" ,
197234 doc_created_at. to_rfc3339( )
198235 ) ) ;
199236 reporter. log ( " Sync function should route it to 'soft_deleted' channel.\n " ) ;
200237
238+ // Clear previous replication events
239+ {
240+ let mut events = repl_events. lock ( ) . unwrap ( ) ;
241+ events. clear ( ) ;
242+ }
243+
201244 // Recreate replicator with reset flag
202- let mut repl_reset = setup_replicator ( db_cblite. clone ( ) , session_token)
203- . add_document_listener ( Box :: new ( doc_listener) ) ;
245+ let repl_events_clone2 = repl_events. clone ( ) ;
246+ let mut repl_reset = setup_replicator ( db_cblite. clone ( ) , session_token) . add_document_listener (
247+ Box :: new ( move |dir, docs| {
248+ let mut events = repl_events_clone2. lock ( ) . unwrap ( ) ;
249+ for doc in docs {
250+ let event = format ! ( "{:?}: {} (flags={})" , dir, doc. id, doc. flags) ;
251+ println ! ( " 📡 {}" , event) ;
252+ events. push ( event) ;
253+ }
254+ } ) ,
255+ ) ;
204256
205257 repl_reset. start ( true ) ; // true = reset checkpoint
206- std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 10 ) ) ;
207258
259+ // Wait longer for replication to complete
260+ reporter. log ( " Waiting 30 seconds for replication to process..." ) ;
261+ std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 30 ) ) ;
208262 reporter. log ( "✓ Replication restarted with reset checkpoint\n " ) ;
209263
210- // STEP 10: Verify auto-purge in cblite (non-blocking)
211- reporter. log ( "STEP 8: Checking if doc1 was auto-purged from cblite..." ) ;
264+ // Log replication events
265+ {
266+ let events = repl_events. lock ( ) . unwrap ( ) ;
267+ if !events. is_empty ( ) {
268+ reporter. log ( & format ! (
269+ " Replication events captured: {} events" ,
270+ events. len( )
271+ ) ) ;
272+ for event in events. iter ( ) {
273+ reporter. log ( & format ! ( " - {}" , event) ) ;
274+ }
275+ reporter. log ( "" ) ;
276+ } else {
277+ reporter
278+ . log ( " ⚠ No replication events captured (document may not have been pushed)\n " ) ;
279+ }
280+ }
281+
282+ // STEP 9: Verify auto-purge in cblite (non-blocking)
283+ reporter. log ( "STEP 9: Checking if doc1 was auto-purged from cblite..." ) ;
212284 reporter. log ( " doc1 should be auto-purged because it was routed to 'soft_deleted' channel" ) ;
213285 reporter. log ( " (user only has access to 'channel1')\n " ) ;
214286
215287 std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 5 ) ) ;
216288
217289 match get_doc ( & db_cblite, "doc1" ) {
218290 Ok ( _) => {
219- reporter. log ( "⚠ doc1 STILL IN cblite (auto-purge may not have triggered yet)" ) ;
291+ reporter. log ( " ⚠ doc1 STILL IN cblite (auto-purge may not have triggered yet)" ) ;
220292 reporter. log ( " This is not blocking - continuing test...\n " ) ;
221293 }
222294 Err ( _) => {
223- reporter. log ( "✓ doc1 AUTO-PURGED from cblite (as expected)\n " ) ;
295+ reporter. log ( " ✓ doc1 AUTO-PURGED from cblite (as expected)\n " ) ;
224296 }
225297 }
226298
227- // Check if doc exists in central with soft_deleted routing
228- reporter. log ( "STEP 9: Checking if doc1 exists in central..." ) ;
299+ // STEP 10: Check if doc exists in central with soft_deleted routing
300+ reporter. log ( "STEP 10: Checking if doc1 exists in central..." ) ;
301+ reporter. log ( " Querying SGW admin API..." ) ;
229302 let doc_in_central = check_doc_exists_in_central ( "doc1" ) ;
230303
304+ if doc_in_central {
305+ reporter. log ( " ✓ Document found in central (resurrection successful)" ) ;
306+ } else {
307+ reporter. log ( " ⚠ Document NOT found in central" ) ;
308+ reporter. log ( " This means the document was not pushed during replication reset" ) ;
309+ reporter. log ( " This is unexpected but continuing test..." ) ;
310+ }
311+ reporter. log ( "" ) ;
312+
231313 let state9 = get_sync_xattr ( "doc1" ) ;
232314 let notes9 = if doc_in_central {
233315 vec ! [
@@ -236,62 +318,69 @@ fn main() {
236318 "TTL set to 5 minutes" . to_string( ) ,
237319 ]
238320 } else {
239- vec ! [ "Document NOT found in central (unexpected at this stage)" . to_string( ) ]
321+ vec ! [
322+ "Document NOT found in central (unexpected at this stage)" . to_string( ) ,
323+ "Document may not have been pushed during replication reset" . to_string( ) ,
324+ ]
240325 } ;
241326 reporter. checkpoint ( "STEP_9_AFTER_RESURRECTION" , state9. clone ( ) , notes9) ;
242327
243- // Check channel routing in xattr
328+ // STEP 9b: Check channel routing in xattr
244329 if let Some ( ref xattr) = state9 {
245330 if let Some ( channels) = xattr. get ( "channels" ) . and_then ( |c| c. as_object ( ) ) {
246- reporter. log ( "\n Channel routing:" ) ;
331+ reporter. log ( " Channel routing in CBS :" ) ;
247332 for ( channel_name, _) in channels {
248333 reporter. log ( & format ! ( " - {}" , channel_name) ) ;
249334 }
250335
251336 if channels. contains_key ( "soft_deleted" ) {
252- reporter. log ( "\n ✓ Document correctly routed to 'soft_deleted' channel" ) ;
337+ reporter. log ( " ✓ Document correctly routed to 'soft_deleted' channel\n " ) ;
253338 } else {
254- reporter. log ( "\n ⚠ Document NOT in 'soft_deleted' channel (unexpected)" ) ;
339+ reporter. log ( " ⚠ Document NOT in 'soft_deleted' channel (unexpected)\n " ) ;
255340 }
256341 }
342+ } else if doc_in_central {
343+ reporter. log ( " ⚠ Could not retrieve _sync xattr to verify channel routing\n " ) ;
257344 }
258- reporter. log ( "" ) ;
259345
260- // STEP 11-12 : Wait for TTL expiry (5 minutes) + compact
261- reporter. log ( "STEP 10 : Waiting 6 minutes for TTL expiry (5 min TTL + margin)..." ) ;
346+ // STEP 11: Wait for TTL expiry (5 minutes) + compact
347+ reporter. log ( "STEP 11 : Waiting 6 minutes for TTL expiry (5 min TTL + margin)..." ) ;
262348 for minute in 1 ..=6 {
263349 reporter. log ( & format ! ( " [{minute}/6] Waiting..." ) ) ;
264350 std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 60 ) ) ;
265351 }
266352 reporter. log ( "✓ Wait complete\n " ) ;
267353
268- reporter. log ( "STEP 11: Compacting CBS bucket (to trigger TTL purge)..." ) ;
354+ // STEP 12: Compact CBS
355+ reporter. log ( "STEP 12: Compacting CBS bucket (to trigger TTL purge)..." ) ;
269356 compact_cbs_bucket ( ) ;
270357 std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 5 ) ) ;
271358 reporter. log ( "✓ CBS compaction triggered\n " ) ;
272359
273- reporter. log ( "STEP 12: Compacting SGW database..." ) ;
360+ // STEP 13: Compact SGW
361+ reporter. log ( "STEP 13: Compacting SGW database..." ) ;
274362 compact_sgw_database ( ) ;
275363 std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 5 ) ) ;
276364 reporter. log ( "✓ SGW compaction complete\n " ) ;
277365
278- // STEP 13: Verify doc purged from central (TTL expired)
279- reporter. log ( "STEP 13: Checking if doc1 was purged from central (TTL expired)..." ) ;
366+ // STEP 14: Verify doc purged from central (TTL expired)
367+ reporter. log ( "STEP 14: Checking if doc1 was purged from central (TTL expired)..." ) ;
368+ reporter. log ( " Querying SGW admin API..." ) ;
280369 let still_in_central = check_doc_exists_in_central ( "doc1" ) ;
281370
282- let state13 = get_sync_xattr ( "doc1" ) ;
283- let notes13 = if still_in_central {
371+ if !still_in_central {
372+ reporter. log ( " ✓ doc1 PURGED from central (TTL expiry successful)\n " ) ;
373+ } else {
374+ reporter. log ( " ⚠ doc1 STILL in central (TTL purge may need more time)\n " ) ;
375+ }
376+
377+ let state14 = get_sync_xattr ( "doc1" ) ;
378+ let notes14 = if still_in_central {
284379 vec ! [ "Document STILL in central (TTL may not have expired yet)" . to_string( ) ]
285380 } else {
286381 vec ! [ "Document successfully purged from central after TTL expiry" . to_string( ) ]
287382 } ;
288- reporter. checkpoint ( "STEP_13_AFTER_TTL_PURGE" , state13, notes13) ;
289-
290- if !still_in_central {
291- reporter. log ( "✓ doc1 PURGED from central (TTL expiry successful)\n " ) ;
292- } else {
293- reporter. log ( "⚠ doc1 STILL in central (TTL purge may need more time)\n " ) ;
294- }
383+ reporter. checkpoint ( "STEP_14_AFTER_TTL_PURGE" , state14, notes14) ;
295384
296385 repl_reset. stop ( None ) ;
297386
@@ -301,6 +390,12 @@ fn main() {
301390 start_time. elapsed( ) . as_secs( ) / 60
302391 ) ) ;
303392
393+ // Log final replication events summary
394+ {
395+ let events = repl_events. lock ( ) . unwrap ( ) ;
396+ reporter. log ( & format ! ( "\n Total replication events: {}" , events. len( ) ) ) ;
397+ }
398+
304399 reporter. log ( "\n === SUMMARY ===" ) ;
305400 reporter. log ( "✓ Document resurrection scenario tested" ) ;
306401 reporter. log ( "✓ Sync function soft_delete logic validated" ) ;
@@ -331,11 +426,6 @@ fn create_doc_with_updated_at(
331426 )
332427 . unwrap ( ) ;
333428 db_cblite. save_document ( & mut doc) . unwrap ( ) ;
334-
335- println ! (
336- " Created doc {id} with updatedAt: {}" ,
337- updated_at. to_rfc3339( )
338- ) ;
339429}
340430
341431#[ allow( deprecated) ]
@@ -373,19 +463,3 @@ fn setup_replicator(db_cblite: Database, session_token: String) -> Replicator {
373463 let repl_context = ReplicationConfigurationContext :: default ( ) ;
374464 Replicator :: new ( repl_conf, Box :: new ( repl_context) ) . unwrap ( )
375465}
376-
377- fn doc_listener ( direction : Direction , documents : Vec < ReplicatedDocument > ) {
378- println ! ( "=== Document(s) replicated ===" ) ;
379- println ! ( "Direction: {direction:?}" ) ;
380- for document in documents {
381- println ! ( "Document: {document:?}" ) ;
382- if document. flags == 1 {
383- println ! ( " ⚠ flags=1 - Document recognized as deleted/tombstone" ) ;
384- } else if document. flags == 0 {
385- println ! ( " ✓ flags=0 - Document treated as new" ) ;
386- } else if document. flags == 2 {
387- println ! ( " 🗑️ flags=2 - Document auto-purged (AccessRemoved)" ) ;
388- }
389- }
390- println ! ( "===\n " ) ;
391- }
0 commit comments