Skip to content

Commit 37c7b3b

Browse files
authored
Merge branch 'master' into fix/data-loader/logger-rename
2 parents bcc3547 + dad6075 commit 37c7b3b

File tree

9 files changed

+406
-9
lines changed

9 files changed

+406
-9
lines changed

core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.List;
2929
import java.util.Optional;
3030
import java.util.stream.Collectors;
31+
import javax.annotation.Nullable;
3132
import javax.annotation.concurrent.NotThreadSafe;
3233
import org.slf4j.Logger;
3334
import org.slf4j.LoggerFactory;
@@ -76,17 +77,27 @@ public CrudHandler(
7677
public Optional<Result> get(Get originalGet) throws CrudException {
7778
List<String> originalProjections = new ArrayList<>(originalGet.getProjections());
7879
Get get = (Get) prepareStorageSelection(originalGet);
79-
Snapshot.Key key = new Snapshot.Key(get);
80-
readUnread(key, get);
8180

8281
TableMetadata metadata = getTableMetadata(get);
82+
83+
Snapshot.Key key;
84+
if (ScalarDbUtils.isSecondaryIndexSpecified(get, metadata)) {
85+
// In case of a Get with index, we don't know the key until we read the record
86+
key = null;
87+
} else {
88+
key = new Snapshot.Key(get);
89+
}
90+
91+
readUnread(key, get);
92+
8393
return snapshot
8494
.getResult(key, get)
8595
.map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled));
8696
}
8797

