Skip to content

Commit 95d07f0

Browse files
committed
GridFS fix source onComplete logic
Ensure that onComplete of the source publisher can trigger the completion steps of the upload publisher JAVA-3587
1 parent 055d2d7 commit 95d07f0

File tree

2 files changed

+39
-7
lines changed

2 files changed

+39
-7
lines changed

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/GridFSUploadPublisherImpl.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ public void onSubscribe(final Subscription s) {
7979

8080
@Override
8181
public void onNext(final ByteBuffer byteBuffer) {
82+
synchronized (GridFSUploadSubscription.this) {
83+
currentAction = Action.IN_PROGRESS;
84+
}
8285
Publishers.publish((Block<SingleResultCallback<Integer>>) callback -> gridFSUploadStream.write(byteBuffer, callback))
8386
.subscribe(new GridFSUploadStreamSubscriber(byteBuffer));
8487
}
@@ -95,8 +98,9 @@ public void onError(final Throwable t) {
9598
public void onComplete() {
9699
synchronized (GridFSUploadSubscription.this) {
97100
hasCompleted = true;
98-
if (currentAction == Action.WAITING) {
101+
if (currentAction == Action.REQUESTING_MORE) {
99102
currentAction = Action.COMPLETE;
103+
tryProcess();
100104
}
101105
}
102106
}
@@ -180,9 +184,9 @@ private void tryProcess() {
180184
if (sourceSubscription == null) {
181185
nextStep = NextStep.SUBSCRIBE;
182186
} else {
183-
nextStep = NextStep.WRITE;
187+
nextStep = NextStep.REQUEST_MORE;
184188
}
185-
currentAction = Action.IN_PROGRESS;
189+
currentAction = Action.REQUESTING_MORE;
186190
break;
187191
case COMPLETE:
188192
nextStep = NextStep.COMPLETE;
@@ -192,6 +196,7 @@ private void tryProcess() {
192196
nextStep = NextStep.TERMINATE;
193197
currentAction = Action.FINISHED;
194198
break;
199+
case REQUESTING_MORE:
195200
case IN_PROGRESS:
196201
case FINISHED:
197202
default:
@@ -204,7 +209,7 @@ private void tryProcess() {
204209
case SUBSCRIBE:
205210
source.subscribe(sourceSubscriber);
206211
break;
207-
case WRITE:
212+
case REQUEST_MORE:
208213
synchronized (this) {
209214
sourceSubscription.request(1);
210215
}
@@ -309,6 +314,7 @@ public void onComplete() {
309314

310315
enum Action {
311316
WAITING,
317+
REQUESTING_MORE,
312318
IN_PROGRESS,
313319
TERMINATE,
314320
COMPLETE,
@@ -317,7 +323,7 @@ enum Action {
317323

318324
enum NextStep {
319325
SUBSCRIBE,
320-
WRITE,
326+
REQUEST_MORE,
321327
COMPLETE,
322328
TERMINATE,
323329
DO_NOTHING

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/gridfs/GridFSPublisherSpecification.groovy

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ class GridFSPublisherSpecification extends FunctionalSpecification {
119119
given:
120120
def contentSize = 1024 * 500
121121
def chunkSize = 10
122-
def contentBytes = new byte[contentSize];
123-
new SecureRandom().nextBytes(contentBytes);
122+
def contentBytes = new byte[contentSize]
123+
new SecureRandom().nextBytes(contentBytes)
124124
def options = new GridFSUploadOptions().chunkSizeBytes(chunkSize)
125125

126126
when:
@@ -167,6 +167,32 @@ class GridFSPublisherSpecification extends FunctionalSpecification {
167167
ByteBuffer.wrap(contentBytes)])
168168
}
169169

170+
def 'should upload from the source publisher when it contains multiple parts and the total size is smaller than chunksize'() {
171+
given:
172+
def contentBytes = singleChunkString.getBytes()
173+
174+
when:
175+
def publisher = gridFSBucket.uploadFromPublisher('myFile',
176+
createPublisher(ByteBuffer.wrap(contentBytes), ByteBuffer.wrap(contentBytes)))
177+
178+
def subscriber = new ObservableSubscriber()
179+
publisher.subscribe(subscriber)
180+
def fileId = subscriber.await(1, 60, SECONDS).getReceived().get(0)
181+
182+
then:
183+
run(filesCollection.&countDocuments) == 1
184+
run(chunksCollection.&countDocuments) == 1
185+
186+
when:
187+
subscriber = new ObservableSubscriber()
188+
publisher = gridFSBucket.downloadToPublisher(fileId as ObjectId)
189+
publisher.subscribe(subscriber)
190+
def data = subscriber.await(1, 30, SECONDS).getReceived().get(0)
191+
192+
then:
193+
data.array() == concatByteBuffers([ByteBuffer.wrap(contentBytes), ByteBuffer.wrap(contentBytes)])
194+
}
195+
170196
def 'should round trip with data larger than the internal bufferSize'() {
171197
given:
172198
def contentSize = 1024 * 1024 * 5

0 commit comments

Comments
 (0)