Skip to content

Commit 9c48d63

Browse files
committed
Release cached item in ChannelSendOperator
1. If the write Subscriber cancels with the item cached, release it. 2. If the write Publisher emits an error while the item is cached, when the write Subscriber subscribes, release the cached item and emit the error signal. Closes gh-22720
1 parent de2a01e commit 9c48d63

File tree

2 files changed

+113
-14
lines changed

2 files changed

+113
-14
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,8 @@
2828
import reactor.core.publisher.Operators;
2929
import reactor.util.context.Context;
3030

31+
import org.springframework.core.io.buffer.DataBuffer;
32+
import org.springframework.core.io.buffer.DataBufferUtils;
3133
import org.springframework.lang.Nullable;
3234
import org.springframework.util.Assert;
3335

@@ -279,13 +281,20 @@ public void request(long n) {
279281
}
280282

281283
private boolean emitCachedSignals() {
282-
if (this.item != null) {
283-
requiredWriteSubscriber().onNext(this.item);
284-
}
285284
if (this.error != null) {
286-
requiredWriteSubscriber().onError(this.error);
285+
try {
286+
requiredWriteSubscriber().onError(this.error);
287+
}
288+
finally {
289+
releaseCachedItem();
290+
}
287291
return true;
288292
}
293+
T item = this.item;
294+
this.item = null;
295+
if (item != null) {
296+
requiredWriteSubscriber().onNext(item);
297+
}
289298
if (this.completed) {
290299
requiredWriteSubscriber().onComplete();
291300
return true;
@@ -298,7 +307,22 @@ public void cancel() {
298307
Subscription s = this.subscription;
299308
if (s != null) {
300309
this.subscription = null;
301-
s.cancel();
310+
try {
311+
s.cancel();
312+
}
313+
finally {
314+
releaseCachedItem();
315+
}
316+
}
317+
}
318+
319+
private void releaseCachedItem() {
320+
synchronized (this) {
321+
Object item = this.item;
322+
if (item instanceof DataBuffer) {
323+
DataBufferUtils.release((DataBuffer) item);
324+
}
325+
this.item = null;
302326
}
303327
}
304328

spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java

Lines changed: 83 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,25 +16,29 @@
1616

1717
package org.springframework.http.server.reactive;
1818

19+
import java.nio.charset.StandardCharsets;
1920
import java.util.ArrayList;
2021
import java.util.Arrays;
2122
import java.util.List;
2223
import java.util.concurrent.Executors;
2324
import java.util.concurrent.TimeUnit;
2425

26+
import io.netty.buffer.ByteBufAllocator;
2527
import org.junit.Before;
2628
import org.junit.Test;
2729
import org.reactivestreams.Publisher;
2830
import org.reactivestreams.Subscriber;
2931
import org.reactivestreams.Subscription;
32+
import reactor.core.publisher.BaseSubscriber;
3033
import reactor.core.publisher.Flux;
3134
import reactor.core.publisher.Mono;
3235
import reactor.core.publisher.Signal;
3336

34-
import static org.junit.Assert.assertEquals;
35-
import static org.junit.Assert.assertNotNull;
36-
import static org.junit.Assert.assertSame;
37-
import static org.junit.Assert.assertTrue;
37+
import org.springframework.core.io.buffer.DataBuffer;
38+
import org.springframework.core.io.buffer.LeakAwareDataBufferFactory;
39+
import org.springframework.core.io.buffer.NettyDataBufferFactory;
40+
41+
import static org.junit.Assert.*;
3842

3943
/**
4044
* @author Rossen Stoyanchev
@@ -50,9 +54,6 @@ public void setUp() throws Exception {
5054
this.writer = new OneByOneAsyncWriter();
5155
}
5256

53-
private <T> Mono<Void> sendOperator(Publisher<String> source){
54-
return new ChannelSendOperator<>(source, writer::send);
55-
}
5657

5758
@Test
5859
public void errorBeforeFirstItem() throws Exception {
@@ -130,6 +131,66 @@ public void errorAfterMultipleItems() throws Exception {
130131
assertSame(error, this.writer.error);
131132
}
132133

134+
@Test // gh-22720
135+
public void cancelWhileItemCached() {
136+
NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
137+
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate);
138+
139+
ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>(
140+
Mono.fromCallable(() -> {
141+
DataBuffer dataBuffer = bufferFactory.allocateBuffer();
142+
dataBuffer.write("foo", StandardCharsets.UTF_8);
143+
return dataBuffer;
144+
}),
145+
publisher -> {
146+
ZeroDemandSubscriber subscriber = new ZeroDemandSubscriber();
147+
publisher.subscribe(subscriber);
148+
return Mono.never();
149+
});
150+
151+
BaseSubscriber<Void> subscriber = new BaseSubscriber<Void>() {};
152+
operator.subscribe(subscriber);
153+
subscriber.cancel();
154+
155+
bufferFactory.checkForLeaks();
156+
}
157+
158+
@Test // gh-22720
159+
public void errorWhileItemCached() {
160+
NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
161+
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate);
162+
ZeroDemandSubscriber writeSubscriber = new ZeroDemandSubscriber();
163+
164+
ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>(
165+
Flux.create(sink -> {
166+
DataBuffer dataBuffer = bufferFactory.allocateBuffer();
167+
dataBuffer.write("foo", StandardCharsets.UTF_8);
168+
sink.next(dataBuffer);
169+
sink.error(new IllegalStateException("err"));
170+
}),
171+
publisher -> {
172+
publisher.subscribe(writeSubscriber);
173+
return Mono.never();
174+
});
175+
176+
177+
operator.subscribe(new BaseSubscriber<Void>() {});
178+
try {
179+
writeSubscriber.signalDemand(1); // Let cached signals ("foo" and error) be published..
180+
}
181+
catch (Throwable ex) {
182+
assertNotNull(ex.getCause());
183+
assertEquals("err", ex.getCause().getMessage());
184+
}
185+
186+
bufferFactory.checkForLeaks();
187+
}
188+
189+
190+
private <T> Mono<Void> sendOperator(Publisher<String> source){
191+
return new ChannelSendOperator<>(source, writer::send);
192+
}
193+
133194

134195
private static class OneByOneAsyncWriter {
135196

@@ -182,4 +243,18 @@ public void onComplete() {
182243
}
183244
}
184245

246+
247+
private static class ZeroDemandSubscriber extends BaseSubscriber<DataBuffer> {
248+
249+
250+
@Override
251+
protected void hookOnSubscribe(Subscription subscription) {
252+
// Just subscribe without requesting
253+
}
254+
255+
public void signalDemand(long demand) {
256+
upstream().request(demand);
257+
}
258+
}
259+
185260
}

0 commit comments

Comments
 (0)