Skip to content

Commit 02f03c8

Browse files
committed
IGNITE-28050 Use message serializer for start continuous routine messages
1 parent 9eb8603 commit 02f03c8

18 files changed

+267
-668
lines changed

modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.apache.ignite.internal.util.future.GridFutureAdapter;
5050
import org.apache.ignite.internal.util.typedef.F;
5151
import org.apache.ignite.internal.util.typedef.P2;
52-
import org.apache.ignite.internal.util.typedef.T2;
5352
import org.apache.ignite.internal.util.typedef.T3;
5453
import org.apache.ignite.internal.util.typedef.internal.U;
5554
import org.apache.ignite.lang.IgniteBiPredicate;
@@ -142,13 +141,13 @@ public GridEventConsumeHandler() {
142141
}
143142

144143
/** {@inheritDoc} */
145-
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
146-
Map<Integer, T2<Long, Long>> cntrs) {
144+
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
145+
Map<Integer, Long> cntrs) {
147146
// No-op.
148147
}
149148

150149
/** {@inheritDoc} */
151-
@Override public Map<Integer, T2<Long, Long>> updateCounters() {
150+
@Override public Map<Integer, Long> updateCounters() {
152151
return Collections.emptyMap();
153152
}
154153

modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.ignite.internal.util.future.GridFinishedFuture;
3737
import org.apache.ignite.internal.util.future.GridFutureAdapter;
3838
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
39-
import org.apache.ignite.internal.util.typedef.T2;
4039
import org.apache.ignite.internal.util.typedef.internal.S;
4140
import org.apache.ignite.internal.util.typedef.internal.U;
4241
import org.apache.ignite.lang.IgniteBiPredicate;
@@ -117,13 +116,13 @@ public GridMessageListenHandler(@Nullable Object topic, IgniteBiPredicate<UUID,
117116
}
118117

119118
/** {@inheritDoc} */
120-
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
121-
Map<Integer, T2<Long, Long>> cntrs) {
119+
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
120+
Map<Integer, Long> cntrs) {
122121
// No-op.
123122
}
124123

125124
/** {@inheritDoc} */
126-
@Override public Map<Integer, T2<Long, Long>> updateCounters() {
125+
@Override public Map<Integer, Long> updateCounters() {
127126
return Collections.emptyMap();
128127
}
129128

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,14 @@
9191
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessageSerializer;
9292
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
9393
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessageMarshallableSerializer;
94+
import org.apache.ignite.internal.processors.continuous.StartRequestData;
95+
import org.apache.ignite.internal.processors.continuous.StartRequestDataSerializer;
96+
import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
97+
import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessageSerializer;
98+
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryImmutableMessage;
99+
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryImmutableMessageSerializer;
100+
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
101+
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageSerializer;
94102
import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage;
95103
import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessageSerializer;
96104
import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
@@ -241,6 +249,7 @@ public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) {
241249
factory.register(-200, TcpDiscoveryCollectionMessage::new,
242250
new TcpDiscoveryCollectionMessageMarshallableSerializer(marsh, clsLdr));
243251

252+
factory.register(-118, StartRequestData::new, new StartRequestDataSerializer());
244253
factory.register(-117, TcpDiscoveryNode::new, new TcpDiscoveryNodeMarshallableSerializer(marsh, clsLdr));
245254
factory.register(-116, IgniteProductVersion::new, new IgniteProductVersionSerializer());
246255
factory.register(-115, SchemaAlterTableAddColumnOperation::new, new SchemaAlterTableAddColumnOperationSerializer());
@@ -361,5 +370,8 @@ public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) {
361370
factory.register(537, ServiceDeploymentRequest::new,
362371
new ServiceDeploymentRequestMarshallableSerializer(marsh, clsLdr));
363372
factory.register(538, ServiceUndeploymentRequest::new, new ServiceUndeploymentRequestSerializer());
373+
factory.register(539, StartRoutineDiscoveryMessage::new, new StartRoutineDiscoveryMessageSerializer());
374+
factory.register(540, StartRoutineAckDiscoveryMessage::new, new StartRoutineAckDiscoveryMessageSerializer());
375+
factory.register(541, StartRoutineDiscoveryImmutableMessage::new, new StartRoutineDiscoveryImmutableMessageSerializer());
364376
}
365377
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.Collections;
2323
import java.util.Map;
2424
import org.apache.ignite.internal.Order;
25-
import org.apache.ignite.internal.util.typedef.T2;
2625
import org.apache.ignite.internal.util.typedef.internal.U;
2726
import org.apache.ignite.plugin.extensions.communication.Message;
2827

