File tree Expand file tree Collapse file tree 1 file changed +22
-23
lines changed
driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal Expand file tree Collapse file tree 1 file changed +22
-23
lines changed Original file line number Diff line number Diff line change @@ -87,29 +87,28 @@ private void recurseCursor(){
87
87
batchCursor .setBatchSize (calculateBatchSize (sink .requestedFromDownstream ()));
88
88
Mono .from (batchCursor .next (() -> sink .isCancelled ()))
89
89
.doOnCancel (this ::closeCursor )
90
- .doOnError ((e ) -> {
91
- try {
92
- closeCursor ();
93
- } finally {
94
- sink .error (e );
95
- }
96
- })
97
- .doOnSuccess (results -> {
98
- if (!results .isEmpty ()) {
99
- results
100
- .stream ()
101
- .filter (Objects ::nonNull )
102
- .forEach (sink ::next );
103
- calculateDemand (-results .size ());
104
- }
105
- if (batchCursor .isClosed ()) {
106
- sink .complete ();
107
- } else {
108
- inProgress .set (false );
109
- recurseCursor ();
110
- }
111
- })
112
- .subscribe ();
90
+ .subscribe (results -> {
91
+ if (!results .isEmpty ()) {
92
+ results
93
+ .stream ()
94
+ .filter (Objects ::nonNull )
95
+ .forEach (sink ::next );
96
+ calculateDemand (-results .size ());
97
+ }
98
+ if (batchCursor .isClosed ()) {
99
+ sink .complete ();
100
+ } else {
101
+ inProgress .set (false );
102
+ recurseCursor ();
103
+ }
104
+ },
105
+ e -> {
106
+ try {
107
+ closeCursor ();
108
+ } finally {
109
+ sink .error (e );
110
+ }
111
+ });
113
112
}
114
113
}
115
114
}
You can’t perform that action at this time.
0 commit comments