24
24
import java .util .function .Function ;
25
25
26
26
import org .reactivestreams .Publisher ;
27
- import org .reactivestreams .Subscription ;
28
27
import reactor .core .publisher .Flux ;
29
28
import reactor .core .publisher .Mono ;
30
29
import reactor .core .publisher .MonoProcessor ;
@@ -115,10 +114,8 @@ public Info(WiretapClientHttpRequest request, WiretapClientHttpResponse response
115
114
116
115
117
116
public ExchangeResult createExchangeResult (Duration timeout , @ Nullable String uriTemplate ) {
118
- return new ExchangeResult (this .request , this .response ,
119
- Mono .defer (() -> this .request .getRecorder ().getContent ()),
120
- Mono .defer (() -> this .response .getRecorder ().getContent ()),
121
- timeout , uriTemplate );
117
+ return new ExchangeResult (this .request , this .response , this .request .getRecorder ().getContent (),
118
+ this .response .getRecorder ().getContent (), timeout , uriTemplate );
122
119
}
123
120
}
124
121
@@ -137,11 +134,11 @@ final static class WiretapRecorder {
137
134
@ Nullable
138
135
private final Flux <? extends Publisher <? extends DataBuffer >> publisherNested ;
139
136
140
- private final DataBuffer buffer ;
137
+ private final DataBuffer buffer = bufferFactory . allocateBuffer () ;
141
138
142
- private final MonoProcessor <byte []> content ;
139
+ private final MonoProcessor <byte []> content = MonoProcessor . create () ;
143
140
144
- private volatile boolean subscriberRegistered ;
141
+ private boolean hasContentConsumer ;
145
142
146
143
147
144
public WiretapRecorder (@ Nullable Publisher <? extends DataBuffer > publisher ,
@@ -153,22 +150,23 @@ public WiretapRecorder(@Nullable Publisher<? extends DataBuffer> publisher,
153
150
154
151
this .publisher = publisher != null ?
155
152
Flux .from (publisher )
156
- .doOnSubscribe (this :: handleOnSubscribe )
157
- .doOnNext (this :: handleOnNext )
153
+ .doOnSubscribe (s -> this . hasContentConsumer = true )
154
+ .doOnNext (this . buffer :: write )
158
155
.doOnError (this ::handleOnError )
159
156
.doOnCancel (this ::handleOnComplete )
160
157
.doOnComplete (this ::handleOnComplete ) : null ;
161
158
162
159
this .publisherNested = publisherNested != null ?
163
160
Flux .from (publisherNested )
164
- .doOnSubscribe (this :: handleOnSubscribe )
165
- .map (p -> Flux .from (p ).doOnNext (this :: handleOnNext ).doOnError (this ::handleOnError ))
161
+ .doOnSubscribe (s -> this . hasContentConsumer = true )
162
+ .map (p -> Flux .from (p ).doOnNext (this . buffer :: write ).doOnError (this ::handleOnError ))
166
163
.doOnError (this ::handleOnError )
167
164
.doOnCancel (this ::handleOnComplete )
168
165
.doOnComplete (this ::handleOnComplete ) : null ;
169
166
170
- this .buffer = bufferFactory .allocateBuffer ();
171
- this .content = MonoProcessor .create ();
167
+ if (publisher == null && publisherNested == null ) {
168
+ this .content .onComplete ();
169
+ }
172
170
}
173
171
174
172
@@ -183,39 +181,26 @@ public Publisher<? extends Publisher<? extends DataBuffer>> getNestedPublisherTo
183
181
}
184
182
185
183
public Mono <byte []> getContent () {
186
- // No publisher (e.g. request#setComplete)
187
- if (this .publisher == null && this .publisherNested == null ) {
188
- return Mono .empty ();
189
- }
190
- if (this .content .isTerminated ()) {
191
- return this .content ;
192
- }
193
- if (this .subscriberRegistered ) {
194
- return Mono .error (new IllegalStateException (
195
- "Subscriber registered but content is not yet fully consumed." ));
196
- }
197
- else {
198
- // No subscriber, e.g.:
199
- // - mock server request body never consumed (error before read)
200
- // - FluxExchangeResult#getResponseBodyContent called
201
- (this .publisher != null ? this .publisher : this .publisherNested )
202
- .onErrorMap (ex -> new IllegalStateException (
203
- "Content was not been consumed and " +
204
- "an error was raised on attempt to produce it:" , ex ))
205
- .subscribe ();
184
+ return Mono .defer (() -> {
185
+ if (this .content .isTerminated ()) {
186
+ return this .content ;
187
+ }
188
+ if (!this .hasContentConsumer ) {
189
+ // Couple of possible cases:
190
+ // 1. Mock server never consumed request body (e.g. error before read)
191
+ // 2. FluxExchangeResult: getResponseBodyContent called before getResponseBody
192
+ //noinspection ConstantConditions
193
+ (this .publisher != null ? this .publisher : this .publisherNested )
194
+ .onErrorMap (ex -> new IllegalStateException (
195
+ "Content was not been consumed and " +
196
+ "an error was raised on attempt to produce it:" , ex ))
197
+ .subscribe ();
198
+ }
206
199
return this .content ;
207
- }
200
+ });
208
201
}
209
202
210
203
211
- private void handleOnSubscribe (Subscription subscription ) {
212
- this .subscriberRegistered = true ;
213
- }
214
-
215
- private void handleOnNext (DataBuffer nextBuffer ) {
216
- this .buffer .write (nextBuffer );
217
- }
218
-
219
204
private void handleOnError (Throwable ex ) {
220
205
if (!this .content .isTerminated ()) {
221
206
this .content .onError (ex );
0 commit comments