@@ -76,6 +76,7 @@ abstract class AsyncBenchmark<T> {
7676
7777 final Logger logger ;
7878 final CosmosAsyncClient benchmarkWorkloadClient ;
79+ final CosmosClient resultUploaderClient ;
7980 CosmosAsyncContainer cosmosAsyncContainer ;
8081 CosmosAsyncDatabase cosmosAsyncDatabase ;
8182 final String partitionKey ;
@@ -170,174 +171,174 @@ abstract class AsyncBenchmark<T> {
170171 }
171172
172173 benchmarkWorkloadClient = benchmarkSpecificClientBuilder .buildAsyncClient ();
173- try ( CosmosClient syncClient = resultUploadClientBuilder
174+ this . resultUploaderClient = resultUploadClientBuilder
174175 .endpoint (StringUtils .isNotEmpty (configuration .getServiceEndpointForRunResultsUploadAccount ()) ? configuration .getServiceEndpointForRunResultsUploadAccount () : configuration .getServiceEndpoint ())
175176 .key (StringUtils .isNotEmpty (configuration .getMasterKeyForRunResultsUploadAccount ()) ? configuration .getMasterKeyForRunResultsUploadAccount () : configuration .getMasterKey ())
176- .buildClient ()) {
177+ .buildClient ();
177178
178- try {
179- cosmosAsyncDatabase = benchmarkWorkloadClient .getDatabase (this .configuration .getDatabaseId ());
180- cosmosAsyncDatabase .read ().block ();
181- } catch (CosmosException e ) {
182- if (e .getStatusCode () == HttpConstants .StatusCodes .NOTFOUND ) {
179+ try {
180+ cosmosAsyncDatabase = benchmarkWorkloadClient .getDatabase (this .configuration .getDatabaseId ());
181+ cosmosAsyncDatabase .read ().block ();
182+ } catch (CosmosException e ) {
183+ if (e .getStatusCode () == HttpConstants .StatusCodes .NOTFOUND ) {
183184
184- if (isManagedIdentityRequired ) {
185- throw new IllegalStateException ("If managed identity is required, " +
186- "either pre-create a database and a container or use the management SDK." );
187- }
188-
189- benchmarkWorkloadClient .createDatabase (cfg .getDatabaseId ()).block ();
190- cosmosAsyncDatabase = benchmarkWorkloadClient .getDatabase (cfg .getDatabaseId ());
191- logger .info ("Database {} is created for this test" , this .configuration .getDatabaseId ());
192- databaseCreated = true ;
193- } else {
194- throw e ;
185+ if (isManagedIdentityRequired ) {
186+ throw new IllegalStateException ("If managed identity is required, " +
187+ "either pre-create a database and a container or use the management SDK." );
195188 }
196- }
197189
198- try {
199- cosmosAsyncContainer = cosmosAsyncDatabase .getContainer (this .configuration .getCollectionId ());
200- cosmosAsyncContainer .read ().block ();
190+ benchmarkWorkloadClient .createDatabase (cfg .getDatabaseId ()).block ();
191+ cosmosAsyncDatabase = benchmarkWorkloadClient .getDatabase (cfg .getDatabaseId ());
192+ logger .info ("Database {} is created for this test" , this .configuration .getDatabaseId ());
193+ databaseCreated = true ;
194+ } else {
195+ throw e ;
196+ }
197+ }
201198
202- } catch (CosmosException e ) {
203- if (e .getStatusCode () == HttpConstants .StatusCodes .NOTFOUND ) {
199+ try {
200+ cosmosAsyncContainer = cosmosAsyncDatabase .getContainer (this .configuration .getCollectionId ());
201+ cosmosAsyncContainer .read ().block ();
204202
205- if (isManagedIdentityRequired ) {
206- throw new IllegalStateException ("If managed identity is required, " +
207- "either pre-create a database and a container or use the management SDK." );
208- }
203+ } catch (CosmosException e ) {
204+ if (e .getStatusCode () == HttpConstants .StatusCodes .NOTFOUND ) {
209205
210- cosmosAsyncDatabase .createContainer (
211- this .configuration .getCollectionId (),
212- Configuration .DEFAULT_PARTITION_KEY_PATH ,
213- ThroughputProperties .createManualThroughput (this .configuration .getThroughput ())
214- ).block ();
206+ if (isManagedIdentityRequired ) {
207+ throw new IllegalStateException ("If managed identity is required, " +
208+ "either pre-create a database and a container or use the management SDK." );
209+ }
215210
216- cosmosAsyncContainer = cosmosAsyncDatabase .getContainer (this .configuration .getCollectionId ());
211+ cosmosAsyncDatabase .createContainer (
212+ this .configuration .getCollectionId (),
213+ Configuration .DEFAULT_PARTITION_KEY_PATH ,
214+ ThroughputProperties .createManualThroughput (this .configuration .getThroughput ())
215+ ).block ();
217216
218- // add some delay to allow container to be created across multiple regions
219- // container creation across regions is an async operation
220- // without the delay a container may not be available to process reads / writes
217+ cosmosAsyncContainer = cosmosAsyncDatabase .getContainer (this .configuration .getCollectionId ());
221218
222- try {
223- Thread .sleep (30_000 );
224- } catch (Exception exception ) {
225- throw new RuntimeException (exception );
226- }
219+ // add some delay to allow container to be created across multiple regions
220+ // container creation across regions is an async operation
221+ // without the delay a container may not be available to process reads / writes
227222
228- logger . info ( "Collection {} is created for this test" , this . configuration . getCollectionId ());
229- collectionCreated = true ;
230- } else {
231- throw e ;
223+ try {
224+ Thread . sleep ( 30_000 ) ;
225+ } catch ( Exception exception ) {
226+ throw new RuntimeException ( exception ) ;
232227 }
228+
229+ logger .info ("Collection {} is created for this test" , this .configuration .getCollectionId ());
230+ collectionCreated = true ;
231+ } else {
232+ throw e ;
233233 }
234+ }
234235
235- partitionKey = cosmosAsyncContainer .read ().block ().getProperties ().getPartitionKeyDefinition ()
236- .getPaths ().iterator ().next ().split ("/" )[1 ];
237-
238- concurrencyControlSemaphore = new Semaphore (cfg .getConcurrency ());
239-
240- ArrayList <Flux <PojoizedJson >> createDocumentObservables = new ArrayList <>();
241-
242- if (configuration .getOperationType () != Configuration .Operation .WriteLatency
243- && configuration .getOperationType () != Configuration .Operation .WriteThroughput
244- && configuration .getOperationType () != Configuration .Operation .ReadMyWrites ) {
245- logger .info ("PRE-populating {} documents ...." , cfg .getNumberOfPreCreatedDocuments ());
246- String dataFieldValue = RandomStringUtils .randomAlphabetic (cfg .getDocumentDataFieldSize ());
247- for (int i = 0 ; i < cfg .getNumberOfPreCreatedDocuments (); i ++) {
248- String uuid = UUID .randomUUID ().toString ();
249- PojoizedJson newDoc = BenchmarkHelper .generateDocument (uuid ,
250- dataFieldValue ,
251- partitionKey ,
252- configuration .getDocumentDataFieldCount ());
253- Flux <PojoizedJson > obs = cosmosAsyncContainer
254- .createItem (newDoc )
255- .retryWhen (Retry .max (5 ).filter ((error ) -> {
256- if (!(error instanceof CosmosException )) {
257- return false ;
258- }
259- final CosmosException cosmosException = (CosmosException ) error ;
260- if (cosmosException .getStatusCode () == 410 ||
261- cosmosException .getStatusCode () == 408 ||
262- cosmosException .getStatusCode () == 429 ||
263- cosmosException .getStatusCode () == 500 ||
264- cosmosException .getStatusCode () == 503 ) {
265- return true ;
266- }
236+ partitionKey = cosmosAsyncContainer .read ().block ().getProperties ().getPartitionKeyDefinition ()
237+ .getPaths ().iterator ().next ().split ("/" )[1 ];
238+
239+ concurrencyControlSemaphore = new Semaphore (cfg .getConcurrency ());
240+
241+ ArrayList <Flux <PojoizedJson >> createDocumentObservables = new ArrayList <>();
242+
243+ if (configuration .getOperationType () != Configuration .Operation .WriteLatency
244+ && configuration .getOperationType () != Configuration .Operation .WriteThroughput
245+ && configuration .getOperationType () != Configuration .Operation .ReadMyWrites ) {
246+ logger .info ("PRE-populating {} documents ...." , cfg .getNumberOfPreCreatedDocuments ());
247+ String dataFieldValue = RandomStringUtils .randomAlphabetic (cfg .getDocumentDataFieldSize ());
248+ for (int i = 0 ; i < cfg .getNumberOfPreCreatedDocuments (); i ++) {
249+ String uuid = UUID .randomUUID ().toString ();
250+ PojoizedJson newDoc = BenchmarkHelper .generateDocument (uuid ,
251+ dataFieldValue ,
252+ partitionKey ,
253+ configuration .getDocumentDataFieldCount ());
254+ Flux <PojoizedJson > obs = cosmosAsyncContainer
255+ .createItem (newDoc )
256+ .retryWhen (Retry .max (5 ).filter ((error ) -> {
257+ if (!(error instanceof CosmosException )) {
258+ return false ;
259+ }
260+ final CosmosException cosmosException = (CosmosException ) error ;
261+ if (cosmosException .getStatusCode () == 410 ||
262+ cosmosException .getStatusCode () == 408 ||
263+ cosmosException .getStatusCode () == 429 ||
264+ cosmosException .getStatusCode () == 500 ||
265+ cosmosException .getStatusCode () == 503 ) {
266+ return true ;
267+ }
267268
269+ return false ;
270+ }))
271+ .onErrorResume (
272+ (error ) -> {
273+ if (!(error instanceof CosmosException )) {
268274 return false ;
269- }))
270- .onErrorResume (
271- (error ) -> {
272- if (!(error instanceof CosmosException )) {
273- return false ;
274- }
275- final CosmosException cosmosException = (CosmosException ) error ;
276- if (cosmosException .getStatusCode () == 409 ) {
277- return true ;
278- }
279-
280- return false ;
281- },
282- (conflictException ) -> cosmosAsyncContainer .readItem (
283- uuid , new PartitionKey (partitionKey ), PojoizedJson .class )
284- )
285- .map (resp -> {
286- PojoizedJson x =
287- resp .getItem ();
288- return x ;
289- })
290- .flux ();
291- createDocumentObservables .add (obs );
292- }
275+ }
276+ final CosmosException cosmosException = (CosmosException ) error ;
277+ if (cosmosException .getStatusCode () == 409 ) {
278+ return true ;
279+ }
280+
281+ return false ;
282+ },
283+ (conflictException ) -> cosmosAsyncContainer .readItem (
284+ uuid , new PartitionKey (partitionKey ), PojoizedJson .class )
285+ )
286+ .map (resp -> {
287+ PojoizedJson x =
288+ resp .getItem ();
289+ return x ;
290+ })
291+ .flux ();
292+ createDocumentObservables .add (obs );
293293 }
294+ }
294295
295- docsToRead = Flux .merge (Flux .fromIterable (createDocumentObservables ), 100 ).collectList ().block ();
296- logger .info ("Finished pre-populating {} documents" , cfg .getNumberOfPreCreatedDocuments ());
296+ docsToRead = Flux .merge (Flux .fromIterable (createDocumentObservables ), 100 ).collectList ().block ();
297+ logger .info ("Finished pre-populating {} documents" , cfg .getNumberOfPreCreatedDocuments ());
297298
298- init ();
299+ init ();
299300
300- if (configuration .isEnableJvmStats ()) {
301- metricsRegistry .register ("gc" , new GarbageCollectorMetricSet ());
302- metricsRegistry .register ("threads" , new CachedThreadStatesGaugeSet (10 , TimeUnit .SECONDS ));
303- metricsRegistry .register ("memory" , new MemoryUsageGaugeSet ());
304- }
301+ if (configuration .isEnableJvmStats ()) {
302+ metricsRegistry .register ("gc" , new GarbageCollectorMetricSet ());
303+ metricsRegistry .register ("threads" , new CachedThreadStatesGaugeSet (10 , TimeUnit .SECONDS ));
304+ metricsRegistry .register ("memory" , new MemoryUsageGaugeSet ());
305+ }
305306
306- if (configuration .getGraphiteEndpoint () != null ) {
307- final Graphite graphite = new Graphite (new InetSocketAddress (
308- configuration .getGraphiteEndpoint (),
309- configuration .getGraphiteEndpointPort ()));
310- reporter = GraphiteReporter .forRegistry (metricsRegistry )
311- .prefixedWith (configuration .getOperationType ().name ())
312- .convertDurationsTo (TimeUnit .MILLISECONDS )
313- .convertRatesTo (TimeUnit .SECONDS )
314- .filter (MetricFilter .ALL )
315- .build (graphite );
316- } else if (configuration .getReportingDirectory () != null ) {
317- reporter = CsvReporter .forRegistry (metricsRegistry )
318- .convertDurationsTo (TimeUnit .MILLISECONDS )
319- .convertRatesTo (TimeUnit .SECONDS )
320- .build (configuration .getReportingDirectory ());
321- } else {
322- reporter = ConsoleReporter .forRegistry (metricsRegistry )
323- .convertDurationsTo (TimeUnit .MILLISECONDS )
324- .convertRatesTo (TimeUnit .SECONDS )
325- .build ();
326- }
307+ if (configuration .getGraphiteEndpoint () != null ) {
308+ final Graphite graphite = new Graphite (new InetSocketAddress (
309+ configuration .getGraphiteEndpoint (),
310+ configuration .getGraphiteEndpointPort ()));
311+ reporter = GraphiteReporter .forRegistry (metricsRegistry )
312+ .prefixedWith (configuration .getOperationType ().name ())
313+ .convertDurationsTo (TimeUnit .MILLISECONDS )
314+ .convertRatesTo (TimeUnit .SECONDS )
315+ .filter (MetricFilter .ALL )
316+ .build (graphite );
317+ } else if (configuration .getReportingDirectory () != null ) {
318+ reporter = CsvReporter .forRegistry (metricsRegistry )
319+ .convertDurationsTo (TimeUnit .MILLISECONDS )
320+ .convertRatesTo (TimeUnit .SECONDS )
321+ .build (configuration .getReportingDirectory ());
322+ } else {
323+ reporter = ConsoleReporter .forRegistry (metricsRegistry )
324+ .convertDurationsTo (TimeUnit .MILLISECONDS )
325+ .convertRatesTo (TimeUnit .SECONDS )
326+ .build ();
327+ }
327328
328- if (configuration .getResultUploadDatabase () != null && configuration .getResultUploadContainer () != null ) {
329- resultReporter = CosmosTotalResultReporter
330- .forRegistry (
331- metricsRegistry ,
332- syncClient .getDatabase (configuration .getResultUploadDatabase ()).getContainer (configuration .getResultUploadContainer ()),
333- configuration )
334- .convertRatesTo (TimeUnit .SECONDS )
335- .convertDurationsTo (TimeUnit .MILLISECONDS ).build ();
336- } else {
337- resultReporter = null ;
338- }
329+ if (configuration .getResultUploadDatabase () != null && configuration .getResultUploadContainer () != null ) {
330+ resultReporter = CosmosTotalResultReporter
331+ .forRegistry (
332+ metricsRegistry ,
333+ this .resultUploaderClient .getDatabase (configuration .getResultUploadDatabase ()).getContainer (configuration .getResultUploadContainer ()),
334+ configuration )
335+ .convertRatesTo (TimeUnit .SECONDS )
336+ .convertDurationsTo (TimeUnit .MILLISECONDS ).build ();
337+ } else {
338+ resultReporter = null ;
339339 }
340340
341+
341342 boolean shouldOpenConnectionsAndInitCaches = configuration .getConnectionMode () == ConnectionMode .DIRECT
342343 && configuration .isProactiveConnectionManagementEnabled ()
343344 && !configuration .isUseUnWarmedUpContainer ();
@@ -429,7 +430,11 @@ uuid, new PartitionKey(partitionKey), PojoizedJson.class)
429430
430431 databaseForUnwarmedContainer = clientForUnwarmedContainer .getDatabase (configuration .getDatabaseId ());
431432 }
432- databaseForUnwarmedContainer .createContainerIfNotExists (configuration .getCollectionId (), "/id" ).block ();
433+
434+ if (!isManagedIdentityRequired ) {
435+ databaseForUnwarmedContainer .createContainerIfNotExists (configuration .getCollectionId (), "/id" ).block ();
436+ }
437+
433438 cosmosAsyncContainer = databaseForUnwarmedContainer .getContainer (configuration .getCollectionId ());
434439 }
435440 }
@@ -447,6 +452,7 @@ void shutdown() {
447452 }
448453
449454 benchmarkWorkloadClient .close ();
455+ resultUploaderClient .close ();
450456 }
451457
452458 protected void onSuccess () {
0 commit comments