@@ -40,6 +40,7 @@ import java.nio.ByteBuffer
40
40
import java.nio.channels.Channels
41
41
import java.nio.channels.WritableByteChannel
42
42
import java.security.SecureRandom
43
+ import java.time.Duration
43
44
44
45
import static com.mongodb.ClusterFixture.TIMEOUT_DURATION
45
46
import static com.mongodb.client.model.Filters.eq
@@ -433,14 +434,14 @@ class GridFSPublisherSpecification extends FunctionalSpecification {
433
434
434
435
def ' should cleanup when unsubscribing' () {
435
436
given :
436
- def contentSize = 1024 * 1024
437
+ def contentSize = 1024
437
438
def contentBytes = new byte [contentSize]
438
439
new SecureRandom (). nextBytes(contentBytes)
439
440
def options = new GridFSUploadOptions (). chunkSizeBytes(1024 )
440
441
def data = (0 .. 1024 ). collect { ByteBuffer . wrap(contentBytes) }
442
+ def publisher = createPublisher(* data). delayElements(Duration . ofMillis(1000 ))
441
443
def subscriber = new Subscriber<ObjectId > () {
442
444
Subscription subscription
443
- boolean completed = false
444
445
445
446
@Override
446
447
void onSubscribe (final Subscription s ) {
@@ -457,12 +458,11 @@ class GridFSPublisherSpecification extends FunctionalSpecification {
457
458
458
459
@Override
459
460
void onComplete () {
460
- completed = true
461
461
}
462
462
}
463
463
464
464
when :
465
- gridFSBucket. uploadFromPublisher(' myFile' , createPublisher( * data) , options)
465
+ gridFSBucket. uploadFromPublisher(' myFile' , publisher , options)
466
466
.subscribe(subscriber)
467
467
subscriber. subscription. request(1 )
468
468
@@ -474,8 +474,8 @@ class GridFSPublisherSpecification extends FunctionalSpecification {
474
474
subscriber. subscription. cancel()
475
475
476
476
then :
477
- retry(50 ) { subscriber . completed || run(chunksCollection. &countDocuments) == 0 }
478
- subscriber . completed || run(filesCollection. &countDocuments) == 0
477
+ retry(50 ) { run(chunksCollection. &countDocuments) == 0 }
478
+ run(filesCollection. &countDocuments) == 0
479
479
}
480
480
481
481
def retry (Integer times , Closure<Boolean > closure ) {
0 commit comments