|
| 1 | +/* |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one or more |
| 3 | + * contributor license agreements. See the NOTICE file distributed with |
| 4 | + * this work for additional information regarding copyright ownership. |
| 5 | + * The ASF licenses this file to You under the Apache License, Version 2.0 |
| 6 | + * (the "License"); you may not use this file except in compliance with |
| 7 | + * the License. You may obtain a copy of the License at |
| 8 | + * |
| 9 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | + * |
| 11 | + * Unless required by applicable law or agreed to in writing, software |
| 12 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | + * See the License for the specific language governing permissions and |
| 15 | + * limitations under the License. |
| 16 | + */ |
| 17 | + |
| 18 | +package org.apache.ignite.cache.affinity.rendezvous; |
| 19 | + |
| 20 | +import java.util.List; |
| 21 | +import org.apache.ignite.cluster.ClusterNode; |
| 22 | +import org.apache.ignite.internal.util.typedef.internal.S; |
| 23 | +import org.apache.ignite.lang.IgniteBiPredicate; |
| 24 | + |
| 25 | +/** |
| 26 | + * Multi-data center affinity backup filter that ensures each partition's data is distributed across multiple data centers, |
| 27 | + * providing high availability and fault tolerance. This implementation guarantees at least one copy of the data in each |
| 28 | + * data center and attempts to maintain the configured backup factor without discarding copies. |
| 29 | + * <p> |
| 30 | + * The filter works by grouping nodes based on their data center identification attribute (@see {@link ClusterNode#dataCenterId()}) |
| 31 | + * and ensuring that for every partition, at least one node from each data center is included in the primary-backup set. |
| 32 | + * <p> |
| 33 | + * The filter will discard backup copies only if the number of available nodes in a given data center is less |
| 34 | + * than the number of copies assigned to that data center. |
| 35 | + * For example, if a partition has 4 copies (1 primary and 3 backups) and the cluster has 2 data centers, |
| 36 | + * than 2 copies are assigned to each data center. The only scenario when just a single copy is assigned to a node in a data center is when |
| 37 | + * the number of nodes in that data center is one. |
| 38 | + * <p> |
| 39 | + * This class is constructed with a number of data centers the cluster spans and a number of backups of the cache this filter is applied to. |
| 40 | + * Implementation expects that all copies can be spread evenly across all data centers. In other words, (backups + 1) is divisible by |
| 41 | + * number of data centers without remainder. Uneven distributions of copies are not supported. |
| 42 | + * <p> |
| 43 | + * Warning: Ensure that all nodes have a consistent and valid data center identifier attribute. Missing or inconsistent values |
| 44 | + * may lead to unexpected placement of data. |
| 45 | + * </pre> |
| 46 | + * <h2 class="header">Spring Example</h2> |
| 47 | + * Create a partitioned cache template where each data center has at least one copy of the data, and the backup count is maintained. |
| 48 | + * <pre name="code" class="xml"> |
| 49 | + * <property name="cacheConfiguration"> |
| 50 | + * <list> |
| 51 | + * <bean id="cache-template-bean" abstract="true" class="org.apache.ignite.configuration.CacheConfiguration"> |
| 52 | + * <property name="name" value="JobcaseDefaultCacheConfig*"/> |
| 53 | + * <property name="cacheMode" value="PARTITIONED" /> |
| 54 | + * <property name="backups" value="3" /> |
| 55 | + * <property name="affinity"> |
| 56 | + * <bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction"> |
| 57 | + * <property name="affinityBackupFilter"> |
| 58 | + * <bean class="org.apache.ignite.cache.affinity.rendezvous.MdcAffinityBackupFilter"> |
| 59 | + * <constructor-arg value="2"/> <!-- dcsNumber --> |
| 60 | + * <constructor-arg value="3"/> <!-- backups, the same as in the cache template --> |
| 61 | + * </bean> |
| 62 | + * </property> |
| 63 | + * </bean> |
| 64 | + * </property> |
| 65 | + * </bean> |
| 66 | + * </list> |
| 67 | + * </property> |
| 68 | + * </pre> |
| 69 | + * <p> |
| 70 | + * With more backups, additional replicas can be distributed across different data centers to further improve redundancy. |
| 71 | + */ |
| 72 | +public class MdcAffinityBackupFilter implements IgniteBiPredicate<ClusterNode, List<ClusterNode>> { |
| 73 | + /** */ |
| 74 | + private static final long serialVersionUID = 1L; |
| 75 | + |
| 76 | + /** */ |
| 77 | + private final int partCopiesPerDc; |
| 78 | + |
| 79 | + /** |
| 80 | + * @param dcsNum Number of data centers. |
| 81 | + * @param backups Number of backups. |
| 82 | + */ |
| 83 | + public MdcAffinityBackupFilter(int dcsNum, int backups) { |
| 84 | + if (dcsNum < 2) { |
| 85 | + throw new IllegalArgumentException("MdcAffinityBackupFilter cannot be used in an environment with only one datacenter. " + |
| 86 | + "Number of datacenters must be at least 2."); |
| 87 | + } |
| 88 | + |
| 89 | + int numCopies = backups + 1; |
| 90 | + |
| 91 | + partCopiesPerDc = numCopies / dcsNum; |
| 92 | + int remainder = numCopies % dcsNum; |
| 93 | + |
| 94 | + if (remainder != 0) { |
| 95 | + String suggestion = "recommended "; |
| 96 | + if (numCopies - remainder <= 0) |
| 97 | + suggestion += "value is " + (backups + (dcsNum - remainder)); |
| 98 | + else |
| 99 | + suggestion += "values are " + (backups - remainder) + " and " + (backups + (dcsNum - remainder)); |
| 100 | + |
| 101 | + throw new IllegalArgumentException("Number of copies is not completely divisible by number of datacenters, " + |
| 102 | + "copies cannot be distributed evenly across DCs. " + |
| 103 | + "Please adjust the number of backups, " + suggestion); |
| 104 | + } |
| 105 | + } |
| 106 | + |
| 107 | + /** {@inheritDoc} */ |
| 108 | + @Override public boolean apply(ClusterNode candidate, List<ClusterNode> previouslySelected) { |
| 109 | + String candidateDcId = candidate.dataCenterId(); |
| 110 | + int candDcCopiesAssigned = 0; |
| 111 | + |
| 112 | + for (int i = 0; i < previouslySelected.size(); i++) { |
| 113 | + String prevDcId = previouslySelected.get(i).dataCenterId(); |
| 114 | + |
| 115 | + if (prevDcId == null) |
| 116 | + return false; |
| 117 | + |
| 118 | + candDcCopiesAssigned += prevDcId.equals(candidateDcId) ? 1 : 0; |
| 119 | + } |
| 120 | + |
| 121 | + return candDcCopiesAssigned < partCopiesPerDc; |
| 122 | + } |
| 123 | + |
| 124 | + /** {@inheritDoc} */ |
| 125 | + @Override public String toString() { |
| 126 | + return S.toString(MdcAffinityBackupFilter.class, this); |
| 127 | + } |
| 128 | +} |
0 commit comments