Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public SingleFileImportLogger(ImportLoggerConfig config, LogWriterFactory logWri
*/
@Override
public void onTaskComplete(ImportTaskResult taskResult) {
if (!config.isLogSuccessRecordsEnabled() && !config.isLogRawSourceRecordsEnabled()) return;
try {
writeImportTaskResultDetailToLogs(taskResult);
} catch (Exception e) {
Expand Down Expand Up @@ -199,9 +198,7 @@ private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult
&& target.getStatus().equals(ImportTargetResultStatus.SAVED)) {

writeToLogWriter(successLogWriter, OBJECT_MAPPER.valueToTree(target));
}
if (config.isLogRawSourceRecordsEnabled()
&& !target.getStatus().equals(ImportTargetResultStatus.SAVED)) {
} else if (!target.getStatus().equals(ImportTargetResultStatus.SAVED)) {
writeToLogWriter(failureLogWriter, OBJECT_MAPPER.valueToTree(target));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public SplitByDataChunkImportLogger(
*/
@Override
public void onTaskComplete(ImportTaskResult taskResult) {
if (!config.isLogSuccessRecordsEnabled() && !config.isLogRawSourceRecordsEnabled()) return;
try {
writeImportTaskResultDetailToLogs(taskResult);
} catch (IOException e) {
Expand All @@ -94,8 +93,7 @@ private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult
ImportTargetResultStatus status = target.getStatus();
if (status.equals(ImportTargetResultStatus.SAVED) && config.isLogSuccessRecordsEnabled()) {
writeLog(target, LogFileType.SUCCESS, importTaskResult.getDataChunkId());
} else if (!status.equals(ImportTargetResultStatus.SAVED)
&& config.isLogRawSourceRecordsEnabled()) {
} else if (!status.equals(ImportTargetResultStatus.SAVED)) {
writeLog(target, LogFileType.FAILURE, importTaskResult.getDataChunkId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;

/**
Expand Down Expand Up @@ -183,6 +184,7 @@ private ImportTargetResult importIntoSingleTable(
.tableName(table)
.status(ImportTargetResultStatus.VALIDATION_FAILED)
.errors(Collections.singletonList(DataLoaderError.TABLE_METADATA_MISSING.buildMessage()))
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}

Expand All @@ -209,6 +211,7 @@ private ImportTargetResult importIntoSingleTable(
.tableName(table)
.status(ImportTargetResultStatus.VALIDATION_FAILED)
.errors(validationResult.getErrorMessages())
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}

Expand All @@ -223,6 +226,7 @@ private ImportTargetResult importIntoSingleTable(
.errors(
Collections.singletonList(
DataLoaderError.COULD_NOT_FIND_PARTITION_KEY.buildMessage()))
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}
Optional<Key> optionalClusteringKey = Optional.empty();
Expand All @@ -238,6 +242,7 @@ private ImportTargetResult importIntoSingleTable(
.errors(
Collections.singletonList(
DataLoaderError.COULD_NOT_FIND_CLUSTERING_KEY.buildMessage()))
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}
}
Expand All @@ -254,6 +259,7 @@ private ImportTargetResult importIntoSingleTable(
.tableName(table)
.status(ImportTargetResultStatus.RETRIEVAL_FAILED)
.errors(Collections.singletonList(e.getMessage()))
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}
ImportTaskAction importAction =
Expand All @@ -273,6 +279,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
.errors(
Collections.singletonList(
DataLoaderError.UPSERT_INSERT_MISSING_COLUMNS.buildMessage()))
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}
}
Expand All @@ -281,7 +288,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
return ImportTargetResult.builder()
.namespace(namespace)
.tableName(table)
.importedRecord(mutableSourceRecord)
.importedRecord(getRecordForLogging(mutableSourceRecord))
.importAction(importAction)
.status(ImportTargetResultStatus.DATA_ALREADY_EXISTS)
.errors(Collections.singletonList(DataLoaderError.DATA_ALREADY_EXISTS.buildMessage()))
Expand All @@ -292,7 +299,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
return ImportTargetResult.builder()
.namespace(namespace)
.tableName(table)
.importedRecord(mutableSourceRecord)
.importedRecord(getRecordForLogging(mutableSourceRecord))
.importAction(importAction)
.status(ImportTargetResultStatus.DATA_NOT_FOUND)
.errors(Collections.singletonList(DataLoaderError.DATA_NOT_FOUND.buildMessage()))
Expand All @@ -314,7 +321,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
return ImportTargetResult.builder()
.namespace(namespace)
.tableName(table)
.importedRecord(mutableSourceRecord)
.importedRecord(getRecordForLogging(mutableSourceRecord))
.status(ImportTargetResultStatus.VALIDATION_FAILED)
.errors(Collections.singletonList(e.getMessage()))
.build();
Expand All @@ -333,13 +340,14 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
.namespace(namespace)
.tableName(table)
.importAction(importAction)
.importedRecord(mutableSourceRecord)
.importedRecord(getRecordForLogging(mutableSourceRecord))
.status(ImportTargetResultStatus.SAVED)
.build();

} catch (ScalarDbDaoException e) {
return ImportTargetResult.builder()
.namespace(namespace)
.importedRecord(getRecordForLogging(mutableSourceRecord))
.tableName(table)
.importAction(importAction)
.status(ImportTargetResultStatus.SAVE_FAILED)
Expand Down Expand Up @@ -423,6 +431,21 @@ private boolean shouldFailForExistingData(
&& importOptions.getImportMode() == ImportMode.INSERT;
}

/**
* Returns the given source record only if raw record logging is enabled in the import options.
*
* <p>This helper method centralizes the logic for conditionally including the raw source record
* in {@code ImportTargetResult}. If {@code logRawRecord} is disabled, {@code null} is returned to
* avoid storing or displaying the raw record.
*
* @param record the source record to include conditionally
* @return the provided record if raw record logging is enabled; otherwise {@code null}
*/
@Nullable
private ObjectNode getRecordForLogging(ObjectNode record) {
return params.getImportOptions().isLogRawRecord() ? record : null;
}

/**
* Determines whether the operation should fail if the expected data is missing.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -160,6 +161,60 @@ private void assertTransactionBatchResults(
}
}

@Test
void onTransactionBatchCompleted_HasErrors_ShouldWriteToFailureLogFileWithoutRawRecord()
throws IOException {
testTransactionBatchCompletedForRawRecords(false);
}

@Test
void onTransactionBatchCompleted_HasErrors_ShouldWriteToFailureLogFileWithRawRecord()
throws IOException {
testTransactionBatchCompletedForRawRecords(true);
}

private void testTransactionBatchCompletedForRawRecords(boolean logRawRecords)
throws IOException {
// Arrange
ImportLoggerConfig config =
ImportLoggerConfig.builder()
.logDirectoryPath(tempDir.toString() + "/")
.isLogRawSourceRecordsEnabled(logRawRecords)
.isLogSuccessRecordsEnabled(false)
.build();
SingleFileImportLogger importLogger = new SingleFileImportLogger(config, logWriterFactory);

List<ImportTransactionBatchResult> batchResults = createBatchResults(1, false);

// Act
for (ImportTransactionBatchResult batchResult : batchResults) {
importLogger.onTransactionBatchCompleted(batchResult);
importLogger.onDataChunkCompleted(
ImportDataChunkStatus.builder().dataChunkId(batchResult.getDataChunkId()).build());
}
importLogger.onAllDataChunksCompleted();

// Assert
assertTransactionBatchResultsForRawRecords(logRawRecords);
}

private void assertTransactionBatchResultsForRawRecords(boolean logRawRecord) throws IOException {
DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper();

Path logFileName = tempDir.resolve(SingleFileImportLogger.FAILURE_LOG_FILE_NAME);
assertTrue(Files.exists(logFileName), "Log file should exist");
String logContent = new String(Files.readAllBytes(logFileName), StandardCharsets.UTF_8);
List<ImportTransactionBatchResult> logEntries =
objectMapper.readValue(
logContent, new TypeReference<List<ImportTransactionBatchResult>>() {});
ImportTaskResult importTaskResult = logEntries.get(0).getRecords().get(0);
if (logRawRecord) {
assertEquals(OBJECT_MAPPER.createObjectNode(), importTaskResult.getRawRecord());
} else {
assertNull(importTaskResult.getRawRecord());
}
}

private void assertTransactionBatchResult(
ImportTransactionBatchResult expected, ImportTransactionBatchResult actual) {
assertEquals(expected.getDataChunkId(), actual.getDataChunkId(), "Data chunk ID should match");
Expand Down