Skip to content

Commit 1232ff1

Browse files
authored
IGNITE-27319 Rework PartitionListener to be TablePartitionProcessor and not RaftGroupListener (#7218)
1 parent b7ba2f3 commit 1232ff1

File tree

12 files changed

+104
-360
lines changed

12 files changed

+104
-360
lines changed

modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@
110110
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
111111
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
112112
import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
113-
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
113+
import org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
114114
import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
115115
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
116116
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -394,7 +394,7 @@ private RaftTableProcessor createTableProcessor(int tableId) {
394394
ClockService clockService = mock(ClockService.class);
395395
lenient().when(clockService.current()).thenReturn(clock.current());
396396

397-
return new PartitionListener(
397+
return new TablePartitionProcessor(
398398
txManager,
399399
storage,
400400
storageUpdateHandler,

modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
import static org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition;
5858
import static org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignments;
5959
import static org.apache.ignite.internal.raft.PeersAndLearners.fromAssignments;
60+
import static org.apache.ignite.internal.raft.RaftGroupConfiguration.UNKNOWN_INDEX;
61+
import static org.apache.ignite.internal.raft.RaftGroupConfiguration.UNKNOWN_TERM;
6062
import static org.apache.ignite.internal.tostring.IgniteToStringBuilder.COLLECTION_LIMIT;
6163
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
6264
import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
@@ -920,7 +922,7 @@ private CompletableFuture<List<Assignments>> writeZoneAssignmentsToMetastore(
920922

921923
if (haMode) {
922924
ByteArray assignmentsChainKey = assignmentsChainKey(zonePartitionId);
923-
byte[] assignmentChain = AssignmentsChain.of(newAssignments.get(i)).toBytes();
925+
byte[] assignmentChain = AssignmentsChain.of(UNKNOWN_TERM, UNKNOWN_INDEX, newAssignments.get(i)).toBytes();
924926
Operation chainOp = put(assignmentsChainKey, assignmentChain);
925927
partitionAssignments.add(chainOp);
926928
}

modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
8181
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
8282
import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
83-
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
83+
import org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
8484
import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
8585
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
8686
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
@@ -266,9 +266,9 @@ void propagatesRaftMetaToPartitionListeners(
266266

267267
listener.onConfigurationCommitted(raftGroupConfiguration, 2L, 3L);
268268

269-
PartitionListener partitionListener = partitionListener(TABLE_ID);
269+
TablePartitionProcessor tablePartitionProcessor = partitionListener(TABLE_ID);
270270

271-
listener.addTableProcessor(TABLE_ID, partitionListener);
271+
listener.addTableProcessor(TABLE_ID, tablePartitionProcessor);
272272

273273
verify(mvPartitionStorage).lastApplied(2L, 3L);
274274
verify(mvPartitionStorage).committedGroupConfiguration(any());
@@ -410,7 +410,7 @@ void processorsAreNotInitializedWithoutSnapshot(@Mock RaftTableProcessor tablePr
410410
void testSkipWriteCommandByAppliedIndex() {
411411
mvPartitionStorage = spy(new TestMvPartitionStorage(PARTITION_ID));
412412

413-
PartitionListener tableProcessor = partitionListener(TABLE_ID);
413+
TablePartitionProcessor tableProcessor = partitionListener(TABLE_ID);
414414

415415
listener.addTableProcessor(TABLE_ID, tableProcessor);
416416
// Update(All)Command handling requires both information about raft group topology and the primary replica,
@@ -552,14 +552,14 @@ private static CommandClosure<WriteCommand> writeCommandClosure(
552552
return commandClosure;
553553
}
554554

555-
private PartitionListener partitionListener(int tableId) {
555+
private TablePartitionProcessor partitionListener(int tableId) {
556556
LeasePlacementDriver placementDriver = mock(LeasePlacementDriver.class);
557557
lenient().when(placementDriver.getCurrentPrimaryReplica(any(), any())).thenReturn(null);
558558

559559
ClockService clockService = mock(ClockService.class);
560560
lenient().when(clockService.current()).thenReturn(clock.current());
561561

562-
return new PartitionListener(
562+
return new TablePartitionProcessor(
563563
txManager,
564564
new SnapshotAwarePartitionDataStorage(
565565
tableId,

modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import org.apache.ignite.internal.replicator.ReplicationGroupId;
4040
import org.apache.ignite.internal.table.TableViewInternal;
4141
import org.apache.ignite.internal.table.TxInfrastructureTest;
42-
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
42+
import org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
4343
import org.apache.ignite.internal.testframework.IgniteTestUtils;
4444
import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
4545
import org.apache.ignite.internal.testframework.WithSystemProperty;
@@ -138,7 +138,7 @@ public void testImplicitObservableTimePropagation() {
138138
}
139139

140140
var fsm = (JraftServerImpl.DelegatingStateMachine) raftNode.getOptions().getFsm();
141-
PartitionListener listener = extractPartitionListener(fsm, accounts);
141+
TablePartitionProcessor listener = extractPartitionListener(fsm, accounts);
142142
PendingComparableValuesTracker<HybridTimestamp, Void> safeTime = listener.getSafeTimeTracker();
143143

144144
try {
@@ -172,7 +172,7 @@ public void testImplicitObservableTimePropagation() {
172172
assertTrue(commitTs2.compareTo(commitTs) > 0, "Invalid safe time");
173173
}
174174

175-
private static PartitionListener extractPartitionListener(DelegatingStateMachine fsm, TableViewInternal table) {
176-
return (PartitionListener) ((ZonePartitionRaftListener) fsm.getListener()).tableProcessor(table.tableId());
175+
private static TablePartitionProcessor extractPartitionListener(DelegatingStateMachine fsm, TableViewInternal table) {
176+
return (TablePartitionProcessor) ((ZonePartitionRaftListener) fsm.getListener()).tableProcessor(table.tableId());
177177
}
178178
}

modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@
4343
import java.util.stream.IntStream;
4444
import java.util.stream.Stream;
4545
import org.apache.ignite.internal.TestHybridClock;
46-
import org.apache.ignite.internal.catalog.CatalogService;
4746
import org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
4847
import org.apache.ignite.internal.configuration.ComponentWorkingDir;
4948
import org.apache.ignite.internal.configuration.SystemLocalConfiguration;
5049
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
5150
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
51+
import org.apache.ignite.internal.failure.NoOpFailureManager;
5252
import org.apache.ignite.internal.hlc.ClockService;
5353
import org.apache.ignite.internal.hlc.HybridClock;
5454
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -59,7 +59,8 @@
5959
import org.apache.ignite.internal.network.ClusterService;
6060
import org.apache.ignite.internal.network.StaticNodeFinder;
6161
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
62-
import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
62+
import org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener;
63+
import org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
6364
import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
6465
import org.apache.ignite.internal.raft.Loza;
6566
import org.apache.ignite.internal.raft.Peer;
@@ -77,14 +78,11 @@
7778
import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
7879
import org.apache.ignite.internal.replicator.ZonePartitionId;
7980
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
80-
import org.apache.ignite.internal.schema.SchemaRegistry;
81-
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
82-
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
83-
import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
84-
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
8581
import org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
8682
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
8783
import org.apache.ignite.internal.tx.TxManager;
84+
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
85+
import org.apache.ignite.internal.util.PendingIndependentComparableValuesTracker;
8886
import org.apache.ignite.internal.util.SafeTimeValuesTracker;
8987
import org.apache.ignite.network.NetworkAddress;
9088
import org.apache.ignite.raft.jraft.conf.Configuration;
@@ -383,23 +381,23 @@ void start() throws Exception {
383381
ClockService clockService = mock(ClockService.class);
384382
when(clockService.current()).thenReturn(clock.current());
385383

384+
OutgoingSnapshotsManager outgoingSnapshotsManager = new OutgoingSnapshotsManager(
385+
clusterService.nodeName(),
386+
clusterService.messagingService(),
387+
new NoOpFailureManager()
388+
);
389+
386390
this.raftClient = raftManager.startRaftGroupNode(
387391
new RaftNodeId(GROUP_ID, new Peer(nodeName)),
388392
fromConsistentIds(cluster.keySet()),
389-
new PartitionListener(
393+
new ZonePartitionRaftListener(
394+
GROUP_ID,
395+
mock(TxStatePartitionStorage.class),
390396
txManagerMock,
391-
mock(PartitionDataStorage.class),
392-
mock(StorageUpdateHandler.class),
393397
safeTs,
394-
mock(CatalogService.class),
395-
mock(SchemaRegistry.class),
396-
mock(IndexMetaStorage.class),
397-
clusterService.topologyService().localMember().id(),
398-
mock(MinimumRequiredTimeCollectorService.class),
399-
mock(Executor.class),
400-
placementDriver,
401-
clockService,
402-
GROUP_ID
398+
mock(PendingIndependentComparableValuesTracker.class),
399+
outgoingSnapshotsManager,
400+
mock(Executor.class)
403401
) {
404402
@Override
405403
public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {

modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@
166166
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
167167
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
168168
import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
169-
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
169+
import org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
170170
import org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
171171
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionMvStorageAccessImpl;
172172
import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
@@ -933,7 +933,7 @@ private void preparePartitionResourcesAndLoadToZoneReplicaBusy(
933933
raftClient
934934
);
935935

936-
var tablePartitionRaftListener = new PartitionListener(
936+
var tablePartitionRaftListener = new TablePartitionProcessor(
937937
txManager,
938938
partitionDataStorage,
939939
partitionUpdateHandlers.storageUpdateHandler,

modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java renamed to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java

Lines changed: 2 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,14 @@
3030
import static org.apache.ignite.internal.tx.TxState.COMMITTED;
3131
import static org.apache.ignite.internal.tx.TxState.PENDING;
3232

33-
import java.nio.file.Path;
3433
import java.util.HashSet;
35-
import java.util.Iterator;
3634
import java.util.Set;
3735
import java.util.UUID;
3836
import java.util.concurrent.CompletableFuture;
3937
import java.util.concurrent.Executor;
40-
import java.util.function.Consumer;
4138
import org.apache.ignite.internal.catalog.CatalogService;
4239
import org.apache.ignite.internal.hlc.ClockService;
4340
import org.apache.ignite.internal.hlc.HybridTimestamp;
44-
import org.apache.ignite.internal.logger.IgniteLogger;
45-
import org.apache.ignite.internal.logger.Loggers;
4641
import org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
4742
import org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommandV2;
4843
import org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
@@ -55,15 +50,10 @@
5550
import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
5651
import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
5752
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
58-
import org.apache.ignite.internal.raft.Command;
5953
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
60-
import org.apache.ignite.internal.raft.ReadCommand;
6154
import org.apache.ignite.internal.raft.WriteCommand;
62-
import org.apache.ignite.internal.raft.service.CommandClosure;
63-
import org.apache.ignite.internal.raft.service.RaftGroupListener;
6455
import org.apache.ignite.internal.replicator.TablePartitionId;
6556
import org.apache.ignite.internal.replicator.ZonePartitionId;
66-
import org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
6757
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
6858
import org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
6959
import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -85,11 +75,7 @@
8575
/**
8676
* Partition command handler.
8777
*/
88-
// TODO ignite-22522 Rename to TablePartitionProcessor and remove implements RaftGroupListener
89-
public class PartitionListener implements RaftGroupListener, RaftTableProcessor {
90-
/** Logger. */
91-
private static final IgniteLogger LOG = Loggers.forClass(PartitionListener.class);
92-
78+
public class TablePartitionProcessor implements RaftTableProcessor {
9379
/** Transaction manager. */
9480
private final TxManager txManager;
9581

@@ -123,7 +109,7 @@ public class PartitionListener implements RaftGroupListener, RaftTableProcessor
123109
private ReplicaMeta lastKnownLease;
124110

125111
/** Constructor. */
126-
public PartitionListener(
112+
public TablePartitionProcessor(
127113
TxManager txManager,
128114
PartitionDataStorage partitionDataStorage,
129115
StorageUpdateHandler storageUpdateHandler,
@@ -195,66 +181,6 @@ private boolean shouldUpdateStorage(boolean isFull, LeaseInfo storageLeaseInfo)
195181
}
196182
}
197183

198-
@Override
199-
public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
200-
iterator.forEachRemaining((CommandClosure<? extends ReadCommand> clo) -> {
201-
Command command = clo.command();
202-
203-
assert false : "No read commands expected, [cmd=" + command + ']';
204-
});
205-
}
206-
207-
@Override
208-
public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
209-
iterator.forEachRemaining((CommandClosure<? extends WriteCommand> clo) -> {
210-
WriteCommand command = clo.command();
211-
212-
long commandIndex = clo.index();
213-
long commandTerm = clo.term();
214-
@Nullable HybridTimestamp safeTimestamp = clo.safeTimestamp();
215-
assert safeTimestamp == null || command instanceof SafeTimePropagatingCommand : command;
216-
217-
long storagesAppliedIndex = storage.lastAppliedIndex();
218-
219-
assert commandIndex > storagesAppliedIndex :
220-
"Write command must have an index greater than that of storages [commandIndex=" + commandIndex
221-
+ ", mvAppliedIndex=" + storage.lastAppliedIndex() + "]";
222-
223-
CommandResult result;
224-
225-
// NB: Make sure that ANY command we accept here updates lastAppliedIndex+term info in one of the underlying
226-
// storages!
227-
// Otherwise, a gap between lastAppliedIndex from the point of view of JRaft and our storage might appear.
228-
// If a leader has such a gap, and does doSnapshot(), it will subsequently truncate its log too aggressively
229-
// in comparison with 'snapshot' state stored in our storages; and if we install a snapshot from our storages
230-
// to a follower at this point, for a subsequent AppendEntries the leader will not be able to get prevLogTerm
231-
// (because it's already truncated in the leader's log), so it will have to install a snapshot again, and then
232-
// repeat same thing over and over again.
233-
234-
storage.acquirePartitionSnapshotsReadLock();
235-
236-
try {
237-
result = processCommand(command, commandIndex, commandTerm, safeTimestamp);
238-
} catch (Throwable t) {
239-
LOG.error(
240-
"Got error while processing command [commandIndex={}, commandTerm={}, command={}]",
241-
t,
242-
clo.index(), clo.index(), command
243-
);
244-
245-
clo.result(t);
246-
247-
throw t;
248-
} finally {
249-
storage.releasePartitionSnapshotsReadLock();
250-
}
251-
252-
// Completing the closure out of the partition snapshots lock to reduce possibility of deadlocks as it might
253-
// trigger other actions taking same locks.
254-
clo.result(result.result());
255-
});
256-
}
257-
258184
@Override
259185
public CommandResult processCommand(
260186
WriteCommand command,
@@ -567,17 +493,6 @@ private void setCurrentGroupTopology(RaftGroupConfiguration config) {
567493
currentGroupTopology.addAll(config.learners());
568494
}
569495

570-
@Override
571-
public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
572-
throw new UnsupportedOperationException("!!! It's not expected that PartitionListener onSnapshotSave will be called.");
573-
574-
}
575-
576-
@Override
577-
public boolean onSnapshotLoad(Path path) {
578-
return true;
579-
}
580-
581496
@Override
582497
public void onShutdown() {
583498
storage.close();

modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
/**
1919
* This package contains RAFT command handlers that is used by
20-
* {@link org.apache.ignite.internal.table.distributed.raft.PartitionListener} aka table raft processor.
20+
* {@link org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor} aka table raft processor.
2121
*/
2222

2323
package org.apache.ignite.internal.table.distributed.raft.handlers;

0 commit comments

Comments
 (0)