24
24
import java .util .concurrent .atomic .AtomicLong ;
25
25
import java .util .concurrent .atomic .AtomicReference ;
26
26
import java .util .concurrent .locks .LockSupport ;
27
- import java .util .function .Consumer ;
28
27
29
28
import org .reactivestreams .Publisher ;
30
29
import org .reactivestreams .Subscriber ;
36
35
/**
37
36
* Bridges between {@link OutputStream} and {@link Publisher Publisher<DataBuffer>}.
38
37
*
38
+ * <p>When there is demand on the Reactive Streams subscription, any write to
39
+ * the OutputStream is mapped to a buffer and published to the subscriber.
40
+ * If there is no demand, writes block until demand materializes.
41
+ * If the subscription is cancelled, further writes raise {@code IOException}.
42
+ *
39
43
* <p>Note that this class has a near duplicate in
40
44
* {@link org.springframework.http.client.OutputStreamPublisher}.
41
45
*
42
46
* @author Oleh Dokuka
43
47
* @author Arjen Poutsma
44
48
* @since 6.1
49
+ * @param <T> the published byte buffer type
45
50
*/
46
- final class OutputStreamPublisher implements Publisher <DataBuffer > {
51
+ final class OutputStreamPublisher < T > implements Publisher <T > {
47
52
48
53
private static final int DEFAULT_CHUNK_SIZE = 1024 ;
49
54
50
55
51
- private final Consumer < OutputStream > outputStreamConsumer ;
56
+ private final OutputStreamHandler outputStreamHandler ;
52
57
53
- private final DataBufferFactory bufferFactory ;
58
+ private final ByteMapper < T > byteMapper ;
54
59
55
60
private final Executor executor ;
56
61
@@ -59,50 +64,74 @@ final class OutputStreamPublisher implements Publisher<DataBuffer> {
59
64
60
65
/**
61
66
* Create an instance.
62
- * @param outputStreamConsumer invoked when the first buffer is requested
63
- * @param bufferFactory to create data buffers with
67
+ * @param outputStreamHandler invoked when the first buffer is requested
68
+ * @param byteMapper maps written bytes to {@code T}
64
69
* @param executor used to invoke the {@code outputStreamHandler}
65
70
* @param chunkSize the chunk sizes to be produced by the publisher
66
71
*/
67
72
OutputStreamPublisher (
68
- Consumer < OutputStream > outputStreamConsumer , DataBufferFactory bufferFactory ,
73
+ OutputStreamHandler outputStreamHandler , ByteMapper < T > byteMapper ,
69
74
Executor executor , @ Nullable Integer chunkSize ) {
70
75
71
- Assert .notNull (outputStreamConsumer , "OutputStreamConsumer must not be null" );
72
- Assert .notNull (bufferFactory , "BufferFactory must not be null" );
76
+ Assert .notNull (outputStreamHandler , "OutputStreamHandler must not be null" );
77
+ Assert .notNull (byteMapper , "ByteMapper must not be null" );
73
78
Assert .notNull (executor , "Executor must not be null" );
74
79
Assert .isTrue (chunkSize == null || chunkSize > 0 , "ChunkSize must be larger than 0" );
75
80
76
- this .outputStreamConsumer = outputStreamConsumer ;
77
- this .bufferFactory = bufferFactory ;
81
+ this .outputStreamHandler = outputStreamHandler ;
82
+ this .byteMapper = byteMapper ;
78
83
this .executor = executor ;
79
84
this .chunkSize = (chunkSize != null ? chunkSize : DEFAULT_CHUNK_SIZE );
80
85
}
81
86
82
87
83
88
@ Override
84
- public void subscribe (Subscriber <? super DataBuffer > subscriber ) {
89
+ public void subscribe (Subscriber <? super T > subscriber ) {
85
90
// We don't use Assert.notNull(), because a NullPointerException is required
86
91
// for Reactive Streams compliance.
87
92
Objects .requireNonNull (subscriber , "Subscriber must not be null" );
88
93
89
- OutputStreamSubscription subscription = new OutputStreamSubscription (
90
- subscriber , this .outputStreamConsumer , this .bufferFactory , this .chunkSize );
94
+ OutputStreamSubscription < T > subscription = new OutputStreamSubscription <> (
95
+ subscriber , this .outputStreamHandler , this .byteMapper , this .chunkSize );
91
96
92
97
subscriber .onSubscribe (subscription );
93
98
this .executor .execute (subscription ::invokeHandler );
94
99
}
95
100
96
101
97
- private static final class OutputStreamSubscription extends OutputStream implements Subscription {
102
+ /**
103
+ * Contract to provide callback access to the {@link OutputStream}.
104
+ */
105
+ @ FunctionalInterface
106
+ public interface OutputStreamHandler {
107
+
108
+ void handle (OutputStream outputStream ) throws Exception ;
109
+
110
+ }
111
+
112
+
113
+ /**
114
+ * Maps from bytes to byte buffers.
115
+ * @param <T> the type of byte buffer to map to
116
+ */
117
+ public interface ByteMapper <T > {
118
+
119
+ T map (int b );
120
+
121
+ T map (byte [] b , int off , int len );
122
+
123
+ }
124
+
125
+
126
+ private static final class OutputStreamSubscription <T > extends OutputStream implements Subscription {
98
127
99
128
private static final Object READY = new Object ();
100
129
101
- private final Subscriber <? super DataBuffer > actual ;
130
+ private final Subscriber <? super T > actual ;
102
131
103
- private final Consumer < OutputStream > outputStreamHandler ;
132
+ private final OutputStreamHandler outputStreamHandler ;
104
133
105
- private final DataBufferFactory bufferFactory ;
134
+ private final ByteMapper < T > byteMapper ;
106
135
107
136
private final int chunkSize ;
108
137
@@ -116,24 +145,20 @@ private static final class OutputStreamSubscription extends OutputStream impleme
116
145
private long produced ;
117
146
118
147
OutputStreamSubscription (
119
- Subscriber <? super DataBuffer > actual , Consumer < OutputStream > outputStreamConsumer ,
120
- DataBufferFactory bufferFactory , int chunkSize ) {
148
+ Subscriber <? super T > actual , OutputStreamHandler outputStreamHandler ,
149
+ ByteMapper < T > byteMapper , int chunkSize ) {
121
150
122
151
this .actual = actual ;
123
- this .outputStreamHandler = outputStreamConsumer ;
124
- this .bufferFactory = bufferFactory ;
152
+ this .outputStreamHandler = outputStreamHandler ;
153
+ this .byteMapper = byteMapper ;
125
154
this .chunkSize = chunkSize ;
126
155
}
127
156
128
157
@ Override
129
158
public void write (int b ) throws IOException {
130
159
checkDemandAndAwaitIfNeeded ();
131
-
132
- DataBuffer next = this .bufferFactory .allocateBuffer (1 );
133
- next .write ((byte ) b );
134
-
160
+ T next = this .byteMapper .map (b );
135
161
this .actual .onNext (next );
136
-
137
162
this .produced ++;
138
163
}
139
164
@@ -145,12 +170,8 @@ public void write(byte[] b) throws IOException {
145
170
@ Override
146
171
public void write (byte [] b , int off , int len ) throws IOException {
147
172
checkDemandAndAwaitIfNeeded ();
148
-
149
- DataBuffer next = this .bufferFactory .allocateBuffer (len );
150
- next .write (b , off , len );
151
-
173
+ T next = this .byteMapper .map (b , off , len );
152
174
this .actual .onNext (next );
153
-
154
175
this .produced ++;
155
176
}
156
177
@@ -190,7 +211,7 @@ private void invokeHandler() {
190
211
// use BufferedOutputStream, so that written bytes are buffered
191
212
// before publishing as byte buffer
192
213
try (OutputStream outputStream = new BufferedOutputStream (this , this .chunkSize )) {
193
- this .outputStreamHandler .accept (outputStream );
214
+ this .outputStreamHandler .handle (outputStream );
194
215
}
195
216
catch (Exception ex ) {
196
217
long previousState = tryTerminate ();
0 commit comments