Skip to content

Commit 2b0eb5c

Browse files
Add native memory circuit breaker. (#689) (#693)
* Add native memory circuit breaker. Refactor all breakers from common to plugin. Add dynamic setting for native memory circuit breaker. Signed-off-by: Jing Zhang <[email protected]> * Address the comments 1. Signed-off-by: Jing Zhang <[email protected]> * Spotless changes. Signed-off-by: Jing Zhang <[email protected]> * Address the comments 2. Signed-off-by: Jing Zhang <[email protected]> Signed-off-by: Jing Zhang <[email protected]> (cherry picked from commit d809dd2) Co-authored-by: Jing Zhang <[email protected]>
1 parent 684235a commit 2b0eb5c

31 files changed

+257
-55
lines changed

plugin/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,8 @@ List<String> jacocoExclusions = [
261261
'org.opensearch.ml.task.MLExecuteTaskRunner',
262262
'org.opensearch.ml.action.profile.MLProfileTransportAction',
263263
'org.opensearch.ml.action.models.DeleteModelTransportAction.1',
264-
'org.opensearch.ml.rest.RestMLPredictionAction'
264+
'org.opensearch.ml.rest.RestMLPredictionAction',
265+
'org.opensearch.ml.breaker.DiskCircuitBreaker'
265266
]
266267

267268
jacocoTestCoverageVerification {

plugin/src/main/java/org/opensearch/ml/action/load/TransportLoadModelOnNodeAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import org.opensearch.common.io.stream.StreamInput;
2929
import org.opensearch.common.settings.Settings;
3030
import org.opensearch.common.xcontent.NamedXContentRegistry;
31+
import org.opensearch.ml.breaker.MLCircuitBreakerService;
3132
import org.opensearch.ml.common.FunctionName;
3233
import org.opensearch.ml.common.MLTask;
33-
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
3434
import org.opensearch.ml.common.transport.forward.MLForwardAction;
3535
import org.opensearch.ml.common.transport.forward.MLForwardInput;
3636
import org.opensearch.ml.common.transport.forward.MLForwardRequest;

common/src/main/java/org/opensearch/ml/common/breaker/BreakerName.java renamed to plugin/src/main/java/org/opensearch/ml/breaker/BreakerName.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.ml.common.breaker;
6+
package org.opensearch.ml.breaker;
77

88
public enum BreakerName {
99
MEMORY,
10-
DISK
10+
DISK,
11+
NATIVE_MEMORY
1112
}

common/src/main/java/org/opensearch/ml/common/breaker/CircuitBreaker.java renamed to plugin/src/main/java/org/opensearch/ml/breaker/CircuitBreaker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.ml.common.breaker;
6+
package org.opensearch.ml.breaker;
77

88
/**
99
* An interface for circuit breaker.

common/src/main/java/org/opensearch/ml/common/breaker/DiskCircuitBreaker.java renamed to plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,23 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.ml.common.breaker;
7-
8-
import org.opensearch.ml.common.exception.MLException;
6+
package org.opensearch.ml.breaker;
97

108
import java.io.File;
119
import java.security.AccessController;
1210
import java.security.PrivilegedActionException;
1311
import java.security.PrivilegedExceptionAction;
1412

13+
import org.opensearch.ml.common.exception.MLException;
14+
1515
/**
1616
* A circuit breaker for disk usage.
1717
*/
1818
public class DiskCircuitBreaker extends ThresholdCircuitBreaker<Long> {
19+
// TODO: make this value configurable as cluster setting
1920
private static final String ML_DISK_CB = "Disk Circuit Breaker";
2021
public static final long DEFAULT_DISK_SHORTAGE_THRESHOLD = 5L;
22+
private static final long GB = 1024 * 1024 * 1024;
2123
private String diskDir;
2224

2325
public DiskCircuitBreaker(String diskDir) {
@@ -32,17 +34,17 @@ public DiskCircuitBreaker(long threshold, String diskDir) {
3234

3335
@Override
3436
public String getName() {
35-
return ML_DISK_CB;
37+
return ML_DISK_CB;
3638
}
3739

3840
@Override
3941
public boolean isOpen() {
4042
try {
4143
return AccessController.doPrivileged((PrivilegedExceptionAction<Boolean>) () -> {
42-
return (new File(diskDir).getFreeSpace()/1024/1024/1024) < getThreshold(); // in GB
44+
return (new File(diskDir).getFreeSpace() / GB) < getThreshold(); // in GB
4345
});
4446
} catch (PrivilegedActionException e) {
4547
throw new MLException("Failed to run disk circuit breaker");
4648
}
4749
}
48-
}
50+
}
Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,19 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.ml.common.breaker;
7-
8-
import lombok.extern.log4j.Log4j2;
9-
import org.opensearch.monitor.jvm.JvmService;
6+
package org.opensearch.ml.breaker;
107

118
import java.nio.file.Path;
129
import java.util.concurrent.ConcurrentHashMap;
1310
import java.util.concurrent.ConcurrentMap;
1411

12+
import lombok.extern.log4j.Log4j2;
13+
14+
import org.opensearch.cluster.service.ClusterService;
15+
import org.opensearch.common.settings.Settings;
16+
import org.opensearch.monitor.jvm.JvmService;
17+
import org.opensearch.monitor.os.OsService;
18+
1519
/**
1620
* This service registers internal system breakers and provide API for users to register their own breakers.
1721
*/
@@ -20,14 +24,23 @@ public class MLCircuitBreakerService {
2024

2125
private final ConcurrentMap<BreakerName, CircuitBreaker> breakers = new ConcurrentHashMap<>();
2226
private final JvmService jvmService;
27+
private final OsService osService;
28+
private final Settings settings;
29+
private final ClusterService clusterService;
2330

2431
/**
2532
* Constructor.
2633
*
2734
* @param jvmService jvm info
35+
* @param osService os info
36+
* @param settings settings
37+
* @param clusterService clusterService
2838
*/
29-
public MLCircuitBreakerService(JvmService jvmService) {
39+
public MLCircuitBreakerService(JvmService jvmService, OsService osService, Settings settings, ClusterService clusterService) {
3040
this.jvmService = jvmService;
41+
this.osService = osService;
42+
this.settings = settings;
43+
this.clusterService = clusterService;
3144
}
3245

3346
public void registerBreaker(BreakerName name, CircuitBreaker breaker) {
@@ -65,18 +78,21 @@ public MLCircuitBreakerService init(Path path) {
6578
log.info("Registered ML memory breaker.");
6679
registerBreaker(BreakerName.DISK, new DiskCircuitBreaker(path.toString()));
6780
log.info("Registered ML disk breaker.");
81+
// Register native memory circuit breaker
82+
registerBreaker(BreakerName.NATIVE_MEMORY, new NativeMemoryCircuitBreaker(this.osService, this.settings, this.clusterService));
83+
log.info("Registered ML native memory breaker.");
6884

6985
return this;
7086
}
7187

7288
/**
7389
*
74-
* @return the name of any open circuit breaker; otherwise return null
90+
* @return any open circuit breaker; otherwise return null
7591
*/
76-
public String checkOpenCB() {
92+
public ThresholdCircuitBreaker checkOpenCB() {
7793
for (CircuitBreaker breaker : breakers.values()) {
7894
if (breaker.isOpen()) {
79-
return breaker.getName();
95+
return (ThresholdCircuitBreaker) breaker;
8096
}
8197
}
8298

common/src/main/java/org/opensearch/ml/common/breaker/MemoryCircuitBreaker.java renamed to plugin/src/main/java/org/opensearch/ml/breaker/MemoryCircuitBreaker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.ml.common.breaker;
6+
package org.opensearch.ml.breaker;
77

88
import org.opensearch.monitor.jvm.JvmService;
99

1010
/**
1111
* A circuit breaker for memory usage.
1212
*/
1313
public class MemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {
14-
//TODO: make this value configurable as cluster setting
14+
// TODO: make this value configurable as cluster setting
1515
private static final String ML_MEMORY_CB = "Memory Circuit Breaker";
1616
public static final short DEFAULT_JVM_HEAP_USAGE_THRESHOLD = 85;
1717
private final JvmService jvmService;
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.ml.breaker;
7+
8+
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD;
9+
10+
import org.opensearch.cluster.service.ClusterService;
11+
import org.opensearch.common.settings.Settings;
12+
import org.opensearch.monitor.os.OsService;
13+
14+
/**
15+
* A circuit breaker for native memory usage.
16+
*/
17+
public class NativeMemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {
18+
private static final String ML_MEMORY_CB = "Native Memory Circuit Breaker";
19+
public static final short DEFAULT_NATIVE_MEM_USAGE_THRESHOLD = 90;
20+
private final OsService osService;
21+
private volatile Integer nativeMemThreshold = 90;
22+
23+
public NativeMemoryCircuitBreaker(OsService osService, Settings settings, ClusterService clusterService) {
24+
super(DEFAULT_NATIVE_MEM_USAGE_THRESHOLD);
25+
this.osService = osService;
26+
this.nativeMemThreshold = ML_COMMONS_NATIVE_MEM_THRESHOLD.get(settings);
27+
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_NATIVE_MEM_THRESHOLD, it -> nativeMemThreshold = it);
28+
}
29+
30+
public NativeMemoryCircuitBreaker(Integer threshold, OsService osService) {
31+
super(threshold.shortValue());
32+
this.nativeMemThreshold = threshold;
33+
this.osService = osService;
34+
}
35+
36+
@Override
37+
public String getName() {
38+
return ML_MEMORY_CB;
39+
}
40+
41+
@Override
42+
public Short getThreshold() {
43+
return this.nativeMemThreshold.shortValue();
44+
}
45+
46+
@Override
47+
public boolean isOpen() {
48+
return osService.stats().getMem().getUsedPercent() > this.nativeMemThreshold.shortValue();
49+
}
50+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.ml.common.breaker;
6+
package org.opensearch.ml.breaker;
77

88
/**
99
* An abstract class for all breakers with threshold.

plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@
7373
import org.opensearch.index.query.TermQueryBuilder;
7474
import org.opensearch.index.reindex.DeleteByQueryAction;
7575
import org.opensearch.index.reindex.DeleteByQueryRequest;
76+
import org.opensearch.ml.breaker.MLCircuitBreakerService;
7677
import org.opensearch.ml.common.FunctionName;
7778
import org.opensearch.ml.common.MLModel;
7879
import org.opensearch.ml.common.MLTask;
79-
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
8080
import org.opensearch.ml.common.exception.MLException;
8181
import org.opensearch.ml.common.exception.MLResourceNotFoundException;
8282
import org.opensearch.ml.common.model.MLModelState;

0 commit comments

Comments
 (0)