|
14 | 14 | import com.azure.cosmos.CosmosDiagnostics; |
15 | 15 | import com.azure.cosmos.CosmosDiagnosticsContext; |
16 | 16 | import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; |
| 17 | +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; |
17 | 18 | import com.azure.cosmos.CosmosException; |
18 | 19 | import com.azure.cosmos.CosmosItemSerializer; |
19 | 20 | import com.azure.cosmos.CosmosOperationPolicy; |
@@ -192,6 +193,7 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization |
192 | 193 |
|
193 | 194 | private static final String DUMMY_SQL_QUERY = "this is dummy and only used in creating " + |
194 | 195 | "ParallelDocumentQueryExecutioncontext, but not used"; |
| 196 | + |
195 | 197 | private final static ObjectMapper mapper = Utils.getSimpleObjectMapper(); |
196 | 198 | private final CosmosItemSerializer defaultCustomSerializer; |
197 | 199 | private final static Logger logger = LoggerFactory.getLogger(RxDocumentClientImpl.class); |
@@ -266,7 +268,8 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization |
266 | 268 | private final boolean sessionCapturingDisabled; |
267 | 269 | private final boolean isRegionScopedSessionCapturingEnabledOnClientOrSystemConfig; |
268 | 270 | private List<CosmosOperationPolicy> operationPolicies; |
269 | | - private AtomicReference<CosmosAsyncClient> cachedCosmosAsyncClientSnapshot; |
| 271 | + private final AtomicReference<CosmosAsyncClient> cachedCosmosAsyncClientSnapshot; |
| 272 | + private final CosmosEndToEndOperationLatencyPolicyConfig ppafEnforcedE2ELatencyPolicyConfigForReads; |
270 | 273 |
|
271 | 274 | public RxDocumentClientImpl(URI serviceEndpoint, |
272 | 275 | String masterKeyOrResourceToken, |
@@ -600,6 +603,10 @@ private RxDocumentClientImpl(URI serviceEndpoint, |
600 | 603 | this.queryPlanCache = new ConcurrentHashMap<>(); |
601 | 604 | this.apiType = apiType; |
602 | 605 | this.clientTelemetryConfig = clientTelemetryConfig; |
| 606 | + this.ppafEnforcedE2ELatencyPolicyConfigForReads = evaluatePpafEnforcedE2eLatencyPolicyCfgForReads( |
| 607 | + this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover, |
| 608 | + this.connectionPolicy |
| 609 | + ); |
603 | 610 | } catch (RuntimeException e) { |
604 | 611 | logger.error("unexpected failure in initializing client.", e); |
605 | 612 | close(); |
@@ -2466,7 +2473,7 @@ private Mono<ResourceResponse<Document>> createDocumentCore( |
2466 | 2473 | crossRegionAvailabilityContextForRxDocumentServiceRequest), |
2467 | 2474 | requestRetryPolicy), |
2468 | 2475 | scopedDiagnosticsFactory |
2469 | | - ), requestReference); |
| 2476 | + ), requestReference, endToEndPolicyConfig); |
2470 | 2477 | } |
2471 | 2478 |
|
2472 | 2479 | private Mono<ResourceResponse<Document>> createDocumentInternal( |
@@ -2576,7 +2583,10 @@ private static <T> Mono<T> getPointOperationResponseMonoWithE2ETimeout( |
2576 | 2583 |
|
2577 | 2584 | private <T> Mono<T> handleCircuitBreakingFeedbackForPointOperation( |
2578 | 2585 | Mono<T> response, |
2579 | | - AtomicReference<RxDocumentServiceRequest> requestReference) { |
| 2586 | + AtomicReference<RxDocumentServiceRequest> requestReference, |
| 2587 | + CosmosEndToEndOperationLatencyPolicyConfig effectiveEndToEndPolicyConfig) { |
| 2588 | + |
| 2589 | + applyEndToEndLatencyPolicyCfgToRequestContext(requestReference.get(), effectiveEndToEndPolicyConfig); |
2580 | 2590 |
|
2581 | 2591 | return response |
2582 | 2592 | .doOnSuccess(ignore -> { |
@@ -2769,6 +2779,23 @@ private static CosmosException getNegativeTimeoutException(CosmosDiagnostics cos |
2769 | 2779 | return exception; |
2770 | 2780 | } |
2771 | 2781 |
|
| 2782 | + private static void applyEndToEndLatencyPolicyCfgToRequestContext(RxDocumentServiceRequest rxDocumentServiceRequest, CosmosEndToEndOperationLatencyPolicyConfig effectiveEndToEndPolicyConfig) { |
| 2783 | + |
| 2784 | + if (rxDocumentServiceRequest == null) { |
| 2785 | + return; |
| 2786 | + } |
| 2787 | + |
| 2788 | + if (rxDocumentServiceRequest.requestContext == null) { |
| 2789 | + return; |
| 2790 | + } |
| 2791 | + |
| 2792 | + if (effectiveEndToEndPolicyConfig == null) { |
| 2793 | + return; |
| 2794 | + } |
| 2795 | + |
| 2796 | + rxDocumentServiceRequest.requestContext.setEndToEndOperationLatencyPolicyConfig(effectiveEndToEndPolicyConfig); |
| 2797 | + } |
| 2798 | + |
2772 | 2799 | @Override |
2773 | 2800 | public Mono<ResourceResponse<Document>> upsertDocument(String collectionLink, Object document, |
2774 | 2801 | RequestOptions options, boolean disableAutomaticIdGeneration) { |
@@ -2831,7 +2858,7 @@ private Mono<ResourceResponse<Document>> upsertDocumentCore( |
2831 | 2858 | requestReference, |
2832 | 2859 | crossRegionAvailabilityContextForRequest), |
2833 | 2860 | finalRetryPolicyInstance), |
2834 | | - scopedDiagnosticsFactory), requestReference); |
| 2861 | + scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig); |
2835 | 2862 | } |
2836 | 2863 |
|
2837 | 2864 | private Mono<ResourceResponse<Document>> upsertDocumentInternal( |
@@ -2974,7 +3001,7 @@ private Mono<ResourceResponse<Document>> replaceDocumentCore( |
2974 | 3001 | requestReference, |
2975 | 3002 | crossRegionAvailabilityContextForRequest), |
2976 | 3003 | requestRetryPolicy), |
2977 | | - scopedDiagnosticsFactory), requestReference); |
| 3004 | + scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig); |
2978 | 3005 | } |
2979 | 3006 |
|
2980 | 3007 | private Mono<ResourceResponse<Document>> replaceDocumentInternal( |
@@ -3056,7 +3083,7 @@ private Mono<ResourceResponse<Document>> replaceDocumentCore( |
3056 | 3083 | clientContextOverride, |
3057 | 3084 | requestReference, |
3058 | 3085 | crossRegionAvailabilityContextForRequest), |
3059 | | - requestRetryPolicy), requestReference); |
| 3086 | + requestRetryPolicy), requestReference, cosmosEndToEndOperationLatencyPolicyConfig); |
3060 | 3087 | } |
3061 | 3088 |
|
3062 | 3089 | private Mono<ResourceResponse<Document>> replaceDocumentInternal( |
@@ -3232,7 +3259,17 @@ private CosmosEndToEndOperationLatencyPolicyConfig getEffectiveEndToEndOperation |
3232 | 3259 | return null; |
3233 | 3260 | } |
3234 | 3261 |
|
3235 | | - return this.cosmosEndToEndOperationLatencyPolicyConfig; |
| 3262 | + if (this.cosmosEndToEndOperationLatencyPolicyConfig != null) { |
| 3263 | + return this.cosmosEndToEndOperationLatencyPolicyConfig; |
| 3264 | + } |
| 3265 | + |
| 3266 | + // If request options level and client-level e2e latency policy config, |
| 3267 | + // rely on PPAF enforced defaults |
| 3268 | + if (operationType.isReadOnlyOperation()) { |
| 3269 | + return this.ppafEnforcedE2ELatencyPolicyConfigForReads; |
| 3270 | + } |
| 3271 | + |
| 3272 | + return null; |
3236 | 3273 | } |
3237 | 3274 |
|
3238 | 3275 | @Override |
@@ -3295,7 +3332,7 @@ private Mono<ResourceResponse<Document>> patchDocumentCore( |
3295 | 3332 | requestReference, |
3296 | 3333 | crossRegionAvailabilityContextForRequest), |
3297 | 3334 | documentClientRetryPolicy), |
3298 | | - scopedDiagnosticsFactory), requestReference); |
| 3335 | + scopedDiagnosticsFactory), requestReference, cosmosEndToEndOperationLatencyPolicyConfig); |
3299 | 3336 | } |
3300 | 3337 |
|
3301 | 3338 | private Mono<ResourceResponse<Document>> patchDocumentInternal( |
@@ -3504,7 +3541,7 @@ private Mono<ResourceResponse<Document>> deleteDocumentCore( |
3504 | 3541 | requestReference, |
3505 | 3542 | crossRegionAvailabilityContextForRequest), |
3506 | 3543 | requestRetryPolicy), |
3507 | | - scopedDiagnosticsFactory), requestReference); |
| 3544 | + scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig); |
3508 | 3545 | } |
3509 | 3546 |
|
3510 | 3547 | private Mono<ResourceResponse<Document>> deleteDocumentInternal( |
@@ -3692,7 +3729,7 @@ private Mono<ResourceResponse<Document>> readDocumentCore( |
3692 | 3729 | crossRegionAvailabilityContextForRequest), |
3693 | 3730 | retryPolicyInstance), |
3694 | 3731 | scopedDiagnosticsFactory |
3695 | | - ), requestReference); |
| 3732 | + ), requestReference, endToEndPolicyConfig); |
3696 | 3733 | } |
3697 | 3734 |
|
3698 | 3735 | private Mono<ResourceResponse<Document>> readDocumentInternal( |
@@ -4883,7 +4920,7 @@ public Mono<CosmosBatchResponse> executeBatchRequest(String collectionLink, |
4883 | 4920 | requestReference), documentClientRetryPolicy), |
4884 | 4921 | scopedDiagnosticsFactory |
4885 | 4922 | ), |
4886 | | - requestReference); |
| 4923 | + requestReference, endToEndPolicyConfig); |
4887 | 4924 | } |
4888 | 4925 |
|
4889 | 4926 | private Mono<StoredProcedureResponse> executeStoredProcedureInternal(String storedProcedureLink, |
@@ -7145,6 +7182,49 @@ private static boolean isNonTransientResultForHedging(int statusCode, int subSta |
7145 | 7182 | return false; |
7146 | 7183 | } |
7147 | 7184 |
|
| 7185 | + private static CosmosEndToEndOperationLatencyPolicyConfig evaluatePpafEnforcedE2eLatencyPolicyCfgForReads( |
| 7186 | + GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover, |
| 7187 | + ConnectionPolicy connectionPolicy) { |
| 7188 | + |
| 7189 | + if (!globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled()) { |
| 7190 | + return null; |
| 7191 | + } |
| 7192 | + |
| 7193 | + if (Configs.isReadAvailabilityStrategyEnabledWithPpaf()) { |
| 7194 | + |
| 7195 | + if (connectionPolicy.getConnectionMode() == ConnectionMode.DIRECT) { |
| 7196 | + Duration networkRequestTimeout = connectionPolicy.getTcpNetworkRequestTimeout(); |
| 7197 | + |
| 7198 | + checkNotNull(networkRequestTimeout, "Argument 'networkRequestTimeout' cannot be null!"); |
| 7199 | + |
| 7200 | + Duration overallE2eLatencyTimeout = networkRequestTimeout.plus(Utils.ONE_SECOND); |
| 7201 | + Duration threshold = Utils.min(networkRequestTimeout.dividedBy(2), Utils.ONE_SECOND); |
| 7202 | + Duration thresholdStep = Utils.min(threshold.dividedBy(2), Utils.HALF_SECOND); |
| 7203 | + |
| 7204 | + return new CosmosEndToEndOperationLatencyPolicyConfigBuilder(overallE2eLatencyTimeout) |
| 7205 | + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(threshold, thresholdStep)) |
| 7206 | + .build(); |
| 7207 | + } else { |
| 7208 | + |
| 7209 | + Duration httpNetworkRequestTimeout = connectionPolicy.getHttpNetworkRequestTimeout(); |
| 7210 | + |
| 7211 | + checkNotNull(httpNetworkRequestTimeout, "Argument 'httpNetworkRequestTimeout' cannot be null!"); |
| 7212 | + |
| 7213 | + // 6s was chosen to accommodate for control-plane hot path read timeout retries (like QueryPlan / PartitionKeyRange) |
| 7214 | + Duration overallE2eLatencyTimeout = Utils.min(Utils.SIX_SECONDS, httpNetworkRequestTimeout); |
| 7215 | + |
| 7216 | + Duration threshold = Utils.min(overallE2eLatencyTimeout.dividedBy(2), Utils.ONE_SECOND); |
| 7217 | + Duration thresholdStep = Utils.min(threshold.dividedBy(2), Utils.HALF_SECOND); |
| 7218 | + |
| 7219 | + return new CosmosEndToEndOperationLatencyPolicyConfigBuilder(overallE2eLatencyTimeout) |
| 7220 | + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(threshold, thresholdStep)) |
| 7221 | + .build(); |
| 7222 | + } |
| 7223 | + } |
| 7224 | + |
| 7225 | + return null; |
| 7226 | + } |
| 7227 | + |
7148 | 7228 | private DiagnosticsClientContext getEffectiveClientContext(DiagnosticsClientContext clientContextOverride) { |
7149 | 7229 | if (clientContextOverride != null) { |
7150 | 7230 | return clientContextOverride; |
@@ -7261,6 +7341,8 @@ private <T> Mono<T> executeFeedOperationWithAvailabilityStrategy( |
7261 | 7341 | this.getEffectiveEndToEndOperationLatencyPolicyConfig( |
7262 | 7342 | req.requestContext.getEndToEndOperationLatencyPolicyConfig(), resourceType, operationType); |
7263 | 7343 |
|
| 7344 | + req.requestContext.setEndToEndOperationLatencyPolicyConfig(endToEndPolicyConfig); |
| 7345 | + |
7264 | 7346 | List<String> initialExcludedRegions = req.requestContext.getExcludeRegions(); |
7265 | 7347 | List<String> orderedApplicableRegionsForSpeculation = this.getApplicableRegionsForSpeculation( |
7266 | 7348 | endToEndPolicyConfig, |
|
0 commit comments