Skip to content

Commit 50116bb

Browse files
authored
skip running syncup job if no model index (#717)
Signed-off-by: Yaliang Wu <[email protected]>
1 parent 46a8d6a commit 50116bb

File tree

2 files changed

+54
-10
lines changed

2 files changed

+54
-10
lines changed

plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ public MLSyncUpCron(Client client, ClusterService clusterService, DiscoveryNodeH
6161

6262
@Override
6363
public void run() {
64+
if (!clusterService.state().metadata().indices().containsKey(ML_MODEL_INDEX)) {
65+
// no need to run sync up job if no model index
66+
return;
67+
}
6468
log.debug("ML sync job starts");
6569
DiscoveryNode[] allNodes = nodeHelper.getAllNodes();
6670
MLSyncUpInput gatherInfoInput = MLSyncUpInput.builder().getLoadedModels(true).build();
@@ -131,16 +135,14 @@ public void run() {
131135
);
132136

133137
// refresh model status
134-
if (clusterService.state().getRoutingTable().hasIndex(ML_MODEL_INDEX)) {
135-
mlIndicesHandler
136-
.initModelIndexIfAbsent(
137-
ActionListener
138-
.wrap(
139-
res -> { refreshModelState(modelWorkerNodes, loadingModels); },
140-
e -> { log.error("Failed to init model index", e); }
141-
)
142-
);
143-
}
138+
mlIndicesHandler
139+
.initModelIndexIfAbsent(
140+
ActionListener
141+
.wrap(
142+
res -> { refreshModelState(modelWorkerNodes, loadingModels); },
143+
e -> { log.error("Failed to init model index", e); }
144+
)
145+
);
144146
}, e -> { log.error("Failed to sync model routing", e); }));
145147
}
146148

plugin/src/test/java/org/opensearch/ml/cluster/MLSyncUpCronTests.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static org.mockito.Mockito.when;
1717
import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;
1818
import static org.opensearch.ml.utils.TestHelper.ML_ROLE;
19+
import static org.opensearch.ml.utils.TestHelper.setupTestClusterState;
1920

2021
import java.io.IOException;
2122
import java.time.Instant;
@@ -26,6 +27,7 @@
2627
import java.util.List;
2728
import java.util.Map;
2829
import java.util.Set;
30+
import java.util.concurrent.atomic.AtomicInteger;
2931

3032
import org.apache.lucene.search.TotalHits;
3133
import org.junit.Before;
@@ -41,9 +43,16 @@
4143
import org.opensearch.action.update.UpdateRequest;
4244
import org.opensearch.client.Client;
4345
import org.opensearch.cluster.ClusterName;
46+
import org.opensearch.cluster.ClusterState;
47+
import org.opensearch.cluster.metadata.IndexMetadata;
48+
import org.opensearch.cluster.metadata.Metadata;
4449
import org.opensearch.cluster.node.DiscoveryNode;
50+
import org.opensearch.cluster.node.DiscoveryNodeRole;
51+
import org.opensearch.cluster.node.DiscoveryNodes;
4552
import org.opensearch.cluster.service.ClusterService;
4653
import org.opensearch.common.bytes.BytesReference;
54+
import org.opensearch.common.collect.ImmutableOpenMap;
55+
import org.opensearch.common.transport.TransportAddress;
4756
import org.opensearch.common.xcontent.XContentBuilder;
4857
import org.opensearch.ml.common.MLModel;
4958
import org.opensearch.ml.common.model.MLModelState;
@@ -77,12 +86,45 @@ public class MLSyncUpCronTests extends OpenSearchTestCase {
7786
private final String mlNode1Id = "mlNode1";
7887
private final String mlNode2Id = "mlNode2";
7988

89+
private ClusterState testState;
90+
8091
@Before
8192
public void setup() throws IOException {
8293
MockitoAnnotations.openMocks(this);
8394
mlNode1 = new DiscoveryNode(mlNode1Id, buildNewFakeTransportAddress(), emptyMap(), ImmutableSet.of(ML_ROLE), Version.CURRENT);
8495
mlNode2 = new DiscoveryNode(mlNode2Id, buildNewFakeTransportAddress(), emptyMap(), ImmutableSet.of(ML_ROLE), Version.CURRENT);
8596
syncUpCron = new MLSyncUpCron(client, clusterService, nodeHelper, null);
97+
98+
testState = setupTestClusterState();
99+
when(clusterService.state()).thenReturn(testState);
100+
}
101+
102+
public void testRun_NoMLModelIndex() {
103+
Metadata metadata = new Metadata.Builder().indices(ImmutableOpenMap.<String, IndexMetadata>builder().build()).build();
104+
DiscoveryNode node = new DiscoveryNode(
105+
"node",
106+
new TransportAddress(TransportAddress.META_ADDRESS, new AtomicInteger().incrementAndGet()),
107+
new HashMap<>(),
108+
ImmutableSet.of(DiscoveryNodeRole.DATA_ROLE),
109+
Version.CURRENT
110+
);
111+
ClusterState state = new ClusterState(
112+
new ClusterName("test cluster"),
113+
123l,
114+
"111111",
115+
metadata,
116+
null,
117+
DiscoveryNodes.builder().add(node).build(),
118+
null,
119+
null,
120+
0,
121+
false
122+
);
123+
;
124+
when(clusterService.state()).thenReturn(state);
125+
126+
syncUpCron.run();
127+
verify(client, never()).execute(eq(MLSyncUpAction.INSTANCE), any(), any());
86128
}
87129

88130
public void testRun() {

0 commit comments

Comments
 (0)