Skip to content

Commit ce59c37

Browse files
authored
IGNITE-15126 Fixed in-memory cluster hanging when error is thrown on activation (#11694)
1 parent d1d29d9 commit ce59c37

File tree

15 files changed

+412
-221
lines changed

15 files changed

+412
-221
lines changed

modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsTest.java

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,7 @@ public abstract class BaselineEventsTest extends GridCommandHandlerFactoryAbstra
4141

4242
/** {@inheritDoc} */
4343
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
44-
return super.getConfiguration(igniteInstanceName)
45-
.setConnectorConfiguration(new ConnectorConfiguration())
46-
.setDataStorageConfiguration(
47-
new DataStorageConfiguration()
48-
.setDefaultDataRegionConfiguration(
49-
new DataRegionConfiguration()
50-
.setPersistenceEnabled(true)
51-
)
52-
.setWalSegments(3)
53-
.setWalSegmentSize(512 * 1024)
54-
)
55-
.setConsistentId(igniteInstanceName)
56-
.setIncludeEventTypes(includedEvtTypes);
44+
return getConfiguration(igniteInstanceName, true);
5745
}
5846

5947
/** {@inheritDoc} */
@@ -72,9 +60,51 @@ public abstract class BaselineEventsTest extends GridCommandHandlerFactoryAbstra
7260
cleanPersistenceDir();
7361
}
7462

63+
/** */
64+
private IgniteConfiguration getConfiguration(String igniteInstanceName, boolean isPersistenceEnabled) throws Exception {
65+
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
66+
.setConnectorConfiguration(new ConnectorConfiguration())
67+
.setConsistentId(igniteInstanceName)
68+
.setIncludeEventTypes(includedEvtTypes);
69+
70+
if (isPersistenceEnabled) {
71+
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
72+
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
73+
.setPersistenceEnabled(true))
74+
.setWalSegments(3)
75+
.setWalSegmentSize(512 * 1024)
76+
);
77+
}
78+
79+
return cfg;
80+
}
81+
7582
/** */
7683
protected abstract void listen(IgniteEx ignite, IgnitePredicate<Event> lsnr, int... types);
7784