@@ -203,15 +202,14 @@ public long updateCounterAt(int idx) {
203202
* @param cntrsMap Partial local counters map.
204203
* @return Partition ID to partition counters map.
205204
*/
206-
public static Map<Integer, T2<Long, Long>> toCountersMap(CachePartitionPartialCountersMap cntrsMap) {
205+
public static Map<Integer, Long> toCountersMap(CachePartitionPartialCountersMap cntrsMap) {
207206
if (cntrsMap.size() == 0)
208207
return Collections.emptyMap();
209208

210-
Map<Integer, T2<Long, Long>> res = U.newHashMap(cntrsMap.size());
209+
Map<Integer, Long> res = U.newHashMap(cntrsMap.size());
211210

212211
for (int idx = 0; idx < cntrsMap.size(); idx++)
213-
res.put(cntrsMap.partitionAt(idx),
214-
new T2<>(cntrsMap.initialUpdateCounterAt(idx), cntrsMap.updateCounterAt(idx)));
212+
res.put(cntrsMap.partitionAt(idx), cntrsMap.updateCounterAt(idx));
215213

216214
return res;
217215
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@
7575
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
7676
import org.apache.ignite.internal.util.typedef.CI1;
7777
import org.apache.ignite.internal.util.typedef.F;
78-
import org.apache.ignite.internal.util.typedef.T2;
7978
import org.apache.ignite.internal.util.typedef.internal.CU;
8079
import org.apache.ignite.internal.util.typedef.internal.S;
8180
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -200,10 +199,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
200199
private transient int cacheId;
201200

202201
/** */
203-
private transient volatile Map<Integer, T2<Long, Long>> initUpdCntrs;
202+
private transient volatile Map<Integer, Long> initUpdCntrs;
204203

205204
/** */
206-
private transient volatile Map<UUID, Map<Integer, T2<Long, Long>>> initUpdCntrsPerNode;
205+
private transient volatile Map<UUID, Map<Integer, Long>> initUpdCntrsPerNode;
207206

208207
/** */
209208
private transient volatile AffinityTopologyVersion initTopVer;
@@ -224,7 +223,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
224223
private transient UUID routineId;
225224

226225
/** Local update counters values on listener start. Used for skipping events fired before the listener start. */
227-
private transient volatile Map<Integer, T2<Long, Long>> locInitUpdCntrs;
226+
private transient volatile Map<Integer, Long> locInitUpdCntrs;
228227

229228
/** */
230229
private transient GridKernalContext ctx;
@@ -361,15 +360,15 @@ public void keepBinary(boolean keepBinary) {
361360
}
362361

363362
/** {@inheritDoc} */
364-
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
365-
Map<Integer, T2<Long, Long>> cntrs) {
366-
this.initUpdCntrsPerNode = cntrsPerNode;
367-
this.initUpdCntrs = cntrs;
368-
this.initTopVer = topVer;
363+
@Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
364+
Map<Integer, Long> cntrs) {
365+
initUpdCntrsPerNode = cntrsPerNode;
366+
initUpdCntrs = cntrs;
367+
initTopVer = topVer;
369368
}
370369

371370
/** {@inheritDoc} */
372-
@Override public Map<Integer, T2<Long, Long>> updateCounters() {
371+
@Override public Map<Integer, Long> updateCounters() {
373372
return locInitUpdCntrs;
374373
}
375374

@@ -1163,32 +1162,31 @@ private String taskName() {
11631162
CacheContinuousQueryPartitionRecovery rec = rcvs.get(partId);
11641163

11651164
if (rec == null) {
1166-
T2<Long, Long> partCntrs = null;
1165+
Long partCntr = null;
11671166

1168-
Map<UUID, Map<Integer, T2<Long, Long>>> initUpdCntrsPerNode = this.initUpdCntrsPerNode;
1167+
Map<UUID, Map<Integer, Long>> initUpdCntrsPerNode = this.initUpdCntrsPerNode;
11691168

11701169
if (initUpdCntrsPerNode != null) {
11711170
GridCacheContext<K, V> cctx = cacheContext(ctx);
11721171

11731172
GridCacheAffinityManager aff = cctx.affinity();
11741173

11751174
for (ClusterNode node : aff.nodesByPartition(partId, topVer)) {
1176-
Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
1175+
Map<Integer, Long> map = initUpdCntrsPerNode.get(node.id());
11771176

11781177
if (map != null) {
1179-
partCntrs = map.get(partId);
1178+
partCntr = map.get(partId);
11801179

11811180
break;
11821181
}
11831182
}
11841183
}
11851184
else if (initUpdCntrs != null)
1186-
partCntrs = initUpdCntrs.get(partId);
1185+
partCntr = initUpdCntrs.get(partId);
11871186

1188-
T2<Long, Long> partCntrs0 = partCntrs;
1187+
Long partCntr0 = partCntr;
11891188
CacheContinuousQueryPartitionRecovery oldRec = rcvs.computeIfAbsent(partId, k ->
1190-
new CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
1191-
partCntrs0 != null ? partCntrs0.get2() : null));
1189+
new CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer, partCntr0));
11921190

11931191
if (oldRec != null)
11941192
rec = oldRec;

modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@
2121
import org.apache.ignite.internal.Order;
2222
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
2323
import org.apache.ignite.lang.IgniteUuid;
24+
import org.apache.ignite.plugin.extensions.communication.Message;
2425

2526
/**
2627
*
2728
*/
28-
public abstract class AbstractContinuousMessage implements DiscoveryCustomMessage {
29+
public abstract class AbstractContinuousMessage implements Message, DiscoveryCustomMessage {
2930
/** */
3031
private static final long serialVersionUID = 2781778657738703012L;
3132

modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineInfo.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
import java.io.Serializable;
2121
import java.util.UUID;
22+
import org.apache.ignite.cluster.ClusterNode;
2223
import org.apache.ignite.internal.util.typedef.internal.S;
24+
import org.apache.ignite.lang.IgnitePredicate;
2325

2426
/**
2527
*
@@ -35,10 +37,10 @@ class ContinuousRoutineInfo implements Serializable {
3537
final UUID routineId;
3638

3739
/** */
38-
final byte[] hnd;
40+
final GridContinuousHandler hnd;
3941

4042
/** */
41-
final byte[] nodeFilter;
43+
final IgnitePredicate<ClusterNode> nodeFilter;
4244

4345
/** */
4446
final int bufSize;
@@ -55,17 +57,17 @@ class ContinuousRoutineInfo implements Serializable {
5557
/**
5658
* @param srcNodeId Source node ID.
5759
* @param routineId Routine ID.
58-
* @param hnd Marshalled handler.
59-
* @param nodeFilter Marshalled node filter.
60+
* @param hnd Handler.
61+
* @param nodeFilter Node filter.
6062
* @param bufSize Handler buffer size.
6163
* @param interval Time interval.
6264
* @param autoUnsubscribe Auto unsubscribe flag.
6365
*/
6466
ContinuousRoutineInfo(
6567
UUID srcNodeId,
6668
UUID routineId,
67-
byte[] hnd,
68-
byte[] nodeFilter,
69+
GridContinuousHandler hnd,
70+
IgnitePredicate<ClusterNode> nodeFilter,
6971
int bufSize,
7072
long interval,
7173
boolean autoUnsubscribe

modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.ignite.IgniteCheckedException;
2525
import org.apache.ignite.internal.GridKernalContext;
2626
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
27-
import org.apache.ignite.internal.util.typedef.T2;
2827
import org.jetbrains.annotations.Nullable;
2928

3029
/**
@@ -163,11 +162,11 @@ public default void flushOnNodeLeft() {
163162
* @param cntrs Init state for partition counters.
164163
* @param topVer Topology version.
165164
*/
166-
public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
167-
Map<Integer, T2<Long, Long>> cntrs);
165+
public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
166+
Map<Integer, Long> cntrs);
168167

169168
/**
170169
* @return Init state for partition counters.
171170
*/
172-
public Map<Integer, T2<Long, Long>> updateCounters();
171+
public Map<Integer, Long> updateCounters();
173172
}

0 commit comments

Comments
 (0)