File tree Expand file tree Collapse file tree 2 files changed +40
-1
lines changed
driver-core/src/main/com/mongodb/operation
driver-sync/src/test/functional/com/mongodb/client Expand file tree Collapse file tree 2 files changed +40
-1
lines changed Original file line number Diff line number Diff line change 19
19
import com .mongodb .MongoChangeStreamException ;
20
20
import com .mongodb .MongoCursorNotFoundException ;
21
21
import com .mongodb .MongoException ;
22
+ import com .mongodb .MongoInterruptedException ;
22
23
import com .mongodb .MongoNotPrimaryException ;
23
24
import com .mongodb .MongoSocketException ;
24
25
@@ -32,7 +33,7 @@ final class ChangeStreamBatchCursorHelper {
32
33
private static final List <String > NONRESUMABLE_CHANGE_STREAM_ERROR_LABELS = asList ("NonResumableChangeStreamError" );
33
34
34
35
static boolean isRetryableError (final Throwable t ) {
35
- if (!(t instanceof MongoException ) || t instanceof MongoChangeStreamException ) {
36
+ if (!(t instanceof MongoException ) || t instanceof MongoChangeStreamException || t instanceof MongoInterruptedException ) {
36
37
return false ;
37
38
} else if (t instanceof MongoNotPrimaryException || t instanceof MongoCursorNotFoundException
38
39
|| t instanceof MongoSocketException ) {
Original file line number Diff line number Diff line change 19
19
import com .mongodb .MongoChangeStreamException ;
20
20
import com .mongodb .MongoCommandException ;
21
21
import com .mongodb .MongoException ;
22
+ import com .mongodb .MongoInterruptedException ;
22
23
import com .mongodb .MongoQueryException ;
23
24
import com .mongodb .client .internal .MongoChangeStreamCursorImpl ;
24
25
import com .mongodb .client .model .Aggregates ;
@@ -59,6 +60,43 @@ public void setUp() {
59
60
collection .insertOne (Document .parse ("{ _id : 0 }" ));
60
61
}
61
62
63
+ class ChangeStreamWatcher implements Runnable {
64
+ private volatile boolean interruptedExceptionOccurred = false ;
65
+ private final MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor ;
66
+
67
+ ChangeStreamWatcher (final MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor ) {
68
+ this .cursor = cursor ;
69
+ }
70
+
71
+ @ Override
72
+ public void run () {
73
+ try {
74
+ cursor .next ();
75
+ } catch (final MongoInterruptedException e ) {
76
+ interruptedExceptionOccurred = true ;
77
+ } finally {
78
+ cursor .close ();
79
+ }
80
+ }
81
+
82
+ public boolean getInterruptedExceptionOccurred () {
83
+ return interruptedExceptionOccurred ;
84
+ }
85
+ }
86
+
87
+ //
88
+ // Test that MongoInterruptedException is not retryable so that a thread can be interrupted.
89
+ //
90
+ @ Test
91
+ public void testThreadInterrupted () throws InterruptedException {
92
+ final ChangeStreamWatcher watcher = new ChangeStreamWatcher (collection .watch ().cursor ());
93
+ final Thread t = new Thread (watcher );
94
+ t .start ();
95
+ t .interrupt ();
96
+ t .join ();
97
+ assertTrue (watcher .getInterruptedExceptionOccurred ());
98
+ }
99
+
62
100
//
63
101
// Test that the ChangeStream continuously tracks the last seen resumeToken.
64
102
//
You can’t perform that action at this time.
0 commit comments