1717
1818package org .apache .ignite .internal .client .thin ;
1919
20+ import java .util .ArrayList ;
2021import java .util .Collection ;
2122import java .util .Collections ;
23+ import java .util .EnumSet ;
2224import java .util .HashMap ;
2325import java .util .HashSet ;
26+ import java .util .List ;
2427import java .util .Map ;
2528import java .util .Objects ;
2629import java .util .Set ;
3235import org .apache .ignite .IgniteBinary ;
3336import org .apache .ignite .client .ClientPartitionAwarenessMapper ;
3437import org .apache .ignite .client .ClientPartitionAwarenessMapperFactory ;
38+ import org .apache .ignite .internal .binary .BinaryUtils ;
39+ import org .apache .ignite .internal .binary .BinaryWriterEx ;
3540import org .apache .ignite .internal .processors .affinity .AffinityTopologyVersion ;
3641import org .apache .ignite .internal .util .GridConcurrentHashSet ;
3742import org .apache .ignite .internal .util .typedef .internal .U ;
@@ -46,6 +51,20 @@ public class ClientCacheAffinityContext {
4651 /** If a factory needs to be removed. */
4752 private static final long REMOVED_TS = 0 ;
4853
54+ /** Affinity operations, allowed to be executed on backup nodes. */
55+ private static final EnumSet <ClientOperation > OPS_ALLOWED_ON_BACKUPS = EnumSet .of (
56+ ClientOperation .CACHE_GET ,
57+ ClientOperation .CACHE_CONTAINS_KEY ,
58+ ClientOperation .ATOMIC_LONG_VALUE_GET ,
59+ ClientOperation .OP_SET_VALUE_CONTAINS ,
60+ ClientOperation .OP_SET_VALUE_CONTAINS_ALL ,
61+ ClientOperation .OP_SET_ITERATOR_START ,
62+ ClientOperation .QUERY_SQL ,
63+ ClientOperation .QUERY_SQL_FIELDS ,
64+ ClientOperation .QUERY_SCAN ,
65+ ClientOperation .QUERY_INDEX
66+ );
67+
4968 /**
5069 * Factory for each cache id to produce key to partition mapping functions.
5170 * This factory is also used to resolve cacheName from cacheId. If a cache has default affinity mappings then
@@ -74,18 +93,24 @@ public class ClientCacheAffinityContext {
7493 /** Predicate to check whether a connection to the node with the specified ID is open. */
7594 private final Predicate <UUID > connectionEstablishedPredicate ;
7695
96+ /** Data center ID. */
97+ private final String dataCenterId ;
98+
7799 /**
78100 * @param binary Binary data processor.
79101 * @param factory Factory for caches with custom affinity.
102+ * @param dataCenterId Data center ID.
80103 */
81104 public ClientCacheAffinityContext (
82105 IgniteBinary binary ,
83106 @ Nullable ClientPartitionAwarenessMapperFactory factory ,
84- Predicate <UUID > connectionEstablishedPredicate
107+ Predicate <UUID > connectionEstablishedPredicate ,
108+ String dataCenterId
85109 ) {
86110 this .paMapFactory = factory ;
87111 this .binary = binary ;
88112 this .connectionEstablishedPredicate = connectionEstablishedPredicate ;
113+ this .dataCenterId = dataCenterId ;
89114 }
90115
91116 /**
@@ -150,7 +175,7 @@ public void writePartitionsUpdateRequest(PayloadOutputChannel ch) {
150175
151176 // In case of IO error rq can hold previous mapping request. Just overwrite it, we don't need it anymore.
152177 rq = new CacheMappingRequest (cacheIds , lastAccessed );
153- ClientCacheAffinityMapping .writeRequest (ch , rq .caches , rq .ts > 0 );
178+ ClientCacheAffinityMapping .writeRequest (ch , rq .caches , rq .ts > 0 , dataCenterId );
154179 }
155180
156181 /**
@@ -221,6 +246,36 @@ else if (newMapping.topologyVersion().equals(oldMapping.topologyVersion()))
221246 return true ;
222247 }
223248
249+ /**
250+ * @param ch Payload output channel.
251+ */
252+ public void writeDataCenterNodesRequest (PayloadOutputChannel ch ) {
253+ try (BinaryWriterEx w = BinaryUtils .writer (null , ch .out (), null )) {
254+ w .writeString (dataCenterId );
255+ }
256+ }
257+
258+ /**
259+ * @param ch Payload input channel.
260+ */
261+ public boolean readDataCenterNodesResponse (PayloadInputChannel ch ) {
262+ TopologyNodes top = lastTop .get ();
263+
264+ if (top == null )
265+ return false ;
266+
267+ int cnt = ch .in ().readInt ();
268+
269+ List <UUID > dcNodes = new ArrayList <>(cnt );
270+
271+ for (int i = 0 ; i < cnt ; i ++)
272+ dcNodes .add (new UUID (ch .in ().readLong (), ch .in ().readLong ()));
273+
274+ top .dcNodes = dcNodes ;
275+
276+ return true ;
277+ }
278+
224279 /**
225280 * Gets last topology information.
226281 */
@@ -246,25 +301,40 @@ public synchronized void reset(TopologyNodes top) {
246301 *
247302 * @param cacheId Cache ID.
248303 * @param key Key.
304+ * @param op Client operation.
249305 * @return Affinity node id or {@code null} if affinity node can't be determined for given cache and key.
250306 */
251- public UUID affinityNode (int cacheId , Object key ) {
307+ public UUID affinityNode (int cacheId , Object key , ClientOperation op ) {
252308 ClientCacheAffinityMapping mapping = currentMapping ();
253309
254- return mapping == null ? null : mapping .affinityNode (binary , cacheId , key );
310+ boolean primary = !OPS_ALLOWED_ON_BACKUPS .contains (op );
311+
312+ return mapping == null ? null : mapping .affinityNode (binary , cacheId , key , primary );
255313 }
256314
257315 /**
258316 * Calculates affinity node for given cache and partition.
259317 *
260318 * @param cacheId Cache ID.
261319 * @param part Partition.
320+ * @param op Client operation.
262321 * @return Affinity node id or {@code null} if affinity node can't be determined for given cache and partition.
263322 */
264- public UUID affinityNode (int cacheId , int part ) {
323+ public UUID affinityNode (int cacheId , int part , ClientOperation op ) {
265324 ClientCacheAffinityMapping mapping = currentMapping ();
266325
267- return mapping == null ? null : mapping .affinityNode (cacheId , part );
326+ boolean primary = !OPS_ALLOWED_ON_BACKUPS .contains (op );
327+
328+ return mapping == null ? null : mapping .affinityNode (cacheId , part , primary );
329+ }
330+
331+ /**
332+ * @return List of nodes in current data center.
333+ */
334+ public List <UUID > dataCenterNodes () {
335+ TopologyNodes top = lastTop .get ();
336+
337+ return top == null ? null : top .dataCenterNodes ();
268338 }
269339
270340 /**
@@ -314,6 +384,11 @@ public void unregisterCache(String cacheName) {
314384 }
315385 }
316386
387+ /** */
388+ public String dataCenterId () {
389+ return dataCenterId ;
390+ }
391+
317392 /** */
318393 private boolean isTopologyOutdated (TopologyNodes top , AffinityTopologyVersion srvSideTopVer ) {
319394 if (top == null )
@@ -340,6 +415,9 @@ static class TopologyNodes {
340415 /** Nodes. */
341416 private final Collection <UUID > nodes = new ConcurrentLinkedQueue <>();
342417
418+ /** Current data center nodes. */
419+ private volatile List <UUID > dcNodes ;
420+
343421 /**
344422 * @param topVer Topology version.
345423 * @param nodeId Node id.
@@ -357,6 +435,13 @@ public Iterable<UUID> nodes() {
357435 return Collections .unmodifiableCollection (nodes );
358436 }
359437
438+ /**
439+ * Gets nodes of current data center.
440+ */
441+ public List <UUID > dataCenterNodes () {
442+ return dcNodes ;
443+ }
444+
360445 /**
361446 * @return Topology version.
362447 */
0 commit comments