Skip to content

Commit 0ee5538

Browse files
committed
Fix comments
Signed-off-by: David Zane <davizane@amazon.com>
1 parent 2ccda1c commit 0ee5538

File tree

9 files changed

+35
-66
lines changed

9 files changed

+35
-66
lines changed

plugins/workload-management/src/javaRestTest/java/org/opensearch/rest/WorkloadManagementRestIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void testUpdate() throws Exception {
9191
"cpu" : 1.1,
9292
"memory" : -0.1
9393
}
94-
}'""";
94+
}""";
9595
assertThrows(ResponseException.class, () -> performOperation("PUT", "_wlm/workload_group/analytics5", json));
9696
assertThrows(
9797
ResponseException.class,

server/src/main/java/org/opensearch/action/search/SearchRequest.java

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -649,20 +649,15 @@ public int getMaxConcurrentShardRequests() {
649649
return maxConcurrentShardRequests == 0 ? 5 : maxConcurrentShardRequests;
650650
}
651651

652-
/**
653-
* Returns the raw internal value of maxConcurrentShardRequests without applying the default.
654-
*/
655-
public int getMaxConcurrentShardRequestsRaw() {
656-
return maxConcurrentShardRequests;
657-
}
658-
659652
/**
660653
* Sets the number of shard requests that should be executed concurrently on a single node. This value should be used as a
661654
* protection mechanism to reduce the number of shard requests fired per high level search request. Searches that hit the entire
662655
* cluster can be throttled with this number to reduce the cluster load. The default is {@code 5}
663656
*/
664657
public void setMaxConcurrentShardRequests(int maxConcurrentShardRequests) {
665-
validatePositiveInteger(maxConcurrentShardRequests, "maxConcurrentShardRequests");
658+
if (maxConcurrentShardRequests < 1) {
659+
throw new IllegalArgumentException("maxConcurrentShardRequests must be >= 1");
660+
}
666661
this.maxConcurrentShardRequests = maxConcurrentShardRequests;
667662
}
668663

@@ -680,7 +675,9 @@ public void setMaxConcurrentShardRequests(int maxConcurrentShardRequests) {
680675
* </ul>
681676
*/
682677
public void setPreFilterShardSize(int preFilterShardSize) {
683-
validatePositiveInteger(preFilterShardSize, "preFilterShardSize");
678+
if (preFilterShardSize < 1) {
679+
throw new IllegalArgumentException("preFilterShardSize must be >= 1");
680+
}
684681
this.preFilterShardSize = preFilterShardSize;
685682
}
686683

@@ -738,18 +735,6 @@ public static int resolveTrackTotalHitsUpTo(Scroll scroll, SearchSourceBuilder s
738735
: source.trackTotalHitsUpTo();
739736
}
740737

741-
/**
742-
* Validates that an integer value is positive (>= 1).
743-
* @param value the value to validate
744-
* @param parameterName the name of the parameter for error messages
745-
* @throws IllegalArgumentException if the value is not positive
746-
*/
747-
public static void validatePositiveInteger(int value, String parameterName) {
748-
if (value < 1) {
749-
throw new IllegalArgumentException(parameterName + " must be >= 1");
750-
}
751-
}
752-
753738
public void setCancelAfterTimeInterval(TimeValue cancelAfterTimeInterval) {
754739
this.cancelAfterTimeInterval = cancelAfterTimeInterval;
755740
}

server/src/main/java/org/opensearch/cluster/metadata/WorkloadGroup.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
import java.io.IOException;
2727
import java.util.HashMap;
28-
import java.util.Locale;
2928
import java.util.Map;
3029
import java.util.Objects;
3130
import java.util.Optional;

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,8 +1140,7 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
11401140

11411141
final WorkloadGroupRequestOperationListener workloadGroupRequestOperationListener = new WorkloadGroupRequestOperationListener(
11421142
workloadGroupService,
1143-
threadPool,
1144-
clusterService
1143+
threadPool
11451144
);
11461145

11471146
// register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory

server/src/main/java/org/opensearch/wlm/MutableWorkloadGroupFragment.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public MutableWorkloadGroupFragment(StreamInput in) throws IOException {
7373
if (in.getVersion().onOrAfter(Version.V_3_5_0)) {
7474
// Read null marker: true means searchSettings is null (not specified)
7575
boolean isNull = in.readBoolean();
76-
searchSettings = isNull ? null : in.readMap(StreamInput::readOptionalString, StreamInput::readOptionalString);
76+
searchSettings = isNull ? null : in.readMap(StreamInput::readString, StreamInput::readString);
7777
} else {
7878
searchSettings = new HashMap<>();
7979
}
@@ -195,7 +195,7 @@ public void writeTo(StreamOutput out) throws IOException {
195195
if (out.getVersion().onOrAfter(Version.V_3_5_0)) {
196196
out.writeBoolean(searchSettings == null);
197197
if (searchSettings != null) {
198-
out.writeMap(searchSettings, StreamOutput::writeOptionalString, StreamOutput::writeOptionalString);
198+
out.writeMap(searchSettings, StreamOutput::writeString, StreamOutput::writeString);
199199
}
200200
}
201201
}

server/src/main/java/org/opensearch/wlm/WorkloadGroupSearchSettings.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public String getSettingName() {
5656
void validate(String value) {
5757
String error = validator.apply(value);
5858
if (error != null) {
59-
throw new IllegalArgumentException("Invalid value for " + settingName + ": " + error);
59+
throw new IllegalArgumentException("Invalid value '" + value + "' for " + settingName + ": " + error);
6060
}
6161
}
6262

@@ -85,6 +85,12 @@ public static void validateSearchSettings(Map<String, String> searchSettings) {
8585
return;
8686
}
8787
for (Map.Entry<String, String> entry : searchSettings.entrySet()) {
88+
if (entry.getKey() == null) {
89+
throw new IllegalArgumentException("Search setting key cannot be null");
90+
}
91+
if (entry.getValue() == null) {
92+
throw new IllegalArgumentException("Search setting value cannot be null for key: " + entry.getKey());
93+
}
8894
WlmSearchSetting setting = WlmSearchSetting.fromKey(entry.getKey());
8995
if (setting == null) {
9096
throw new IllegalArgumentException("Unknown search setting: " + entry.getKey());

server/src/main/java/org/opensearch/wlm/WorkloadGroupService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,15 @@ public Set<WorkloadGroup> getActiveWorkloadGroups() {
325325
return activeWorkloadGroups;
326326
}
327327

328+
/**
329+
* Returns the workload group with the given ID, or null if not found.
330+
* @param workloadGroupId the workload group identifier
331+
* @return the WorkloadGroup or null
332+
*/
333+
public WorkloadGroup getWorkloadGroupById(String workloadGroupId) {
334+
return clusterService.state().metadata().workloadGroups().get(workloadGroupId);
335+
}
336+
328337
public Set<WorkloadGroup> getDeletedWorkloadGroups() {
329338
return deletedWorkloadGroups;
330339
}

server/src/main/java/org/opensearch/wlm/listeners/WorkloadGroupRequestOperationListener.java

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
import org.opensearch.action.search.SearchRequestContext;
1616
import org.opensearch.action.search.SearchRequestOperationsListener;
1717
import org.opensearch.cluster.metadata.WorkloadGroup;
18-
import org.opensearch.cluster.metadata.WorkloadGroupMetadata;
19-
import org.opensearch.cluster.service.ClusterService;
2018
import org.opensearch.threadpool.ThreadPool;
2119
import org.opensearch.wlm.WorkloadGroupSearchSettings;
2220
import org.opensearch.wlm.WorkloadGroupService;
@@ -32,16 +30,10 @@ public class WorkloadGroupRequestOperationListener extends SearchRequestOperatio
3230
private static final Logger logger = LogManager.getLogger(WorkloadGroupRequestOperationListener.class);
3331
private final WorkloadGroupService workloadGroupService;
3432
private final ThreadPool threadPool;
35-
private final ClusterService clusterService;
3633

37-
public WorkloadGroupRequestOperationListener(
38-
WorkloadGroupService workloadGroupService,
39-
ThreadPool threadPool,
40-
ClusterService clusterService
41-
) {
34+
public WorkloadGroupRequestOperationListener(WorkloadGroupService workloadGroupService, ThreadPool threadPool) {
4235
this.workloadGroupService = workloadGroupService;
4336
this.threadPool = threadPool;
44-
this.clusterService = clusterService;
4537
}
4638

4739
/**
@@ -69,24 +61,18 @@ protected void onRequestFailure(SearchPhaseContext context, SearchRequestContext
6961
* @param searchRequest the search request to modify
7062
*/
7163
private void applyWorkloadGroupSearchSettings(String workloadGroupId, SearchRequest searchRequest) {
72-
// Skip if no workload group ID (default group is added later)
7364
if (workloadGroupId == null) {
65+
// Return if request contains no WLM group assignment (default group is added later)
7466
return;
7567
}
7668

77-
WorkloadGroupMetadata metadata = clusterService.state().metadata().custom(WorkloadGroupMetadata.TYPE);
78-
if (metadata == null) {
79-
return;
80-
}
69+
WorkloadGroup workloadGroup = workloadGroupService.getWorkloadGroupById(workloadGroupId);
8170

82-
// Get the workload group object by ID
83-
WorkloadGroup workloadGroup = metadata.workloadGroups().get(workloadGroupId);
8471
if (workloadGroup == null) {
8572
return;
8673
}
8774

8875
// Loop through WLM group search settings and apply them as needed
89-
// WLM settings are applied only if the corresponding setting is not already set in the request
9076
for (Map.Entry<String, String> entry : workloadGroup.getSearchSettings().entrySet()) {
9177
try {
9278
WorkloadGroupSearchSettings.WlmSearchSetting settingKey = WorkloadGroupSearchSettings.WlmSearchSetting.fromKey(

server/src/test/java/org/opensearch/wlm/listeners/WorkloadGroupRequestOperationListenerTests.java

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.opensearch.cluster.ClusterState;
1414
import org.opensearch.cluster.metadata.Metadata;
1515
import org.opensearch.cluster.metadata.WorkloadGroup;
16-
import org.opensearch.cluster.metadata.WorkloadGroupMetadata;
1716
import org.opensearch.cluster.service.ClusterService;
1817
import org.opensearch.common.util.concurrent.ThreadContext;
1918
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
@@ -69,7 +68,7 @@ public void setUp() throws Exception {
6968
Metadata mockMetaData = mock(Metadata.class);
7069
when(mockClusterState.metadata()).thenReturn(mockMetaData);
7170
workloadGroupService = mock(WorkloadGroupService.class);
72-
sut = new WorkloadGroupRequestOperationListener(workloadGroupService, testThreadPool, mockClusterService);
71+
sut = new WorkloadGroupRequestOperationListener(workloadGroupService, testThreadPool);
7372
mockSearchRequest = new SearchRequest();
7473
mockSearchRequestContext = mock(SearchRequestContext.class);
7574
when(mockSearchRequestContext.getRequest()).thenReturn(mockSearchRequest);
@@ -147,7 +146,7 @@ public void testMultiThreadedValidWorkloadGroupRequestFailures() {
147146
Collections.emptySet()
148147
);
149148

150-
sut = new WorkloadGroupRequestOperationListener(workloadGroupService, testThreadPool, mockClusterService);
149+
sut = new WorkloadGroupRequestOperationListener(workloadGroupService, testThreadPool);
151150

152151
List<Thread> threads = new ArrayList<>();
153152
for (int i = 0; i < ITERATIONS; i++) {
@@ -265,7 +264,7 @@ private void assertSuccess(
265264
Collections.emptySet(),
266265
Collections.emptySet()
267266
);
268-
sut = new WorkloadGroupRequestOperationListener(workloadGroupService, testThreadPool, mockClusterService);
267+
sut = new WorkloadGroupRequestOperationListener(workloadGroupService, testThreadPool);
269268
sut.onRequestFailure(null, null);
270269

271270
HashSet<String> set = new HashSet<>();
@@ -296,11 +295,7 @@ public void testApplySearchSettings_NullWorkloadGroupId() {
296295

297296
public void testApplySearchSettings_NullMetadata() {
298297
testThreadPool.getThreadContext().putHeader(WorkloadGroupTask.WORKLOAD_GROUP_ID_HEADER, "test-wg-id");
299-
ClusterState state = mock(ClusterState.class);
300-
Metadata metadata = mock(Metadata.class);
301-
when(mockClusterService.state()).thenReturn(state);
302-
when(state.metadata()).thenReturn(metadata);
303-
when(metadata.custom(WorkloadGroupMetadata.TYPE)).thenReturn(null);
298+
when(workloadGroupService.getWorkloadGroupById("test-wg-id")).thenReturn(null);
304299

305300
sut.onRequestStart(mockSearchRequestContext);
306301

@@ -309,7 +304,7 @@ public void testApplySearchSettings_NullMetadata() {
309304

310305
public void testApplySearchSettings_WorkloadGroupNotFound() {
311306
testThreadPool.getThreadContext().putHeader(WorkloadGroupTask.WORKLOAD_GROUP_ID_HEADER, "non-existent-id");
312-
setupWorkloadGroupMetadata(Map.of());
307+
when(workloadGroupService.getWorkloadGroupById("non-existent-id")).thenReturn(null);
313308

314309
sut.onRequestStart(mockSearchRequestContext);
315310

@@ -321,24 +316,14 @@ public void testApplySearchSettings_PhaseTook() {
321316

322317
String wgId = "test-wg";
323318
WorkloadGroup wg = createWorkloadGroup(wgId, Map.of("phase_took", "true"));
324-
setupWorkloadGroupMetadata(Map.of(wgId, wg));
319+
when(workloadGroupService.getWorkloadGroupById(wgId)).thenReturn(wg);
325320
testThreadPool.getThreadContext().putHeader(WorkloadGroupTask.WORKLOAD_GROUP_ID_HEADER, wgId);
326321

327322
sut.onRequestStart(mockSearchRequestContext);
328323

329324
assertTrue(mockSearchRequest.isPhaseTook());
330325
}
331326

332-
private void setupWorkloadGroupMetadata(Map<String, WorkloadGroup> workloadGroups) {
333-
ClusterState state = mock(ClusterState.class);
334-
Metadata metadata = mock(Metadata.class);
335-
WorkloadGroupMetadata wgMetadata = new WorkloadGroupMetadata(workloadGroups);
336-
when(mockClusterService.state()).thenReturn(state);
337-
when(state.metadata()).thenReturn(metadata);
338-
when(metadata.custom(WorkloadGroupMetadata.TYPE)).thenReturn(wgMetadata);
339-
when(metadata.workloadGroups()).thenReturn(workloadGroups);
340-
}
341-
342327
private WorkloadGroup createWorkloadGroup(String id, Map<String, String> searchSettings) {
343328
return new WorkloadGroup(
344329
"test-name",

0 commit comments

Comments
 (0)