1212import com .azure .cosmos .implementation .FeedOperationState ;
1313import com .azure .cosmos .implementation .ImplementationBridgeHelpers ;
1414import com .azure .cosmos .models .FeedResponse ;
15+ import org .slf4j .Logger ;
16+ import org .slf4j .LoggerFactory ;
1517import reactor .core .publisher .Flux ;
1618
1719import java .time .Duration ;
1820import java .time .Instant ;
21+ import java .util .concurrent .atomic .AtomicInteger ;
1922import java .util .concurrent .atomic .AtomicLong ;
2023import java .util .concurrent .atomic .AtomicReference ;
2124import java .util .function .Consumer ;
3639 * @see FeedResponse
3740 */
3841final class CosmosPagedFluxDefaultImpl <T > extends CosmosPagedFlux <T > {
42+ private static final Logger LOGGER = LoggerFactory .getLogger (CosmosPagedFluxStaticListImpl .class );
3943 private static final ImplementationBridgeHelpers .CosmosDiagnosticsContextHelper .CosmosDiagnosticsContextAccessor ctxAccessor =
4044 ImplementationBridgeHelpers .CosmosDiagnosticsContextHelper .getCosmosDiagnosticsContextAccessor ();
4145
4246 private final Function <CosmosPagedFluxOptions , Flux <FeedResponse <T >>> optionsFluxFunction ;
43- private final Consumer <FeedResponse <T >> feedResponseConsumer ;
44- private final int defaultPageSize ;
47+ private final AtomicReference < Consumer <FeedResponse <T > >> feedResponseConsumer ;
48+ private final AtomicInteger defaultPageSize ;
4549
4650 CosmosPagedFluxDefaultImpl (Function <CosmosPagedFluxOptions , Flux <FeedResponse <T >>> optionsFluxFunction ) {
4751 this (optionsFluxFunction , null , -1 );
@@ -57,8 +61,8 @@ final class CosmosPagedFluxDefaultImpl<T> extends CosmosPagedFlux<T> {
5761 int defaultPageSize ) {
5862 super ();
5963 this .optionsFluxFunction = optionsFluxFunction ;
60- this .feedResponseConsumer = feedResponseConsumer ;
61- this .defaultPageSize = defaultPageSize ;
64+ this .feedResponseConsumer = new AtomicReference <>( feedResponseConsumer ) ;
65+ this .defaultPageSize = new AtomicInteger ( defaultPageSize ) ;
6266 }
6367
6468 /**
@@ -68,18 +72,39 @@ final class CosmosPagedFluxDefaultImpl<T> extends CosmosPagedFlux<T> {
6872 * @return CosmosPagedFlux instance with attached handler
6973 */
7074 public CosmosPagedFlux <T > handle (Consumer <FeedResponse <T >> newFeedResponseConsumer ) {
71- if (this .feedResponseConsumer != null ) {
72- return new CosmosPagedFluxDefaultImpl <>(
73- this .optionsFluxFunction ,
74- this .feedResponseConsumer .andThen (newFeedResponseConsumer ));
75- } else {
76- return new CosmosPagedFluxDefaultImpl <>(this .optionsFluxFunction , newFeedResponseConsumer );
75+ int i = 0 ;
76+ while (true ) {
77+ Consumer <FeedResponse <T >> feedResponseConsumerSnapshot = this .feedResponseConsumer .get ();
78+ i ++;
79+ if (feedResponseConsumerSnapshot != null ) {
80+
81+ if (this .feedResponseConsumer .compareAndSet (
82+ feedResponseConsumerSnapshot , feedResponseConsumerSnapshot .andThen (newFeedResponseConsumer ))) {
83+
84+ break ;
85+ }
86+ } else {
87+ if (this .feedResponseConsumer .compareAndSet (
88+ null ,
89+ newFeedResponseConsumer )) {
90+
91+ break ;
92+ }
93+ }
94+
95+ if (i > 10 ) {
96+ LOGGER .warn ("Highly concurrent calls to CosmosPagedFlux.handle "
97+ + "are not expected and can result in perf regressions. Avoid this by reducing concurrency." );
98+ }
7799 }
100+
101+ return this ;
78102 }
79103
80104 @ Override
81105 CosmosPagedFlux <T > withDefaultPageSize (int pageSize ) {
82- return new CosmosPagedFluxDefaultImpl <>(this .optionsFluxFunction , this .feedResponseConsumer , pageSize );
106+ this .defaultPageSize .set (pageSize );
107+ return this ;
83108 }
84109
85110 @ Override
@@ -113,8 +138,9 @@ public Flux<FeedResponse<T>> byPage(String continuationToken, int preferredPageS
113138 private CosmosPagedFluxOptions createCosmosPagedFluxOptions () {
114139 CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions ();
115140
116- if (this .defaultPageSize > 0 ) {
117- cosmosPagedFluxOptions .setMaxItemCount (this .defaultPageSize );
141+ int defaultPageSizeSnapshot = this .defaultPageSize .get ();
142+ if (defaultPageSizeSnapshot > 0 ) {
143+ cosmosPagedFluxOptions .setMaxItemCount (defaultPageSizeSnapshot );
118144 }
119145
120146 return cosmosPagedFluxOptions ;
@@ -137,7 +163,7 @@ private Flux<FeedResponse<T>> wrapWithTracingIfEnabled(CosmosPagedFluxOptions p
137163 case ON_COMPLETE :
138164 case ON_NEXT :
139165 DiagnosticsProvider .recordFeedResponse (
140- feedResponseConsumer ,
166+ feedResponseConsumer . get () ,
141167 pagedFluxOptions .getFeedOperationState (),
142168 () ->pagedFluxOptions .getSamplingRateSnapshot (),
143169 tracerProvider ,
@@ -169,7 +195,7 @@ private Flux<FeedResponse<T>> wrapWithTracingIfEnabled(CosmosPagedFluxOptions p
169195 case ON_COMPLETE :
170196 if (response != null ) {
171197 DiagnosticsProvider .recordFeedResponse (
172- feedResponseConsumer ,
198+ feedResponseConsumer . get () ,
173199 pagedFluxOptions .getFeedOperationState (),
174200 () ->pagedFluxOptions .getSamplingRateSnapshot (),
175201 tracerProvider ,
@@ -193,7 +219,7 @@ private Flux<FeedResponse<T>> wrapWithTracingIfEnabled(CosmosPagedFluxOptions p
193219 break ;
194220 case ON_NEXT :
195221 DiagnosticsProvider .recordFeedResponse (
196- feedResponseConsumer ,
222+ feedResponseConsumer . get () ,
197223 pagedFluxOptions .getFeedOperationState (),
198224 () ->pagedFluxOptions .getSamplingRateSnapshot (),
199225 tracerProvider ,
0 commit comments