Skip to content

Commit f37a2a6

Browse files
nehrao1FabianMeiswinkeljeet1995
authored
Thinclient Store Model (Azure#43707)
* start * push progress * Update ThinClientStoreModel.java * Skeleton for ThinClientStoreModel and RNTBD serialization * Adding serialization skeleton in ThinClientStoreModel * test * fix * todo * pr comments --------- Co-authored-by: Fabian Meiswinkel <[email protected]> Co-authored-by: Abhijeet Mohanty <[email protected]>
1 parent 0afce24 commit f37a2a6

File tree

9 files changed

+370
-95
lines changed

9 files changed

+370
-95
lines changed

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ public void validateApiType() throws Exception {
274274
ResourceType.Document);
275275

276276
try {
277-
storeModel.performRequest(dsr, HttpMethod.POST).block();
277+
storeModel.performRequest(dsr).block();
278278
fail("Request should fail");
279279
} catch (Exception e) {
280280
//no-op
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package com.azure.cosmos.implementation;
2+
3+
import com.azure.cosmos.ConsistencyLevel;
4+
import com.azure.cosmos.implementation.circuitBreaker.GlobalPartitionEndpointManagerForCircuitBreaker;
5+
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
6+
import com.azure.cosmos.implementation.http.Http2ConnectionConfig;
7+
import com.azure.cosmos.implementation.http.HttpClient;
8+
import com.azure.cosmos.implementation.http.HttpClientConfig;
9+
import com.azure.cosmos.implementation.http.HttpHeaders;
10+
import com.azure.cosmos.implementation.http.HttpRequest;
11+
import com.azure.cosmos.implementation.http.ReactorNettyClient;
12+
import io.netty.channel.ConnectTimeoutException;
13+
import org.mockito.ArgumentCaptor;
14+
import org.mockito.Mockito;
15+
import org.testng.annotations.Test;
16+
import reactor.core.publisher.Mono;
17+
18+
import java.net.URI;
19+
20+
import static org.assertj.core.api.Assertions.fail;
21+
import static org.mockito.ArgumentMatchers.any;
22+
23+
public class ThinClientStoreModelTest {
24+
@Test(groups = "unit")
25+
public void testThinClientStoreModel() throws Exception {
26+
DiagnosticsClientContext clientContext = Mockito.mock(DiagnosticsClientContext.class);
27+
Mockito.doReturn(new DiagnosticsClientContext.DiagnosticsClientConfig()).when(clientContext).getConfig();
28+
Mockito
29+
.doReturn(ImplementationBridgeHelpers
30+
.CosmosDiagnosticsHelper
31+
.getCosmosDiagnosticsAccessor()
32+
.create(clientContext, 1d))
33+
.when(clientContext).createDiagnostics();
34+
35+
String sdkGlobalSessionToken = "1#100#1=20#2=5#3=30";
36+
ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
37+
Mockito.doReturn(sdkGlobalSessionToken).when(sessionContainer).resolveGlobalSessionToken(any());
38+
39+
GlobalEndpointManager globalEndpointManager = Mockito.mock(GlobalEndpointManager.class);
40+
41+
Mockito.doReturn(new URI("https://localhost"))
42+
.when(globalEndpointManager).resolveServiceEndpoint(any());
43+
44+
// mocking with HTTP/1.1 client, just using this test as basic store model validation. e2e request flow
45+
// with HTTP/2 will be tested in future PR once the wiring is all connected
46+
HttpClient httpClient = Mockito.mock(HttpClient.class);
47+
Mockito.when(httpClient.send(any(), any())).thenReturn(Mono.error(new ConnectTimeoutException()));
48+
49+
ThinClientStoreModel storeModel = new ThinClientStoreModel(
50+
clientContext,
51+
sessionContainer,
52+
ConsistencyLevel.SESSION,
53+
new UserAgentContainer(),
54+
globalEndpointManager,
55+
httpClient);
56+
57+
RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName(
58+
clientContext,
59+
OperationType.Read,
60+
"/fakeResourceFullName",
61+
ResourceType.Document);
62+
63+
try {
64+
storeModel.performRequest(dsr).block();
65+
} catch (Exception e) {
66+
//no-op
67+
}
68+
}
69+
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,10 @@ public static class HttpHeaders {
282282

283283
// Priority Level for throttling
284284
public static final String PRIORITY_LEVEL = "x-ms-cosmos-priority-level";
285+
286+
// Thinclient headers
287+
public static final String THINCLIENT_PROXY_OPERATION_TYPE = "x-ms-thinclient-proxy-operation-type";
288+
public static final String THINCLIENT_PROXY_RESOURCE_TYPE = "x-ms-thinclient-proxy-resource-type";
285289
}
286290

287291
public static class A_IMHeaderValues {

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.azure.cosmos.implementation.directconnectivity.WFConstants;
1010
import com.azure.cosmos.implementation.faultinjection.FaultInjectionRequestContext;
1111
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
12+
import com.azure.cosmos.implementation.http.HttpTransportSerializer;
1213
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
1314
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
1415
import com.azure.cosmos.implementation.routing.Range;
@@ -29,6 +30,9 @@
2930
import java.util.HashMap;
3031
import java.util.Map;
3132
import java.util.UUID;
33+
import java.util.concurrent.atomic.AtomicReference;
34+
35+
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
3236

3337
/**
3438
* This is core Transport/Connection agnostic request to the Azure Cosmos DB database service.
@@ -89,6 +93,8 @@ public class RxDocumentServiceRequest implements Cloneable {
8993

9094
private volatile boolean hasFeedRangeFilteringBeenApplied = false;
9195

96+
private final AtomicReference<HttpTransportSerializer> httpTransportSerializer = new AtomicReference<>(null);
97+
9298
public boolean isReadOnlyRequest() {
9399
return this.operationType.isReadOnlyOperation();
94100
}
@@ -1233,4 +1239,28 @@ public String getEffectivePartitionKey() {
12331239
public void setEffectivePartitionKey(String effectivePartitionKey) {
12341240
this.effectivePartitionKey = effectivePartitionKey;
12351241
}
1242+
1243+
public void setThinclientHeaders(String operationType, String resourceType) {
1244+
this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_OPERATION_TYPE, operationType);
1245+
this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_RESOURCE_TYPE, resourceType);
1246+
}
1247+
1248+
public RxDocumentServiceRequest setHttpTransportSerializer(HttpTransportSerializer transportSerializer) {
1249+
this.httpTransportSerializer.set(transportSerializer);
1250+
1251+
return this;
1252+
}
1253+
1254+
public HttpTransportSerializer getEffectiveHttpTransportSerializer(
1255+
HttpTransportSerializer defaultTransportSerializer) {
1256+
1257+
checkNotNull(defaultTransportSerializer, "Argument 'defaultTransportSerializer' must not be null.");
1258+
1259+
HttpTransportSerializer snapshot = this.httpTransportSerializer.get();
1260+
if (snapshot != null) {
1261+
return snapshot;
1262+
}
1263+
1264+
return defaultTransportSerializer;
1265+
}
12361266
}

0 commit comments

Comments
 (0)