Skip to content

Commit f6c31bb

Browse files
committed
Align OutputStreamPublisher's
Update constructors to match and drop unnecessary factory methods. See gh-33592
1 parent 113b430 commit f6c31bb

File tree

6 files changed

+53
-146
lines changed

6 files changed

+53
-146
lines changed

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@ public abstract class DataBufferUtils {
6868

6969
private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release;
7070

71-
private static final int DEFAULT_CHUNK_SIZE = 1024;
72-
7371

7472
//---------------------------------------------------------------------
7573
// Reading
@@ -442,48 +440,17 @@ static void closeChannel(@Nullable Channel channel) {
442440
public static Publisher<DataBuffer> outputStreamPublisher(Consumer<OutputStream> outputStreamConsumer,
443441
DataBufferFactory bufferFactory, Executor executor) {
444442

445-
return outputStreamPublisher(outputStreamConsumer, bufferFactory, executor, DEFAULT_CHUNK_SIZE);
443+
return new OutputStreamPublisher(outputStreamConsumer, bufferFactory, executor, null);
446444
}
447445

448446
/**
449-
* Creates a new {@code Publisher<DataBuffer>} based on bytes written to a
450-
* {@code OutputStream}.
451-
* <ul>
452-
* <li>The parameter {@code outputStreamConsumer} is invoked once per
453-
* subscription of the returned {@code Publisher}, when the first
454-
* item is
455-
* {@linkplain Subscription#request(long) requested}.</li>
456-
* <li>{@link OutputStream#write(byte[], int, int) OutputStream.write()}
457-
* invocations made by {@code outputStreamHandler} are buffered until they
458-
* reach or exceed {@code chunkSize}, or when the stream is
459-
* {@linkplain OutputStream#flush() flushed} and then result in a
460-
* {@linkplain Subscriber#onNext(Object) published} item
461-
* if there is {@linkplain Subscription#request(long) demand}.</li>
462-
* <li>If there is <em>no demand</em>, {@code OutputStream.write()} will block
463-
* until there is.</li>
464-
* <li>If the subscription is {@linkplain Subscription#cancel() cancelled},
465-
* {@code OutputStream.write()} will throw a {@code IOException}.</li>
466-
* <li>The subscription is
467-
* {@linkplain Subscriber#onComplete() completed} when
468-
* {@code outputStreamHandler} completes.</li>
469-
* <li>Any exceptions thrown from {@code outputStreamHandler} will
470-
* be dispatched to the {@linkplain Subscriber#onError(Throwable) Subscriber}.
471-
* </ul>
472-
* @param outputStreamConsumer invoked when the first buffer is requested
473-
* @param executor used to invoke the {@code outputStreamHandler}
474-
* @param chunkSize minimum size of the buffer produced by the publisher
475-
* @return a {@code Publisher<DataBuffer>} based on bytes written by
476-
* {@code outputStreamHandler}
447+
* Variant of {@link #outputStreamPublisher(Consumer, DataBufferFactory, Executor)}
448+
* providing control over the chunk sizes to be produced by the publisher.
477449
* @since 6.1
478450
*/
479451
public static Publisher<DataBuffer> outputStreamPublisher(Consumer<OutputStream> outputStreamConsumer,
480452
DataBufferFactory bufferFactory, Executor executor, int chunkSize) {
481453

482-
Assert.notNull(outputStreamConsumer, "OutputStreamConsumer must not be null");
483-
Assert.notNull(bufferFactory, "BufferFactory must not be null");
484-
Assert.notNull(executor, "Executor must not be null");
485-
Assert.isTrue(chunkSize > 0, "Chunk size must be > 0");
486-
487454
return new OutputStreamPublisher(outputStreamConsumer, bufferFactory, executor, chunkSize);
488455
}
489456

spring-core/src/main/java/org/springframework/core/io/buffer/OutputStreamPublisher.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@
3131
import org.reactivestreams.Subscription;
3232

3333
import org.springframework.lang.Nullable;
34+
import org.springframework.util.Assert;
3435

3536
/**
36-
* Bridges between {@link OutputStream} and
37-
* {@link Publisher Publisher&lt;DataBuffer&gt;}.
37+
* Bridges between {@link OutputStream} and {@link Publisher Publisher&lt;DataBuffer&gt;}.
3838
*
3939
* <p>Note that this class has a near duplicate in
4040
* {@link org.springframework.http.client.OutputStreamPublisher}.
@@ -45,6 +45,9 @@
4545
*/
4646
final class OutputStreamPublisher implements Publisher<DataBuffer> {
4747

48+
private static final int DEFAULT_CHUNK_SIZE = 1024;
49+
50+
4851
private final Consumer<OutputStream> outputStreamConsumer;
4952

5053
private final DataBufferFactory bufferFactory;
@@ -54,14 +57,26 @@ final class OutputStreamPublisher implements Publisher<DataBuffer> {
5457
private final int chunkSize;
5558

5659

60+
/**
61+
* Create an instance.
62+
* @param outputStreamConsumer invoked when the first buffer is requested
63+
* @param bufferFactory to create data buffers with
64+
* @param executor used to invoke the {@code outputStreamHandler}
65+
* @param chunkSize the chunk sizes to be produced by the publisher
66+
*/
5767
OutputStreamPublisher(
5868
Consumer<OutputStream> outputStreamConsumer, DataBufferFactory bufferFactory,
59-
Executor executor, int chunkSize) {
69+
Executor executor, @Nullable Integer chunkSize) {
70+
71+
Assert.notNull(outputStreamConsumer, "OutputStreamConsumer must not be null");
72+
Assert.notNull(bufferFactory, "BufferFactory must not be null");
73+
Assert.notNull(executor, "Executor must not be null");
74+
Assert.isTrue(chunkSize == null || chunkSize > 0, "ChunkSize must be larger than 0");
6075

6176
this.outputStreamConsumer = outputStreamConsumer;
6277
this.bufferFactory = bufferFactory;
6378
this.executor = executor;
64-
this.chunkSize = chunkSize;
79+
this.chunkSize = (chunkSize != null ? chunkSize : DEFAULT_CHUNK_SIZE);
6580
}
6681

6782

spring-web/src/main/java/org/springframework/http/client/JdkClientHttpRequest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,19 +153,18 @@ private HttpRequest buildRequest(HttpHeaders headers, @Nullable Body body) {
153153

154154
private HttpRequest.BodyPublisher bodyPublisher(HttpHeaders headers, @Nullable Body body) {
155155
if (body != null) {
156-
Flow.Publisher<ByteBuffer> outputStreamPublisher = OutputStreamPublisher.create(
157-
outputStream -> body.writeTo(StreamUtils.nonClosing(outputStream)),
158-
BYTE_MAPPER, this.executor);
156+
Flow.Publisher<ByteBuffer> publisher = new OutputStreamPublisher<>(
157+
os -> body.writeTo(StreamUtils.nonClosing(os)), BYTE_MAPPER, this.executor, null);
159158

160159
long contentLength = headers.getContentLength();
161160
if (contentLength > 0) {
162-
return HttpRequest.BodyPublishers.fromPublisher(outputStreamPublisher, contentLength);
161+
return HttpRequest.BodyPublishers.fromPublisher(publisher, contentLength);
163162
}
164163
else if (contentLength == 0) {
165164
return HttpRequest.BodyPublishers.noBody();
166165
}
167166
else {
168-
return HttpRequest.BodyPublishers.fromPublisher(outputStreamPublisher);
167+
return HttpRequest.BodyPublishers.fromPublisher(publisher);
169168
}
170169
}
171170
else {

spring-web/src/main/java/org/springframework/http/client/OutputStreamPublisher.java

Lines changed: 12 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,15 @@
3030
import org.springframework.util.Assert;
3131

3232
/**
33-
* Bridges between {@link OutputStream} and
34-
* {@link Flow.Publisher Flow.Publisher&lt;T&gt;}.
33+
* Bridges between {@link OutputStream} and {@link Flow.Publisher Flow.Publisher&lt;T&gt;}.
3534
*
3635
* <p>Note that this class has a near duplicate in
3736
* {@link org.springframework.core.io.buffer.OutputStreamPublisher}.
3837
*
3938
* @author Oleh Dokuka
4039
* @author Arjen Poutsma
4140
* @since 6.1
42-
* @param <T> the published item type
43-
* @see #create(OutputStreamHandler, ByteMapper, Executor)
41+
* @param <T> the published byte buffer type
4442
*/
4543
final class OutputStreamPublisher<T> implements Flow.Publisher<T> {
4644

@@ -56,98 +54,26 @@ final class OutputStreamPublisher<T> implements Flow.Publisher<T> {
5654
private final int chunkSize;
5755

5856

59-
private OutputStreamPublisher(
60-
OutputStreamHandler outputStreamHandler, ByteMapper<T> byteMapper,
61-
Executor executor, int chunkSize) {
62-
63-
this.outputStreamHandler = outputStreamHandler;
64-
this.byteMapper = byteMapper;
65-
this.executor = executor;
66-
this.chunkSize = chunkSize;
67-
}
68-
69-
70-
/**
71-
* Creates a new {@code Publisher<T>} based on bytes written to a
72-
* {@code OutputStream}. The parameter {@code byteMapper} is used to map
73-
* from written bytes to the published type.
74-
* <ul>
75-
* <li>The parameter {@code outputStreamHandler} is invoked once per
76-
* subscription of the returned {@code Publisher}, when the first
77-
* item is
78-
* {@linkplain Flow.Subscription#request(long) requested}.</li>
79-
* <li>{@link OutputStream#write(byte[], int, int) OutputStream.write()}
80-
* invocations made by {@code outputStreamHandler} are buffered until they
81-
* exceed the default chunk size of 1024, and then result in a
82-
* {@linkplain Flow.Subscriber#onNext(Object) published} item
83-
* if there is {@linkplain Flow.Subscription#request(long) demand}.</li>
84-
* <li>If there is <em>no demand</em>, {@code OutputStream.write()} will block
85-
* until there is.</li>
86-
* <li>If the subscription is {@linkplain Flow.Subscription#cancel() cancelled},
87-
* {@code OutputStream.write()} will throw a {@code IOException}.</li>
88-
* <li>The subscription is
89-
* {@linkplain Flow.Subscriber#onComplete() completed} when
90-
* {@code outputStreamHandler} completes.</li>
91-
* <li>Any {@code IOException}s thrown from {@code outputStreamHandler} will
92-
* be dispatched to the {@linkplain Flow.Subscriber#onError(Throwable) Subscriber}.
93-
* </ul>
94-
* @param outputStreamHandler invoked when the first buffer is requested
95-
* @param byteMapper maps written bytes to {@code T}
96-
* @param executor used to invoke the {@code outputStreamHandler}
97-
* @param <T> the publisher type
98-
* @return a {@code Publisher<T>} based on bytes written by
99-
* {@code outputStreamHandler} mapped by {@code byteMapper}
100-
*/
101-
public static <T> Flow.Publisher<T> create(OutputStreamHandler outputStreamHandler, ByteMapper<T> byteMapper,
102-
Executor executor) {
103-
104-
Assert.notNull(outputStreamHandler, "OutputStreamHandler must not be null");
105-
Assert.notNull(byteMapper, "ByteMapper must not be null");
106-
Assert.notNull(executor, "Executor must not be null");
107-
108-
return new OutputStreamPublisher<>(outputStreamHandler, byteMapper, executor, DEFAULT_CHUNK_SIZE);
109-
}
110-
11157
/**
112-
* Creates a new {@code Publisher<T>} based on bytes written to a
113-
* {@code OutputStream}. The parameter {@code byteMapper} is used to map
114-
* from written bytes to the published type.
115-
* <ul>
116-
* <li>The parameter {@code outputStreamHandler} is invoked once per
117-
* subscription of the returned {@code Publisher}, when the first
118-
* item is
119-
* {@linkplain Flow.Subscription#request(long) requested}.</li>
120-
* <li>{@link OutputStream#write(byte[], int, int) OutputStream.write()}
121-
* invocations made by {@code outputStreamHandler} are buffered until they
122-
* exceed {@code chunkSize}, and then result in a
123-
* {@linkplain Flow.Subscriber#onNext(Object) published} item
124-
* if there is {@linkplain Flow.Subscription#request(long) demand}.</li>
125-
* <li>If there is <em>no demand</em>, {@code OutputStream.write()} will block
126-
* until there is.</li>
127-
* <li>If the subscription is {@linkplain Flow.Subscription#cancel() cancelled},
128-
* {@code OutputStream.write()} will throw a {@code IOException}.</li>
129-
* <li>The subscription is
130-
* {@linkplain Flow.Subscriber#onComplete() completed} when
131-
* {@code outputStreamHandler} completes.</li>
132-
* <li>Any {@code IOException}s thrown from {@code outputStreamHandler} will
133-
* be dispatched to the {@linkplain Flow.Subscriber#onError(Throwable) Subscriber}.
134-
* </ul>
58+
* Create an instance.
13559
* @param outputStreamHandler invoked when the first buffer is requested
13660
* @param byteMapper maps written bytes to {@code T}
13761
* @param executor used to invoke the {@code outputStreamHandler}
138-
* @param <T> the publisher type
139-
* @return a {@code Publisher<T>} based on bytes written by
140-
* {@code outputStreamHandler} mapped by {@code byteMapper}
62+
* @param chunkSize the chunk sizes to be produced by the publisher
14163
*/
142-
public static <T> Flow.Publisher<T> create(OutputStreamHandler outputStreamHandler, ByteMapper<T> byteMapper,
143-
Executor executor, int chunkSize) {
64+
OutputStreamPublisher(
65+
OutputStreamHandler outputStreamHandler, ByteMapper<T> byteMapper,
66+
Executor executor, @Nullable Integer chunkSize) {
14467

14568
Assert.notNull(outputStreamHandler, "OutputStreamHandler must not be null");
14669
Assert.notNull(byteMapper, "ByteMapper must not be null");
14770
Assert.notNull(executor, "Executor must not be null");
148-
Assert.isTrue(chunkSize > 0, "ChunkSize must be larger than 0");
71+
Assert.isTrue(chunkSize == null || chunkSize > 0, "ChunkSize must be larger than 0");
14972

150-
return new OutputStreamPublisher<>(outputStreamHandler, byteMapper, executor, chunkSize);
73+
this.outputStreamHandler = outputStreamHandler;
74+
this.byteMapper = byteMapper;
75+
this.executor = executor;
76+
this.chunkSize = (chunkSize != null ? chunkSize : DEFAULT_CHUNK_SIZE);
15177
}
15278

15379

spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,14 @@ private Publisher<Void> send(HttpHeaders headers, @Nullable Body body,
113113
headers.forEach((key, value) -> reactorRequest.requestHeaders().set(key, value));
114114

115115
if (body != null) {
116+
ByteBufMapper byteMapper = new ByteBufMapper(nettyOutbound.alloc());
116117
AtomicReference<Executor> executor = new AtomicReference<>();
117118

118119
return nettyOutbound
119120
.withConnection(connection -> executor.set(connection.channel().eventLoop()))
120-
.send(FlowAdapters.toPublisher(OutputStreamPublisher.create(
121-
outputStream -> body.writeTo(StreamUtils.nonClosing(outputStream)),
122-
new ByteBufMapper(nettyOutbound.alloc()),
123-
executor.getAndSet(null))));
121+
.send(FlowAdapters.toPublisher(new OutputStreamPublisher<>(
122+
os -> body.writeTo(StreamUtils.nonClosing(os)), byteMapper,
123+
executor.getAndSet(null), null)));
124124
}
125125
else {
126126
return nettyOutbound;

spring-web/src/test/java/org/springframework/http/client/OutputStreamPublisherTests.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,11 @@ public byte[] map(byte[] b, int off, int len) {
6464

6565
@Test
6666
void basic() {
67-
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
67+
Flow.Publisher<byte[]> flowPublisher = new OutputStreamPublisher<>(outputStream -> {
6868
outputStream.write(FOO);
6969
outputStream.write(BAR);
7070
outputStream.write(BAZ);
71-
}, this.byteMapper, this.executor);
71+
}, this.byteMapper, this.executor, null);
7272
Flux<String> flux = toString(flowPublisher);
7373

7474
StepVerifier.create(flux)
@@ -78,14 +78,14 @@ void basic() {
7878

7979
@Test
8080
void flush() {
81-
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
81+
Flow.Publisher<byte[]> flowPublisher = new OutputStreamPublisher<>(outputStream -> {
8282
outputStream.write(FOO);
8383
outputStream.flush();
8484
outputStream.write(BAR);
8585
outputStream.flush();
8686
outputStream.write(BAZ);
8787
outputStream.flush();
88-
}, this.byteMapper, this.executor);
88+
}, this.byteMapper, this.executor, null);
8989
Flux<String> flux = toString(flowPublisher);
9090

9191
StepVerifier.create(flux)
@@ -97,7 +97,7 @@ void flush() {
9797

9898
@Test
9999
void chunkSize() {
100-
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
100+
Flow.Publisher<byte[]> flowPublisher = new OutputStreamPublisher<>(outputStream -> {
101101
outputStream.write(FOO);
102102
outputStream.write(BAR);
103103
outputStream.write(BAZ);
@@ -115,7 +115,7 @@ void chunkSize() {
115115
void cancel() throws InterruptedException {
116116
CountDownLatch latch = new CountDownLatch(1);
117117

118-
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
118+
Flow.Publisher<byte[]> flowPublisher = new OutputStreamPublisher<>(outputStream -> {
119119
assertThatIOException()
120120
.isThrownBy(() -> {
121121
outputStream.write(FOO);
@@ -126,7 +126,7 @@ void cancel() throws InterruptedException {
126126
.withMessage("Subscription has been terminated");
127127
latch.countDown();
128128

129-
}, this.byteMapper, this.executor);
129+
}, this.byteMapper, this.executor, null);
130130
Flux<String> flux = toString(flowPublisher);
131131

132132
StepVerifier.create(flux, 1)
@@ -141,14 +141,14 @@ void cancel() throws InterruptedException {
141141
void closed() throws InterruptedException {
142142
CountDownLatch latch = new CountDownLatch(1);
143143

144-
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
144+
Flow.Publisher<byte[]> flowPublisher = new OutputStreamPublisher<>(outputStream -> {
145145
OutputStreamWriter writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8);
146146
writer.write("foo");
147147
writer.close();
148148
assertThatIOException().isThrownBy(() -> writer.write("bar"))
149149
.withMessage("Stream closed");
150150
latch.countDown();
151-
}, this.byteMapper, this.executor);
151+
}, this.byteMapper, this.executor, null);
152152
Flux<String> flux = toString(flowPublisher);
153153

154154
StepVerifier.create(flux)
@@ -162,7 +162,7 @@ void closed() throws InterruptedException {
162162
void negativeRequestN() throws InterruptedException {
163163
CountDownLatch latch = new CountDownLatch(1);
164164

165-
Flow.Publisher<byte[]> flowPublisher = OutputStreamPublisher.create(outputStream -> {
165+
Flow.Publisher<byte[]> flowPublisher = new OutputStreamPublisher<>(outputStream -> {
166166
try (outputStream) {
167167
outputStream.write(FOO);
168168
outputStream.flush();
@@ -172,7 +172,7 @@ void negativeRequestN() throws InterruptedException {
172172
finally {
173173
latch.countDown();
174174
}
175-
}, this.byteMapper, this.executor);
175+
}, this.byteMapper, this.executor, null);
176176
Flow.Subscription[] subscriptions = new Flow.Subscription[1];
177177
Flux<String> flux = toString(a-> flowPublisher.subscribe(new Flow.Subscriber<>() {
178178
@Override

0 commit comments

Comments
 (0)