98+
// Only for a Get with index, the argument `key` is null
8899
@VisibleForTesting
89-
void readUnread(Snapshot.Key key, Get get) throws CrudException {
100+
void readUnread(@Nullable Snapshot.Key key, Get get) throws CrudException {
90101
if (!snapshot.containsKeyInGetSet(get)) {
91102
read(key, get);
92103
}
@@ -95,7 +106,7 @@ void readUnread(Snapshot.Key key, Get get) throws CrudException {
95106
// Although this class is not thread-safe, this method is actually thread-safe, so we call it
96107
// concurrently in the implicit pre-read
97108
@VisibleForTesting
98-
void read(Snapshot.Key key, Get get) throws CrudException {
109+
void read(@Nullable Snapshot.Key key, Get get) throws CrudException {
99110
Optional<TransactionResult> result = getFromStorage(get);
100111
if (!result.isPresent() || result.get().isCommitted()) {
101112
if (result.isPresent() || get.getConjunctions().isEmpty()) {
@@ -104,7 +115,18 @@ void read(Snapshot.Key key, Get get) throws CrudException {
104115
// transaction read it first. However, we update it only if a get operation has no
105116
// conjunction or the result exists. This is because we don’t know whether the record
106117
// actually exists or not due to the conjunction.
107-
snapshot.putIntoReadSet(key, result);
118+
if (key != null) {
119+
snapshot.putIntoReadSet(key, result);
120+
} else {
121+
// Only for a Get with index, the argument `key` is null
122+
123+
if (result.isPresent()) {
124+
// Only when we can get the record with the Get with index, we can put it into the read
125+
// set
126+
key = new Snapshot.Key(get, result.get());
127+
snapshot.putIntoReadSet(key, result);
128+
}
129+
}
108130
}
109131
snapshot.putIntoGetSet(get, result); // for re-read and validation
110132
return;

core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,13 @@ public Key(Get get) {
720720
this((Operation) get);
721721
}
722722

723+
public Key(Get get, Result result) {
724+
this.namespace = get.forNamespace().get();
725+
this.table = get.forTable().get();
726+
this.partitionKey = result.getPartitionKey().get();
727+
this.clusteringKey = result.getClusteringKey();
728+
}
729+
723730
public Key(Put put) {
724731
this((Operation) put);
725732
}

core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public class CrudHandlerTest {
5656
private static final String ANY_ID_2 = "id2";
5757
private static final String ANY_NAME_1 = "name1";
5858
private static final String ANY_NAME_2 = "name2";
59+
private static final String ANY_NAME_3 = "name3";
5960
private static final String ANY_TEXT_1 = "text1";
6061
private static final String ANY_TEXT_2 = "text2";
6162
private static final String ANY_TEXT_3 = "text3";
@@ -66,8 +67,10 @@ public class CrudHandlerTest {
6667
TableMetadata.newBuilder()
6768
.addColumn(ANY_NAME_1, DataType.TEXT)
6869
.addColumn(ANY_NAME_2, DataType.TEXT)
70+
.addColumn(ANY_NAME_3, DataType.TEXT)
6971
.addPartitionKey(ANY_NAME_1)
7072
.addClusteringKey(ANY_NAME_2)
73+
.addSecondaryIndex(ANY_NAME_3)
7174
.build());
7275
private static final TransactionTableMetadata TRANSACTION_TABLE_METADATA =
7376
new TransactionTableMetadata(TABLE_METADATA);
@@ -928,6 +931,7 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()
928931

929932
// Assert
930933
verify(storage, never()).get(any());
934+
verify(snapshot, never()).putIntoReadSet(any(Snapshot.Key.class), any(Optional.class));
931935
verify(snapshot, never()).putIntoGetSet(any(Get.class), any(Optional.class));
932936
}
933937

@@ -1014,6 +1018,7 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()
10141018
// Assert
10151019
verify(storage).get(any());
10161020
verify(snapshot).putIntoReadSet(key, Optional.of(new TransactionResult(result)));
1021+
verify(snapshot).putIntoGetSet(getForKey, Optional.of(new TransactionResult(result)));
10171022
}
10181023

10191024
@Test
@@ -1050,6 +1055,88 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()
10501055
});
10511056
}
10521057

1058+
@Test
1059+
public void
1060+
readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_EmptyResultReturnedByStorage_ShouldCallAppropriateMethods()
1061+
throws CrudException, ExecutionException {
1062+
// Arrange
1063+
Get getWithIndex =
1064+
Get.newBuilder()
1065+
.namespace(ANY_NAMESPACE_NAME)
1066+
.table(ANY_TABLE_NAME)
1067+
.indexKey(Key.ofText(ANY_NAME_3, ANY_TEXT_1))
1068+
.build();
1069+
when(snapshot.containsKeyInGetSet(getWithIndex)).thenReturn(false);
1070+
when(storage.get(any())).thenReturn(Optional.empty());
1071+
1072+
// Act
1073+
handler.readUnread(null, getWithIndex);
1074+
1075+
// Assert
1076+
verify(storage).get(any());
1077+
verify(snapshot, never()).putIntoReadSet(any(), any());
1078+
verify(snapshot).putIntoGetSet(getWithIndex, Optional.empty());
1079+
}
1080+
1081+
@Test
1082+
public void
1083+
readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_CommittedRecordReturnedByStorage_ShouldCallAppropriateMethods()
1084+
throws CrudException, ExecutionException {
1085+
// Arrange
1086+
Result result = mock(Result.class);
1087+
when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get());
1088+
when(result.getPartitionKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_1, ANY_TEXT_1)));
1089+
when(result.getClusteringKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_2, ANY_TEXT_2)));
1090+
when(storage.get(any())).thenReturn(Optional.of(result));
1091+
1092+
Get getWithIndex =
1093+
Get.newBuilder()
1094+
.namespace(ANY_NAMESPACE_NAME)
1095+
.table(ANY_TABLE_NAME)
1096+
.indexKey(Key.ofText(ANY_NAME_3, ANY_TEXT_1))
1097+
.build();
1098+
when(snapshot.containsKeyInGetSet(getWithIndex)).thenReturn(false);
1099+
1100+
// Act
1101+
handler.readUnread(null, getWithIndex);
1102+
1103+
// Assert
1104+
verify(storage).get(any());
1105+
verify(snapshot)
1106+
.putIntoReadSet(
1107+
new Snapshot.Key(getWithIndex, result), Optional.of(new TransactionResult(result)));
1108+
verify(snapshot).putIntoGetSet(getWithIndex, Optional.of(new TransactionResult(result)));
1109+
}
1110+
1111+
@Test
1112+
public void
1113+
readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_UncommittedRecordReturnedByStorage_ShouldThrowUncommittedRecordException()
1114+
throws ExecutionException {
1115+
// Arrange
1116+
Result result = mock(Result.class);
1117+
when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.PREPARED.get());
1118+
when(storage.get(any())).thenReturn(Optional.of(result));
1119+
1120+
Get getWithIndex =
1121+
Get.newBuilder()
1122+
.namespace(ANY_NAMESPACE_NAME)
1123+
.table(ANY_TABLE_NAME)
1124+
.indexKey(Key.ofText(ANY_NAME_3, ANY_TEXT_1))
1125+
.build();
1126+
when(snapshot.containsKeyInGetSet(getWithIndex)).thenReturn(false);
1127+
1128+
// Act Assert
1129+
assertThatThrownBy(() -> handler.readUnread(null, getWithIndex))
1130+
.isInstanceOf(UncommittedRecordException.class)
1131+
.satisfies(
1132+
e -> {
1133+
UncommittedRecordException exception = (UncommittedRecordException) e;
1134+
assertThat(exception.getSelection()).isEqualTo(getWithIndex);
1135+
assertThat(exception.getResults().size()).isEqualTo(1);
1136+
assertThat(exception.getResults().get(0)).isEqualTo(result);
1137+
});
1138+
}
1139+
10531140
@Test
10541141
public void readIfImplicitPreReadEnabled_ShouldCallAppropriateMethods() throws CrudException {
10551142
// Arrange

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,45 @@ public void onTaskComplete(ImportTaskResult taskResult) {
153153
/** {@inheritDoc} Forwards the event to all registered listeners. */
154154
@Override
155155
public void onAllDataChunksCompleted() {
156+
Throwable firstException = null;
157+
156158
for (ImportEventListener listener : listeners) {
157-
listener.onAllDataChunksCompleted();
159+
try {
160+
listener.onAllDataChunksCompleted();
161+
} catch (Throwable e) {
162+
if (firstException == null) {
163+
firstException = e;
164+
} else {
165+
firstException.addSuppressed(e);
166+
}
167+
}
168+
}
169+
170+
try {
171+
closeResources();
172+
} catch (Throwable e) {
173+
if (firstException != null) {
174+
firstException.addSuppressed(e);
175+
} else {
176+
firstException = e;
177+
}
178+
}
179+
180+
if (firstException != null) {
181+
throw new RuntimeException("Error during completion", firstException);
182+
}
183+
}
184+
185+
/** Close resources properly once the process is completed */
186+
public void closeResources() {
187+
try {
188+
if (distributedStorage != null) {
189+
distributedStorage.close();
190+
} else if (distributedTransactionManager != null) {
191+
distributedTransactionManager.close();
192+
}
193+
} catch (Throwable e) {
194+
throw new RuntimeException("Failed to close the resource", e);
158195
}
159196
}
160197

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
*/
4040
public class CsvImportProcessor extends ImportProcessor {
4141
private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper();
42-
private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0);
42+
private final AtomicInteger dataChunkIdCounter = new AtomicInteger(0);
4343

4444
/**
4545
* Creates a new CsvImportProcessor with the specified parameters.

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
public class JsonImportProcessor extends ImportProcessor {
4444

4545
private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper();
46-
private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0);
46+
private final AtomicInteger dataChunkIdCounter = new AtomicInteger(0);
4747

4848
public JsonImportProcessor(ImportProcessorParams params) {
4949
super(params);

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
public class JsonLinesImportProcessor extends ImportProcessor {
3838

3939
private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper();
40-
private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0);
40+
private final AtomicInteger dataChunkIdCounter = new AtomicInteger(0);
4141

4242
/**
4343
* Creates a new JsonLinesImportProcessor with the specified parameters.
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package com.scalar.db.dataloader.core.dataimport;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertThrows;
5+
import static org.junit.jupiter.api.Assertions.assertTrue;
6+
import static org.mockito.Mockito.doThrow;
7+
import static org.mockito.Mockito.mock;
8+
import static org.mockito.Mockito.verify;
9+
10+
import com.scalar.db.api.DistributedStorage;
11+
import com.scalar.db.api.DistributedTransactionManager;
12+
import com.scalar.db.api.TableMetadata;
13+
import com.scalar.db.dataloader.core.ScalarDbMode;
14+
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory;
15+
import java.io.BufferedReader;
16+
import java.util.HashMap;
17+
import java.util.Map;
18+
import org.junit.jupiter.api.BeforeEach;
19+
import org.junit.jupiter.api.Test;
20+
21+
public class ImportManagerTest {
22+
23+
private ImportManager importManager;
24+
private ImportEventListener listener1;
25+
private ImportEventListener listener2;
26+
private DistributedStorage distributedStorage;
27+
private DistributedTransactionManager distributedTransactionManager;
28+
29+
@BeforeEach
30+
void setUp() {
31+
Map<String, TableMetadata> tableMetadata = new HashMap<>();
32+
BufferedReader reader = mock(BufferedReader.class);
33+
ImportOptions options = mock(ImportOptions.class);
34+
ImportProcessorFactory processorFactory = mock(ImportProcessorFactory.class);
35+
36+
listener1 = mock(ImportEventListener.class);
37+
listener2 = mock(ImportEventListener.class);
38+
distributedStorage = mock(DistributedStorage.class);
39+
distributedTransactionManager = mock(DistributedTransactionManager.class);
40+
41+
importManager =
42+
new ImportManager(
43+
tableMetadata,
44+
reader,
45+
options,
46+
processorFactory,
47+
ScalarDbMode.STORAGE,
48+
distributedStorage,
49+
null); // Only one resource present
50+
importManager.addListener(listener1);
51+
importManager.addListener(listener2);
52+
}
53+
54+
@Test
55+
void onAllDataChunksCompleted_shouldNotifyListenersAndCloseStorage() {
56+
importManager.onAllDataChunksCompleted();
57+
58+
verify(listener1).onAllDataChunksCompleted();
59+
verify(listener2).onAllDataChunksCompleted();
60+
verify(distributedStorage).close();
61+
}
62+
63+
@Test
64+
void onAllDataChunksCompleted_shouldAggregateListenerExceptionAndStillCloseResources() {
65+
doThrow(new RuntimeException("Listener1 failed")).when(listener1).onAllDataChunksCompleted();
66+
67+
RuntimeException thrown =
68+
assertThrows(RuntimeException.class, () -> importManager.onAllDataChunksCompleted());
69+
70+
assertTrue(thrown.getMessage().contains("Error during completion"));
71+
assertEquals("Listener1 failed", thrown.getCause().getMessage());
72+
verify(distributedStorage).close();
73+
}
74+
75+
@Test
76+
void closeResources_shouldCloseTransactionManagerIfStorageIsNull() {
77+
ImportManager managerWithTx =
78+
new ImportManager(
79+
new HashMap<>(),
80+
mock(BufferedReader.class),
81+
mock(ImportOptions.class),
82+
mock(ImportProcessorFactory.class),
83+
ScalarDbMode.TRANSACTION,
84+
null,
85+
distributedTransactionManager);
86+
87+
managerWithTx.closeResources();
88+
verify(distributedTransactionManager).close();
89+
}
90+
91+
@Test
92+
void closeResources_shouldThrowIfResourceCloseFails() {
93+
doThrow(new RuntimeException("Close failed")).when(distributedStorage).close();
94+
95+
RuntimeException ex =
96+
assertThrows(RuntimeException.class, () -> importManager.closeResources());
97+
98+
assertEquals("Failed to close the resource", ex.getMessage());
99+
assertEquals("Close failed", ex.getCause().getMessage());
100+
}
101+
}

0 commit comments

Comments
 (0)