1
1
/*
2
- * Copyright 2002-2017 the original author or authors.
2
+ * Copyright 2002-2018 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
34
34
import org .springframework .util .ClassUtils ;
35
35
import org .springframework .util .ReflectionUtils ;
36
36
37
- import static org .springframework .core .ReactiveTypeDescriptor .multiValue ;
38
- import static org .springframework .core .ReactiveTypeDescriptor .noValue ;
39
- import static org .springframework .core .ReactiveTypeDescriptor .singleOptionalValue ;
40
- import static org .springframework .core .ReactiveTypeDescriptor .singleRequiredValue ;
41
-
42
37
/**
43
- * A registry of adapters to adapt a Reactive Streams {@link Publisher} to/from
38
+ * A registry of adapters to adapt Reactive Streams {@link Publisher} to/from
44
39
* various async/reactive types such as {@code CompletableFuture}, RxJava
45
40
* {@code Observable}, and others.
46
41
*
@@ -64,6 +59,7 @@ public class ReactiveAdapterRegistry {
64
59
65
60
/**
66
61
* Create a registry and auto-register default adapters.
62
+ * @see #getSharedInstance()
67
63
*/
68
64
public ReactiveAdapterRegistry () {
69
65
@@ -168,49 +164,54 @@ public ReactiveAdapter getAdapter(@Nullable Class<?> reactiveType, @Nullable Obj
168
164
/**
169
165
* Return a shared default {@code ReactiveAdapterRegistry} instance, lazily
170
166
* building it once needed.
167
+ *
171
168
* <p><b>NOTE:</b> We highly recommend passing a long-lived, pre-configured
172
169
* {@code ReactiveAdapterRegistry} instance for customization purposes.
173
170
* This accessor is only meant as a fallback for code paths that want to
174
171
* fall back on a default instance if one isn't provided.
172
+ *
175
173
* @return the shared {@code ReactiveAdapterRegistry} instance (never {@code null})
176
174
* @since 5.0.2
177
175
*/
178
176
public static ReactiveAdapterRegistry getSharedInstance () {
179
- ReactiveAdapterRegistry ar = sharedInstance ;
180
- if (ar == null ) {
177
+ ReactiveAdapterRegistry registry = sharedInstance ;
178
+ if (registry == null ) {
181
179
synchronized (ReactiveAdapterRegistry .class ) {
182
- ar = sharedInstance ;
183
- if (ar == null ) {
184
- ar = new ReactiveAdapterRegistry ();
185
- sharedInstance = ar ;
180
+ registry = sharedInstance ;
181
+ if (registry == null ) {
182
+ registry = new ReactiveAdapterRegistry ();
183
+ sharedInstance = registry ;
186
184
}
187
185
}
188
186
}
189
- return ar ;
187
+ return registry ;
190
188
}
191
189
192
190
193
191
private static class ReactorRegistrar {
194
192
195
193
void registerAdapters (ReactiveAdapterRegistry registry ) {
196
- // Flux and Mono ahead of Publisher...
194
+
195
+ // Register Flux and Mono before Publisher...
197
196
198
197
registry .registerReactiveType (
199
- singleOptionalValue (Mono .class , Mono ::empty ),
198
+ ReactiveTypeDescriptor . singleOptionalValue (Mono .class , Mono ::empty ),
200
199
source -> (Mono <?>) source ,
201
200
Mono ::from
202
201
);
203
202
204
- registry .registerReactiveType (multiValue (Flux .class , Flux ::empty ),
203
+ registry .registerReactiveType (
204
+ ReactiveTypeDescriptor .multiValue (Flux .class , Flux ::empty ),
205
205
source -> (Flux <?>) source ,
206
206
Flux ::from );
207
207
208
- registry .registerReactiveType (multiValue (Publisher .class , Flux ::empty ),
208
+ registry .registerReactiveType (
209
+ ReactiveTypeDescriptor .multiValue (Publisher .class , Flux ::empty ),
209
210
source -> (Publisher <?>) source ,
210
211
source -> source );
211
212
212
213
registry .registerReactiveType (
213
- singleOptionalValue (CompletableFuture .class , () -> {
214
+ ReactiveTypeDescriptor . singleOptionalValue (CompletableFuture .class , () -> {
214
215
CompletableFuture <?> empty = new CompletableFuture <>();
215
216
empty .complete (null );
216
217
return empty ;
@@ -226,17 +227,17 @@ private static class RxJava1Registrar {
226
227
227
228
void registerAdapters (ReactiveAdapterRegistry registry ) {
228
229
registry .registerReactiveType (
229
- multiValue (rx .Observable .class , rx .Observable ::empty ),
230
+ ReactiveTypeDescriptor . multiValue (rx .Observable .class , rx .Observable ::empty ),
230
231
source -> RxReactiveStreams .toPublisher ((rx .Observable <?>) source ),
231
232
RxReactiveStreams ::toObservable
232
233
);
233
234
registry .registerReactiveType (
234
- singleRequiredValue (rx .Single .class ),
235
+ ReactiveTypeDescriptor . singleRequiredValue (rx .Single .class ),
235
236
source -> RxReactiveStreams .toPublisher ((rx .Single <?>) source ),
236
237
RxReactiveStreams ::toSingle
237
238
);
238
239
registry .registerReactiveType (
239
- noValue (rx .Completable .class , rx .Completable ::complete ),
240
+ ReactiveTypeDescriptor . noValue (rx .Completable .class , rx .Completable ::complete ),
240
241
source -> RxReactiveStreams .toPublisher ((rx .Completable ) source ),
241
242
RxReactiveStreams ::toCompletable
242
243
);
@@ -248,27 +249,27 @@ private static class RxJava2Registrar {
248
249
249
250
void registerAdapters (ReactiveAdapterRegistry registry ) {
250
251
registry .registerReactiveType (
251
- multiValue (io .reactivex .Flowable .class , io .reactivex .Flowable ::empty ),
252
+ ReactiveTypeDescriptor . multiValue (io .reactivex .Flowable .class , io .reactivex .Flowable ::empty ),
252
253
source -> (io .reactivex .Flowable <?>) source ,
253
254
Flowable ::fromPublisher
254
255
);
255
256
registry .registerReactiveType (
256
- multiValue (io .reactivex .Observable .class , io .reactivex .Observable ::empty ),
257
+ ReactiveTypeDescriptor . multiValue (io .reactivex .Observable .class , io .reactivex .Observable ::empty ),
257
258
source -> ((io .reactivex .Observable <?>) source ).toFlowable (BackpressureStrategy .BUFFER ),
258
259
source -> io .reactivex .Flowable .fromPublisher (source ).toObservable ()
259
260
);
260
261
registry .registerReactiveType (
261
- singleRequiredValue (io .reactivex .Single .class ),
262
+ ReactiveTypeDescriptor . singleRequiredValue (io .reactivex .Single .class ),
262
263
source -> ((io .reactivex .Single <?>) source ).toFlowable (),
263
264
source -> io .reactivex .Flowable .fromPublisher (source ).toObservable ().singleElement ().toSingle ()
264
265
);
265
266
registry .registerReactiveType (
266
- singleOptionalValue (io .reactivex .Maybe .class , io .reactivex .Maybe ::empty ),
267
+ ReactiveTypeDescriptor . singleOptionalValue (io .reactivex .Maybe .class , io .reactivex .Maybe ::empty ),
267
268
source -> ((io .reactivex .Maybe <?>) source ).toFlowable (),
268
269
source -> io .reactivex .Flowable .fromPublisher (source ).toObservable ().singleElement ()
269
270
);
270
271
registry .registerReactiveType (
271
- noValue (io .reactivex .Completable .class , io .reactivex .Completable ::complete ),
272
+ ReactiveTypeDescriptor . noValue (io .reactivex .Completable .class , io .reactivex .Completable ::complete ),
272
273
source -> ((io .reactivex .Completable ) source ).toFlowable (),
273
274
source -> io .reactivex .Flowable .fromPublisher (source ).toObservable ().ignoreElements ()
274
275
);
@@ -278,15 +279,15 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
278
279
279
280
private static class ReactorJdkFlowAdapterRegistrar {
280
281
281
- // TODO: remove reflection when build requires JDK 9+
282
282
void registerAdapter (ReactiveAdapterRegistry registry ) throws Exception {
283
+ // TODO: remove reflection when build requires JDK 9+
283
284
Class <?> type = ClassUtils .forName ("java.util.concurrent.Flow.Publisher" , getClass ().getClassLoader ());
284
285
Method toFluxMethod = getMethod ("flowPublisherToFlux" , type );
285
286
Method toFlowMethod = getMethod ("publisherToFlowPublisher" , Publisher .class );
286
287
Object emptyFlow = ReflectionUtils .invokeMethod (toFlowMethod , null , Flux .empty ());
287
288
288
289
registry .registerReactiveType (
289
- multiValue (type , () -> emptyFlow ),
290
+ ReactiveTypeDescriptor . multiValue (type , () -> emptyFlow ),
290
291
source -> (Publisher <?>) ReflectionUtils .invokeMethod (toFluxMethod , null , source ),
291
292
publisher -> ReflectionUtils .invokeMethod (toFlowMethod , null , publisher )
292
293
);
@@ -299,9 +300,10 @@ private static Method getMethod(String name, Class<?> argumentType) throws NoSuc
299
300
300
301
301
302
/**
302
- * Extension of ReactiveAdapter that wraps adapted (raw) Publisher's as
303
- * {@link Flux} or {@link Mono} depending on the underlying reactive type's
304
- * stream semantics.
303
+ * ReactiveAdapter variant that wraps adapted Publishers as {@link Flux} or
304
+ * {@link Mono} depending on {@link ReactiveTypeDescriptor#isMultiValue()}.
305
+ * This is important in places where only the stream and stream element type
306
+ * information is available like encoders and decoders.
305
307
*/
306
308
private static class ReactorAdapter extends ReactiveAdapter {
307
309
0 commit comments