Skip to content

Commit c0d37c6

Browse files
authored
IGNITE-26717 [ducktests] Add ducktest for Rolling Upgrade (#12451)
Authored-by: Maksim Davydov <[email protected]>
1 parent 73dd28e commit c0d37c6

File tree

8 files changed

+376
-8
lines changed

8 files changed

+376
-8
lines changed

modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/persistence_upgrade_test/DataLoaderAndCheckerApplication.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,21 @@ public class DataLoaderAndCheckerApplication extends IgniteAwareApplication {
3434
/** {@inheritDoc} */
3535
@Override public void run(JsonNode jNode) throws IgniteInterruptedCheckedException {
3636
boolean check = jNode.get("check").asBoolean();
37+
int backups = jNode.path("backups").asInt(0);
38+
int entryCnt = jNode.path("entryCount").asInt(10_000);
3739

3840
markInitialized();
3941
waitForActivation();
4042

4143
CacheConfiguration<Integer, CustomObject> cacheCfg = new CacheConfiguration<>("cache");
4244

45+
cacheCfg.setBackups(backups);
46+
4347
IgniteCache<Integer, CustomObject> cache = ignite.getOrCreateCache(cacheCfg);
4448

4549
log.info(check ? "Checking..." : " Preparing...");
4650

47-
for (int i = 0; i < 10_000; i++) {
51+
for (int i = 0; i < entryCnt; i++) {
4852
CustomObject obj = new CustomObject(i);
4953

5054
if (!check)

modules/ducktests/tests/ignitetest/services/utils/control_utility.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,33 @@ def snapshot_check(self, snapshot_name: str):
231231

232232
return res
233233

234+
def enable_rolling_upgrade(self, target_version: str, force: bool = False):
235+
"""
236+
Enable Rolling Upgrade with the target Ignite version.
237+
:param target_version: Target Ignite version.
238+
:param force: If {@code true}, skips target version compatibility checks and forcibly enables rolling upgrade.
239+
This flag does not override an already active upgrade configuration.
240+
"""
241+
242+
if force:
243+
result = self.__run(f"--rolling-upgrade enable {target_version} --force --enable-experimental --yes")
244+
else:
245+
result = self.__run(f"--rolling-upgrade enable {target_version} --enable-experimental --yes")
246+
247+
assert "Rolling upgrade enabled" in result, f"Unexpected response: {result}"
248+
249+
return result
250+
251+
def disable_rolling_upgrade(self):
252+
"""
253+
Disable Rolling Upgrade.
254+
"""
255+
result = self.__run(f"--rolling-upgrade disable --enable-experimental --yes")
256+
257+
assert "Rolling upgrade disabled" in result, f"Unexpected response: {result}"
258+
259+
return result
260+
234261
def start_performance_statistics(self):
235262
"""
236263
Start performance statistics collecting in the cluster.

modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -142,21 +142,31 @@ def await_stopped(self):
142142
self.logger.info("Waiting for IgniteAware(s) to stop ...")
143143

144144
for node in self.nodes:
145-
stopped = self.wait_node(node, timeout_sec=self.shutdown_timeout_sec)
146-
assert stopped, "Node %s's worker thread did not stop in %d seconds" % \
147-
(str(node.account), self.shutdown_timeout_sec)
145+
self.await_stopped_node(node)
148146

149-
for node in self.nodes:
150-
wait_until(lambda: not self.alive(node), timeout_sec=self.shutdown_timeout_sec,
151-
err_msg="Node %s's remote processes failed to stop in %d seconds" %
152-
(str(node.account), self.shutdown_timeout_sec))
147+
def await_stopped_node(self, node):
148+
"""
149+
Awaits node stop finished.
150+
"""
151+
self.logger.info(f"Waiting for {self.who_am_i(node)}) to stop ...")
152+
153+
stopped = self.wait_node(node, timeout_sec=self.shutdown_timeout_sec)
154+
assert stopped, "Node %s's worker thread did not stop in %d seconds" % \
155+
(str(node.account), self.shutdown_timeout_sec)
156+
157+
wait_until(lambda: not self.alive(node), timeout_sec=self.shutdown_timeout_sec,
158+
err_msg="Node %s's remote processes failed to stop in %d seconds" %
159+
(str(node.account), self.shutdown_timeout_sec))
153160

154161
def stop_node(self, node, force_stop=False, **kwargs):
155162
pids = self.pids(node, self.main_java_class)
156163

157164
for pid in pids:
158165
node.account.signal(pid, signal.SIGKILL if force_stop else signal.SIGTERM, allow_fail=False)
159166

167+
if not force_stop:
168+
self.await_stopped_node(node)
169+
160170
def clean(self, **kwargs):
161171
self.__restore_iptables()
162172

modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/discovery.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
from abc import ABCMeta, abstractmethod
2121

22+
from typing import List
23+
24+
from ignitetest.services.ignite import IgniteService
2225
from ignitetest.services.utils.ignite_aware import IgniteAwareService
2326
from ignitetest.services.zk.zookeeper import ZookeeperService
2427

@@ -27,6 +30,7 @@ class DiscoverySpi(metaclass=ABCMeta):
2730
"""
2831
Abstract class for DiscoverySpi.
2932
"""
33+
3034
@property
3135
@abstractmethod
3236
def type(self):
@@ -45,6 +49,7 @@ class ZookeeperDiscoverySpi(DiscoverySpi):
4549
"""
4650
ZookeeperDiscoverySpi.
4751
"""
52+
4853
def __init__(self, zoo_service, root_path):
4954
self.connection_string = zoo_service.connection_string()
5055
self.port = zoo_service.settings.client_port
@@ -63,6 +68,7 @@ class TcpDiscoveryIpFinder(metaclass=ABCMeta):
6368
"""
6469
Abstract class for TcpDiscoveryIpFinder.
6570
"""
71+
6672
@property
6773
@abstractmethod
6874
def type(self):
@@ -81,6 +87,7 @@ class TcpDiscoveryVmIpFinder(TcpDiscoveryIpFinder):
8187
"""
8288
IpFinder with static ips, obtained from cluster nodes.
8389
"""
90+
8491
def __init__(self, nodes=None):
8592
self.addresses = TcpDiscoveryVmIpFinder.__get_addresses(nodes) if nodes else None
8693

@@ -102,6 +109,7 @@ class TcpDiscoverySpi(DiscoverySpi):
102109
"""
103110
TcpDiscoverySpi.
104111
"""
112+
105113
def __init__(self, ip_finder=TcpDiscoveryVmIpFinder(), port=47500, port_range=100, local_address=None):
106114
self.ip_finder = ip_finder
107115
self.port = port
@@ -134,6 +142,23 @@ def from_ignite_cluster(cluster, subset=None):
134142
return TcpDiscoverySpi(ip_finder=TcpDiscoveryVmIpFinder(nodes))
135143

136144

145+
def from_ignite_services(ignite_service_list: List[IgniteService]):
146+
"""
147+
Constructs a `TcpDiscoverySpi` instance from the provided Ignite services.
148+
:param ignite_service_list: A list of `IgniteService` objects representing the cluster.
149+
:return: A configured `TcpDiscoverySpi` containing the static IP addresses of the services.
150+
"""
151+
assert isinstance(ignite_service_list, list)
152+
assert all(isinstance(ignite_service, IgniteService) for ignite_service in ignite_service_list)
153+
154+
nodes = []
155+
156+
for ignite_service in ignite_service_list:
157+
nodes.extend(ignite_service.nodes)
158+
159+
return TcpDiscoverySpi(ip_finder=TcpDiscoveryVmIpFinder(nodes))
160+
161+
137162
def from_zookeeper_cluster(cluster, root_path="/apacheIgnite"):
138163
"""
139164
Form ZookeeperDiscoverySpi from zookeeper service cluster.
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
"""
17+
This package contains rolling upgrade tests.
18+
"""
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
"""
17+
Module contains rolling upgrade tests with new nodes introduced into the topology and older nodes gracefully removed.
18+
"""
19+
from ducktape.mark import defaults, matrix
20+
21+
from ignitetest.services.ignite import IgniteService
22+
from ignitetest.services.utils.control_utility import ControlUtility
23+
from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_services
24+
from ignitetest.tests.rebalance.persistent_test import await_and_check_rebalance
25+
from ignitetest.tests.rolling_upgrade.util import BaseRollingUpgradeTest, PRELOADERS_COUNT, NUM_NODES
26+
from ignitetest.utils import cluster, ignite_versions
27+
from ignitetest.utils.version import LATEST, DEV_BRANCH, IgniteVersion
28+
29+
30+
class AddRemoveNodeUpgradeTest(BaseRollingUpgradeTest):
31+
32+
@cluster(num_nodes=2 * NUM_NODES + PRELOADERS_COUNT)
33+
@ignite_versions(str(LATEST))
34+
@matrix(with_persistence=[True, False])
35+
@defaults(upgrade_version=[str(DEV_BRANCH)], force=[False], backups=[1], entry_count=[15_000])
36+
def test_add_remove_rolling_upgrade(self, ignite_version, upgrade_version, force, with_persistence,
37+
backups, entry_count):
38+
node_count = (self.test_context.expected_num_nodes - PRELOADERS_COUNT) // 2
39+
40+
self.check_rolling_upgrade(ignite_version, upgrade_version, force, with_persistence,
41+
backups, entry_count, self._upgrade_ignite_cluster, node_count)
42+
43+
def _upgrade_ignite_cluster(self, ignites, upgrade_version, force, with_persistence):
44+
control_sh = ControlUtility(ignites)
45+
46+
control_sh.enable_rolling_upgrade(IgniteVersion(upgrade_version).vstring, force)
47+
48+
self.logger.info(f"Starting rolling upgrade.")
49+
50+
upgraded_nodes = []
51+
52+
for ignite in ignites.nodes:
53+
new_node_cfg = ignites.config._replace(
54+
version=IgniteVersion(upgrade_version),
55+
discovery_spi=from_ignite_services([ignites] + upgraded_nodes)
56+
)
57+
58+
new_node = IgniteService(self.test_context, new_node_cfg, num_nodes=1)
59+
60+
new_node.start()
61+
62+
control_sh = ControlUtility(new_node)
63+
64+
if with_persistence:
65+
control_sh.add_to_baseline(new_node.nodes)
66+
67+
await_and_check_rebalance(new_node)
68+
69+
upgraded_nodes.append(new_node)
70+
71+
ignites.stop_node(ignite)
72+
73+
if with_persistence:
74+
control_sh.remove_from_baseline([ignite])
75+
76+
self.logger.info(f"Cluster upgrade is complete.")
77+
78+
control_sh.disable_rolling_upgrade()
79+
80+
return upgraded_nodes
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
"""
17+
Module contains in-place rolling upgrade tests
18+
"""
19+
from ducktape.mark import matrix, defaults
20+
21+
from ignitetest.services.utils.control_utility import ControlUtility
22+
from ignitetest.tests.rolling_upgrade.util import BaseRollingUpgradeTest, NUM_NODES
23+
from ignitetest.utils import cluster, ignite_versions
24+
from ignitetest.utils.version import IgniteVersion, LATEST, DEV_BRANCH
25+
26+
27+
class InPlaceNodeUpgradeTest(BaseRollingUpgradeTest):
28+
@cluster(num_nodes=NUM_NODES)
29+
@ignite_versions(str(LATEST))
30+
@matrix(with_persistence=[True, False], upgrade_coordinator_first=[True, False])
31+
@defaults(upgrade_version=[str(DEV_BRANCH)], force=[False], backups=[1], entry_count=[15_000])
32+
def test_in_place_rolling_upgrade(self, ignite_version, upgrade_version, force, with_persistence,
33+
backups, entry_count, upgrade_coordinator_first):
34+
self.upgrade_coordinator_first = upgrade_coordinator_first
35+
36+
self.check_rolling_upgrade(ignite_version, upgrade_version, force, with_persistence,
37+
backups, entry_count, self._upgrade_ignite_cluster)
38+
39+
def _upgrade_ignite_cluster(self, ignites, upgrade_version, force, with_persistence):
40+
control_sh = ControlUtility(ignites)
41+
42+
control_sh.enable_rolling_upgrade(IgniteVersion(upgrade_version).vstring, force)
43+
44+
self.logger.info(
45+
f"Starting in-place rolling upgrade "
46+
f"{'with coordinator going first' if self.upgrade_coordinator_first else 'from the last node in the ring'}"
47+
)
48+
49+
ignites.config = ignites.config._replace(version=IgniteVersion(upgrade_version))
50+
51+
for ignite in ignites.nodes if self.upgrade_coordinator_first else reversed(ignites.nodes):
52+
self.logger.debug(f"Upgrading {ignites.who_am_i(ignite)}")
53+
54+
ignites.stop_node(ignite)
55+
56+
ignites.start_node(ignite)
57+
58+
ignites.await_started([ignite])
59+
60+
self.logger.info(f"Cluster upgrade is complete.")
61+
62+
control_sh.disable_rolling_upgrade()
63+
64+
return [ignites]

0 commit comments

Comments
 (0)