@@ -391,7 +391,7 @@ private MongoChangeStreamCursor<? extends BsonDocument> tryCreateCursor(
391391 } else if (doesNotSupportsStartAfter (e )) {
392392 supportsStartAfter = false ;
393393 return tryCreateCursor (sourceConfig , mongoClient , resumeToken );
394- } else if (resumeTokenNotFound (e )) {
394+ } else if (sourceConfig . tolerateErrors () && resumeTokenNotFound (e )) {
395395 LOGGER .warn (
396396 "Failed to resume change stream: {} {}\n "
397397 + "===================================================================================\n "
@@ -422,6 +422,10 @@ private MongoChangeStreamCursor<? extends BsonDocument> tryCreateCursor(
422422 + "=====================================================================================\n " ,
423423 e .getErrorMessage (),
424424 e .getErrorCode ());
425+ if (resumeTokenNotFound (e )) {
426+ throw new ConnectException (
427+ "ResumeToken not found. Cannot create a change stream cursor" , e );
428+ }
425429 }
426430 return null ;
427431 }
@@ -437,12 +441,8 @@ private boolean invalidatedResumeToken(final MongoCommandException e) {
437441 }
438442
439443 private boolean resumeTokenNotFound (final MongoCommandException e ) {
440- if (!sourceConfig .tolerateErrors ()) {
441- return false ;
442- }
443444 String errorMessage = e .getErrorMessage ().toLowerCase (Locale .ROOT );
444- return sourceConfig .tolerateErrors ()
445- && errorMessage .contains (RESUME_TOKEN )
445+ return errorMessage .contains (RESUME_TOKEN )
446446 && (errorMessage .contains (NOT_FOUND )
447447 || errorMessage .contains (DOES_NOT_EXIST )
448448 || errorMessage .contains (INVALID_RESUME_TOKEN ));
0 commit comments