Skip to content

Commit 0b7dcd1

Browse files
Allowing customers to wrap CosmosAsyncContainer (Azure#43724)
* Initial draft * Added unit test * Update CosmosClientBuilderTest.java * Update CosmosClientBuilderTest.java * Adding simple extensibility for CosmosPagedFlux * Refactoring the refactoring to avoid LINT errors * Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncDatabase.java Co-authored-by: Tomas Varon <[email protected]> * Reacting to code review feedback * Revert "Reacting to code review feedback" This reverts commit d54c91b. * Reapply "Reacting to code review feedback" This reverts commit 4f280d0. * Update CosmosAsyncClient.java --------- Co-authored-by: Tomas Varon <[email protected]>
1 parent a0bc04e commit 0b7dcd1

File tree

11 files changed

+759
-267
lines changed

11 files changed

+759
-267
lines changed

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosClientBuilderTest.java

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,27 @@
1010
import com.azure.cosmos.implementation.SessionContainer;
1111
import com.azure.cosmos.implementation.TestConfigurations;
1212
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
13+
import com.azure.cosmos.models.CosmosQueryRequestOptions;
14+
import com.azure.cosmos.models.SqlParameter;
15+
import com.azure.cosmos.models.SqlQuerySpec;
16+
import com.azure.cosmos.util.CosmosPagedFlux;
17+
import com.fasterxml.jackson.databind.node.ObjectNode;
1318
import org.testng.SkipException;
1419
import org.testng.annotations.DataProvider;
1520
import org.testng.annotations.Test;
1621

1722
import java.net.URISyntaxException;
23+
import java.util.ArrayList;
1824
import java.util.Arrays;
25+
import java.util.Comparator;
26+
import java.util.List;
27+
import java.util.Objects;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.ConcurrentMap;
30+
import java.util.stream.Collectors;
1931

2032
import static org.assertj.core.api.Assertions.assertThat;
33+
import static org.assertj.core.api.Assertions.fail;
2134

2235
public class CosmosClientBuilderTest {
2336
String hostName = "https://sample-account.documents.azure.com:443/";
@@ -149,4 +162,210 @@ public void validateSessionTokenCapturingForAccountDefaultConsistencyWithEnvVari
149162
System.clearProperty("COSMOS.SESSION_CAPTURING_TYPE");
150163
}
151164
}
165+
166+
@Test(groups = "emulator")
167+
public void validateContainerCreationInterceptor() {
168+
CosmosClient clientWithoutInterceptor = new CosmosClientBuilder()
169+
.endpoint(TestConfigurations.HOST)
170+
.key(TestConfigurations.MASTER_KEY)
171+
.userAgentSuffix("noInterceptor")
172+
.buildClient();
173+
174+
ConcurrentMap<CacheKey, List<?>> queryCache = new ConcurrentHashMap<>();
175+
176+
CosmosClient clientWithInterceptor = new CosmosClientBuilder()
177+
.endpoint(TestConfigurations.HOST)
178+
.key(TestConfigurations.MASTER_KEY)
179+
.userAgentSuffix("withInterceptor")
180+
.containerCreationInterceptor(originalContainer -> new CacheAndValidateQueriesContainer(originalContainer, queryCache))
181+
.buildClient();
182+
183+
CosmosAsyncClient asyncClientWithInterceptor = new CosmosClientBuilder()
184+
.endpoint(TestConfigurations.HOST)
185+
.key(TestConfigurations.MASTER_KEY)
186+
.userAgentSuffix("withInterceptor")
187+
.containerCreationInterceptor(originalContainer -> new CacheAndValidateQueriesContainer(originalContainer, queryCache))
188+
.buildAsyncClient();
189+
190+
CosmosContainer normalContainer = clientWithoutInterceptor
191+
.getDatabase("TestDB")
192+
.getContainer("TestContainer");
193+
assertThat(normalContainer).isNotNull();
194+
assertThat(normalContainer.getClass()).isEqualTo(CosmosContainer.class);
195+
assertThat(normalContainer.asyncContainer.getClass()).isEqualTo(CosmosAsyncContainer.class);
196+
197+
CosmosContainer customSyncContainer = clientWithInterceptor
198+
.getDatabase("TestDB")
199+
.getContainer("TestContainer");
200+
assertThat(customSyncContainer).isNotNull();
201+
assertThat(customSyncContainer.getClass()).isEqualTo(CosmosContainer.class);
202+
assertThat(customSyncContainer.asyncContainer.getClass()).isEqualTo(CacheAndValidateQueriesContainer.class);
203+
204+
CosmosAsyncContainer customAsyncContainer = asyncClientWithInterceptor
205+
.getDatabase("TestDB")
206+
.getContainer("TestContainer");
207+
assertThat(customAsyncContainer).isNotNull();
208+
assertThat(customAsyncContainer.getClass()).isEqualTo(CacheAndValidateQueriesContainer.class);
209+
210+
try {
211+
customSyncContainer.queryItems("SELECT * from c", null, ObjectNode.class);
212+
fail("Unparameterized query should throw");
213+
} catch (IllegalStateException expectedError) {}
214+
215+
try {
216+
customAsyncContainer.queryItems("SELECT * from c", null, ObjectNode.class);
217+
fail("Unparameterized query should throw");
218+
} catch (IllegalStateException expectedError) {}
219+
220+
try {
221+
customAsyncContainer.queryItems("SELECT * from c", ObjectNode.class);
222+
fail("Unparameterized query should throw");
223+
} catch (IllegalStateException expectedError) {}
224+
225+
SqlQuerySpec querySpec = new SqlQuerySpec().setQueryText("SELECT * from c");
226+
assertThat(queryCache).size().isEqualTo(0);
227+
228+
try {
229+
List<ObjectNode> items = customSyncContainer
230+
.queryItems(querySpec, null, ObjectNode.class)
231+
.stream().collect(Collectors.toList());
232+
fail("Not yet cached - the query above should always throw");
233+
} catch (CosmosException cosmosException) {
234+
// Container does not exist - when not cached should fail
235+
assertThat(cosmosException.getStatusCode()).isEqualTo(404);
236+
assertThat(cosmosException.getSubStatusCode()).isEqualTo(1003);
237+
}
238+
239+
queryCache.putIfAbsent(new CacheKey(ObjectNode.class.getCanonicalName(), querySpec), new ArrayList<>());
240+
assertThat(queryCache).size().isEqualTo(1);
241+
242+
// Validate that CacheKey equality check works
243+
queryCache.putIfAbsent(new CacheKey(ObjectNode.class.getCanonicalName(), querySpec), new ArrayList<>());
244+
assertThat(queryCache).size().isEqualTo(1);
245+
246+
// Validate that form cache the results can be served
247+
List<ObjectNode> items = customSyncContainer
248+
.queryItems(querySpec, null, ObjectNode.class)
249+
.stream().collect(Collectors.toList());
250+
251+
querySpec = new SqlQuerySpec().setQueryText("SELECT * from c");
252+
CosmosPagedFlux<ObjectNode> cachedPagedFlux = customAsyncContainer
253+
.queryItems(querySpec, null, ObjectNode.class);
254+
assertThat(cachedPagedFlux.getClass().getName()).startsWith("com.azure.cosmos.util.CosmosPagedFluxStaticListImpl");
255+
256+
// Validate that uncached query form async Container also fails with 404 due to non-existing Container
257+
querySpec = new SqlQuerySpec().setQueryText("SELECT * from r");
258+
try {
259+
CosmosPagedFlux<ObjectNode> uncachedPagedFlux = customAsyncContainer
260+
.queryItems(querySpec, null, ObjectNode.class);
261+
} catch (CosmosException cosmosException) {
262+
assertThat(cosmosException.getStatusCode()).isEqualTo(404);
263+
assertThat(cosmosException.getSubStatusCode()).isEqualTo(1003);
264+
}
265+
}
266+
267+
private static class CacheKey {
268+
private final String className;
269+
private final String queryText;
270+
271+
private final List<SqlParameter> parameters;
272+
public CacheKey(String className, SqlQuerySpec querySpec) {
273+
this.className = className;
274+
this.queryText = querySpec.getQueryText();
275+
List<SqlParameter> tempParameters = querySpec.getParameters();
276+
if (tempParameters != null) {
277+
tempParameters.sort(Comparator.comparing(SqlParameter::getName));
278+
this.parameters = tempParameters;
279+
} else {
280+
this.parameters = new ArrayList<>();
281+
}
282+
}
283+
284+
@Override
285+
public int hashCode() {
286+
Object[] temp = new Object[2 + this.parameters.size()];
287+
temp[0] = this.className;
288+
temp[1] = this.queryText;
289+
for (int i = 0; i < this.parameters.size(); i++) {
290+
temp[2 + i] = this.parameters.get(i).getValue(Object.class);
291+
}
292+
293+
return Objects.hash(temp);
294+
}
295+
296+
@Override
297+
public boolean equals(Object obj) {
298+
if (obj == null) {
299+
return false;
300+
}
301+
302+
if (!(obj instanceof CacheKey)) {
303+
return false;
304+
}
305+
306+
CacheKey other = (CacheKey)obj;
307+
if (!this.className.equals(other.className)) {
308+
return false;
309+
}
310+
311+
if (!this.queryText.equals(other.queryText)) {
312+
return false;
313+
}
314+
315+
if (this.parameters.size() != other.parameters.size()) {
316+
return false;
317+
}
318+
319+
for (int i = 0; i < this.parameters.size(); i++) {
320+
if (!this.parameters.get(i).getName().equals(other.parameters.get(i).getName())) {
321+
return false;
322+
}
323+
324+
if (!this.parameters.get(i).getValue(Object.class).equals(other.parameters.get(i).getValue(Object.class))) {
325+
return false;
326+
}
327+
}
328+
329+
return true;
330+
}
331+
}
332+
333+
private static class CacheAndValidateQueriesContainer extends CosmosAsyncContainer {
334+
private final ConcurrentMap<CacheKey, List<?>> queryCache;
335+
336+
protected CacheAndValidateQueriesContainer(
337+
CosmosAsyncContainer toBeWrappedContainer,
338+
ConcurrentMap<CacheKey, List<?>> queryCache) {
339+
340+
super(toBeWrappedContainer);
341+
this.queryCache = queryCache;
342+
}
343+
344+
@Override
345+
public <T> CosmosPagedFlux<T> queryItems(String query, CosmosQueryRequestOptions options, Class<T> classType) {
346+
throw new IllegalStateException("No unparameterized queries allowed. Use parameterized query instead.");
347+
}
348+
349+
@Override
350+
public <T> CosmosPagedFlux<T> queryItems(SqlQuerySpec querySpec, Class<T> classType) {
351+
return this.queryItems(querySpec, null, classType);
352+
}
353+
354+
@Override
355+
public <T> CosmosPagedFlux<T> queryItems(String query, Class<T> classType) {
356+
throw new IllegalStateException("No unparameterized queries allowed. Use parameterized query instead.");
357+
}
358+
359+
@Override
360+
public <T> CosmosPagedFlux<T> queryItems(SqlQuerySpec querySpec, CosmosQueryRequestOptions options, Class<T> classType) {
361+
CacheKey key = new CacheKey(classType.getCanonicalName(), querySpec);
362+
List<?> cachedResult = this.queryCache.get(key);
363+
if (cachedResult != null) {
364+
return CosmosPagedFlux.createFromList((List<T>)cachedResult, false);
365+
}
366+
367+
return super
368+
.queryItems(querySpec, options, classType);
369+
}
370+
}
152371
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncClient.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.util.EnumSet;
6060
import java.util.List;
6161
import java.util.Objects;
62+
import java.util.function.Function;
6263
import java.util.stream.Collectors;
6364

