Skip to content

Commit f4c282d

Browse files
committed
Add support for rolling upgrade to LocalClusterHandle.
And push logic for logsdb rolling upgrade to LocalClusterHandle. Upgrading a statefull cluster doesn't require a special ordering, but a serveless cluster needs to first upgrade search nodes and then other node types. This logic should be stay behind LocalClusterHandle abstraction.
1 parent 970b478 commit f4c282d

File tree

10 files changed

+51
-42
lines changed

10 files changed

+51
-42
lines changed

test/test-clusters/src/main/java/org/elasticsearch/test/cluster/local/LocalClusterHandle.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
import org.elasticsearch.test.cluster.MutableSettingsProvider;
1515
import org.elasticsearch.test.cluster.util.Version;
1616

17+
import java.io.IOException;
1718
import java.io.InputStream;
1819
import java.nio.file.Path;
1920
import java.util.List;
21+
import java.util.function.Consumer;
2022

2123
public interface LocalClusterHandle extends ClusterHandle {
2224

@@ -97,6 +99,19 @@ public interface LocalClusterHandle extends ClusterHandle {
9799
*/
98100
void upgradeNodeToVersion(int index, Version version);
99101

102+
/**
103+
* Perform a rolling upgrade to the given version.
104+
* @param version The version to upgrade to.
105+
* @param onNodeUpgradeComplete A callback that is invoked after each node is upgraded.
106+
*/
107+
default void rollingUpgradeToVersion(Version version, Consumer<Integer> onNodeUpgradeComplete) {
108+
int numNodes = getNumNodes();
109+
for (int index = 0; index < numNodes; index++) {
110+
upgradeNodeToVersion(index, version);
111+
onNodeUpgradeComplete.accept(index);
112+
}
113+
}
114+
100115
/**
101116
* Performs a "full cluster restart" upgrade to the given version. Method blocks until the cluster is restarted and available.
102117
*

x-pack/plugin/logsdb/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/AbstractLogsdbRollingUpgradeTestCase.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.common.time.DateFormatter;
1414
import org.elasticsearch.common.time.FormatNames;
1515
import org.elasticsearch.common.util.concurrent.ThreadContext;
16+
import org.elasticsearch.core.CheckedConsumer;
1617
import org.elasticsearch.features.NodeFeature;
1718
import org.elasticsearch.test.cluster.ElasticsearchCluster;
1819
import org.elasticsearch.test.cluster.util.Version;
@@ -66,7 +67,7 @@ protected Settings restClientSettings() {
6667
return Settings.builder().put(super.restClientSettings()).put(ThreadContext.PREFIX + ".Authorization", token).build();
6768
}
6869

69-
protected void upgradeNode(int n) throws IOException {
70+
protected void clusterRollingUpgrade(CheckedConsumer<Integer, Exception> onNodeUpgradeComplete) throws IOException {
7071
closeClients();
7172

7273
var serverlessBwcStackVersion = System.getProperty("tests.serverless.bwc_stack_version");
@@ -75,9 +76,14 @@ protected void upgradeNode(int n) throws IOException {
7576
logger.info("serverlessBwcStackVersion={}, bwcTag={}, newClusterVersion={}", serverlessBwcStackVersion, bwcTag, newClusterVersion);
7677

7778
var upgradeVersion = newClusterVersion != null ? Version.fromString(newClusterVersion) : Version.CURRENT;
78-
logger.info("Upgrading node {} to version {}", n, upgradeVersion);
79-
getCluster().upgradeNodeToVersion(n, upgradeVersion);
80-
initClient();
79+
getCluster().rollingUpgradeToVersion(upgradeVersion, (index) -> {
80+
try {
81+
initClient();
82+
onNodeUpgradeComplete.accept(index);
83+
} catch (Exception e) {
84+
throw new RuntimeException(e);
85+
}
86+
});
8187
}
8288

8389
protected ElasticsearchCluster getCluster() {

x-pack/plugin/logsdb/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/AbstractStringTypeLogsdbRollingUpgradeTestCase.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,11 @@ public void testIndexing() throws Exception {
120120
verifyIndexMode(IndexMode.LOGSDB, templates.get(0).dataStreamName());
121121

122122
// during upgrade
123-
for (int i = 0; i < numNodes; i++) {
124-
upgradeNode(i);
123+
clusterRollingUpgrade(index -> {
125124
for (TemplateConfig config : templates) {
126125
indexDocumentsAndVerifyResults(config);
127126
}
128-
}
127+
});
129128

130129
// after everything is upgraded
131130
for (TemplateConfig config : templates) {

x-pack/plugin/logsdb/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/FlattenedRollingUpgradeIT.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -263,11 +263,8 @@ public void testIndexing() throws IOException {
263263
List<FlattenedData> indexedData = new ArrayList<>();
264264
indexDocumentsAndVerifyResults(spec, settings, indexedData);
265265

266-
int numNodes = Integer.parseInt(System.getProperty("tests.num_nodes", "3"));
267-
for (int i = 0; i < numNodes; i++) {
268-
upgradeNode(i);
269-
indexDocumentsAndVerifyResults(spec, settings, indexedData);
270-
}
266+
Settings.Builder finalSettings = settings;
267+
clusterRollingUpgrade(index -> { indexDocumentsAndVerifyResults(spec, finalSettings, indexedData); });
271268
}
272269

273270
}

x-pack/plugin/logsdb/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbIndexingRollingUpgradeIT.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.time.Instant;
1616
import java.util.Locale;
1717
import java.util.Map;
18+
import java.util.concurrent.atomic.AtomicReference;
1819

1920
import static org.hamcrest.Matchers.equalTo;
2021
import static org.hamcrest.Matchers.greaterThan;
@@ -82,14 +83,13 @@ public void testIndexing() throws Exception {
8283
search(dataStreamName);
8384
query(dataStreamName);
8485
}
85-
int numNodes = Integer.parseInt(System.getProperty("tests.num_nodes", "3"));
86-
for (int i = 0; i < numNodes; i++) {
87-
upgradeNode(i);
88-
time = time.plusNanos(60 * 30);
89-
bulkIndex(dataStreamName, 4, 1024, time, LogsdbIndexingRollingUpgradeIT::docSupplier);
86+
AtomicReference<Instant> timeRef = new AtomicReference<>(time);
87+
clusterRollingUpgrade(index -> {
88+
timeRef.set(timeRef.get().plusNanos(60 * 30));
89+
bulkIndex(dataStreamName, 4, 1024, timeRef.get(), LogsdbIndexingRollingUpgradeIT::docSupplier);
9090
search(dataStreamName);
9191
query(dataStreamName);
92-
}
92+
});
9393
{
9494
var forceMergeRequest = new Request("POST", "/" + dataStreamName + "/_forcemerge");
9595
forceMergeRequest.addParameter("max_num_segments", "1");

x-pack/plugin/logsdb/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/RandomizedRollingUpgradeIT.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,15 +150,13 @@ private void testIndexing(String indexNameBase, Settings.Builder settings) throw
150150
indexAndQueryDocuments(indexConfigs[i]);
151151
}
152152

153-
int numNodes = Integer.parseInt(System.getProperty("tests.num_nodes", "3"));
154-
for (int i = 0; i < numNodes; i++) {
155-
flush(indexNameBase + "*", true);
156-
upgradeNode(i);
153+
flush(indexNameBase + "*", true);
154+
clusterRollingUpgrade(index -> {
157155
ensureGreen(indexNameBase + "*");
158156
for (int j = 0; j < NUM_INDICES; j++) {
159157
indexAndQueryDocuments(indexConfigs[j]);
160158
}
161-
}
159+
});
162160
}
163161

164162
public void testIndexingStandardSource() throws IOException {

x-pack/plugin/logsdb/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/StandardToLogsDbIndexModeRollingUpgradeIT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,11 @@ public void testIndexing() throws Exception {
7373
verifyIndexMode(IndexMode.STANDARD, templates.get(0).dataStreamName());
7474

7575
// during upgrade
76-
for (int i = 0; i < getNumNodes(); i++) {
77-
upgradeNode(i);
76+
clusterRollingUpgrade(index -> {
7877
for (TemplateConfig config : templates) {
7978
indexDocumentsAndVerifyResults(config);
8079
}
81-
}
80+
});
8281

8382
enableLogsDb();
8483
for (TemplateConfig config : templates) {

x-pack/plugin/logsdb/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/SyntheticSourceRollingUpgradeIT.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.List;
1919
import java.util.Locale;
2020
import java.util.Map;
21+
import java.util.concurrent.atomic.AtomicReference;
2122
import java.util.stream.Collectors;
2223

2324
import static org.hamcrest.Matchers.both;
@@ -82,15 +83,13 @@ public void testIndexing() throws Exception {
8283
search(dataStreamName);
8384
query(dataStreamName);
8485

85-
int numNodes = Integer.parseInt(System.getProperty("tests.num_nodes", "3"));
86-
for (int i = 0; i < numNodes; i++) {
87-
upgradeNode(i);
88-
time = time.plusNanos(60 * 30);
86+
AtomicReference<Instant> timeRef = new AtomicReference<>(time);
87+
clusterRollingUpgrade(index -> {
88+
timeRef.set(timeRef.get().plusNanos(60 * 30));
8989
bulkIndex(dataStreamName, 4, 1024, time, SyntheticSourceRollingUpgradeIT::docSupplier);
9090
search(dataStreamName);
9191
query(dataStreamName);
92-
93-
}
92+
});
9493

9594
var forceMergeRequest = new Request("POST", "/" + dataStreamName + "/_forcemerge");
9695
forceMergeRequest.addParameter("max_num_segments", "1");

x-pack/plugin/logsdb/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/TsdbIT.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,11 +136,7 @@ public void testTsdbDataStream() throws Exception {
136136

137137
performOldClustertOperations(templateName, dataStreamName);
138138

139-
int numNodes = Integer.parseInt(System.getProperty("tests.num_nodes", "3"));
140-
for (int i = 0; i < numNodes; i++) {
141-
upgradeNode(i);
142-
performMixedClusterOperations(dataStreamName, i == 0);
143-
}
139+
clusterRollingUpgrade(index -> { performMixedClusterOperations(dataStreamName, index == 0); });
144140
performUpgradedClusterOperations(dataStreamName);
145141
}
146142

x-pack/plugin/logsdb/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/TsdbIndexingRollingUpgradeIT.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.List;
2424
import java.util.Locale;
2525
import java.util.Map;
26+
import java.util.concurrent.atomic.AtomicReference;
2627

2728
import static org.elasticsearch.xpack.logsdb.TsdbIT.TEMPLATE;
2829
import static org.elasticsearch.xpack.logsdb.TsdbIT.formatInstant;
@@ -61,14 +62,13 @@ public void testIndexing() throws Exception {
6162
search(dataStreamName);
6263
query(dataStreamName);
6364

64-
int numNodes = Integer.parseInt(System.getProperty("tests.num_nodes", "3"));
65-
for (int i = 0; i < numNodes; i++) {
66-
upgradeNode(i);
67-
startTime = startTime.plusNanos(60 * 30);
65+
AtomicReference<Instant> startTimeRef = new AtomicReference<>(startTime);
66+
clusterRollingUpgrade(index -> {
67+
startTimeRef.set(startTimeRef.get().plusNanos(60 * 30));
6868
bulkIndex(dataStreamName, 4, 1024, startTime, TsdbIndexingRollingUpgradeIT::docSupplier);
6969
search(dataStreamName);
7070
query(dataStreamName);
71-
}
71+
});
7272

7373
var forceMergeRequest = new Request("POST", "/" + dataStreamName + "/_forcemerge");
7474
forceMergeRequest.addParameter("max_num_segments", "1");

0 commit comments

Comments
 (0)