66import com .azure .cosmos .CosmosAsyncClient ;
77import com .azure .cosmos .CosmosAsyncContainer ;
88import com .azure .cosmos .CosmosClientBuilder ;
9+ import com .azure .cosmos .CosmosDiagnostics ;
10+ import com .azure .cosmos .CosmosDiagnosticsContext ;
11+ import com .azure .cosmos .CosmosDiagnosticsRequestInfo ;
912import com .azure .cosmos .FlakyTestRetryAnalyzer ;
13+ import com .azure .cosmos .models .CosmosBatch ;
14+ import com .azure .cosmos .models .CosmosBatchResponse ;
15+ import com .azure .cosmos .models .CosmosBulkItemResponse ;
16+ import com .azure .cosmos .models .CosmosBulkOperationResponse ;
17+ import com .azure .cosmos .models .CosmosBulkOperations ;
1018import com .azure .cosmos .models .CosmosQueryRequestOptions ;
1119import com .azure .cosmos .models .PartitionKey ;
1220import com .azure .cosmos .models .SqlQuerySpec ;
1927import com .azure .cosmos .models .CosmosPatchOperations ;
2028import com .fasterxml .jackson .databind .ObjectMapper ;
2129import com .fasterxml .jackson .databind .node .ObjectNode ;
30+ import org .slf4j .Logger ;
31+ import org .slf4j .LoggerFactory ;
2232import org .testng .annotations .Test ;
33+ import reactor .core .publisher .Flux ;
2334
2435import java .util .Arrays ;
36+ import java .util .Collection ;
37+ import java .util .List ;
2538import java .util .UUID ;
2639
2740import static org .assertj .core .api .AssertionsForClassTypes .assertThat ;
41+ import static org .assertj .core .api .Fail .fail ;
2842
2943// End to end sanity tests for basic thin client functionality.
3044public class ThinClientE2ETest {
45+ private static final Logger logger = LoggerFactory .getLogger (ThinClientE2ETest .class );
46+ private static final String thinClientEndpointIndicator = ":10250/" ;
47+
3148 @ Test (groups = {"thinclient" }, retryAnalyzer = FlakyTestRetryAnalyzer .class )
3249 public void testThinClientQuery () {
3350 CosmosAsyncClient client = null ;
3451 try {
3552 // If running locally, uncomment these lines
36- //System.setProperty("COSMOS.THINCLIENT_ENABLED", "true");
37- //System.setProperty("COSMOS.HTTP2_ENABLED", "true");
53+ // System.setProperty("COSMOS.THINCLIENT_ENABLED", "true");
54+ // System.setProperty("COSMOS.HTTP2_ENABLED", "true");
3855
3956 client = new CosmosClientBuilder ()
4057 .endpoint (TestConfigurations .HOST )
@@ -67,14 +84,131 @@ public void testThinClientQuery() {
6784 ObjectNode docFromResponse = response .getResults ().get (0 );
6885 assertThat (docFromResponse .get (partitionKeyName ).textValue ()).isEqualTo (idValue );
6986 assertThat (docFromResponse .get (idName ).textValue ()).isEqualTo (idValue );
87+ assertThinClientEndpointUsed (response .getCosmosDiagnostics ());
88+
89+ } finally {
90+ if (client != null ) {
91+ client .close ();
92+ }
93+ }
94+ }
95+
96+ @ Test (groups = {"thinclient" }, retryAnalyzer = FlakyTestRetryAnalyzer .class )
97+ public void testThinClientBulk () {
98+ CosmosAsyncClient client = null ;
99+ try {
100+ // If running locally, uncomment these lines
101+ // System.setProperty("COSMOS.THINCLIENT_ENABLED", "true");
102+ // System.setProperty("COSMOS.HTTP2_ENABLED", "true");
103+
104+ client = new CosmosClientBuilder ()
105+ .endpoint (TestConfigurations .HOST )
106+ .key (TestConfigurations .MASTER_KEY )
107+ .gatewayMode ()
108+ .consistencyLevel (ConsistencyLevel .EVENTUAL )
109+ .buildAsyncClient ();
110+
111+ CosmosAsyncContainer container = client .getDatabase ("db1" ).getContainer ("c2" );
112+ String idName = "id" ;
113+ String partitionKeyName = "partitionKey" ;
114+ ObjectMapper mapper = new ObjectMapper ();
115+ ObjectNode doc = mapper .createObjectNode ();
116+ String idValue = UUID .randomUUID ().toString ();
117+ doc .put (idName , idValue );
118+ doc .put (partitionKeyName , idValue );
119+
120+ Flux <CosmosBulkOperationResponse <Object >> responsesFlux = container .executeBulkOperations (Flux .just (
121+ CosmosBulkOperations .getCreateItemOperation (doc , new PartitionKey (idValue ))
122+ ));
123+
124+ List <CosmosBulkOperationResponse <Object >> responses = responsesFlux .collectList ().block ();
125+
126+ assertThat (responses .size ()).isEqualTo (1 );
127+ assertThat (responses .get (0 ).getException ()).isNull ();
128+ CosmosBulkItemResponse bulkResponse = responses .get (0 ).getResponse ();
129+ assertThat (bulkResponse .isSuccessStatusCode ()).isEqualTo (true );
130+ assertThinClientEndpointUsed (bulkResponse .getCosmosDiagnostics ());
131+ } finally {
132+ if (client != null ) {
133+ client .close ();
134+ }
135+ }
136+ }
137+
138+ @ Test (groups = {"thinclient" }, retryAnalyzer = FlakyTestRetryAnalyzer .class )
139+ public void testThinClientBatch () {
140+ CosmosAsyncClient client = null ;
141+ try {
142+ // If running locally, uncomment these lines
143+ // System.setProperty("COSMOS.THINCLIENT_ENABLED", "true");
144+ // System.setProperty("COSMOS.HTTP2_ENABLED", "true");
145+
146+ client = new CosmosClientBuilder ()
147+ .endpoint (TestConfigurations .HOST )
148+ .key (TestConfigurations .MASTER_KEY )
149+ .gatewayMode ()
150+ .consistencyLevel (ConsistencyLevel .SESSION )
151+ .buildAsyncClient ();
70152
153+ CosmosAsyncContainer container = client .getDatabase ("db1" ).getContainer ("c2" );
154+ String idName = "id" ;
155+ String partitionKeyName = "partitionKey" ;
156+ ObjectMapper mapper = new ObjectMapper ();
157+ String pkValue = UUID .randomUUID ().toString ();
158+ ObjectNode doc1 = mapper .createObjectNode ();
159+ String idValue1 = UUID .randomUUID ().toString ();
160+ doc1 .put (idName , idValue1 );
161+ doc1 .put (partitionKeyName , pkValue );
162+
163+ ObjectNode doc2 = mapper .createObjectNode ();
164+ String idValue2 = UUID .randomUUID ().toString ();
165+ doc2 .put (idName , idValue2 );
166+ doc2 .put (partitionKeyName , pkValue );
167+
168+ CosmosBatch batch = CosmosBatch .createCosmosBatch (new PartitionKey (pkValue ));
169+ batch .createItemOperation (doc1 );
170+ batch .createItemOperation (doc2 );
171+
172+ CosmosBatchResponse response = container
173+ .executeCosmosBatch (batch )
174+ .block ();
175+
176+ assertThat (response .getStatusCode ()).isEqualTo (200 );
177+ assertThinClientEndpointUsed (response .getDiagnostics ());
71178 } finally {
72179 if (client != null ) {
73180 client .close ();
74181 }
75182 }
76183 }
77184
185+ private static void assertThinClientEndpointUsed (CosmosDiagnostics diagnostics ) {
186+ assertThat (diagnostics ).isNotNull ();
187+
188+ CosmosDiagnosticsContext ctx = diagnostics .getDiagnosticsContext ();
189+ assertThat (ctx ).isNotNull ();
190+
191+ Collection <CosmosDiagnosticsRequestInfo > requests = ctx .getRequestInfo ();
192+ assertThat (requests ).isNotNull ();
193+ assertThat (requests .size ()).isPositive ();
194+
195+ for (CosmosDiagnosticsRequestInfo requestInfo : requests ) {
196+ logger .info (
197+ "Endpoint: {}, RequestType: {}, Partition: {}/{}, ActivityId: {}" ,
198+ requestInfo .getEndpoint (),
199+ requestInfo .getRequestType (),
200+ requestInfo .getPartitionId (),
201+ requestInfo .getPartitionKeyRangeId (),
202+ requestInfo .getActivityId ());
203+ if (requestInfo .getEndpoint ().contains (thinClientEndpointIndicator )) {
204+ return ;
205+ }
206+ }
207+
208+ fail ("No request targeting thin client proxy endpoint." );
209+ }
210+
211+
78212 @ Test (groups = {"thinclient" }, retryAnalyzer = FlakyTestRetryAnalyzer .class )
79213 public void testThinClientDocumentPointOperations () {
80214 CosmosAsyncClient client = null ;
@@ -113,11 +247,13 @@ public void testThinClientDocumentPointOperations() {
113247 CosmosItemResponse <ObjectNode > createResponse = container .createItem (doc ).block ();
114248 assertThat (createResponse .getStatusCode ()).isEqualTo (201 );
115249 assertThat (createResponse .getRequestCharge ()).isGreaterThan (0.0 );
250+ assertThinClientEndpointUsed (createResponse .getDiagnostics ());
116251
117252 // read
118253 CosmosItemResponse <ObjectNode > readResponse = container .readItem (idValue , new PartitionKey (idValue ), ObjectNode .class ).block ();
119254 assertThat (readResponse .getStatusCode ()).isEqualTo (200 );
120255 assertThat (readResponse .getRequestCharge ()).isGreaterThan (0.0 );
256+ assertThinClientEndpointUsed (readResponse .getDiagnostics ());
121257
122258 ObjectNode doc2 = mapper .createObjectNode ();
123259 String idValue2 = UUID .randomUUID ().toString ();
@@ -128,11 +264,14 @@ public void testThinClientDocumentPointOperations() {
128264 CosmosItemResponse <ObjectNode > replaceResponse = container .replaceItem (doc2 , idValue , new PartitionKey (idValue )).block ();
129265 assertThat (replaceResponse .getStatusCode ()).isEqualTo (200 );
130266 assertThat (replaceResponse .getRequestCharge ()).isGreaterThan (0.0 );
267+ assertThinClientEndpointUsed (replaceResponse .getDiagnostics ());
268+
131269 CosmosItemResponse <ObjectNode > readAfterReplaceResponse = container .readItem (idValue2 , new PartitionKey (idValue ), ObjectNode .class ).block ();
132270 assertThat (readAfterReplaceResponse .getStatusCode ()).isEqualTo (200 );
133271 ObjectNode replacedItemFromRead = readAfterReplaceResponse .getItem ();
134272 assertThat (replacedItemFromRead .get (idName ).asText ()).isEqualTo (idValue2 );
135273 assertThat (replacedItemFromRead .get (partitionKeyName ).asText ()).isEqualTo (idValue );
274+ assertThinClientEndpointUsed (readAfterReplaceResponse .getDiagnostics ());
136275
137276 ObjectNode doc3 = mapper .createObjectNode ();
138277 doc3 .put (idName , idValue2 );
@@ -143,11 +282,14 @@ public void testThinClientDocumentPointOperations() {
143282 CosmosItemResponse <ObjectNode > upsertResponse = container .upsertItem (doc3 , new PartitionKey (idValue ), new CosmosItemRequestOptions ()).block ();
144283 assertThat (upsertResponse .getStatusCode ()).isEqualTo (200 );
145284 assertThat (upsertResponse .getRequestCharge ()).isGreaterThan (0.0 );
285+ assertThinClientEndpointUsed (upsertResponse .getDiagnostics ());
286+
146287 CosmosItemResponse <ObjectNode > readAfterUpsertResponse = container .readItem (idValue2 , new PartitionKey (idValue ), ObjectNode .class ).block ();
147288 ObjectNode upsertedItemFromRead = readAfterUpsertResponse .getItem ();
148289 assertThat (upsertedItemFromRead .get (idName ).asText ()).isEqualTo (idValue2 );
149290 assertThat (upsertedItemFromRead .get (partitionKeyName ).asText ()).isEqualTo (idValue );
150291 assertThat (upsertedItemFromRead .get ("newField" ).asText ()).isEqualTo ("newValue" );
292+ assertThinClientEndpointUsed (readAfterUpsertResponse .getDiagnostics ());
151293
152294 // patch
153295 CosmosPatchOperations patchOperations = CosmosPatchOperations .create ();
@@ -156,17 +298,21 @@ public void testThinClientDocumentPointOperations() {
156298 CosmosItemResponse <ObjectNode > patchResponse = container .patchItem (idValue2 , new PartitionKey (idValue ), patchOperations , ObjectNode .class ).block ();
157299 assertThat (patchResponse .getStatusCode ()).isEqualTo (200 );
158300 assertThat (patchResponse .getRequestCharge ()).isGreaterThan (0.0 );
301+ assertThinClientEndpointUsed (patchResponse .getDiagnostics ());
302+
159303 CosmosItemResponse <ObjectNode > readAfterPatchResponse = container .readItem (idValue2 , new PartitionKey (idValue ), ObjectNode .class ).block ();
160304 ObjectNode patchedItemFromRead = readAfterPatchResponse .getItem ();
161305 assertThat (patchedItemFromRead .get (idName ).asText ()).isEqualTo (idValue2 );
162306 assertThat (patchedItemFromRead .get (partitionKeyName ).asText ()).isEqualTo (idValue );
163307 assertThat (patchedItemFromRead .get ("newField" ).asText ()).isEqualTo ("patchedNewField" );
164308 assertThat (patchedItemFromRead .get ("anotherNewField" ).asText ()).isEqualTo ("anotherNewValue" );
309+ assertThinClientEndpointUsed (readAfterPatchResponse .getDiagnostics ());
165310
166311 // delete
167312 CosmosItemResponse <Object > deleteResponse = container .deleteItem (idValue2 , new PartitionKey (idValue )).block ();
168313 assertThat (deleteResponse .getStatusCode ()).isEqualTo (204 );
169314 assertThat (deleteResponse .getRequestCharge ()).isGreaterThan (0.0 );
315+ assertThinClientEndpointUsed (deleteResponse .getDiagnostics ());
170316 } finally {
171317 if (client != null ) {
172318 client .close ();
0 commit comments