@@ -632,6 +632,7 @@ public async static Task SetChangeStreamResumeTokenAsync(Log log, MongoClient cl
632632 int retryCount = 0 ;
633633 bool isSucessful = false ;
634634 bool resetCS = unit . ResetChangeStream ;
635+ bool useServerLevel = false ;
635636
636637 while ( ! isSucessful && retryCount < 10 )
637638 {
@@ -642,7 +643,7 @@ public async static Task SetChangeStreamResumeTokenAsync(Log log, MongoClient cl
642643 var collection = database . GetCollection < BsonDocument > ( unit . CollectionName ) ;
643644
644645 // Determine if we should use server-level or collection-level processing
645- bool useServerLevel = job . ChangeStreamLevel == ChangeStreamLevel . Server ;
646+ useServerLevel = job . ChangeStreamLevel == ChangeStreamLevel . Server ;
646647
647648 // Initialize with safe defaults; will be overridden below
648649 var options = new ChangeStreamOptions { BatchSize = 500 , FullDocument = ChangeStreamFullDocumentOption . UpdateLookup } ;
@@ -696,24 +697,55 @@ public async static Task SetChangeStreamResumeTokenAsync(Log log, MongoClient cl
696697 } ;
697698 }
698699
699- //new way to get resume token
700- //On MongoDB 4.0+, the WatchChangeStreamAsync method opens a change stream and waits for changes.
701- //On MongoDB 3.6, TailOplogAsync opens a tailable cursor on the oplog, filtering on namespace and timestamp to detect new operations.
702- if ( ! string . IsNullOrEmpty ( job . SourceServerVersion ) && job . SourceServerVersion . StartsWith ( "3" ) )
703- {
704- if ( ! await TailOplogAsync ( client , unit . DatabaseName , unit . CollectionName , unit , cts ) && ! resetCS )
700+ //new way to get resume token
701+ //On MongoDB 4.0+, the WatchChangeStreamAsync method opens a change stream and waits for changes.
702+ //On MongoDB 3.6, TailOplogAsync opens a tailable cursor on the oplog, filtering on namespace and timestamp to detect new operations.
703+ if ( ! string . IsNullOrEmpty ( job . SourceServerVersion ) && job . SourceServerVersion . StartsWith ( "3" ) )
705704 {
706- //if failed to tail oplog, fallback to watching change stream infinelty. Should be called async only
707- await WatchChangeStreamUntilChangeAsync ( log , client , jobList , job , unit , collection , options , resetCS , - 1 , cts , useServerLevel ) ;
705+ if ( ! await TailOplogAsync ( client , unit . DatabaseName , unit . CollectionName , unit , cts ) && ! resetCS )
706+ {
707+ //if failed to tail oplog, fallback to watching change stream infinelty. Should be called async only
708+ await WatchChangeStreamUntilChangeAsync ( log , client , jobList , job , unit , collection , options , resetCS , - 1 , cts , useServerLevel ) ; //don't await
709+ }
710+ }
711+ else
712+ {
713+ //await WatchChangeStreamUntilChangeAsync(log, client, jobList, job, unit, collection, options, resetCS, seconds, cts, useServerLevel);
714+ //end of new way to get resume token
715+
716+ var settings = client . Settings . Clone ( ) ;
717+ var localClient = new MongoClient ( settings ) ;
718+
719+ var result = await MongoSafeTaskExecutor . ExecuteAsync (
720+ operation : async ( ct ) =>
721+ {
722+ // forward the cancellation token to your watch function
723+ return await WatchChangeStreamUntilChangeAsync (
724+ log ,
725+ localClient ,
726+ jobList ,
727+ job ,
728+ unit ,
729+ collection ,
730+ options ,
731+ resetCS ,
732+ seconds ,
733+ ct ,
734+ useServerLevel
735+ ) ;
736+ } ,
737+ timeoutSeconds : seconds + 10 , // choose your timeout
738+ operationName : $ "WatchChangeStreamUntilChangeAsync({ collection . CollectionNamespace . CollectionName } )",
739+ logAction : msg => log . WriteLine ( msg , LogType . Debug ) ,
740+ externalToken : cts ,
741+ clientToKill : localClient // <-- VERY IMPORTANT: kill this client if watch hangs
742+ ) ;
743+
708744 }
709- }
710- else
711- await WatchChangeStreamUntilChangeAsync ( log , client , jobList , job , unit , collection , options , resetCS , seconds , cts , useServerLevel ) ;
712- //end of new way to get resume token
713745
714746 isSucessful = true ;
715747 }
716- catch ( OperationCanceledException )
748+ catch ( Exception ex ) when ( ex is OperationCanceledException || ex is TimeoutException )
717749 {
718750 // Distinguish between timeout (no activity) vs manual cancellation
719751 if ( resetCS && unit . ResetChangeStream )
@@ -764,12 +796,17 @@ public async static Task SetChangeStreamResumeTokenAsync(Log log, MongoClient cl
764796 finally
765797 {
766798 jobList . Save ( ) ;
799+ if ( useServerLevel )
800+ log . WriteLine ( $ "Exiting Server-level SetChangeStreamResumeTokenAsync for job { job . Id } ", LogType . Info ) ;
801+ else
802+ log . WriteLine ( $ "Exiting Collection-level SetChangeStreamResumeToken for { unit . DatabaseName } .{ unit . CollectionName } ", LogType . Debug ) ;
767803 }
804+
768805 }
769806 return ;
770807 }
771808
772- private static async Task WatchChangeStreamUntilChangeAsync ( Log log , MongoClient client , JobList jobList , MigrationJob job , MigrationUnit unit , IMongoCollection < BsonDocument > collection , ChangeStreamOptions options , bool resetCS , int seconds , CancellationToken manualCts , bool useServerLevel = false )
809+ private static async Task < bool > WatchChangeStreamUntilChangeAsync ( Log log , MongoClient client , JobList jobList , MigrationJob job , MigrationUnit unit , IMongoCollection < BsonDocument > collection , ChangeStreamOptions options , bool resetCS , int seconds , CancellationToken manualCts , bool useServerLevel = false )
773810 {
774811 var pipeline = new BsonDocument [ ] { } ;
775812 if ( job . JobType == JobType . RUOptimizedCopy )
@@ -810,13 +847,13 @@ private static async Task WatchChangeStreamUntilChangeAsync(Log log, MongoClient
810847 if ( useServerLevel )
811848 {
812849 // Server-level change stream
813- log . WriteLine ( $ "Setting up server-level change stream resume token for job { job . Id } ") ;
850+ log . WriteLine ( $ "Setting up server-level change stream resume token for job { job . Id } ", LogType . Debug ) ;
814851 cursor = await client . WatchAsync < ChangeStreamDocument < BsonDocument > > ( pipeline , options , linkedCts . Token ) ;
815852 }
816853 else
817854 {
818855 // Collection-level change stream
819- log . WriteLine ( $ "Setting up collection-level change stream resume token for { unit . DatabaseName } .{ unit . CollectionName } ") ;
856+ log . WriteLine ( $ "Setting up collection-level change stream resume token for { unit . DatabaseName } .{ unit . CollectionName } ", LogType . Debug ) ;
820857 cursor = await collection . WatchAsync < ChangeStreamDocument < BsonDocument > > ( pipeline , options , linkedCts . Token ) ;
821858 }
822859
@@ -839,9 +876,9 @@ private static async Task WatchChangeStreamUntilChangeAsync(Log log, MongoClient
839876 unit . OriginalResumeToken = resumeTokenJson ;
840877
841878 }
842- return ;
879+ return true ;
843880 }
844- return ;
881+ return true ;
845882 }
846883
847884 // Iterate until cancellation or first change detected
@@ -859,7 +896,7 @@ private static async Task WatchChangeStreamUntilChangeAsync(Log log, MongoClient
859896 //if bulk load is complete, no point in continuing to watch
860897 // Use the local resetCS variable captured at method start, not unit.ResetChangeStream
861898 if ( ( unit . RestoreComplete || job . IsSimulatedRun ) && unit . DumpComplete && ! resetCS )
862- return ;
899+ return true ;
863900
864901
865902 // Handle server-level vs collection-level resume token storage
@@ -881,7 +918,7 @@ private static async Task WatchChangeStreamUntilChangeAsync(Log log, MongoClient
881918
882919 log . WriteLine ( $ "Server-level resume token set for job { job . Id } with collection key { job . ResumeCollectionKey } ") ;
883920 // Exit immediately after first change detected
884- return ;
921+ return true ;
885922 }
886923 }
887924 else
@@ -892,16 +929,19 @@ private static async Task WatchChangeStreamUntilChangeAsync(Log log, MongoClient
892929 log . WriteLine ( $ "Collection-level resume token set for { unit . DatabaseName } .{ unit . CollectionName } ") ;
893930
894931 // Exit immediately after first change detected
895- return ;
932+ return true ;
896933 }
897934
898935 }
899936 }
900-
937+
938+ return false ;
901939 }
902940 catch ( OperationCanceledException )
903941 {
904942 // Cancellation requested - exit quietly
943+ log . WriteLine ( $ "Collection-level resume token request for { unit . DatabaseName } .{ unit . CollectionName } was canceled.", LogType . Debug ) ;
944+ return false ;
905945 }
906946 }
907947 }
0 commit comments