Skip to content

Commit 7248f7e

Browse files
Exposing MaxBufferedItemCount to the Spring SDK. (Azure#30921)
* Exposing MaxBufferedItemCount to the Spring SDK. * Updating the changelog. * Update application.properties * Update sdk/cosmos/azure-spring-data-cosmos/CHANGELOG.md Co-authored-by: Kushagra Thapar <[email protected]> * Update README.md * Update README.md * Update CosmosConfig.java * Update application.properties Co-authored-by: Kushagra Thapar <[email protected]>
1 parent 35961ec commit 7248f7e

File tree

15 files changed

+149
-2
lines changed

15 files changed

+149
-2
lines changed

sdk/cosmos/azure-spring-data-cosmos-test/src/test/java/com/azure/spring/data/cosmos/core/CosmosTemplateIT.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,6 +1115,22 @@ public void queryWithMaxDegreeOfParallelism() throws ClassNotFoundException {
11151115
assertEquals((int) ReflectionTestUtils.getField(maxDegreeOfParallelismCosmosTemplate, "maxDegreeOfParallelism"), 20);
11161116
}
11171117

1118+
@Test
1119+
public void queryWithMaxBufferedItemCount() throws ClassNotFoundException {
1120+
final CosmosConfig config = CosmosConfig.builder()
1121+
.maxBufferedItemCount(500)
1122+
.build();
1123+
final CosmosTemplate maxBufferedItemCountCosmosTemplate = createCosmosTemplate(config, TestConstants.DB_NAME);
1124+
1125+
final Criteria criteria = Criteria.getInstance(CriteriaType.IS_EQUAL, "firstName",
1126+
Collections.singletonList(TEST_PERSON.getFirstName()), Part.IgnoreCaseType.NEVER);
1127+
final CosmosQuery query = new CosmosQuery(criteria);
1128+
1129+
final long count = maxBufferedItemCountCosmosTemplate.count(query, containerName);
1130+
1131+
assertEquals((int) ReflectionTestUtils.getField(maxBufferedItemCountCosmosTemplate, "maxBufferedItemCount"), 500);
1132+
}
1133+
11181134
@Test
11191135
public void queryDatabaseWithQueryMerticsEnabled() throws ClassNotFoundException {
11201136
final CosmosConfig config = CosmosConfig.builder()

sdk/cosmos/azure-spring-data-cosmos-test/src/test/java/com/azure/spring/data/cosmos/core/ReactiveCosmosTemplateIT.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,26 @@ public void queryWithMaxDegreeOfParallelism() throws ClassNotFoundException {
595595
assertEquals((int) ReflectionTestUtils.getField(maxDegreeOfParallelismCosmosTemplate, "maxDegreeOfParallelism"), 20);
596596
}
597597

598+
@Test
599+
public void queryWithMaxBufferedItemCount() throws ClassNotFoundException {
600+
final CosmosConfig config = CosmosConfig.builder()
601+
.maxBufferedItemCount(500)
602+
.build();
603+
final ReactiveCosmosTemplate maxBufferedItemCountCosmosTemplate = createReactiveCosmosTemplate(config, TestConstants.DB_NAME);
604+
605+
final AuditableEntity entity = new AuditableEntity();
606+
entity.setId(UUID.randomUUID().toString());
607+
608+
auditableRepository.save(entity);
609+
610+
Criteria equals = Criteria.getInstance(CriteriaType.IS_EQUAL, "id", Collections.singletonList(entity.getId()), Part.IgnoreCaseType.NEVER);
611+
final SqlQuerySpec sqlQuerySpec = new FindQuerySpecGenerator().generateCosmos(new CosmosQuery(equals));
612+
final Flux<AuditableEntity> flux = maxBufferedItemCountCosmosTemplate.runQuery(sqlQuerySpec, AuditableEntity.class, AuditableEntity.class);
613+
614+
StepVerifier.create(flux).expectNextCount(1).verifyComplete();
615+
assertEquals((int) ReflectionTestUtils.getField(maxBufferedItemCountCosmosTemplate, "maxBufferedItemCount"), 500);
616+
}
617+
598618
@Test
599619
public void queryWithQueryMerticsEnabled() throws ClassNotFoundException {
600620
final CosmosConfig config = CosmosConfig.builder()

sdk/cosmos/azure-spring-data-cosmos-test/src/test/java/com/azure/spring/data/cosmos/repository/SecondaryTestRepositoryConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ public class SecondaryTestRepositoryConfig {
3737
@Value("${cosmos.secondary.maxDegreeOfParallelism}")
3838
private int maxDegreeOfParallelism;
3939

40+
@Value("${cosmos.secondary.maxBufferedItemCount}")
41+
private int maxBufferedItemCount;
42+
4043
@Bean
4144
public CosmosClientBuilder secondaryCosmosClientBuilder() {
4245
return new CosmosClientBuilder()
@@ -61,6 +64,7 @@ public ReactiveCosmosTemplate secondaryReactiveCosmosTemplate(@Qualifier("second
6164
CosmosConfig config = CosmosConfig.builder()
6265
.enableQueryMetrics(queryMetricsEnabled)
6366
.maxDegreeOfParallelism(maxDegreeOfParallelism)
67+
.maxBufferedItemCount(maxBufferedItemCount)
6468
.build();
6569

6670
return new ReactiveCosmosTemplate(new CosmosFactory(client, getFirstDatabase()), config, mappingCosmosConverter);
@@ -78,6 +82,7 @@ public ReactiveCosmosTemplate secondaryReactiveCosmosTemplate1(@Qualifier("secon
7882
CosmosConfig config = CosmosConfig.builder()
7983
.enableQueryMetrics(queryMetricsEnabled)
8084
.maxDegreeOfParallelism(maxDegreeOfParallelism)
85+
.maxBufferedItemCount(maxBufferedItemCount)
8186
.build();
8287

8388
return new ReactiveCosmosTemplate(new CosmosFactory(client, getSecondDatabase()), config, mappingCosmosConverter);

sdk/cosmos/azure-spring-data-cosmos-test/src/test/java/com/azure/spring/data/cosmos/repository/TestRepositoryConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ public class TestRepositoryConfig extends AbstractCosmosConfiguration {
4141
@Value("${cosmos.maxDegreeOfParallelism}")
4242
private int maxDegreeOfParallelism;
4343

44+
@Value("${cosmos.maxBufferedItemCount}")
45+
private int maxBufferedItemCount;
46+
4447
@Bean
4548
public ResponseDiagnosticsTestUtils responseDiagnosticsTestUtils() {
4649
return new ResponseDiagnosticsTestUtils();
@@ -60,6 +63,7 @@ public CosmosConfig cosmosConfig() {
6063
return CosmosConfig.builder()
6164
.enableQueryMetrics(queryMetricsEnabled)
6265
.maxDegreeOfParallelism(maxDegreeOfParallelism)
66+
.maxBufferedItemCount(maxBufferedItemCount)
6367
.responseDiagnosticsProcessor(responseDiagnosticsTestUtils().getResponseDiagnosticsProcessor())
6468
.build();
6569
}

sdk/cosmos/azure-spring-data-cosmos-test/src/test/resources/application.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ dynamic.collection.name=spel-property-collection
77
cosmos.queryMetricsEnabled=true
88
# Max Degree of Parallelism allowed
99
cosmos.maxDegreeOfParallelism=0
10+
# Max number of items to buffer
11+
cosmos.maxBufferedItemCount=0
1012

1113
# Secondary DataSource Config
1214
cosmos.secondary.uri=${NEW_ACCOUNT_HOST}
@@ -17,5 +19,7 @@ cosmos.secondary.secondaryKey=${NEW_SECONDARY_ACCOUNT_KEY}
1719
cosmos.secondary.queryMetricsEnabled=true
1820
# Max Degree of Parallelism allowed
1921
cosmos.secondary.maxDegreeOfParallelism=0
22+
# Max number of items to buffer
23+
cosmos.secondary.maxBufferedItemCount=0
2024

2125

sdk/cosmos/azure-spring-data-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
### 3.27.0-beta.1 (Unreleased)
44

55
#### Features Added
6+
* Exposed `maxBufferedItemCount` feature from `CosmosQueryRequestOptions` through `application.properties` flag - See [PR 30921](https://github.com/Azure/azure-sdk-for-java/pull/30921)
67

78
#### Breaking Changes
89

sdk/cosmos/azure-spring-data-cosmos/README.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ SLF4J is only needed if you plan to use logging, please also download an SLF4J b
121121
Set `queryMetricsEnabled` flag to true in application.properties to enable query metrics.
122122
In addition to setting the flag, implement `ResponseDiagnosticsProcessor` to log diagnostics information.
123123
Set `maxDegreeOfParallelism` flag to an integer in application.properties to allow parallel processing; setting the value to -1 will lead to the SDK deciding the optimal value.
124+
Set `maxBufferedItemCount` flag to an integer in application.properties to allow the user to set the max number of items that can be buffered during parallel query execution; if set to less than 0, the system automatically decides the number of items to buffer.
125+
NOTE: Setting this to a very high value can result in high memory consumption.
124126

125127
```java readme-sample-AppConfiguration
126128
@Configuration
@@ -146,6 +148,9 @@ public class AppConfiguration extends AbstractCosmosConfiguration {
146148

147149
@Value("${azure.cosmos.maxDegreeOfParallelism}")
148150
private int maxDegreeOfParallelism;
151+
152+
@Value("${azure.cosmos.maxBufferedItemCount}")
153+
private int maxBufferedItemCount;
149154

150155
private AzureKeyCredential azureKeyCredential;
151156

@@ -165,6 +170,7 @@ public class AppConfiguration extends AbstractCosmosConfiguration {
165170
return CosmosConfig.builder()
166171
.enableQueryMetrics(queryMetricsEnabled)
167172
.maxDegreeOfParallelism(maxDegreeOfParallelism)
173+
.maxBufferedItemCount(maxBufferedItemCount)
168174
.responseDiagnosticsProcessor(new ResponseDiagnosticsProcessorImplementation())
169175
.build();
170176
}
@@ -207,6 +213,7 @@ public CosmosConfig cosmosConfig() {
207213
return CosmosConfig.builder()
208214
.enableQueryMetrics(queryMetricsEnabled)
209215
.maxDegreeOfParallelism(maxDegreeOfParallelism)
216+
.maxBufferedItemCount(maxBufferedItemCount)
210217
.responseDiagnosticsProcessor(new ResponseDiagnosticsProcessorImplementation())
211218
.build();
212219
}
@@ -684,6 +691,7 @@ public class SecondaryDatasourceConfiguration {
684691
return CosmosConfig.builder()
685692
.enableQueryMetrics(true)
686693
.maxDegreeOfParallelism(0)
694+
.maxBufferedItemCount(0)
687695
.responseDiagnosticsProcessor(new ResponseDiagnosticsProcessorImplementation())
688696
.build();
689697
}
@@ -720,19 +728,21 @@ public CosmosConfig getCosmosConfig() {
720728
return CosmosConfig.builder()
721729
.enableQueryMetrics(true)
722730
.maxDegreeOfParallelism(0)
731+
.maxBufferedItemCount(0)
723732
.responseDiagnosticsProcessor(new ResponseDiagnosticsProcessorImplementation())
724733
.build();
725734
}
726735
```
727736

728-
- Besides, if you want to define `queryMetricsEnabled`, `ResponseDiagnosticsProcessor` or `maxDegreeOfParallelism` , you can create the `CosmosConfig` for your cosmos template.
737+
- Besides, if you want to define `queryMetricsEnabled`, `ResponseDiagnosticsProcessor`, `maxDegreeOfParallelism` or `maxBufferedItemCount` , you can create the `CosmosConfig` for your cosmos template.
729738

730739
```java
731740
@Bean("secondaryCosmosConfig")
732741
public CosmosConfig getCosmosConfig() {
733742
return CosmosConfig.builder()
734743
.enableQueryMetrics(true)
735744
.maxDegreeOfParallelism(0)
745+
.maxBufferedItemCount(0)
736746
.responseDiagnosticsProcessor(new ResponseDiagnosticsProcessorImplementation())
737747
.build();
738748
}

sdk/cosmos/azure-spring-data-cosmos/src/main/java/com/azure/spring/data/cosmos/config/CosmosConfig.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ public class CosmosConfig {
1919

2020
private final int maxDegreeOfParallelism;
2121

22+
private final int maxBufferedItemCount;
23+
2224
/**
2325
* Initialization
2426
*
@@ -46,6 +48,7 @@ public CosmosConfig(ResponseDiagnosticsProcessor responseDiagnosticsProcessor,
4648
this.databaseThroughputConfig = databaseThroughputConfig;
4749
this.queryMetricsEnabled = queryMetricsEnabled;
4850
this.maxDegreeOfParallelism = 0;
51+
this.maxBufferedItemCount = 0;
4952
}
5053

5154
/**
@@ -65,6 +68,29 @@ public CosmosConfig(ResponseDiagnosticsProcessor responseDiagnosticsProcessor,
6568
this.databaseThroughputConfig = databaseThroughputConfig;
6669
this.queryMetricsEnabled = queryMetricsEnabled;
6770
this.maxDegreeOfParallelism = maxDegreeOfParallelism;
71+
this.maxBufferedItemCount = 0;
72+
}
73+
74+
/**
75+
* Initialization
76+
*
77+
* @param responseDiagnosticsProcessor must not be {@literal null}
78+
* @param databaseThroughputConfig may be {@literal null}
79+
* @param queryMetricsEnabled must not be {@literal null}
80+
* @param maxDegreeOfParallelism must not be {@literal null}
81+
* @param maxBufferedItemCount must not be {@literal null}
82+
*/
83+
@ConstructorProperties({"responseDiagnosticsProcessor", "databaseThroughputConfig", "queryMetricsEnabled", "maxDegreeOfParallelism", "maxBufferedItemCount"})
84+
CosmosConfig(ResponseDiagnosticsProcessor responseDiagnosticsProcessor,
85+
DatabaseThroughputConfig databaseThroughputConfig,
86+
boolean queryMetricsEnabled,
87+
int maxDegreeOfParallelism,
88+
int maxBufferedItemCount) {
89+
this.responseDiagnosticsProcessor = responseDiagnosticsProcessor;
90+
this.databaseThroughputConfig = databaseThroughputConfig;
91+
this.queryMetricsEnabled = queryMetricsEnabled;
92+
this.maxDegreeOfParallelism = maxDegreeOfParallelism;
93+
this.maxBufferedItemCount = maxBufferedItemCount;
6894
}
6995

7096
/**
@@ -94,6 +120,15 @@ public int getMaxDegreeOfParallelism() {
94120
return maxDegreeOfParallelism;
95121
}
96122

123+
/**
124+
* Gets the value of maxBufferedItemCount
125+
*
126+
* @return int, value of maxBufferedItemCount
127+
*/
128+
public int getMaxBufferedItemCount() {
129+
return maxBufferedItemCount;
130+
}
131+
97132
/**
98133
* Gets the database throughput configuration.
99134
*
@@ -120,6 +155,7 @@ public static class CosmosConfigBuilder {
120155
private DatabaseThroughputConfig databaseThroughputConfig;
121156
private boolean queryMetricsEnabled;
122157
private int maxDegreeOfParallelism;
158+
private int maxBufferedItemCount;
123159
CosmosConfigBuilder() {
124160
}
125161

@@ -158,6 +194,17 @@ public CosmosConfigBuilder maxDegreeOfParallelism(int maxDegreeOfParallelism) {
158194
return this;
159195
}
160196

197+
/**
198+
* Set maxBufferedItemCount
199+
*
200+
* @param maxBufferedItemCount value to initialize
201+
* @return CosmosConfigBuilder
202+
*/
203+
public CosmosConfigBuilder maxBufferedItemCount(int maxBufferedItemCount) {
204+
this.maxBufferedItemCount = maxBufferedItemCount;
205+
return this;
206+
}
207+
161208
/**
162209
* Enable database throughput
163210
*
@@ -176,7 +223,8 @@ public CosmosConfigBuilder enableDatabaseThroughput(boolean autoscale, int reque
176223
* @return CosmosConfig
177224
*/
178225
public CosmosConfig build() {
179-
return new CosmosConfig(this.responseDiagnosticsProcessor, this.databaseThroughputConfig, this.queryMetricsEnabled, this.maxDegreeOfParallelism);
226+
return new CosmosConfig(this.responseDiagnosticsProcessor, this.databaseThroughputConfig, this.queryMetricsEnabled,
227+
this.maxDegreeOfParallelism, this.maxBufferedItemCount);
180228
}
181229

182230
@Override
@@ -186,6 +234,7 @@ public String toString() {
186234
+ ", databaseThroughputConfig=" + databaseThroughputConfig
187235
+ ", queryMetricsEnabled=" + queryMetricsEnabled
188236
+ ", maxDegreeOfParallelism=" + maxDegreeOfParallelism
237+
+ ", maxBufferedItemCount=" + maxBufferedItemCount
189238
+ '}';
190239
}
191240
}

sdk/cosmos/azure-spring-data-cosmos/src/main/java/com/azure/spring/data/cosmos/core/CosmosTemplate.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public class CosmosTemplate implements CosmosOperations, ApplicationContextAware
7878
private final ResponseDiagnosticsProcessor responseDiagnosticsProcessor;
7979
private final boolean queryMetricsEnabled;
8080
private final int maxDegreeOfParallelism;
81+
private final int maxBufferedItemCount;
8182
private final CosmosAsyncClient cosmosAsyncClient;
8283
private final DatabaseThroughputConfig databaseThroughputConfig;
8384

@@ -132,6 +133,7 @@ public CosmosTemplate(CosmosFactory cosmosFactory,
132133
this.responseDiagnosticsProcessor = cosmosConfig.getResponseDiagnosticsProcessor();
133134
this.queryMetricsEnabled = cosmosConfig.isQueryMetricsEnabled();
134135
this.maxDegreeOfParallelism = cosmosConfig.getMaxDegreeOfParallelism();
136+
this.maxBufferedItemCount = cosmosConfig.getMaxBufferedItemCount();
135137
this.databaseThroughputConfig = cosmosConfig.getDatabaseThroughputConfig();
136138
}
137139

@@ -289,6 +291,7 @@ public <T> T findById(String containerName, Object id, Class<T> domainType) {
289291
final CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
290292
options.setQueryMetricsEnabled(this.queryMetricsEnabled);
291293
options.setMaxDegreeOfParallelism(this.maxDegreeOfParallelism);
294+
options.setMaxBufferedItemCount(this.maxBufferedItemCount);
292295
return cosmosAsyncClient
293296
.getDatabase(this.databaseName)
294297
.getContainer(containerName)
@@ -414,6 +417,7 @@ public <T> Iterable<T> findAll(PartitionKey partitionKey, final Class<T> domainT
414417
cosmosQueryRequestOptions.setPartitionKey(partitionKey);
415418
cosmosQueryRequestOptions.setQueryMetricsEnabled(this.queryMetricsEnabled);
416419
cosmosQueryRequestOptions.setMaxDegreeOfParallelism(this.maxDegreeOfParallelism);
420+
cosmosQueryRequestOptions.setMaxBufferedItemCount(this.maxBufferedItemCount);
417421

418422
return cosmosAsyncClient
419423
.getDatabase(this.databaseName)
@@ -740,6 +744,7 @@ private <T> Slice<T> sliceQuery(SqlQuerySpec querySpec,
740744
final CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
741745
cosmosQueryRequestOptions.setQueryMetricsEnabled(this.queryMetricsEnabled);
742746
cosmosQueryRequestOptions.setMaxDegreeOfParallelism(this.maxDegreeOfParallelism);
747+
cosmosQueryRequestOptions.setMaxBufferedItemCount(this.maxBufferedItemCount);
743748
partitionKeyValue.ifPresent(o -> {
744749
LOGGER.debug("Setting partition key {}", o);
745750
cosmosQueryRequestOptions.setPartitionKey(new PartitionKey(o));
@@ -886,6 +891,7 @@ private Long getCountValue(SqlQuerySpec querySpec, String containerName) {
886891
final CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
887892
options.setQueryMetricsEnabled(this.queryMetricsEnabled);
888893
options.setMaxDegreeOfParallelism(this.maxDegreeOfParallelism);
894+
options.setMaxBufferedItemCount(this.maxBufferedItemCount);
889895

890896
return executeQuery(querySpec, containerName, options)
891897
.publishOn(Schedulers.parallel())
@@ -915,6 +921,7 @@ private <T> Flux<JsonNode> findItemsAsFlux(@NonNull CosmosQuery query,
915921
final CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
916922
cosmosQueryRequestOptions.setQueryMetricsEnabled(this.queryMetricsEnabled);
917923
cosmosQueryRequestOptions.setMaxDegreeOfParallelism(this.maxDegreeOfParallelism);
924+
cosmosQueryRequestOptions.setMaxBufferedItemCount(this.maxBufferedItemCount);
918925
Optional<Object> partitionKeyValue = query.getPartitionKeyValue(domainType);
919926
partitionKeyValue.ifPresent(o -> {
920927
LOGGER.debug("Setting partition key {}", o);
@@ -943,6 +950,7 @@ private Flux<JsonNode> getJsonNodeFluxFromQuerySpec(
943950
final CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
944951
cosmosQueryRequestOptions.setQueryMetricsEnabled(this.queryMetricsEnabled);
945952
cosmosQueryRequestOptions.setMaxDegreeOfParallelism(this.maxDegreeOfParallelism);
953+
cosmosQueryRequestOptions.setMaxBufferedItemCount(this.maxBufferedItemCount);
946954

947955
return cosmosAsyncClient
948956
.getDatabase(this.databaseName)

0 commit comments

Comments
 (0)