6465
import static com.azure.core.util.FluxUtil.withContext;
@@ -88,6 +89,9 @@ public final class CosmosAsyncClient implements Closeable {
8889
.CosmosClientTelemetryConfigHelper
8990
.getCosmosClientTelemetryConfigAccessor();
9091

92+
private final static Function<CosmosAsyncContainer, CosmosAsyncContainer> DEFAULT_CONTAINER_FACTORY =
93+
(originalContainer) -> originalContainer;
94+
9195
private final AsyncDocumentClient asyncDocumentClient;
9296
private final String serviceEndpoint;
9397
private final ConnectionPolicy connectionPolicy;
@@ -103,6 +107,7 @@ public final class CosmosAsyncClient implements Closeable {
103107
private final WriteRetryPolicy nonIdempotentWriteRetryPolicy;
104108
private final List<CosmosOperationPolicy> requestPolicies;
105109
private final CosmosItemSerializer defaultCustomSerializer;
110+
private final java.util.function.Function<CosmosAsyncContainer, CosmosAsyncContainer> containerFactory;
106111

107112
CosmosAsyncClient(CosmosClientBuilder builder) {
108113
// Async Cosmos client wrapper
@@ -121,6 +126,11 @@ public final class CosmosAsyncClient implements Closeable {
121126
this.nonIdempotentWriteRetryPolicy = builder.getNonIdempotentWriteRetryPolicy();
122127
this.requestPolicies = builder.getOperationPolicies();
123128
this.defaultCustomSerializer = builder.getCustomItemSerializer();
129+
if (builder.containerCreationInterceptor() != null) {
130+
this.containerFactory = builder.containerCreationInterceptor();
131+
} else {
132+
this.containerFactory = DEFAULT_CONTAINER_FACTORY;
133+
}
124134
CosmosEndToEndOperationLatencyPolicyConfig endToEndOperationLatencyPolicyConfig = builder.getEndToEndOperationConfig();
125135
SessionRetryOptions sessionRetryOptions = builder.getSessionRetryOptions();
126136

@@ -788,6 +798,10 @@ String getUserAgent() {
788798
return this.asyncDocumentClient.getUserAgent();
789799
}
790800

801+
java.util.function.Function<CosmosAsyncContainer, CosmosAsyncContainer> getContainerCreationInterceptor() {
802+
return this.containerFactory;
803+
}
804+
791805
///////////////////////////////////////////////////////////////////////////////////////////
792806
// the following helper/accessor only helps to access this class outside of this package.//
793807
///////////////////////////////////////////////////////////////////////////////////////////

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ public class CosmosAsyncContainer {
146146
private CosmosAsyncScripts scripts;
147147
private IFaultInjectorProvider faultInjectorProvider;
148148

149+
protected CosmosAsyncContainer(CosmosAsyncContainer toBeWrappedContainer) {
150+
this(toBeWrappedContainer.getId(), toBeWrappedContainer.getDatabase());
151+
}
152+
149153
CosmosAsyncContainer(String id, CosmosAsyncDatabase database) {
150154
this.id = id;
151155
this.database = database;

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncDatabase.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -815,7 +815,16 @@ public CosmosPagedFlux<CosmosContainerProperties> queryContainers(SqlQuerySpec q
815815
* @return Cosmos Container
816816
*/
817817
public CosmosAsyncContainer getContainer(String id) {
818-
return new CosmosAsyncContainer(id, this);
818+
CosmosAsyncContainer asyncContainer = this
819+
.client
820+
.getContainerCreationInterceptor()
821+
.apply(new CosmosAsyncContainer(id, this));
822+
823+
if (asyncContainer == null) {
824+
throw new IllegalStateException(
825+
"The implementation of the custom container creation interceptor must not return null.");
826+
}
827+
return asyncContainer;
819828
}
820829

821830
/**

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClientBuilder.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.Locale;
3838
import java.util.Objects;
3939
import java.util.Set;
40+
import java.util.function.Function;
4041
import java.util.function.Supplier;
4142

4243
import static com.azure.cosmos.implementation.ImplementationBridgeHelpers.CosmosClientBuilderHelper;
@@ -149,6 +150,8 @@ public class CosmosClientBuilder implements
149150
private boolean isRegionScopedSessionCapturingEnabled = false;
150151
private boolean serverCertValidationDisabled = false;
151152

153+
private Function<CosmosAsyncContainer, CosmosAsyncContainer> containerFactory = null;
154+
152155
/**
153156
* Instantiates a new Cosmos client builder.
154157
*/
@@ -172,6 +175,28 @@ CosmosClientMetadataCachesSnapshot metadataCaches() {
172175
return this.state;
173176
}
174177

178+
/**
179+
* Gets the container creation interceptor.
180+
* @return the function that should be invoked to allow wrapping containers - or null if no interceptor is defined.
181+
*/
182+
Function<CosmosAsyncContainer, CosmosAsyncContainer> containerCreationInterceptor() {
183+
return this.containerFactory;
184+
}
185+
186+
/**
187+
* Sets a function that allows intercepting container creation - for example to wrap the original
188+
* CosmosAsyncContainer in an extended custom class to add diagnostics, custom validations or behavior.
189+
* @param factory - the factory method allowing to wrap the original container in a custom class.
190+
* @return current {@link CosmosClientBuilder}
191+
*/
192+
public CosmosClientBuilder containerCreationInterceptor(
193+
Function<CosmosAsyncContainer, CosmosAsyncContainer> factory) {
194+
195+
this.containerFactory = factory;
196+
197+
return this;
198+
}
199+
175200
/**
176201
* Sets a {@code boolean} flag to reduce the frequency of retries when the client
177202
* strives to meet Session Consistency guarantees for operations

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,6 +1095,10 @@ <T> FeedResponse<T> createFeedResponse(RxDocumentServiceResponse response,
10951095
CosmosItemSerializer itemSerializer,
10961096
Class<T> cls);
10971097

1098+
<T> FeedResponse<T> createNonServiceFeedResponse(List<T> items,
1099+
boolean isChangeFeed,
1100+
boolean isNoChanges);
1101+
10981102
<T> FeedResponse<T> createChangeFeedResponse(RxDocumentServiceResponse response,
10991103
CosmosItemSerializer itemSerializer,
11001104
Class<T> cls);

0 commit comments

Comments
 (0)