Skip to content

Commit b7b1daf

Browse files
JAkutenshizstan
authored andcommitted
IGNITE-24119 Fix getAll in explicit RO transaction (#6813)
1 parent 933d78f commit b7b1daf

File tree

4 files changed

+84
-19
lines changed

4 files changed

+84
-19
lines changed

modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,8 @@ default CompletableFuture<List<BinaryRow>> getAll(
173173
* @param readTimestamp Read timestamp.
174174
* @param transactionId Transaction ID (might be {@code null}).
175175
* @param coordinatorId Ephemeral ID of the transaction coordinator.
176-
* @param recipientNode Cluster node that will handle given get request.
176+
* @param recipientNode Cluster node that will handle given getAll request. In case if given node is {@code null} then for each
177+
* partition inside of the method recipient node will be calculated separately.
177178
* @return Future that will return rows with all columns filled from the table. The order of collection elements is
178179
* guaranteed to be the same as the order of {@code keyRows}. If a record does not exist, the
179180
* element at the corresponding index of the resulting collection is {@code null}.
@@ -183,7 +184,7 @@ CompletableFuture<List<BinaryRow>> getAll(
183184
HybridTimestamp readTimestamp,
184185
@Nullable UUID transactionId,
185186
@Nullable UUID coordinatorId,
186-
InternalClusterNode recipientNode
187+
@Nullable InternalClusterNode recipientNode
187188
);
188189

189190
/**

modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,10 +1035,7 @@ public CompletableFuture<List<BinaryRow>> getAll(Collection<BinaryRowEx> keyRows
10351035
if (tx != null && tx.isReadOnly()) {
10361036
assert !tx.implicit() : "implicit RO getAll not supported";
10371037

1038-
BinaryRowEx firstRow = keyRows.iterator().next();
1039-
1040-
return evaluateReadOnlyRecipientNode(partitionId(firstRow), tx.readTimestamp())
1041-
.thenCompose(recipientNode -> getAll(keyRows, tx.readTimestamp(), tx.id(), tx.coordinatorId(), recipientNode));
1038+
return getAll(keyRows, tx.readTimestamp(), tx.id(), tx.coordinatorId(), null);
10421039
}
10431040

10441041
return enlistInTx(
@@ -1058,12 +1055,14 @@ public CompletableFuture<List<BinaryRow>> getAll(
10581055
HybridTimestamp readTimestamp,
10591056
@Nullable UUID transactionId,
10601057
@Nullable UUID coordinatorId,
1061-
InternalClusterNode recipientNode
1058+
@Nullable InternalClusterNode recipientNode
10621059
) {
10631060
Int2ObjectMap<RowBatch> rowBatchByPartitionId = toRowBatchByPartitionId(keyRows);
10641061

10651062
for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
1066-
ReplicationGroupId replicationGroupId = targetReplicationGroupId(partitionRowBatch.getIntKey());
1063+
int partitionId = partitionRowBatch.getIntKey();
1064+
1065+
ReplicationGroupId replicationGroupId = targetReplicationGroupId(partitionId);
10671066

10681067
ReadOnlyMultiRowPkReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest()
10691068
.groupId(serializeReplicationGroupId(replicationGroupId))
@@ -1076,7 +1075,10 @@ public CompletableFuture<List<BinaryRow>> getAll(
10761075
.coordinatorId(coordinatorId)
10771076
.build();
10781077

1079-
partitionRowBatch.getValue().resultFuture = replicaSvc.invoke(recipientNode, request);
1078+
partitionRowBatch.getValue().resultFuture = recipientNode != null
1079+
? replicaSvc.invoke(recipientNode, request)
1080+
: evaluateReadOnlyRecipientNode(partitionId, readTimestamp)
1081+
.thenCompose(targetNode -> replicaSvc.invoke(targetNode, request));
10801082
}
10811083

10821084
return collectMultiRowsResponsesWithRestoreOrder(rowBatchByPartitionId.values());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.tx.readonly;
19+
20+
import static java.util.stream.Collectors.toList;
21+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
22+
import static org.junit.jupiter.api.Assertions.assertEquals;
23+
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.stream.IntStream;
27+
import org.apache.ignite.Ignite;
28+
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
29+
import org.apache.ignite.table.KeyValueView;
30+
import org.apache.ignite.tx.Transaction;
31+
import org.apache.ignite.tx.TransactionOptions;
32+
import org.junit.jupiter.api.Test;
33+
34+
class ItMultiGetInExplicitReadOnlyTxTest extends ClusterPerTestIntegrationTest {
35+
private static final String TABLE_NAME = "TEST_TABLE";
36+
37+
private static final int KEY_COUNT = 100;
38+
39+
@Override
40+
protected int initialNodes() {
41+
return 2;
42+
}
43+
44+
@Test
45+
void roTransactionWithGetAllOperation() {
46+
assertEquals(2, cluster.nodes().size());
47+
48+
Ignite coordinator = node(0);
49+
50+
coordinator.sql().executeScript("CREATE ZONE NEW_ZONE (PARTITIONS 2, REPLICAS 1) STORAGE PROFILES ['default']");
51+
52+
coordinator.sql().executeScript("CREATE TABLE " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR) ZONE NEW_ZONE");
53+
54+
KeyValueView<Integer, String> kvView = coordinator.tables().table(TABLE_NAME).keyValueView(Integer.class, String.class);
55+
56+
insertOriginalValues(KEY_COUNT, kvView);
57+
58+
Transaction roTx = coordinator.transactions().begin(new TransactionOptions().readOnly(true));
59+
60+
List<Integer> keys = IntStream.range(0, KEY_COUNT).boxed().collect(toList());
61+
62+
Map<Integer, String> getAllResult = assertDoesNotThrow(() -> kvView.getAll(roTx, keys));
63+
64+
assertEquals(KEY_COUNT, getAllResult.size());
65+
}
66+
67+
private static void insertOriginalValues(int keyCount, KeyValueView<Integer, String> kvView) {
68+
for (int i = 0; i < keyCount; i++) {
69+
kvView.put(null, i, "original-" + i);
70+
}
71+
}
72+
}

modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxAndLowWatermarkTest.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import static org.hamcrest.Matchers.isA;
3333
import static org.junit.jupiter.api.Assertions.assertThrows;
3434
import static org.junit.jupiter.api.Assertions.assertTrue;
35-
import static org.junit.jupiter.api.Assumptions.assumeFalse;
3635

3736
import java.util.HashSet;
3837
import java.util.List;
@@ -103,9 +102,6 @@ void createTable() {
103102
@ParameterizedTest
104103
@EnumSource(TransactionalReader.class)
105104
void roTransactionNoticesTupleVersionsMissingDueToGcOnDataNodes(TransactionalReader reader) throws Exception {
106-
// TODO: remove the assumption when IGNITE-24119 is fixed.
107-
assumeFalse(reader == TransactionalReader.MULTI_GET);
108-
109105
updateDataAvailabilityTimeToShortPeriod();
110106

111107
Ignite coordinator = node(0);
@@ -185,9 +181,6 @@ void lwmIsAllowedToBeRaisedOnDataNodesAfterRoTransactionFinish(
185181
@Enum(TransactionalReader.class) TransactionalReader reader,
186182
@Values(booleans = {true, false}) boolean commit
187183
) throws Exception {
188-
// TODO: remove the assumption when IGNITE-24119 is fixed.
189-
assumeFalse(reader == TransactionalReader.MULTI_GET);
190-
191184
Ignite coordinator = node(0);
192185
KeyValueView<Integer, String> kvView = kvView(coordinator);
193186

@@ -230,9 +223,6 @@ private static void assertLwmGrowsAbove(HybridTimestamp ts, Ignite node) throws
230223
@EnumSource(TransactionalReader.class)
231224
@WithSystemProperty(key = ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value = "100")
232225
void nonFinishedRoTransactionsOfCoordinatorsThatLeftDontHoldLwm(TransactionalReader reader) throws Exception {
233-
// TODO: remove the assumption when IGNITE-24119 is fixed.
234-
assumeFalse(reader == TransactionalReader.MULTI_GET);
235-
236226
Ignite coordinator = node(1);
237227
KeyValueView<Integer, String> kvView = kvView(coordinator);
238228

0 commit comments

Comments
 (0)