Skip to content

Commit 042dfda

Browse files
authored
Session consistency improvement with bloom filter approach (Azure#38003)
* Adding tests. * Adding tests. * Adding PK-scoped session token map. * Refactoring. * Addind client-level options for session consistency. * Adding client-level options for session consistency. * Adding client-level options for session consistency. * Added read my writes test. * Added PartitionKeyMetadata. * Added SessionTokenMetadata. * Fixing bugs. * Added a registry for session tokens. * Added a registry for session tokens. * Refactorings. * Adding LRU-based eviction. * Refactorings. * Shade Guava BloomFilter and its dependencies. * Adding bloom filter based PK tracking. * Adding bloom filter based PK tracking. * Fixing custom type for bloom filter key. * Refactoring. * Refactoring. * Refactoring. * Refactoring. * Fixing SessionContainerTest.java. * Reverting changes. * Refactoring and bug fixes. * Refactoring and bug fixes. * Refactoring and bug fixes. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Code clean up. * Code clean up. * Code clean up. * Code clean up. * Code clean up. * Code clean up. * Code clean up. * Fixing bugs. * Fixing bugs. * Fixing bugs. * Fixing bugs. * Refactoring. * Refactoring. * Fixing ConsistencyTests*. * Refactoring tests. * Store regionId to regionName mappings. * Store regionId to regionName mappings. * Store regionId to regionName mappings. * Modified tests. * Fixing CI pipeline. * Refactoring and tests. * Session scoping changes. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Refactoring. * Refactoring. * Revert "Refactoring." This reverts commit 7f236fd. * Refactoring. * Refactoring. * Adding tests. * Wiring PartitionKeyDefinition into query-based request targeted to single logical partition. * PR clean up. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Run live test and CI with RegionScopedSessionContainer enabled. * Run live test and CI with RegionScopedSessionContainer enabled. * Fixing tests. * Revert RegionScopedSessionContainer injection. * Adding comments. * Adding comments. * Force use of region-scoped session container for benchmarks. * Change expected insertion type to `long` from `int`. * Attempt to use cached effective partition key. * Fix bottleneck. * Attempt at fixing tests. * Reacting to review comments. * Adding tests around bulk, readMany, batch and change feed (pull-model). * Adding tests around patch. * Adding tests. * Adding tests. * Logging changes. * Fixing faulty merge. * Fixing tests. * Added region-scoped session container capability in benchmarks. * Force RegionScopedSessionContainer usage for live tests. * Revert RegionScopedSessionContainer usage for live tests. * Fixing tests. * Improving diagnostics to captured RegionScopedSessionContainer related metadata. * Wire up effective partition key string resolution from upstream classes. * Wire up effective partition key string resolution from upstream classes. * Code clean up. * Improve javadoc. * Updated CHANGELOG.md. * Fixing merge. * Updated CHANGELOG.md. * Modify logger level. * Reacting to review comments. * Reacting to review comments. * Added diagnostic tracking of session token record and resolution flows. * Remove unneeded static methods. * Added additional null checks. * Fixing tests. * Adding code comments. * Adding license headers. * Reacting to review comments. * Fixing tests. * Reacting to review comments. * Reacting to review comments. * Reacting to review comments. * Reacting to review comments. * Reacting to review comments. * Reacting to review comments. * Fixing tests. * Force RegionScopedSessionContainer for live tests. * Reacting to review comments. * Fixing diagnostics. * Adding tests. * Reacting to review comments. * Reacting to review comments. * Reacting to review comments. * Reacting to review comments. * Test changes. * Test changes.
1 parent 85eeca7 commit 042dfda

File tree

86 files changed

+14785
-194
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+14785
-194
lines changed

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.azure.cosmos.DirectConnectionConfig;
1717
import com.azure.cosmos.GatewayConnectionConfig;
1818
import com.azure.cosmos.implementation.HttpConstants;
19+
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
1920
import com.azure.cosmos.models.CosmosClientTelemetryConfig;
2021
import com.azure.cosmos.models.CosmosContainerIdentity;
2122
import com.azure.cosmos.models.CosmosMicrometerMetricsOptions;
@@ -56,6 +57,10 @@
5657
import java.util.concurrent.atomic.AtomicLong;
5758

5859
abstract class AsyncBenchmark<T> {
60+
61+
private static final ImplementationBridgeHelpers.CosmosClientBuilderHelper.CosmosClientBuilderAccessor clientBuilderAccessor
62+
= ImplementationBridgeHelpers.CosmosClientBuilderHelper.getCosmosClientBuilderAccessor();
63+
5964
private final MetricRegistry metricsRegistry = new MetricRegistry();
6065
private final ScheduledReporter reporter;
6166

@@ -91,6 +96,9 @@ abstract class AsyncBenchmark<T> {
9196
.userAgentSuffix(configuration.getApplicationName())
9297
.contentResponseOnWriteEnabled(cfg.isContentResponseOnWriteEnabled());
9398

99+
clientBuilderAccessor
100+
.setRegionScopedSessionCapturingEnabled(cosmosClientBuilder, cfg.isRegionScopedSessionContainerEnabled());
101+
94102
CosmosClientTelemetryConfig telemetryConfig = new CosmosClientTelemetryConfig()
95103
.sendClientTelemetryToService(cfg.isClientTelemetryEnabled())
96104
.diagnosticsThresholds(

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Configuration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ public class Configuration {
134134
@Parameter(names = "-aggressiveWarmupDuration", description = "The duration for which proactive connections are aggressively established", converter = DurationConverter.class)
135135
private Duration aggressiveWarmupDuration = Duration.ZERO;
136136

137+
@Parameter(names = "-isRegionScopedSessionContainerEnabled", description = "A flag to denote whether region scoped session container is enabled")
138+
private String isRegionScopedSessionContainerEnabled = String.valueOf(false);
139+
137140
@Parameter(names = "-operation", description = "Type of Workload:\n"
138141
+ "\tReadThroughput- run a READ workload that prints only throughput *\n"
139142
+ "\tReadThroughputWithMultipleClients - run a READ workload that prints throughput and latency for multiple client read.*\n"
@@ -632,6 +635,10 @@ public String getResultUploadContainer() {
632635
return Strings.emptyToNull(resultUploadContainer);
633636
}
634637

638+
public boolean isRegionScopedSessionContainerEnabled() {
639+
return Boolean.parseBoolean(isRegionScopedSessionContainerEnabled);
640+
}
641+
635642
public void tryGetValuesFromSystem() {
636643
serviceEndpoint = StringUtils.defaultString(Strings.emptyToNull(System.getenv().get("SERVICE_END_POINT")),
637644
serviceEndpoint);

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncBenchmark.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.azure.cosmos.DirectConnectionConfig;
1616
import com.azure.cosmos.GatewayConnectionConfig;
1717
import com.azure.cosmos.implementation.HttpConstants;
18+
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
1819
import com.azure.cosmos.models.CosmosClientTelemetryConfig;
1920
import com.azure.cosmos.models.CosmosItemResponse;
2021
import com.azure.cosmos.models.ThroughputProperties;
@@ -50,6 +51,10 @@
5051
import java.util.stream.Collectors;
5152

5253
abstract class SyncBenchmark<T> {
54+
55+
private static final ImplementationBridgeHelpers.CosmosClientBuilderHelper.CosmosClientBuilderAccessor clientBuilderAccessor
56+
= ImplementationBridgeHelpers.CosmosClientBuilderHelper.getCosmosClientBuilderAccessor();
57+
5358
private final MetricRegistry metricsRegistry = new MetricRegistry();
5459
private final ScheduledReporter reporter;
5560

@@ -118,6 +123,9 @@ public T apply(T o, Throwable throwable) {
118123
.contentResponseOnWriteEnabled(cfg.isContentResponseOnWriteEnabled())
119124
.clientTelemetryEnabled(cfg.isClientTelemetryEnabled());
120125

126+
clientBuilderAccessor
127+
.setRegionScopedSessionCapturingEnabled(cosmosClientBuilder, cfg.isRegionScopedSessionContainerEnabled());
128+
121129
if (cfg.getConnectionMode().equals(ConnectionMode.DIRECT)) {
122130
cosmosClientBuilder = cosmosClientBuilder.directMode(DirectConnectionConfig.getDefaultConfig());
123131
} else {

sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/FaultInjectionServerErrorResultInternal.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public CosmosException getInjectedServerError(RxDocumentServiceRequest request)
117117

118118
case READ_SESSION_NOT_AVAILABLE:
119119

120-
final String badSessionToken = "1:1#1#1=1#1=1";
120+
final String badSessionToken = partitionKeyRangeId + ":" + "1#1#1=1#1=1";
121121

122122
responseHeaders.put(WFConstants.BackendHeaders.SUB_STATUS,
123123
Integer.toString(HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE));

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosClientBuilderTest.java

Lines changed: 71 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@
33
package com.azure.cosmos;
44

55
import com.azure.cosmos.implementation.ApiType;
6+
import com.azure.cosmos.implementation.ISessionContainer;
67
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
8+
import com.azure.cosmos.implementation.RegionScopedSessionContainer;
79
import com.azure.cosmos.implementation.RxDocumentClientImpl;
810
import com.azure.cosmos.implementation.SessionContainer;
911
import com.azure.cosmos.implementation.TestConfigurations;
1012
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
1113
import com.azure.cosmos.models.CosmosClientTelemetryConfig;
1214
import org.testng.SkipException;
15+
import org.testng.annotations.DataProvider;
1316
import org.testng.annotations.Test;
1417

1518
import java.net.URISyntaxException;
@@ -20,6 +23,11 @@
2023
public class CosmosClientBuilderTest {
2124
String hostName = "https://sample-account.documents.azure.com:443/";
2225

26+
@DataProvider(name = "regionScopedSessionContainerConfigs")
27+
public Object[] regionScopedSessionContainerConfigs() {
28+
return new Object[] {false, true};
29+
}
30+
2331
@Test(groups = "unit")
2432
public void validateBadPreferredRegions1() {
2533
try {
@@ -192,22 +200,72 @@ public void validateApiTypePresent() {
192200
assertThat(ReflectionUtils.getApiType(documentClient)).isEqualTo(apiType);
193201
}
194202

195-
@Test(groups = "emulator")
196-
public void validateSessionTokenCapturingForAccountDefaultConsistency() {
197-
CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
198-
.endpoint(TestConfigurations.HOST)
199-
.key(TestConfigurations.MASTER_KEY)
200-
.userAgentSuffix("custom-direct-client");
203+
@Test(groups = "emulator", dataProvider = "regionScopedSessionContainerConfigs")
204+
public void validateSessionTokenCapturingForAccountDefaultConsistency(boolean shouldRegionScopedSessionContainerEnabled) {
201205

202-
CosmosAsyncClient client = cosmosClientBuilder.buildAsyncClient();
203-
RxDocumentClientImpl documentClient =
204-
(RxDocumentClientImpl) ReflectionUtils.getAsyncDocumentClient(client);
206+
try {
207+
208+
if (shouldRegionScopedSessionContainerEnabled) {
209+
System.setProperty("COSMOS.SESSION_CAPTURING_TYPE", "REGION_SCOPED");
210+
}
211+
212+
CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
213+
.endpoint(TestConfigurations.HOST)
214+
.key(TestConfigurations.MASTER_KEY)
215+
.userAgentSuffix("custom-direct-client");
216+
217+
CosmosAsyncClient client = cosmosClientBuilder.buildAsyncClient();
218+
RxDocumentClientImpl documentClient =
219+
(RxDocumentClientImpl) ReflectionUtils.getAsyncDocumentClient(client);
220+
221+
if (documentClient.getDefaultConsistencyLevelOfAccount() != ConsistencyLevel.SESSION) {
222+
throw new SkipException("This test is only applicable when default account-level consistency is Session.");
223+
}
205224

206-
if (documentClient.getDefaultConsistencyLevelOfAccount() != ConsistencyLevel.SESSION) {
207-
throw new SkipException("This test is only applicable when default account-level consistency is Session.");
225+
ISessionContainer sessionContainer = documentClient.getSession();
226+
227+
if (System.getProperty("COSMOS.SESSION_CAPTURING_TYPE") != null && System.getProperty("COSMOS.SESSION_CAPTURING_TYPE").equals("REGION_SCOPED")) {
228+
assertThat(sessionContainer instanceof RegionScopedSessionContainer).isTrue();
229+
} else {
230+
assertThat(sessionContainer instanceof SessionContainer).isTrue();
231+
}
232+
233+
assertThat(sessionContainer.getDisableSessionCapturing()).isEqualTo(false);
234+
} finally {
235+
System.clearProperty("COSMOS.SESSION_CAPTURING_TYPE");
208236
}
237+
}
238+
239+
// set env variable to COSMOS.SESSION_CAPTURING_TYPE to REGION_SCOPED to test all possible assertions
240+
@Test(groups = "unit", enabled = false)
241+
public void validateSessionTokenCapturingForAccountDefaultConsistencyWithEnvVariable() {
209242

210-
SessionContainer sessionContainer = (SessionContainer)documentClient.getSession();
211-
assertThat(sessionContainer.getDisableSessionCapturing()).isEqualTo(false);
243+
try {
244+
245+
CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
246+
.endpoint(TestConfigurations.HOST)
247+
.key(TestConfigurations.MASTER_KEY)
248+
.userAgentSuffix("custom-direct-client");
249+
250+
CosmosAsyncClient client = cosmosClientBuilder.buildAsyncClient();
251+
RxDocumentClientImpl documentClient =
252+
(RxDocumentClientImpl) ReflectionUtils.getAsyncDocumentClient(client);
253+
254+
if (documentClient.getDefaultConsistencyLevelOfAccount() != ConsistencyLevel.SESSION) {
255+
throw new SkipException("This test is only applicable when default account-level consistency is Session.");
256+
}
257+
258+
ISessionContainer sessionContainer = documentClient.getSession();
259+
260+
if (System.getenv("COSMOS.SESSION_CAPTURING_TYPE") != null && System.getenv("COSMOS.SESSION_CAPTURING_TYPE").equals("REGION_SCOPED")) {
261+
assertThat(sessionContainer instanceof RegionScopedSessionContainer).isTrue();
262+
} else {
263+
assertThat(sessionContainer instanceof SessionContainer).isTrue();
264+
}
265+
266+
assertThat(sessionContainer.getDisableSessionCapturing()).isEqualTo(false);
267+
} finally {
268+
System.clearProperty("COSMOS.SESSION_CAPTURING_TYPE");
269+
}
212270
}
213271
}

0 commit comments

Comments
 (0)