85+
/** */
86+
@Test
87+
public void testInMemoryBaselineAutoAdjustNotProduceEvents() throws Exception {
88+
startGrid(getConfiguration(getTestIgniteInstanceName(0), false));
89+
startGrid(getConfiguration(getTestIgniteInstanceName(1), false));
90+
91+
AtomicBoolean isBaselineChangedEvtListened = new AtomicBoolean();
92+
93+
listen(
94+
grid(0),
95+
event -> {
96+
isBaselineChangedEvtListened.set(true);
97+
98+
return true;
99+
},
100+
EventType.EVT_BASELINE_CHANGED
101+
);
102+
103+
startGrid(getConfiguration(getTestIgniteInstanceName(2), false));
104+
105+
assertFalse(GridTestUtils.waitForCondition(isBaselineChangedEvtListened::get, 2000));
106+
}
107+
78108
/** */
79109
@Test
80110
public void testChangeBltWithControlUtility() throws Exception {

modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,6 @@
240240
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STORAGE_CONFIG;
241241
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE;
242242
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
243-
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED;
244243
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
245244
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS;
246245
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JIT_NAME;
@@ -1684,10 +1683,6 @@ private void fillNodeAttributes(boolean notifyEnabled) throws IgniteCheckedExcep
16841683
if (cfg.getConnectorConfiguration() != null)
16851684
add(ATTR_REST_PORT_RANGE, cfg.getConnectorConfiguration().getPortRange());
16861685

1687-
// Whether rollback of dynamic cache start is supported or not.
1688-
// This property is added because of backward compatibility.
1689-
add(ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED, Boolean.TRUE);
1690-
16911686
// Save data storage configuration.
16921687
addDataStorageConfigurationAttributes();
16931688

modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,6 @@ public final class IgniteNodeAttributes {
193193
/** Rebalance thread pool size. */
194194
public static final String ATTR_REBALANCE_POOL_SIZE = ATTR_PREFIX + ".rebalance.pool.size";
195195

196-
/** Internal attribute name constant. */
197-
public static final String ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED = ATTR_PREFIX + ".dynamic.cache.start.rollback.supported";
198-
199196
/** Supported features. */
200197
public static final String ATTR_IGNITE_FEATURES = ATTR_PREFIX + ".features";
201198

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ public void onClientCacheChange(ClientCacheChangeDiscoveryMessage msg, ClusterNo
559559
* @param failMsg Dynamic change request fail message.
560560
* @param topVer Current topology version.
561561
*/
562-
public void onCacheChangeRequested(DynamicCacheChangeFailureMessage failMsg, AffinityTopologyVersion topVer) {
562+
public void onCacheChangeRequested(ExchangeFailureMessage failMsg, AffinityTopologyVersion topVer) {
563563
AffinityTopologyVersion actualTopVer = failMsg.exchangeId().topologyVersion();
564564

565565
ExchangeActions exchangeActions = new ExchangeActions();
@@ -603,7 +603,7 @@ public void onCacheChangeRequested(DynamicCacheChangeFailureMessage failMsg, Aff
603603
processStopCacheRequest(exchangeActions, req, res, req.cacheName(), cacheDesc, actualTopVer, true);
604604
}
605605

606-
failMsg.exchangeActions(exchangeActions);
606+
failMsg.exchangeRollbackActions(exchangeActions);
607607
}
608608

609609
/**

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeFailureMessage.java renamed to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeFailureMessage.java

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.ignite.internal.processors.cache;
1919

2020
import java.util.Collection;
21+
import java.util.Map;
22+
import java.util.UUID;
2123
import org.apache.ignite.IgniteCheckedException;
2224
import org.apache.ignite.cluster.ClusterNode;
2325
import org.apache.ignite.internal.managers.discovery.DiscoCache;
@@ -28,55 +30,55 @@
2830
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
2931
import org.apache.ignite.internal.util.typedef.F;
3032
import org.apache.ignite.internal.util.typedef.internal.S;
33+
import org.apache.ignite.internal.util.typedef.internal.U;
3134
import org.apache.ignite.lang.IgniteUuid;
3235
import org.jetbrains.annotations.Nullable;
3336

3437
/**
3538
* This class represents discovery message that is used to provide information about dynamic cache start failure.
3639
*/
37-
public class DynamicCacheChangeFailureMessage implements DiscoveryCustomMessage {
40+
public class ExchangeFailureMessage implements DiscoveryCustomMessage {
3841
/** */
3942
private static final long serialVersionUID = 0L;
4043

4144
/** Cache names. */
4245
@GridToStringInclude
43-
private Collection<String> cacheNames;
46+
private final Collection<String> cacheNames;
4447

4548
/** Custom message ID. */
46-
private IgniteUuid id;
49+
private final IgniteUuid id;
4750

4851
/** */
49-
private GridDhtPartitionExchangeId exchId;
52+
private final GridDhtPartitionExchangeId exchId;
5053

5154
/** */
5255
@GridToStringInclude
53-
private IgniteCheckedException cause;
56+
private final Map<UUID, Exception> exchangeErrors;
5457

55-
/** Cache updates to be executed on exchange. */
56-
private transient ExchangeActions exchangeActions;
58+
/** Actions to be done to rollback changes done before the exchange failure. */
59+
private transient ExchangeActions exchangeRollbackActions;
5760

5861
/**
5962
* Creates new DynamicCacheChangeFailureMessage instance.
6063
*
6164
* @param locNode Local node.
6265
* @param exchId Exchange Id.
63-
* @param cause Cache start error.
64-
* @param cacheNames Cache names.
66+
* @param exchangeErrors Errors that caused PME to fail.
6567
*/
66-
public DynamicCacheChangeFailureMessage(
68+
public ExchangeFailureMessage(
6769
ClusterNode locNode,
6870
GridDhtPartitionExchangeId exchId,
69-
IgniteCheckedException cause,
71+
Map<UUID, Exception> exchangeErrors,
7072
Collection<String> cacheNames
7173
) {
7274
assert exchId != null;
73-
assert cause != null;
75+
assert !F.isEmpty(exchangeErrors);
7476
assert !F.isEmpty(cacheNames) : cacheNames;
7577

7678
this.id = IgniteUuid.fromUuid(locNode.id());
7779
this.exchId = exchId;
78-
this.cause = cause;
7980
this.cacheNames = cacheNames;
81+
this.exchangeErrors = exchangeErrors;
8082
}
8183

8284
/** {@inheritDoc} */
@@ -91,27 +93,40 @@ public Collection<String> cacheNames() {
9193
return cacheNames;
9294
}
9395

96+
/** */
97+
public Map<UUID, Exception> exchangeErrors() {
98+
return exchangeErrors;
99+
}
100+
94101
/**
95-
* @return Cache start error.
102+
* @return Cache updates to be executed on exchange.
96103
*/
97-
public IgniteCheckedException error() {
98-
return cause;
104+
public ExchangeActions exchangeRollbackActions() {
105+
return exchangeRollbackActions;
99106
}
100107

101108
/**
102-
* @return Cache updates to be executed on exchange.
109+
* @param exchangeRollbackActions Cache updates to be executed on exchange.
103110
*/
104-
public ExchangeActions exchangeActions() {
105-
return exchangeActions;
111+
public void exchangeRollbackActions(ExchangeActions exchangeRollbackActions) {
112+
assert exchangeRollbackActions != null && !exchangeRollbackActions.empty() : exchangeRollbackActions;
113+
114+
this.exchangeRollbackActions = exchangeRollbackActions;
106115
}
107116

108117
/**
109-
* @param exchangeActions Cache updates to be executed on exchange.
118+
* Creates an IgniteCheckedException that is used as root cause of the exchange initialization failure. This method
119+
* aggregates all the exceptions provided from all participating nodes.
120+
*
121+
* @return Exception that represents a cause of the exchange initialization failure.
110122
*/
111-
public void exchangeActions(ExchangeActions exchangeActions) {
112-
assert exchangeActions != null && !exchangeActions.empty() : exchangeActions;
123+
public IgniteCheckedException createFailureCompoundException() {
124+
IgniteCheckedException ex = new IgniteCheckedException("Failed to complete exchange process.");
125+
126+
for (Map.Entry<UUID, Exception> entry : exchangeErrors.entrySet())
127+
U.addSuppressed(ex, entry.getValue());
113128

114-
this.exchangeActions = exchangeActions;
129+
return ex;
115130
}
116131

117132
/**
@@ -141,6 +156,6 @@ public void exchangeActions(ExchangeActions exchangeActions) {
141156

142157
/** {@inheritDoc} */
143158
@Override public String toString() {
144-
return S.toString(DynamicCacheChangeFailureMessage.class, this);
159+
return S.toString(ExchangeFailureMessage.class, this);
145160
}
146161
}

0 commit comments

Comments
 (0)