diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java index f1ba3c0b03..b437be4a88 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java @@ -74,7 +74,6 @@ public SingleFileImportLogger(ImportLoggerConfig config, LogWriterFactory logWri */ @Override public void onTaskComplete(ImportTaskResult taskResult) { - if (!config.isLogSuccessRecordsEnabled() && !config.isLogRawSourceRecordsEnabled()) return; try { writeImportTaskResultDetailToLogs(taskResult); } catch (Exception e) { @@ -199,9 +198,7 @@ private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult && target.getStatus().equals(ImportTargetResultStatus.SAVED)) { writeToLogWriter(successLogWriter, OBJECT_MAPPER.valueToTree(target)); - } - if (config.isLogRawSourceRecordsEnabled() - && !target.getStatus().equals(ImportTargetResultStatus.SAVED)) { + } else if (!target.getStatus().equals(ImportTargetResultStatus.SAVED)) { writeToLogWriter(failureLogWriter, OBJECT_MAPPER.valueToTree(target)); } } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java index 3970ca4017..cfa48327a8 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java @@ -73,7 +73,6 @@ public SplitByDataChunkImportLogger( */ @Override public void onTaskComplete(ImportTaskResult taskResult) { - if (!config.isLogSuccessRecordsEnabled() && !config.isLogRawSourceRecordsEnabled()) return; try { writeImportTaskResultDetailToLogs(taskResult); } catch (IOException e) { @@ -94,8 +93,7 @@ private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult ImportTargetResultStatus status = target.getStatus(); if (status.equals(ImportTargetResultStatus.SAVED) && config.isLogSuccessRecordsEnabled()) { writeLog(target, LogFileType.SUCCESS, importTaskResult.getDataChunkId()); - } else if (!status.equals(ImportTargetResultStatus.SAVED) - && config.isLogRawSourceRecordsEnabled()) { + } else if (!status.equals(ImportTargetResultStatus.SAVED)) { writeLog(target, LogFileType.FAILURE, importTaskResult.getDataChunkId()); } } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java index 128854e18b..78abd6a52a 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; /** @@ -183,6 +184,7 @@ private ImportTargetResult importIntoSingleTable( .tableName(table) .status(ImportTargetResultStatus.VALIDATION_FAILED) .errors(Collections.singletonList(DataLoaderError.TABLE_METADATA_MISSING.buildMessage())) + .importedRecord(getRecordForLogging(mutableSourceRecord)) .build(); } @@ -209,6 +211,7 @@ private ImportTargetResult importIntoSingleTable( .tableName(table) .status(ImportTargetResultStatus.VALIDATION_FAILED) .errors(validationResult.getErrorMessages()) + .importedRecord(getRecordForLogging(mutableSourceRecord)) .build(); } @@ -223,6 +226,7 @@ private ImportTargetResult importIntoSingleTable( .errors( Collections.singletonList( DataLoaderError.COULD_NOT_FIND_PARTITION_KEY.buildMessage())) + .importedRecord(getRecordForLogging(mutableSourceRecord)) .build(); } Optional optionalClusteringKey = Optional.empty(); @@ -238,6 +242,7 @@ private ImportTargetResult importIntoSingleTable( .errors( Collections.singletonList( DataLoaderError.COULD_NOT_FIND_CLUSTERING_KEY.buildMessage())) + .importedRecord(getRecordForLogging(mutableSourceRecord)) .build(); } } @@ -254,6 +259,7 @@ private ImportTargetResult importIntoSingleTable( .tableName(table) .status(ImportTargetResultStatus.RETRIEVAL_FAILED) .errors(Collections.singletonList(e.getMessage())) + .importedRecord(getRecordForLogging(mutableSourceRecord)) .build(); } ImportTaskAction importAction = @@ -273,6 +279,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) { .errors( Collections.singletonList( DataLoaderError.UPSERT_INSERT_MISSING_COLUMNS.buildMessage())) + .importedRecord(getRecordForLogging(mutableSourceRecord)) .build(); } } @@ -281,7 +288,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) { return ImportTargetResult.builder() .namespace(namespace) .tableName(table) - .importedRecord(mutableSourceRecord) + .importedRecord(getRecordForLogging(mutableSourceRecord)) .importAction(importAction) .status(ImportTargetResultStatus.DATA_ALREADY_EXISTS) .errors(Collections.singletonList(DataLoaderError.DATA_ALREADY_EXISTS.buildMessage())) @@ -292,7 +299,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) { return ImportTargetResult.builder() .namespace(namespace) .tableName(table) - .importedRecord(mutableSourceRecord) + .importedRecord(getRecordForLogging(mutableSourceRecord)) .importAction(importAction) .status(ImportTargetResultStatus.DATA_NOT_FOUND) .errors(Collections.singletonList(DataLoaderError.DATA_NOT_FOUND.buildMessage())) @@ -314,7 +321,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) { return ImportTargetResult.builder() .namespace(namespace) .tableName(table) - .importedRecord(mutableSourceRecord) + .importedRecord(getRecordForLogging(mutableSourceRecord)) .status(ImportTargetResultStatus.VALIDATION_FAILED) .errors(Collections.singletonList(e.getMessage())) .build(); @@ -333,13 +340,14 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) { .namespace(namespace) .tableName(table) .importAction(importAction) - .importedRecord(mutableSourceRecord) + .importedRecord(getRecordForLogging(mutableSourceRecord)) .status(ImportTargetResultStatus.SAVED) .build(); } catch (ScalarDbDaoException e) { return ImportTargetResult.builder() .namespace(namespace) + .importedRecord(getRecordForLogging(mutableSourceRecord)) .tableName(table) .importAction(importAction) .status(ImportTargetResultStatus.SAVE_FAILED) @@ -423,6 +431,21 @@ private boolean shouldFailForExistingData( && importOptions.getImportMode() == ImportMode.INSERT; } + /** + * Returns the given source record only if raw record logging is enabled in the import options. + * + *

This helper method centralizes the logic for conditionally including the raw source record + * in {@code ImportTargetResult}. If {@code logRawRecord} is disabled, {@code null} is returned to + * avoid storing or displaying the raw record. + * + * @param record the source record to include conditionally + * @return the provided record if raw record logging is enabled; otherwise {@code null} + */ + @Nullable + private ObjectNode getRecordForLogging(ObjectNode record) { + return params.getImportOptions().isLogRawRecord() ? record : null; + } + /** * Determines whether the operation should fail if the expected data is missing. * diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java index 769c55d2a2..603e7b0f10 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java @@ -2,6 +2,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.core.type.TypeReference; @@ -160,6 +161,60 @@ private void assertTransactionBatchResults( } } + @Test + void onTransactionBatchCompleted_HasErrors_ShouldWriteToFailureLogFileWithoutRawRecord() + throws IOException { + testTransactionBatchCompletedForRawRecords(false); + } + + @Test + void onTransactionBatchCompleted_HasErrors_ShouldWriteToFailureLogFileWithRawRecord() + throws IOException { + testTransactionBatchCompletedForRawRecords(true); + } + + private void testTransactionBatchCompletedForRawRecords(boolean logRawRecords) + throws IOException { + // Arrange + ImportLoggerConfig config = + ImportLoggerConfig.builder() + .logDirectoryPath(tempDir.toString() + "/") + .isLogRawSourceRecordsEnabled(logRawRecords) + .isLogSuccessRecordsEnabled(false) + .build(); + SingleFileImportLogger importLogger = new SingleFileImportLogger(config, logWriterFactory); + + List batchResults = createBatchResults(1, false); + + // Act + for (ImportTransactionBatchResult batchResult : batchResults) { + importLogger.onTransactionBatchCompleted(batchResult); + importLogger.onDataChunkCompleted( + ImportDataChunkStatus.builder().dataChunkId(batchResult.getDataChunkId()).build()); + } + importLogger.onAllDataChunksCompleted(); + + // Assert + assertTransactionBatchResultsForRawRecords(logRawRecords); + } + + private void assertTransactionBatchResultsForRawRecords(boolean logRawRecord) throws IOException { + DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); + + Path logFileName = tempDir.resolve(SingleFileImportLogger.FAILURE_LOG_FILE_NAME); + assertTrue(Files.exists(logFileName), "Log file should exist"); + String logContent = new String(Files.readAllBytes(logFileName), StandardCharsets.UTF_8); + List logEntries = + objectMapper.readValue( + logContent, new TypeReference>() {}); + ImportTaskResult importTaskResult = logEntries.get(0).getRecords().get(0); + if (logRawRecord) { + assertEquals(OBJECT_MAPPER.createObjectNode(), importTaskResult.getRawRecord()); + } else { + assertNull(importTaskResult.getRawRecord()); + } + } + private void assertTransactionBatchResult( ImportTransactionBatchResult expected, ImportTransactionBatchResult actual) { assertEquals(expected.getDataChunkId(), actual.getDataChunkId(), "Data chunk ID should match");