Skip to content

Commit a5d7602

Browse files
committed
Add additional WLM search settings
Signed-off-by: David Zane <davizane@amazon.com>
1 parent f57de46 commit a5d7602

File tree

6 files changed

+261
-7
lines changed

6 files changed

+261
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3939
- Implement FieldMappingIngestionMessageMapper for pull-based ingestion ([#20729](https://github.com/opensearch-project/OpenSearch/pull/20729))
4040
- Added support of WarmerRefreshListener in NRTReplicationEngine to trigger warmer after replication on replica shards ([#20650](https://github.com/opensearch-project/OpenSearch/pull/20650))
4141
- WLM group custom search settings - groundwork and timeout ([#20536](https://github.com/opensearch-project/OpenSearch/issues/20536))
42+
- Add additional WLM search settings ([#20830](https://github.com/opensearch-project/OpenSearch/issues/20830))
4243

4344
### Changed
4445
- Make telemetry `Tags` immutable ([#20788](https://github.com/opensearch-project/OpenSearch/pull/20788))

server/src/main/java/org/opensearch/action/search/SearchRequest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.opensearch.action.IndicesRequest;
4040
import org.opensearch.action.support.IndicesOptions;
4141
import org.opensearch.common.Nullable;
42+
import org.opensearch.common.annotation.ExperimentalApi;
4243
import org.opensearch.common.annotation.PublicApi;
4344
import org.opensearch.common.io.stream.BytesStreamOutput;
4445
import org.opensearch.common.unit.TimeValue;
@@ -649,6 +650,15 @@ public int getMaxConcurrentShardRequests() {
649650
return maxConcurrentShardRequests == 0 ? 5 : maxConcurrentShardRequests;
650651
}
651652

653+
/**
654+
* Returns the raw value of maxConcurrentShardRequests without applying the default.
655+
* A value of {@code 0} means the user has not explicitly set this parameter.
656+
*/
657+
@ExperimentalApi
658+
public int getMaxConcurrentShardRequestsRaw() {
659+
return maxConcurrentShardRequests;
660+
}
661+
652662
/**
653663
* Sets the number of shard requests that should be executed concurrently on a single node. This value should be used as a
654664
* protection mechanism to reduce the number of shard requests fired per high level search request. Searches that hit the entire

server/src/main/java/org/opensearch/wlm/WorkloadGroupSearchSettings.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ private WorkloadGroupSearchSettings() {
3131
*/
3232
public enum WlmSearchSetting {
3333
// Query parameters (applied to SearchRequest)
34+
/** Setting for batched reduce size */
35+
BATCHED_REDUCE_SIZE("batched_reduce_size", WorkloadGroupSearchSettings::validateBatchedReduceSize),
36+
/** Setting for canceling search requests after a time interval */
37+
CANCEL_AFTER_TIME_INTERVAL("cancel_after_time_interval", WorkloadGroupSearchSettings::validateTimeValue),
38+
/** Setting for maximum concurrent shard requests */
39+
MAX_CONCURRENT_SHARD_REQUESTS("max_concurrent_shard_requests", WorkloadGroupSearchSettings::validatePositiveInt),
3440
/** Setting for search request timeout */
3541
TIMEOUT("timeout", WorkloadGroupSearchSettings::validateTimeValue);
3642

@@ -114,4 +120,38 @@ private static String validateTimeValue(String value) {
114120
return e.getMessage();
115121
}
116122
}
123+
124+
/**
125+
* Validates a positive integer string.
126+
* @param value the string to validate
127+
* @return null if valid, error message if invalid
128+
*/
129+
private static String validatePositiveInt(String value) {
130+
try {
131+
int intValue = Integer.parseInt(value);
132+
if (intValue < 1) {
133+
return "must be positive";
134+
}
135+
return null;
136+
} catch (NumberFormatException e) {
137+
return "must be a valid integer";
138+
}
139+
}
140+
141+
/**
142+
* Validates batched reduce size (must be >= 2).
143+
* @param value the string to validate
144+
* @return null if valid, error message if invalid
145+
*/
146+
private static String validateBatchedReduceSize(String value) {
147+
try {
148+
int intValue = Integer.parseInt(value);
149+
if (intValue < 2) {
150+
return "must be >= 2";
151+
}
152+
return null;
153+
} catch (NumberFormatException e) {
154+
return "must be a valid integer";
155+
}
156+
}
117157
}

server/src/main/java/org/opensearch/wlm/listeners/WorkloadGroupRequestOperationListener.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,34 @@ private void applyWorkloadGroupSearchSettings(String workloadGroupId, SearchRequ
9494
);
9595
}
9696
break;
97+
case BATCHED_REDUCE_SIZE:
98+
// Only apply WLM batched reduce size when the request uses the default value
99+
// TODO: batchedReduceSize is a primitive int with no sentinel value, so we cannot
100+
// distinguish between "not set" and "explicitly set to 512 (the default)". If a user
101+
// explicitly sets batched_reduce_size=512, WLM will still override it. Consider adding
102+
// a raw accessor or tracking boolean similar to maxConcurrentShardRequests.
103+
int wlmBatchedReduceSize = Integer.parseInt(entry.getValue());
104+
if (searchRequest.getBatchedReduceSize() == SearchRequest.DEFAULT_BATCHED_REDUCE_SIZE) {
105+
searchRequest.setBatchedReduceSize(wlmBatchedReduceSize);
106+
}
107+
break;
108+
case CANCEL_AFTER_TIME_INTERVAL:
109+
// Only apply WLM cancel_after_time_interval when the request has none set
110+
if (searchRequest.getCancelAfterTimeInterval() == null) {
111+
searchRequest.setCancelAfterTimeInterval(
112+
TimeValue.parseTimeValue(
113+
entry.getValue(),
114+
WorkloadGroupSearchSettings.WlmSearchSetting.CANCEL_AFTER_TIME_INTERVAL.getSettingName()
115+
)
116+
);
117+
}
118+
break;
119+
case MAX_CONCURRENT_SHARD_REQUESTS:
120+
// Raw value 0 means not explicitly set; only apply WLM when not explicitly set
121+
if (searchRequest.getMaxConcurrentShardRequestsRaw() == 0) {
122+
searchRequest.setMaxConcurrentShardRequests(Integer.parseInt(entry.getValue()));
123+
}
124+
break;
97125
}
98126
} catch (Exception e) {
99127
logger.error("Failed to apply workload group setting [{}={}]: {}", entry.getKey(), entry.getValue(), e);

server/src/test/java/org/opensearch/wlm/WorkloadGroupSearchSettingsTests.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,31 @@
1616
public class WorkloadGroupSearchSettingsTests extends OpenSearchTestCase {
1717

1818
public void testEnumSettingNames() {
19+
assertEquals("batched_reduce_size", WorkloadGroupSearchSettings.WlmSearchSetting.BATCHED_REDUCE_SIZE.getSettingName());
20+
assertEquals(
21+
"cancel_after_time_interval",
22+
WorkloadGroupSearchSettings.WlmSearchSetting.CANCEL_AFTER_TIME_INTERVAL.getSettingName()
23+
);
24+
assertEquals(
25+
"max_concurrent_shard_requests",
26+
WorkloadGroupSearchSettings.WlmSearchSetting.MAX_CONCURRENT_SHARD_REQUESTS.getSettingName()
27+
);
1928
assertEquals("timeout", WorkloadGroupSearchSettings.WlmSearchSetting.TIMEOUT.getSettingName());
2029
}
2130

2231
public void testFromKeyValidSettings() {
32+
assertEquals(
33+
WorkloadGroupSearchSettings.WlmSearchSetting.BATCHED_REDUCE_SIZE,
34+
WorkloadGroupSearchSettings.WlmSearchSetting.fromKey("batched_reduce_size")
35+
);
36+
assertEquals(
37+
WorkloadGroupSearchSettings.WlmSearchSetting.CANCEL_AFTER_TIME_INTERVAL,
38+
WorkloadGroupSearchSettings.WlmSearchSetting.fromKey("cancel_after_time_interval")
39+
);
40+
assertEquals(
41+
WorkloadGroupSearchSettings.WlmSearchSetting.MAX_CONCURRENT_SHARD_REQUESTS,
42+
WorkloadGroupSearchSettings.WlmSearchSetting.fromKey("max_concurrent_shard_requests")
43+
);
2344
assertEquals(WorkloadGroupSearchSettings.WlmSearchSetting.TIMEOUT, WorkloadGroupSearchSettings.WlmSearchSetting.fromKey("timeout"));
2445
}
2546

@@ -33,6 +54,7 @@ public void testValidateTimeValue() {
3354
WorkloadGroupSearchSettings.WlmSearchSetting.TIMEOUT.validate("30s");
3455
WorkloadGroupSearchSettings.WlmSearchSetting.TIMEOUT.validate("5m");
3556
WorkloadGroupSearchSettings.WlmSearchSetting.TIMEOUT.validate("1h");
57+
WorkloadGroupSearchSettings.WlmSearchSetting.CANCEL_AFTER_TIME_INTERVAL.validate("1h");
3658
}
3759

3860
public void testValidateInvalidTimeValue() {
@@ -43,9 +65,35 @@ public void testValidateInvalidTimeValue() {
4365
assertTrue(exception.getMessage().contains("Invalid value"));
4466
}
4567

68+
public void testValidatePositiveInt() {
69+
WorkloadGroupSearchSettings.WlmSearchSetting.MAX_CONCURRENT_SHARD_REQUESTS.validate("1");
70+
WorkloadGroupSearchSettings.WlmSearchSetting.MAX_CONCURRENT_SHARD_REQUESTS.validate("100");
71+
}
72+
73+
public void testValidateInvalidPositiveInt() {
74+
IllegalArgumentException exception = expectThrows(
75+
IllegalArgumentException.class,
76+
() -> WorkloadGroupSearchSettings.WlmSearchSetting.MAX_CONCURRENT_SHARD_REQUESTS.validate("0")
77+
);
78+
assertTrue(exception.getMessage().contains("must be positive"));
79+
80+
exception = expectThrows(
81+
IllegalArgumentException.class,
82+
() -> WorkloadGroupSearchSettings.WlmSearchSetting.MAX_CONCURRENT_SHARD_REQUESTS.validate("-1")
83+
);
84+
assertTrue(exception.getMessage().contains("must be positive"));
85+
86+
exception = expectThrows(
87+
IllegalArgumentException.class,
88+
() -> WorkloadGroupSearchSettings.WlmSearchSetting.MAX_CONCURRENT_SHARD_REQUESTS.validate("abc")
89+
);
90+
assertTrue(exception.getMessage().contains("must be a valid integer"));
91+
}
92+
4693
public void testValidateSearchSettingsValid() {
4794
Map<String, String> settings = new HashMap<>();
4895
settings.put("timeout", "30s");
96+
settings.put("max_concurrent_shard_requests", "5");
4997

5098
// Should not throw exception
5199
WorkloadGroupSearchSettings.validateSearchSettings(settings);
@@ -106,4 +154,29 @@ public void testValidateSearchSettingsEmpty() {
106154
// Should not throw exception for empty map
107155
WorkloadGroupSearchSettings.validateSearchSettings(settings);
108156
}
157+
158+
public void testValidateBatchedReduceSize() {
159+
WorkloadGroupSearchSettings.WlmSearchSetting.BATCHED_REDUCE_SIZE.validate("2");
160+
WorkloadGroupSearchSettings.WlmSearchSetting.BATCHED_REDUCE_SIZE.validate("512");
161+
}
162+
163+
public void testValidateInvalidBatchedReduceSize() {
164+
IllegalArgumentException exception = expectThrows(
165+
IllegalArgumentException.class,
166+
() -> WorkloadGroupSearchSettings.WlmSearchSetting.BATCHED_REDUCE_SIZE.validate("1")
167+
);
168+
assertTrue(exception.getMessage().contains("must be >= 2"));
169+
170+
exception = expectThrows(
171+
IllegalArgumentException.class,
172+
() -> WorkloadGroupSearchSettings.WlmSearchSetting.BATCHED_REDUCE_SIZE.validate("0")
173+
);
174+
assertTrue(exception.getMessage().contains("must be >= 2"));
175+
176+
exception = expectThrows(
177+
IllegalArgumentException.class,
178+
() -> WorkloadGroupSearchSettings.WlmSearchSetting.BATCHED_REDUCE_SIZE.validate("abc")
179+
);
180+
assertTrue(exception.getMessage().contains("must be a valid integer"));
181+
}
109182
}

server/src/test/java/org/opensearch/wlm/listeners/WorkloadGroupRequestOperationListenerTests.java

Lines changed: 109 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -288,33 +288,104 @@ private void setupMockedWorkloadGroupsFromClusterState() {
288288
// Tests for applyWorkloadGroupSearchSettings
289289

290290
public void testApplySearchSettings_NullWorkloadGroupId() {
291+
mockSearchRequest.setBatchedReduceSize(256);
292+
291293
// No workload group ID in thread context
292294
sut.onRequestStart(mockSearchRequestContext);
293295

294-
// Request should remain unchanged - source is null, no timeout set
295-
assertNull(mockSearchRequest.source());
296+
// Request should remain unchanged
297+
assertEquals(256, mockSearchRequest.getBatchedReduceSize());
296298
}
297299

298300
public void testApplySearchSettings_WorkloadGroupNotFound() {
301+
mockSearchRequest.setBatchedReduceSize(256);
302+
299303
testThreadPool.getThreadContext().putHeader(WorkloadGroupTask.WORKLOAD_GROUP_ID_HEADER, "non-existent-id");
300304
when(workloadGroupService.getWorkloadGroupById("non-existent-id")).thenReturn(null);
301305

302306
sut.onRequestStart(mockSearchRequestContext);
303307

304-
assertNull(mockSearchRequest.source());
308+
assertEquals(256, mockSearchRequest.getBatchedReduceSize());
305309
}
306310

307-
public void testApplySearchSettings_EmptySearchSettings() {
308-
mockSearchRequest.source(new SearchSourceBuilder());
311+
public void testApplySearchSettings_BatchedReduceSize_WlmAppliedWhenDefault() {
312+
// Request uses default value (512)
313+
assertEquals(SearchRequest.DEFAULT_BATCHED_REDUCE_SIZE, mockSearchRequest.getBatchedReduceSize());
309314

310315
String wgId = "test-wg";
311-
WorkloadGroup wg = createWorkloadGroup(wgId, Map.of());
316+
WorkloadGroup wg = createWorkloadGroup(wgId, Map.of("batched_reduce_size", "100"));
312317
when(workloadGroupService.getWorkloadGroupById(wgId)).thenReturn(wg);
313318
testThreadPool.getThreadContext().putHeader(WorkloadGroupTask.WORKLOAD_GROUP_ID_HEADER, wgId);
314319

315320
sut.onRequestStart(mockSearchRequestContext);
316321

317-
assertNull(mockSearchRequest.source().timeout()); // No settings applied
322+
assertEquals(100, mockSearchRequest.getBatchedReduceSize()); // WLM applied
323+
}
324+
325+
public void testApplySearchSettings_BatchedReduceSize_RequestAlreadySet() {
326+
mockSearchRequest.setBatchedReduceSize(50); // explicitly set by user
327+
328+
String wgId = "test-wg";
329+
WorkloadGroup wg = createWorkloadGroup(wgId, Map.of("batched_reduce_size", "100"));
330+
when(workloadGroupService.getWorkloadGroupById(wgId)).thenReturn(wg);
331+
testThreadPool.getThreadContext().putHeader(WorkloadGroupTask.WORKLOAD_GROUP_ID_HEADER, wgId);
332+
333+
sut.onRequestStart(mockSearchRequestContext);
334+
335+
assertEquals(50, mockSearchRequest.getBatchedReduceSize()); // Request value preserved
336+
}
337+
338+
public void testApplySearchSettings_MaxConcurrentShardRequests_WlmAppliedWhenDefault() {
339+
// Request has default value (not explicitly set), raw field is 0
340+
assertEquals(0, mockSearchRequest.getMaxConcurrentShardRequestsRaw());
341+
342+
String wgId = "test-wg";
343+
WorkloadGroup wg = createWorkloadGroup(wgId, Map.of("max_concurrent_shard_requests", "10"));
344+
when(workloadGroupService.getWorkloadGroupById(wgId)).thenReturn(wg);
345+
testThreadPool.getThreadContext().putHeader(WorkloadGroupTask.WORKLOAD_GROUP_ID_HEADER, wgId);
346+
347+
sut.onRequestStart(mockSearchRequestContext);
348+
349+
assertEquals(10, mockSearchRequest.getMaxConcurrentShardRequests()); // WLM applied
350+
}
351+
352+
public void testApplySearchSettings_MaxConcurrentShardRequests_RequestAlreadySet() {
353+
mockSearchRequest.setMaxConcurrentShardRequests(20); // explicitly set by user
354+
355+
String wgId = "test-wg";
356+
WorkloadGroup wg = createWorkloadGroup(wgId, Map.of("max_concurrent_shard_requests", "5"));
357+
when(workloadGroupService.getWorkloadGroupById(wgId)).thenReturn(wg);
358+
testThreadPool.getThreadContext().putHeader(WorkloadGroupTask.WORKLOAD_GROUP_ID_HEADER, wgId);
359+
360+
sut.onRequestStart(mockSearchRequestContext);
361+
362+
assertEquals(20, mockSearchRequest.getMaxConcurrentShardRequests()); // Request value preserved
363+
}
364+
365+
public void testApplySearchSettings_CancelAfterTimeInterval_WlmAppliedWhenNull() {
366+
assertNull(mockSearchRequest.getCancelAfterTimeInterval());
367+
368+
String wgId = "test-wg";
369+
WorkloadGroup wg = createWorkloadGroup(wgId, Map.of("cancel_after_time_interval", "30s"));
370+
when(workloadGroupService.getWorkloadGroupById(wgId)).thenReturn(wg);
371+
testThreadPool.getThreadContext().putHeader(WorkloadGroupTask.WORKLOAD_GROUP_ID_HEADER, wgId);
372+
373+
sut.onRequestStart(mockSearchRequestContext);
374+
375+
assertEquals(TimeValue.timeValueSeconds(30), mockSearchRequest.getCancelAfterTimeInterval());
376+
}
377+
378+
public void testApplySearchSettings_CancelAfterTimeInterval_RequestAlreadySet() {
379+
mockSearchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10));
380+
381+
String wgId = "test-wg";
382+
WorkloadGroup wg = createWorkloadGroup(wgId, Map.of("cancel_after_time_interval", "30s"));
383+
when(workloadGroupService.getWorkloadGroupById(wgId)).thenReturn(wg);
384+
testThreadPool.getThreadContext().putHeader(WorkloadGroupTask.WORKLOAD_GROUP_ID_HEADER, wgId);
385+
386+
sut.onRequestStart(mockSearchRequestContext);
387+
388+
assertEquals(TimeValue.timeValueSeconds(10), mockSearchRequest.getCancelAfterTimeInterval()); // Request value preserved
318389
}
319390

320391
public void testApplySearchSettings_Timeout_WlmAppliedWhenNull() {
@@ -357,6 +428,37 @@ public void testApplySearchSettings_Timeout_NullSource() {
357428
assertNull(mockSearchRequest.source()); // Should not throw, source remains null
358429
}
359430

431+
public void testApplySearchSettings_EmptySearchSettings() {
432+
mockSearchRequest.source(new SearchSourceBuilder());
433+
434+
String wgId = "test-wg";
435+
WorkloadGroup wg = createWorkloadGroup(wgId, Map.of());
436+
when(workloadGroupService.getWorkloadGroupById(wgId)).thenReturn(wg);
437+
testThreadPool.getThreadContext().putHeader(WorkloadGroupTask.WORKLOAD_GROUP_ID_HEADER, wgId);
438+
439+
sut.onRequestStart(mockSearchRequestContext);
440+
441+
assertNull(mockSearchRequest.source().timeout()); // No settings applied
442+
}
443+
444+
public void testApplySearchSettings_MultipleSettings() {
445+
mockSearchRequest.source(new SearchSourceBuilder());
446+
447+
String wgId = "test-wg";
448+
WorkloadGroup wg = createWorkloadGroup(
449+
wgId,
450+
Map.of("batched_reduce_size", "100", "max_concurrent_shard_requests", "5", "timeout", "30s")
451+
);
452+
when(workloadGroupService.getWorkloadGroupById(wgId)).thenReturn(wg);
453+
testThreadPool.getThreadContext().putHeader(WorkloadGroupTask.WORKLOAD_GROUP_ID_HEADER, wgId);
454+
455+
sut.onRequestStart(mockSearchRequestContext);
456+
457+
assertEquals(100, mockSearchRequest.getBatchedReduceSize());
458+
assertEquals(5, mockSearchRequest.getMaxConcurrentShardRequests());
459+
assertEquals(TimeValue.timeValueSeconds(30), mockSearchRequest.source().timeout());
460+
}
461+
360462
private WorkloadGroup createWorkloadGroup(String id, Map<String, String> searchSettings) {
361463
return new WorkloadGroup(
362464
"test-name",

0 commit comments

Comments
 (0)