Skip to content

Commit 1d8930a

Browse files
todvoraCopilot
andauthored
Migrate datanode upgrade service adapter (#24967)
* Migrate datanode upgrade service adapter * Update graylog-storage-opensearch3/src/main/java/org/graylog/storage/opensearch3/DatanodeUpgradeServiceAdapterOS.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update graylog-storage-opensearch3/src/main/java/org/graylog/storage/opensearch3/DatanodeUpgradeServiceAdapterOS.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update graylog2-server/src/main/java/org/graylog2/datanode/DatanodeUpgradeService.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * code cleanup * code cleanup * better exception type * added license * code cleanup * better enum->json handling * fixed error message --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 0915484 commit 1d8930a

File tree

13 files changed

+414
-227
lines changed

13 files changed

+414
-227
lines changed

graylog-storage-elasticsearch7/src/main/java/org/graylog/storage/elasticsearch7/DatanodeUpgradeServiceAdapterES7.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,12 @@ public ClusterState getClusterState() {
2727
}
2828

2929
@Override
30-
public void disableShardReplication() {
30+
public FlushResponse disableShardReplication() {
3131
throw new UnsupportedOperationException("Not supported for elasticsearch.");
3232
}
3333

3434
@Override
35-
public void enableShardReplication() {
36-
throw new UnsupportedOperationException("Not supported for elasticsearch.");
37-
}
38-
39-
@Override
40-
public FlushResponse flush() {
35+
public FlushResponse enableShardReplication() {
4136
throw new UnsupportedOperationException("Not supported for elasticsearch.");
4237
}
4338
}

graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/DatanodeUpgradeServiceAdapterOS2.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.graylog.shaded.opensearch2.org.opensearch.client.Response;
3636
import org.graylog.shaded.opensearch2.org.opensearch.cluster.health.ClusterHealthStatus;
3737
import org.graylog.shaded.opensearch2.org.opensearch.common.settings.Settings;
38+
import org.graylog2.indexer.indices.HealthStatus;
3839
import org.slf4j.Logger;
3940
import org.slf4j.LoggerFactory;
4041

@@ -67,7 +68,7 @@ public ClusterState getClusterState() {
6768
final String shardReplication = queryShardReplication();
6869
final ManagerNode managerNode = managerNode();
6970
return new ClusterState(
70-
response.getStatus().name(),
71+
HealthStatus.fromString(response.getStatus().name()),
7172
response.getClusterName(),
7273
response.getNumberOfNodes(),
7374
response.getActiveShards(),
@@ -86,20 +87,20 @@ private ClusterHealthResponse getClusterHealthResponse() {
8687
}
8788

8889
@Override
89-
public void disableShardReplication() {
90+
public FlushResponse disableShardReplication() {
9091
LOG.info("Disabling shard replication for opensearch cluster");
9192
final ClusterHealthStatus clusterHealthStatus = getClusterHealthResponse().getStatus();
9293
if (clusterHealthStatus == ClusterHealthStatus.GREEN) {
93-
configureShardReplication(REPLICATION_PRIMARIES);
94+
return configureShardReplication(REPLICATION_PRIMARIES);
9495
} else {
9596
throw new IllegalStateException("Can't disable shard replication, cluster is not in healthy state. Current state: " + clusterHealthStatus);
9697
}
9798
}
9899

99100
@Override
100-
public void enableShardReplication() {
101+
public FlushResponse enableShardReplication() {
101102
LOG.info("Enabling shard replication for opensearch cluster");
102-
configureShardReplication(REPLICATION_ALL);
103+
return configureShardReplication(REPLICATION_ALL);
103104
}
104105

105106
private String queryShardReplication() {
@@ -110,18 +111,18 @@ private String queryShardReplication() {
110111
});
111112
}
112113

113-
private void configureShardReplication(String primaries) {
114+
private FlushResponse configureShardReplication(String primaries) {
114115
final ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest().persistentSettings(Settings.builder()
115116
.put("cluster.routing.allocation.enable", primaries).build());
116117
final ClusterUpdateSettingsResponse result = client.execute((restHighLevelClient, requestOptions) -> restHighLevelClient.cluster().putSettings(request, requestOptions));
117118
final String value = result.getPersistentSettings().get("cluster.routing.allocation.enable");
118119
if (!value.equals(primaries)) {
119120
throw new IllegalStateException("Failed to disable shard replication. Current cluster.routing.allocation.enable: " + value);
120121
}
122+
return flush();
121123
}
122124

123-
@Override
124-
public FlushResponse flush() {
125+
private FlushResponse flush() {
125126
LOG.info("Flushing opensearch nodes, storing all in-memory operations to segments on disk");
126127
final Response response = client.execute((restHighLevelClient, requestOptions) -> restHighLevelClient.getLowLevelClient().performRequest(new Request("POST", "_flush")));
127128
try {
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright (C) 2020 Graylog, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the Server Side Public License, version 1,
6+
* as published by MongoDB, Inc.
7+
*
8+
* This program is distributed in the hope that it will be useful,
9+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
* Server Side Public License for more details.
12+
*
13+
* You should have received a copy of the Server Side Public License
14+
* along with this program. If not, see
15+
* <http://www.mongodb.com/licensing/server-side-public-license>.
16+
*/
17+
package org.graylog.storage.opensearch2;
18+
19+
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import com.github.zafarkhaja.semver.Version;
21+
import org.graylog.plugins.datanode.DatanodeUpgradeServiceAdapter;
22+
import org.graylog.plugins.datanode.DatanodeUpgradeServiceAdapterIT;
23+
import org.graylog.storage.opensearch2.testing.OpenSearchInstance;
24+
import org.graylog.testing.elasticsearch.SearchInstance;
25+
26+
class DatanodeUpgradeServiceAdapterOS2IT extends DatanodeUpgradeServiceAdapterIT {
27+
28+
@SearchInstance
29+
public final OpenSearchInstance openSearchInstance = OpenSearchInstance.create();
30+
31+
@Override
32+
protected DatanodeUpgradeServiceAdapter createAdapter() {
33+
return new DatanodeUpgradeServiceAdapterOS2(openSearchInstance.openSearchClient(), new ObjectMapper());
34+
}
35+
36+
@Override
37+
protected Version indexerVersion() {
38+
return openSearchInstance.version().version();
39+
}
40+
}
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* Copyright (C) 2020 Graylog, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the Server Side Public License, version 1,
6+
* as published by MongoDB, Inc.
7+
*
8+
* This program is distributed in the hope that it will be useful,
9+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
* Server Side Public License for more details.
12+
*
13+
* You should have received a copy of the Server Side Public License
14+
* along with this program. If not, see
15+
* <http://www.mongodb.com/licensing/server-side-public-license>.
16+
*/
17+
package org.graylog.storage.opensearch3;
18+
19+
import com.fasterxml.jackson.databind.JsonNode;
20+
import com.fasterxml.jackson.databind.ObjectMapper;
21+
import jakarta.inject.Inject;
22+
import jakarta.json.JsonString;
23+
import jakarta.json.JsonValue;
24+
import org.graylog.plugins.datanode.DatanodeUpgradeServiceAdapter;
25+
import org.graylog.plugins.datanode.dto.ClusterState;
26+
import org.graylog.plugins.datanode.dto.FlushResponse;
27+
import org.graylog.plugins.datanode.dto.ManagerNode;
28+
import org.graylog.plugins.datanode.dto.Node;
29+
import org.graylog.plugins.datanode.dto.ShardReplication;
30+
import org.opensearch.client.json.JsonData;
31+
import org.opensearch.client.opensearch._types.HealthStatus;
32+
import org.opensearch.client.opensearch.cluster.GetClusterSettingsResponse;
33+
import org.opensearch.client.opensearch.cluster.HealthResponse;
34+
import org.opensearch.client.opensearch.cluster.PutClusterSettingsResponse;
35+
import org.opensearch.client.opensearch.cluster.StateResponse;
36+
import org.opensearch.client.opensearch.generic.Request;
37+
import org.opensearch.client.opensearch.generic.Requests;
38+
import org.opensearch.client.opensearch.generic.Response;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
41+
42+
import java.io.IOException;
43+
import java.io.InputStream;
44+
import java.util.Comparator;
45+
import java.util.List;
46+
import java.util.Locale;
47+
import java.util.Optional;
48+
import java.util.stream.Collectors;
49+
import java.util.stream.StreamSupport;
50+
51+
public class DatanodeUpgradeServiceAdapterOS implements DatanodeUpgradeServiceAdapter {
52+
53+
private static final Logger LOG = LoggerFactory.getLogger(DatanodeUpgradeServiceAdapterOS.class);
54+
55+
public static final String REPLICATION_PRIMARIES = "primaries";
56+
public static final String REPLICATION_ALL = "all";
57+
public static final String SETTING_CLUSTER_ROUTING_ALLOCATION_ENABLE = "cluster.routing.allocation.enable";
58+
private final OfficialOpensearchClient officialOpensearchClient;
59+
private final ObjectMapper objectMapper;
60+
61+
@Inject
62+
public DatanodeUpgradeServiceAdapterOS(OfficialOpensearchClient officialOpensearchClient, ObjectMapper objectMapper) {
63+
this.officialOpensearchClient = officialOpensearchClient;
64+
this.objectMapper = objectMapper;
65+
}
66+
67+
@Override
68+
public ClusterState getClusterState() {
69+
final HealthResponse response = getClusterHealthResponse();
70+
final String shardReplication = queryShardReplication();
71+
final ManagerNode managerNode = managerNode();
72+
return new ClusterState(
73+
org.graylog2.indexer.indices.HealthStatus.fromString(response.status().name()),
74+
response.clusterName(),
75+
response.numberOfNodes(),
76+
response.activeShards(),
77+
response.relocatingShards(),
78+
response.initializingShards(),
79+
response.unassignedShards(),
80+
response.activePrimaryShards(),
81+
response.delayedUnassignedShards(),
82+
Optional.ofNullable(shardReplication).map(v -> v.toUpperCase(Locale.ROOT)).map(ShardReplication::valueOf).orElse(ShardReplication.ALL),
83+
managerNode,
84+
nodesResponse());
85+
}
86+
87+
private HealthResponse getClusterHealthResponse() {
88+
return officialOpensearchClient.sync(c -> c.cluster().health(), "Failed to obtain cluster health!");
89+
}
90+
91+
@Override
92+
public FlushResponse disableShardReplication() {
93+
LOG.info("Disabling shard replication for opensearch cluster");
94+
final HealthStatus clusterHealthStatus = getClusterHealthResponse().status();
95+
if (clusterHealthStatus == HealthStatus.Green) {
96+
return configureShardReplication(REPLICATION_PRIMARIES);
97+
} else {
98+
throw new IllegalStateException("Can't disable shard replication, cluster is not in healthy state. Current state: " + clusterHealthStatus);
99+
}
100+
}
101+
102+
@Override
103+
public FlushResponse enableShardReplication() {
104+
LOG.info("Enabling shard replication for opensearch cluster");
105+
return configureShardReplication(REPLICATION_ALL);
106+
}
107+
108+
private String queryShardReplication() {
109+
final GetClusterSettingsResponse response = officialOpensearchClient.sync(c -> c.cluster().getSettings(settings -> settings.includeDefaults(true).flatSettings(true)), "Failed to obtain shard replication settings!");
110+
return getSetting(SETTING_CLUSTER_ROUTING_ALLOCATION_ENABLE, response);
111+
}
112+
113+
private static String getSetting(String setting, GetClusterSettingsResponse settings) {
114+
JsonData value = settings.transient_().getOrDefault(setting,
115+
settings.persistent().getOrDefault(setting,
116+
settings.defaults().get(setting)));
117+
if (value == null) {
118+
throw new IllegalStateException("Failed to read setting " + setting + " from cluster state");
119+
}
120+
return value.to(String.class);
121+
}
122+
123+
private FlushResponse configureShardReplication(String shardReplication) {
124+
final PutClusterSettingsResponse response = officialOpensearchClient.sync(c -> c.cluster().putSettings(setting -> setting.flatSettings(true).persistent(SETTING_CLUSTER_ROUTING_ALLOCATION_ENABLE, JsonData.of(shardReplication))), "Failed to configure shard replication!");
125+
final String value = response.persistent().get(SETTING_CLUSTER_ROUTING_ALLOCATION_ENABLE).to(String.class);
126+
if (!value.equals(shardReplication)) {
127+
throw new IllegalStateException("Failed to configure shard replication. Expected cluster.routing.allocation.enable=" + shardReplication + " but was: " + value);
128+
}
129+
return flush();
130+
}
131+
132+
private FlushResponse flush() {
133+
LOG.info("Flushing opensearch nodes, storing all in-memory operations to segments on disk");
134+
final org.opensearch.client.opensearch.indices.FlushResponse response = officialOpensearchClient.sync(c -> c.indices().flush(f -> f.force(true).waitIfOngoing(true)), "Failed to flush opensearch nodes!");
135+
return new FlushResponse(response.shards().total(), response.shards().successful(), response.shards().failed());
136+
}
137+
138+
private List<Node> nodesResponse() {
139+
//https://github.com/opensearch-project/opensearch-java/issues/894
140+
//final NodesInfoResponse nodes = officialOpensearchClient.sync(c -> c.nodes().info(), "Failed to obtain opensearch nodes");
141+
final Request req = Requests.builder()
142+
.method("GET")
143+
.endpoint("/_nodes")
144+
.build();
145+
return officialOpensearchClient.sync(c -> {
146+
try (final Response response = c.generic().execute(req)) {
147+
return parseNodesResponse(response);
148+
}
149+
}, "Failed to obtain node infos");
150+
}
151+
152+
private List<Node> parseNodesResponse(Response response) {
153+
return response.getBody().map(body -> {
154+
try (final InputStream is = body.body()) {
155+
final JsonNode parsed = objectMapper.readValue(is, JsonNode.class);
156+
return parseNodes(parsed.path("nodes"));
157+
} catch (IOException e) {
158+
throw new RuntimeException("Failed to parse node response from /_nodes", e);
159+
}
160+
}).orElseThrow(() -> new IllegalStateException("Failed to obtain node response"));
161+
}
162+
163+
private ManagerNode managerNode() {
164+
// https://github.com/opensearch-project/opensearch-java/issues/1791
165+
final StateResponse response = officialOpensearchClient.sync(c -> c.cluster().state(), "Failed to obtain manager node!");
166+
final JsonValue json = response.valueBody().toJson();
167+
final String managerNodeID = parseString(json.asJsonObject().get("cluster_manager_node"));
168+
final String managerNodeName = parseString(json.asJsonObject().get("nodes").asJsonObject().get(managerNodeID).asJsonObject().get("name"));
169+
return new ManagerNode(managerNodeID, managerNodeName);
170+
}
171+
172+
private static String parseString(JsonValue clusterManagerNode) {
173+
if (clusterManagerNode instanceof JsonString jsonString) {
174+
return jsonString.getString();
175+
} else {
176+
throw new IllegalStateException("Failed to obtain String value from json object!");
177+
}
178+
}
179+
180+
private List<Node> parseNodes(JsonNode nodes) {
181+
return StreamSupport.stream(nodes.spliterator(), false)
182+
.map(node -> new Node(
183+
node.path("host").asText(),
184+
node.path("ip").asText(),
185+
node.path("name").asText(),
186+
node.path("version").asText(),
187+
parseRoles(node.path("roles"))))
188+
.sorted(Comparator.comparing(Node::name))
189+
.collect(Collectors.toList());
190+
}
191+
192+
private List<String> parseRoles(JsonNode roles) {
193+
return StreamSupport.stream(roles.spliterator(), false).map(JsonNode::asText)
194+
.sorted(Comparator.naturalOrder())
195+
.collect(Collectors.toList());
196+
}
197+
}

0 commit comments

Comments
 (0)