Skip to content

Commit a148e7b

Browse files
IGNITE-26583 Rebalancing optimization for MultiDC - Fixes #12473.
Signed-off-by: Aleksey Plekhanov <[email protected]>
1 parent 970489e commit a148e7b

File tree

12 files changed

+257
-49
lines changed

12 files changed

+257
-49
lines changed

modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,6 @@ private static AffinityFunctionContext context(
328328
private static ClusterNode node(int idx) {
329329
TcpDiscoveryNode node = new TcpDiscoveryNode(
330330
UUID.randomUUID(),
331-
null,
332331
Collections.singletonList("127.0.0.1"),
333332
Collections.singletonList("127.0.0.1"),
334333
0,

modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.UUID;
2424
import org.apache.ignite.Ignite;
2525
import org.apache.ignite.configuration.IgniteConfiguration;
26+
import org.apache.ignite.internal.IgniteNodeAttributes;
2627
import org.apache.ignite.lang.IgniteExperimental;
2728
import org.apache.ignite.lang.IgniteProductVersion;
2829
import org.jetbrains.annotations.Nullable;
@@ -183,7 +184,7 @@ public interface ClusterNode extends BaselineNode {
183184
*/
184185
@IgniteExperimental
185186
@Nullable public default String dataCenterId() {
186-
return null;
187+
return attribute(IgniteNodeAttributes.ATTR_DATA_CENTER_ID);
187188
}
188189

189190
/**

modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@
221221
import static java.util.Collections.singleton;
222222
import static java.util.Optional.ofNullable;
223223
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
224+
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DATA_CENTER_ID;
224225
import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
225226
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
226227
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
@@ -236,6 +237,7 @@
236237
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER;
237238
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE;
238239
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
240+
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;
239241
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STORAGE_CONFIG;
240242
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE;
241243
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
@@ -1606,6 +1608,11 @@ private void fillNodeAttributes(boolean notifyEnabled) throws IgniteCheckedExcep
16061608
add(ATTR_TX_SERIALIZABLE_ENABLED, cfg.getTransactionConfiguration().isTxSerializableEnabled());
16071609
add(ATTR_TX_AWARE_QUERIES_ENABLED, cfg.getTransactionConfiguration().isTxAwareQueriesEnabled());
16081610

1611+
if (IgniteSystemProperties.getString(IGNITE_DATA_CENTER_ID) != null)
1612+
add(ATTR_DATA_CENTER_ID, IgniteSystemProperties.getString(IGNITE_DATA_CENTER_ID));
1613+
else if (userAttrs != null && userAttrs.get(IGNITE_DATA_CENTER_ID) != null)
1614+
add(ATTR_DATA_CENTER_ID, (Serializable)userAttrs.get(IGNITE_DATA_CENTER_ID));
1615+
16091616
// Stick in SPI versions and classes attributes.
16101617
addSpiAttributes(cfg.getCollisionSpi());
16111618
addSpiAttributes(cfg.getDiscoverySpi());

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,6 @@
150150

151151
import static java.util.concurrent.TimeUnit.MILLISECONDS;
152152
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
153-
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DATA_CENTER_ID;
154153
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE;
155154
import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
156155
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SECURITY_COMPATIBILITY_MODE;
@@ -166,7 +165,6 @@
166165
import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
167166
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
168167
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
169-
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;
170168
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_REGIONS_OFFHEAP_SIZE;
171169
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
172170
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
@@ -486,12 +484,6 @@ private void updateClientNodes(UUID leftNodeId) {
486484
ctx.addNodeAttribute(ATTR_OFFHEAP_SIZE, requiredOffheap());
487485
ctx.addNodeAttribute(ATTR_DATA_REGIONS_OFFHEAP_SIZE, configuredOffheap());
488486

489-
// TODO When exposing to public interface, replace the retrieval in IgniteClusterNode implementations.
490-
String dcId = IgniteSystemProperties.getString(IGNITE_DATA_CENTER_ID);
491-
492-
if (dcId != null)
493-
ctx.addNodeAttribute(ATTR_DATA_CENTER_ID, dcId);
494-
495487
DiscoverySpi spi = getSpi();
496488

497489
discoOrdered = discoOrdered();

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
import java.util.ArrayList;
2121
import java.util.Collection;
22+
import java.util.HashSet;
2223
import java.util.List;
24+
import java.util.Objects;
2325
import java.util.UUID;
2426
import java.util.concurrent.locks.ReadWriteLock;
2527
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -47,7 +49,9 @@
4749
import org.apache.ignite.internal.util.typedef.CI1;
4850
import org.apache.ignite.internal.util.typedef.F;
4951
import org.apache.ignite.internal.util.typedef.internal.SB;
52+
import org.apache.ignite.internal.util.typedef.internal.U;
5053
import org.apache.ignite.lang.IgniteBiPredicate;
54+
import org.apache.ignite.lang.IgnitePredicate;
5155
import org.jetbrains.annotations.Nullable;
5256

5357
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION;
@@ -195,6 +199,10 @@ public boolean disableRebalancingCancellationOptimization() {
195199

196200
CachePartitionFullCountersMap countersMap = grp.topology().fullUpdateCounters();
197201

202+
String dcId = ctx.localNode().dataCenterId();
203+
Collection<UUID> sameDcNodeIds = dcId == null ? null : new HashSet<>(F.viewReadOnly(
204+
ctx.discovery().aliveServerNodes(), ClusterNode::id, n -> Objects.equals(n.dataCenterId(), dcId)));
205+
198206
for (int p = 0; p < partitions; p++) {
199207
if (ctx.exchange().hasPendingServerExchange()) {
200208
if (log.isDebugEnabled())
@@ -228,8 +236,12 @@ public boolean disableRebalancingCancellationOptimization() {
228236
if (grp.persistenceEnabled() && exchFut != null) {
229237
List<UUID> nodeIds = exchFut.partitionHistorySupplier(grp.groupId(), p, part.initialUpdateCounter());
230238

231-
if (!F.isEmpty(nodeIds))
239+
if (!F.isEmpty(nodeIds)) {
240+
if (sameDcNodeIds != null)
241+
nodeIds = retainNodesNotEmpty(nodeIds, sameDcNodeIds::contains);
242+
232243
histSupplier = ctx.discovery().node(nodeIds.get(p % nodeIds.size()));
244+
}
233245
}
234246

235247
if (histSupplier != null && !exchFut.isClearingPartition(grp, p)) {
@@ -260,6 +272,9 @@ public boolean disableRebalancingCancellationOptimization() {
260272
});
261273

262274
if (!picked.isEmpty()) {
275+
if (dcId != null)
276+
picked = retainNodesNotEmpty(picked, n -> Objects.equals(dcId, n.dataCenterId()));
277+
263278
ClusterNode n = picked.get(p % picked.size());
264279

265280
GridDhtPartitionDemandMessage msg = assignments.get(n);
@@ -303,6 +318,15 @@ public boolean disableRebalancingCancellationOptimization() {
303318
return assignments;
304319
}
305320

321+
/**
322+
* Retains nodes which satisfy filter. Returns original list if result set is empty.
323+
*/
324+
private <T> List<T> retainNodesNotEmpty(List<T> nodes, IgnitePredicate<T> filter) {
325+
List<T> nodes0 = U.arrayList(nodes, filter);
326+
327+
return !F.isEmpty(nodes0) ? nodes0 : nodes;
328+
}
329+
306330
/** {@inheritDoc} */
307331
@Override public void onReconnected() {
308332
startFut = new GridFutureAdapter<>();

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@
114114
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT;
115115
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
116116
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
117-
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;
118117
import static org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DISCO_METRICS;
119118

120119
/**
@@ -1169,8 +1168,6 @@ protected void initLocalNode(int srvPort, boolean addExtAddrAttr) {
11691168

11701169
locNode = new TcpDiscoveryNode(
11711170
ignite.configuration().getNodeId(),
1172-
//TODO remove usage of internal API when an alternative from public API is available
1173-
(String)((IgniteEx)ignite).context().nodeAttributes().get(ATTR_DATA_CENTER_ID),
11741171
addrs.get1(),
11751172
addrs.get2(),
11761173
srvPort,

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
4949
import org.jetbrains.annotations.Nullable;
5050

51-
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;
5251
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID;
5352
import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.eqNodes;
5453

@@ -74,9 +73,6 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Ignite
7473
@GridToStringExclude
7574
private Map<String, Object> attrs;
7675

77-
/** Data center ID of the node. */
78-
private String dcId;
79-
8076
/** Internal discovery addresses as strings. */
8177
@GridToStringInclude
8278
private Collection<String> addrs;
@@ -159,7 +155,6 @@ public TcpDiscoveryNode() {
159155
* Constructor.
160156
*
161157
* @param id Node Id.
162-
* @param dcId ID of a data center where this node is started ({@code null} if there is only one data center).
163158
* @param addrs Addresses.
164159
* @param hostNames Host names.
165160
* @param discPort Port.
@@ -168,7 +163,6 @@ public TcpDiscoveryNode() {
168163
* @param consistentId Node consistent ID.
169164
*/
170165
public TcpDiscoveryNode(UUID id,
171-
String dcId,
172166
Collection<String> addrs,
173167
Collection<String> hostNames,
174168
int discPort,
@@ -181,7 +175,6 @@ public TcpDiscoveryNode(UUID id,
181175
assert ver != null;
182176

183177
this.id = id;
184-
this.dcId = dcId;
185178

186179
List<String> sortedAddrs = new ArrayList<>(addrs);
187180

@@ -360,11 +353,6 @@ public void version(IgniteProductVersion ver) {
360353
this.ver = ver;
361354
}
362355

363-
/** {@inheritDoc} */
364-
@Override public @Nullable String dataCenterId() {
365-
return dcId;
366-
}
367-
368356
/** {@inheritDoc} */
369357
@Override public Collection<String> addresses() {
370358
return addrs;
@@ -541,7 +529,7 @@ public void onClientDisconnected(UUID newId) {
541529
*/
542530
public TcpDiscoveryNode clientReconnectNode(Map<String, Object> nodeAttrs) {
543531
TcpDiscoveryNode node = new TcpDiscoveryNode(
544-
id, dcId, addrs, hostNames, discPort, metricsProvider, ver, null
532+
id, addrs, hostNames, discPort, metricsProvider, ver, null
545533
);
546534

547535
node.attrs = Collections.unmodifiableMap(new HashMap<>(nodeAttrs));
@@ -627,8 +615,6 @@ public TcpDiscoveryNode clientReconnectNode(Map<String, Object> nodeAttrs) {
627615
consistentId = consistentIdAttr != null ? consistentIdAttr : id;
628616
else
629617
consistentId = consistentIdAttr != null ? consistentIdAttr : U.consistentId(addrs, discPort);
630-
631-
dcId = (String)attrs.get(ATTR_DATA_CENTER_ID);
632618
}
633619

634620
/** {@inheritDoc} */
@@ -643,7 +629,7 @@ public TcpDiscoveryNode clientReconnectNode(Map<String, Object> nodeAttrs) {
643629

644630
/** {@inheritDoc} */
645631
@Override public String toString() {
646-
return S.toString(TcpDiscoveryNode.class, this, "isClient", isClient());
632+
return S.toString(TcpDiscoveryNode.class, this, "isClient", isClient(), "dataCenterId", dataCenterId());
647633
}
648634

649635
/**

modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2Test.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,6 @@ public void testSerialization() throws IOException, ClassNotFoundException {
237237
protected TcpDiscoveryNode node(DiscoveryMetricsProvider metrics, IgniteProductVersion v, String consistentId) {
238238
TcpDiscoveryNode node = new TcpDiscoveryNode(
239239
UUID.randomUUID(),
240-
null,
241240
Collections.singletonList("127.0.0.1"),
242241
Collections.singletonList("127.0.0.1"),
243242
0,

0 commit comments

Comments
 (0)