Skip to content

Commit 8c189e8

Browse files
authored
Merge branch 'master' into feat/data-loader/export-trn
2 parents 37fa8af + 5a45200 commit 8c189e8

File tree

9 files changed

+197
-45
lines changed

9 files changed

+197
-45
lines changed

core/src/main/java/com/scalar/db/common/StateManagedDistributedTransactionManager.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,12 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
146146

147147
@Override
148148
public void rollback() throws RollbackException {
149-
if (status == Status.COMMITTED || status == Status.ROLLED_BACK) {
149+
if (status == Status.ROLLED_BACK) {
150+
return;
151+
}
152+
if (status == Status.COMMITTED) {
150153
throw new IllegalStateException(
151-
CoreError.TRANSACTION_ALREADY_COMMITTED_OR_ROLLED_BACK.buildMessage(status));
154+
CoreError.TRANSACTION_ALREADY_COMMITTED.buildMessage(status));
152155
}
153156
try {
154157
super.rollback();
@@ -159,9 +162,12 @@ public void rollback() throws RollbackException {
159162

160163
@Override
161164
public void abort() throws AbortException {
162-
if (status == Status.COMMITTED || status == Status.ROLLED_BACK) {
165+
if (status == Status.ROLLED_BACK) {
166+
return;
167+
}
168+
if (status == Status.COMMITTED) {
163169
throw new IllegalStateException(
164-
CoreError.TRANSACTION_ALREADY_COMMITTED_OR_ROLLED_BACK.buildMessage(status));
170+
CoreError.TRANSACTION_ALREADY_COMMITTED.buildMessage(status));
165171
}
166172
try {
167173
super.abort();

core/src/main/java/com/scalar/db/common/StateManagedTwoPhaseCommitTransactionManager.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,12 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
183183

184184
@Override
185185
public void rollback() throws RollbackException {
186-
if (status == Status.COMMITTED || status == Status.ROLLED_BACK) {
186+
if (status == Status.ROLLED_BACK) {
187+
return;
188+
}
189+
if (status == Status.COMMITTED) {
187190
throw new IllegalStateException(
188-
CoreError.TRANSACTION_ALREADY_COMMITTED_OR_ROLLED_BACK.buildMessage(status));
191+
CoreError.TRANSACTION_ALREADY_COMMITTED.buildMessage(status));
189192
}
190193
try {
191194
super.rollback();
@@ -196,9 +199,12 @@ public void rollback() throws RollbackException {
196199

197200
@Override
198201
public void abort() throws AbortException {
199-
if (status == Status.COMMITTED || status == Status.ROLLED_BACK) {
202+
if (status == Status.ROLLED_BACK) {
203+
return;
204+
}
205+
if (status == Status.COMMITTED) {
200206
throw new IllegalStateException(
201-
CoreError.TRANSACTION_ALREADY_COMMITTED_OR_ROLLED_BACK.buildMessage(status));
207+
CoreError.TRANSACTION_ALREADY_COMMITTED.buildMessage(status));
202208
}
203209
try {
204210
super.abort();

core/src/main/java/com/scalar/db/common/error/CoreError.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,10 +214,10 @@ public enum CoreError implements ScalarDbError {
214214
Category.USER_ERROR, "0042", "Invalid ID specified. ID: %d", "", ""),
215215
TRANSACTION_NOT_ACTIVE(
216216
Category.USER_ERROR, "0043", "The transaction is not active. Status: %s", "", ""),
217-
TRANSACTION_ALREADY_COMMITTED_OR_ROLLED_BACK(
217+
TRANSACTION_ALREADY_COMMITTED(
218218
Category.USER_ERROR,
219219
"0044",
220-
"The transaction has already been committed or rolled back. Status: %s",
220+
"The transaction has already been committed. Status: %s",
221221
"",
222222
""),
223223
TRANSACTION_NOT_PREPARED(
@@ -689,13 +689,13 @@ public enum CoreError implements ScalarDbError {
689689
DATA_LOADER_INVALID_BASE64_ENCODING_FOR_COLUMN_VALUE(
690690
Category.USER_ERROR,
691691
"0149",
692-
"Invalid base64 encoding for blob value for column %s in table %s in namespace %s",
692+
"Invalid base64 encoding for blob value '%s' for column %s in table %s in namespace %s",
693693
"",
694694
""),
695695
DATA_LOADER_INVALID_NUMBER_FORMAT_FOR_COLUMN_VALUE(
696696
Category.USER_ERROR,
697697
"0150",
698-
"Invalid number specified for column %s in table %s in namespace %s",
698+
"Invalid number '%s' specified for column %s in table %s in namespace %s",
699699
"",
700700
""),
701701
DATA_LOADER_ERROR_METHOD_NULL_ARGUMENT(
@@ -898,7 +898,7 @@ public enum CoreError implements ScalarDbError {
898898
DATA_LOADER_INVALID_DATE_TIME_FOR_COLUMN_VALUE(
899899
Category.USER_ERROR,
900900
"0199",
901-
"Invalid date time value specified for column %s in table %s in namespace %s.",
901+
"Invalid date time value '%s' specified for column %s in table %s in namespace %s.",
902902
"",
903903
""),
904904
DATA_LOADER_NULL_OR_EMPTY_KEY_VALUE_INPUT(

core/src/test/java/com/scalar/db/common/StateManagedDistributedTransactionManagerTest.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import static org.mockito.ArgumentMatchers.anyString;
88
import static org.mockito.Mockito.doThrow;
99
import static org.mockito.Mockito.mock;
10+
import static org.mockito.Mockito.only;
11+
import static org.mockito.Mockito.verify;
1012
import static org.mockito.Mockito.when;
1113

1214
import com.scalar.db.api.Delete;
@@ -21,6 +23,7 @@
2123
import com.scalar.db.api.SerializableStrategy;
2224
import com.scalar.db.api.Update;
2325
import com.scalar.db.api.Upsert;
26+
import com.scalar.db.exception.transaction.AbortException;
2427
import com.scalar.db.exception.transaction.CommitException;
2528
import com.scalar.db.exception.transaction.RollbackException;
2629
import com.scalar.db.exception.transaction.TransactionException;
@@ -308,12 +311,56 @@ public void rollback_AfterCommit_ShouldThrowIllegalStateException()
308311
}
309312

310313
@Test
311-
public void rollback_Twice_ShouldThrowIllegalStateException() throws RollbackException {
314+
public void rollback_Twice_ShouldNotThrowAnyExceptionAndCallWrappedTransactionRollbackOnlyOnce()
315+
throws RollbackException {
312316
// Arrange
313317
transaction.rollback();
314318

315319
// Act Assert
316-
assertThatThrownBy(() -> transaction.rollback()).isInstanceOf(IllegalStateException.class);
320+
assertThatCode(() -> transaction.rollback()).doesNotThrowAnyException();
321+
322+
verify(wrappedTransaction, only()).rollback();
323+
}
324+
325+
@Test
326+
public void abort_ShouldNotThrowAnyException() {
327+
// Arrange
328+
329+
// Act Assert
330+
assertThatCode(() -> transaction.abort()).doesNotThrowAnyException();
331+
}
332+
333+
@Test
334+
public void abort_AfterCommitFailed_ShouldNotThrowAnyException()
335+
throws CommitException, UnknownTransactionStatusException {
336+
// Arrange
337+
doThrow(CommitException.class).when(wrappedTransaction).commit();
338+
assertThatThrownBy(() -> transaction.commit()).isInstanceOf(CommitException.class);
339+
340+
// Act Assert
341+
assertThatCode(() -> transaction.abort()).doesNotThrowAnyException();
342+
}
343+
344+
@Test
345+
public void abort_AfterCommit_ShouldThrowIllegalStateException()
346+
throws CommitException, UnknownTransactionStatusException {
347+
// Arrange
348+
transaction.commit();
349+
350+
// Act Assert
351+
assertThatThrownBy(() -> transaction.abort()).isInstanceOf(IllegalStateException.class);
352+
}
353+
354+
@Test
355+
public void abort_Twice_ShouldNotThrowAnyExceptionAndCallWrappedTransactionAbortOnlyOnce()
356+
throws AbortException {
357+
// Arrange
358+
transaction.abort();
359+
360+
// Act Assert
361+
assertThatCode(() -> transaction.abort()).doesNotThrowAnyException();
362+
363+
verify(wrappedTransaction, only()).abort();
317364
}
318365
}
319366
}

core/src/test/java/com/scalar/db/common/StateManagedTwoPhaseCommitTransactionManagerTest.java

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import static org.mockito.ArgumentMatchers.anyString;
77
import static org.mockito.Mockito.doThrow;
88
import static org.mockito.Mockito.mock;
9+
import static org.mockito.Mockito.only;
10+
import static org.mockito.Mockito.verify;
911
import static org.mockito.Mockito.when;
1012

1113
import com.scalar.db.api.Delete;
@@ -18,6 +20,7 @@
1820
import com.scalar.db.api.TwoPhaseCommitTransactionManager;
1921
import com.scalar.db.api.Update;
2022
import com.scalar.db.api.Upsert;
23+
import com.scalar.db.exception.transaction.AbortException;
2124
import com.scalar.db.exception.transaction.CommitException;
2225
import com.scalar.db.exception.transaction.PreparationException;
2326
import com.scalar.db.exception.transaction.RollbackException;
@@ -385,12 +388,82 @@ public void rollback_AfterCommit_ShouldThrowIllegalStateException()
385388
}
386389

387390
@Test
388-
public void rollback_Twice_ShouldThrowIllegalStateException() throws RollbackException {
391+
public void rollback_Twice_ShouldNotThrowAnyExceptionAndCallWrappedTransactionRollbackOnlyOnce()
392+
throws RollbackException {
389393
// Arrange
390394
transaction.rollback();
391395

392396
// Act Assert
393-
assertThatThrownBy(() -> transaction.rollback()).isInstanceOf(IllegalStateException.class);
397+
assertThatCode(() -> transaction.rollback()).doesNotThrowAnyException();
398+
399+
verify(wrappedTransaction, only()).rollback();
400+
}
401+
402+
@Test
403+
public void abort_ShouldNotThrowAnyException() {
404+
// Arrange
405+
406+
// Act Assert
407+
assertThatCode(() -> transaction.abort()).doesNotThrowAnyException();
408+
}
409+
410+
@Test
411+
public void abort_AfterCommitFailed_ShouldNotThrowAnyException()
412+
throws PreparationException, CommitException, UnknownTransactionStatusException {
413+
// Arrange
414+
doThrow(CommitException.class).when(wrappedTransaction).commit();
415+
416+
transaction.prepare();
417+
assertThatThrownBy(() -> transaction.commit()).isInstanceOf(CommitException.class);
418+
419+
// Act Assert
420+
assertThatCode(() -> transaction.abort()).doesNotThrowAnyException();
421+
}
422+
423+
@Test
424+
public void abort_AfterPrepareFailed_ShouldNotThrowAnyException() throws PreparationException {
425+
// Arrange
426+
doThrow(PreparationException.class).when(wrappedTransaction).prepare();
427+
assertThatThrownBy(() -> transaction.prepare()).isInstanceOf(PreparationException.class);
428+
429+
// Act Assert
430+
assertThatCode(() -> transaction.abort()).doesNotThrowAnyException();
431+
}
432+
433+
@Test
434+
public void abort_AfterValidateFailed_ShouldNotThrowAnyException()
435+
throws PreparationException, ValidationException {
436+
// Arrange
437+
doThrow(ValidationException.class).when(wrappedTransaction).validate();
438+
439+
transaction.prepare();
440+
assertThatThrownBy(() -> transaction.validate()).isInstanceOf(ValidationException.class);
441+
442+
// Act Assert
443+
assertThatCode(() -> transaction.abort()).doesNotThrowAnyException();
444+
}
445+
446+
@Test
447+
public void abort_AfterCommit_ShouldThrowIllegalStateException()
448+
throws PreparationException, CommitException, UnknownTransactionStatusException {
449+
// Arrange
450+
transaction.prepare();
451+
transaction.commit();
452+
453+
// Act Assert
454+
assertThatThrownBy(() -> transaction.abort()).isInstanceOf(IllegalStateException.class);
455+
}
456+
457+
@Test
458+
public void abort_Twice_ShouldNotThrowAnyExceptionAndCallWrappedTransactionAbortOnlyOnce()
459+
throws AbortException {
460+
// Arrange
461+
transaction.abort();
462+
463+
// Act Assert
464+
assertThatCode(() -> transaction.abort()).doesNotThrowAnyException();
465+
466+
verify(wrappedTransaction, only()).abort();
394467
}
395468
}
396469
}

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -416,19 +416,11 @@ private ImportDataChunkStatus processDataChunkWithTransactions(
416416
ImportTransactionBatchResult importTransactionBatchResult =
417417
processTransactionBatch(dataChunk.getDataChunkId(), transactionBatch);
418418

419-
importTransactionBatchResult
420-
.getRecords()
421-
.forEach(
422-
batchRecords -> {
423-
if (batchRecords.getTargets().stream()
424-
.allMatch(
425-
targetResult ->
426-
targetResult.getStatus().equals(ImportTargetResultStatus.SAVED))) {
427-
successCount.incrementAndGet();
428-
} else {
429-
failureCount.incrementAndGet();
430-
}
431-
});
419+
if (importTransactionBatchResult.isSuccess()) {
420+
successCount.addAndGet(importTransactionBatchResult.getRecords().size());
421+
} else {
422+
failureCount.addAndGet(importTransactionBatchResult.getRecords().size());
423+
}
432424
}
433425
Instant endTime = Instant.now();
434426
int totalDuration = (int) Duration.between(startTime, endTime).toMillis();

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public ImportTaskResult execute() {
7373
return ImportTaskResult.builder()
7474
.rawRecord(params.getSourceRecord())
7575
.rowNumber(params.getRowNumber())
76-
.targets(Collections.singletonList(singleTargetResult))
76+
.targets(new ArrayList<>(Collections.singletonList(singleTargetResult)))
7777
.build();
7878
}
7979

@@ -308,11 +308,14 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
308308
optionalScalarDBResult.orElse(null),
309309
mutableSourceRecord,
310310
importOptions.isIgnoreNullValues(),
311-
tableMetadata);
311+
tableMetadata,
312+
namespace,
313+
table);
312314
} catch (Base64Exception | ColumnParsingException e) {
313315
return ImportTargetResult.builder()
314316
.namespace(namespace)
315317
.tableName(table)
318+
.importedRecord(mutableSourceRecord)
316319
.status(ImportTargetResultStatus.VALIDATION_FAILED)
317320
.errors(Collections.singletonList(e.getMessage()))
318321
.build();

0 commit comments

Comments
 (0)