Skip to content

Commit 8f8b4aa

Browse files
authored
Fix: Hybrid Search Scheduling Stopwatch to Use Cumulative Timing Across All Phases (Azure#46591)
* Adding fixes for stopWatch when running hybrid search queries * Updating changelog * Resolving comments * Resolving comments
1 parent 4537eb8 commit 8f8b4aa

File tree

5 files changed

+94
-8
lines changed

5 files changed

+94
-8
lines changed

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/HybridSearchQueryTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,8 @@ public void hybridQueryTest() {
130130
resultDocs = container.queryItems(querySpec, new CosmosQueryRequestOptions(), Document.class).byPage()
131131
.flatMap(feedResponse -> Flux.fromIterable(feedResponse.getResults()))
132132
.collectList().block();
133-
assertThat(resultDocs).hasSize(13);
134-
validateResults(Arrays.asList("51", "49", "61", "24", "54", "2", "22", "75", "77", "57", "76", "80", "85"), resultDocs);
133+
assertThat(resultDocs).hasSize(15);
134+
validateResults(Arrays.asList("51", "49", "24", "61", "54", "22", "2", "25", "75", "77", "57", "76", "66", "80", "85"), resultDocs);
135135

136136
query = "SELECT c.id, c.title FROM c WHERE FullTextContains(c.title, @term1) " +
137137
"OR FullTextContains(c.text, @term1) OR FullTextContains(c.text, @term2) ORDER BY " +
@@ -143,8 +143,8 @@ public void hybridQueryTest() {
143143
resultDocs = container.queryItems(querySpec, new CosmosQueryRequestOptions(), Document.class).byPage()
144144
.flatMap(feedResponse -> Flux.fromIterable(feedResponse.getResults()))
145145
.collectList().block();
146-
assertThat(resultDocs).hasSize(8);
147-
validateResults(Arrays.asList("2", "22", "75", "77", "57", "76", "80", "85"), resultDocs);
146+
assertThat(resultDocs).hasSize(10);
147+
validateResults(Arrays.asList("22", "2", "25", "75", "77", "57", "76", "66", "80", "85"), resultDocs);
148148

149149
List<Float> vector = getQueryVector();
150150
query = "SELECT TOP 10 c.id, c.text, c.title FROM c " +
@@ -159,7 +159,7 @@ public void hybridQueryTest() {
159159
.flatMap(feedResponse -> Flux.fromIterable(feedResponse.getResults()))
160160
.collectList().block();
161161
assertThat(resultDocs).hasSize(10);
162-
validateResults(Arrays.asList("4", "24", "6", "9", "2", "3", "21", "5", "13", "49"), resultDocs);
162+
validateResults(Arrays.asList("4", "24", "6", "9", "2", "3", "21", "5", "49", "13"), resultDocs);
163163
}
164164

165165
@Test(groups = {"query", "split"}, timeOut = TIMEOUT)

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
* Fixed Null Pointer Exception for query when container recreated with same name. - [PR 45930](https://github.com/Azure/azure-sdk-for-java/pull/45930)
1313
* Fixed Null Pointer Exception for readMany when container recreated with same name. - [PR 45930](https://github.com/Azure/azure-sdk-for-java/pull/45930)
1414
* Fixed parameterized query failures for Hybrid Search queries. - [PR 46446](https://github.com/Azure/azure-sdk-for-java/pull/46446)
15+
* Fixed a rare race in parallel Hybrid Search queries by making internal SchedulingStopwatch start/stop atomic and idempotent. - [PR 46485](https://github.com/Azure/azure-sdk-for-java/pull/46485)
16+
* Fixed the Hybrid Search SchedulingStopWatch to use Cumulative Timing Across all parallel queries. - [PR 46591](https://github.com/Azure/azure-sdk-for-java/pull/46591)
1517
* Fixed Strong Consistency violation when a single replica in a partition returns a 410 `Lease Not Found`. - [PR 46433](https://github.com/Azure/azure-sdk-for-java/pull/46433)
1618

1719
#### Other Changes

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/HybridSearchDocumentProducer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
88
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
99
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
10+
import com.azure.cosmos.implementation.query.metrics.SchedulingStopwatch;
1011
import com.azure.cosmos.models.CosmosQueryRequestOptions;
1112
import com.azure.cosmos.models.FeedResponse;
1213
import reactor.core.publisher.Mono;
@@ -16,6 +17,8 @@
1617
import java.util.function.Supplier;
1718

1819
public class HybridSearchDocumentProducer extends DocumentProducer<Document> {
20+
private final SchedulingStopwatch sharedSchedulingStopwatch;
21+
1922
public HybridSearchDocumentProducer(
2023
IDocumentQueryClient client,
2124
String collectionResourceId,
@@ -30,9 +33,11 @@ public HybridSearchDocumentProducer(
3033
int initialPageSize,
3134
String initialContinuationToken,
3235
int top,
33-
Supplier<String> operationContextTextProvider) {
36+
Supplier<String> operationContextTextProvider,
37+
SchedulingStopwatch sharedSchedulingStopwatch) {
3438
super(client, collectionResourceId, cosmosQueryRequestOptions, createRequestFunc, executeRequestFunc,
3539
collectionLink, createRetryPolicyFunc, resourceType, correlatedActivityId, initialPageSize,
3640
initialContinuationToken, top, feedRange, operationContextTextProvider);
41+
this.sharedSchedulingStopwatch = sharedSchedulingStopwatch;
3742
}
3843
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/HybridSearchDocumentQueryExecutionContext.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
1616
import com.azure.cosmos.implementation.Utils;
1717
import com.azure.cosmos.implementation.query.hybridsearch.HybridSearchQueryResult;
18+
import com.azure.cosmos.implementation.query.metrics.HybridSearchCumulativeSchedulingStopWatch;
19+
import com.azure.cosmos.implementation.query.metrics.SchedulingStopwatch;
1820
import com.azure.cosmos.implementation.QueryMetrics;
1921
import com.azure.cosmos.implementation.RequestChargeTracker;
2022
import com.azure.cosmos.implementation.ResourceType;
@@ -69,6 +71,7 @@ public class HybridSearchDocumentQueryExecutionContext extends ParallelDocumentQ
6971
private final Collection<ClientSideRequestStatistics> clientSideRequestStatistics;
7072
private Flux<HybridSearchQueryResult<Document>> hybridObservable;
7173
private Mono<GlobalFullTextSearchQueryStatistics> aggregatedGlobalStatistics;
74+
private final SchedulingStopwatch hybridSearchSchedulingStopwatch;
7275

7376
protected HybridSearchDocumentQueryExecutionContext(
7477
DiagnosticsClientContext diagnosticsClientContext,
@@ -88,6 +91,10 @@ protected HybridSearchDocumentQueryExecutionContext(
8891
this.tracker = new RequestChargeTracker();
8992
this.queryMetricMap = new ConcurrentHashMap<>();
9093
this.clientSideRequestStatistics = ConcurrentHashMap.newKeySet();
94+
95+
// Initialize the shared stopwatch for hybrid search timing
96+
this.hybridSearchSchedulingStopwatch = new HybridSearchCumulativeSchedulingStopWatch();
97+
this.hybridSearchSchedulingStopwatch.ready();
9198
}
9299

93100
public static Flux<IDocumentQueryExecutionComponent<Document>> createAsync(
@@ -130,6 +137,8 @@ private void initialize(
130137
int initialPageSize,
131138
DocumentCollection collection) {
132139

140+
// Start the hybrid search cumulative stopwatch when search begins
141+
this.hybridSearchSchedulingStopwatch.start();
133142

134143
if (hybridSearchQueryInfo.getRequiresGlobalStatistics()) {
135144
Map<FeedRangeEpkImpl, String> partitionKeyRangeToContinuationToken = new HashMap<>();
@@ -175,7 +184,11 @@ private Flux<HybridSearchQueryResult<Document>> hybridSearch(List<FeedRangeEpkIm
175184
Mono<List<List<Integer>>> ranks = computeRanks(componentScoresList);
176185

177186
// Compute the RRF scores
178-
return computeRRFScores(ranks, coalescedAndSortedResults, componentWeights);
187+
return computeRRFScores(ranks, coalescedAndSortedResults, componentWeights)
188+
.doFinally(signalType -> {
189+
// Stop the hybrid search cumulative stopwatch when search completes
190+
this.hybridSearchSchedulingStopwatch.stop();
191+
});
179192
}
180193

181194
@Override
@@ -206,7 +219,8 @@ protected HybridSearchDocumentProducer createDocumentProducer(
206219
initialPageSize,
207220
continuationToken,
208221
top,
209-
this.getOperationContextTextProvider());
222+
this.getOperationContextTextProvider(),
223+
this.hybridSearchSchedulingStopwatch);
210224
}
211225

212226

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.azure.cosmos.implementation.query.metrics;
4+
5+
import com.azure.cosmos.implementation.apachecommons.lang.time.StopWatch;
6+
7+
public class HybridSearchCumulativeSchedulingStopWatch extends SchedulingStopwatch {
8+
private long cumulativeRunTime;
9+
private StopWatch runTimeStopwatch;
10+
private long numPreemptions;
11+
private boolean responded;
12+
13+
public HybridSearchCumulativeSchedulingStopWatch() {
14+
super();
15+
this.cumulativeRunTime = 0;
16+
this.runTimeStopwatch = new StopWatch();
17+
}
18+
19+
@Override
20+
public SchedulingTimeSpan getElapsedTime() {
21+
SchedulingTimeSpan parentTimeSpan = super.getElapsedTime();
22+
23+
long totalRunTime = this.cumulativeRunTime;
24+
if (this.runTimeStopwatch.isStarted()) {
25+
totalRunTime += this.runTimeStopwatch.getTime();
26+
}
27+
28+
return new SchedulingTimeSpan(
29+
parentTimeSpan.getTurnaroundTime(),
30+
parentTimeSpan.getResponseTime(),
31+
totalRunTime,
32+
parentTimeSpan.getTurnaroundTime() - totalRunTime,
33+
this.numPreemptions
34+
);
35+
}
36+
37+
@Override
38+
public void start() {
39+
synchronized (this.runTimeStopwatch) {
40+
if (this.runTimeStopwatch.isStarted()) {
41+
return;
42+
}
43+
if (!this.responded) {
44+
this.responded = true;
45+
}
46+
// Don't reset - allow cumulative timing
47+
this.runTimeStopwatch.start();
48+
}
49+
}
50+
51+
@Override
52+
public void stop() {
53+
synchronized (this.runTimeStopwatch) {
54+
if (!this.runTimeStopwatch.isStarted()) {
55+
return;
56+
}
57+
this.runTimeStopwatch.stop();
58+
// Add elapsed time to cumulative total
59+
this.cumulativeRunTime += this.runTimeStopwatch.getTime();
60+
// Reset for next cycle
61+
this.runTimeStopwatch.reset();
62+
this.numPreemptions++;
63+
}
64+
}
65+
}

0 commit comments

Comments
 (0)