Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public SingleFileImportLogger(ImportLoggerConfig config, LogWriterFactory logWri
*/
@Override
public void onTaskComplete(ImportTaskResult taskResult) {
if (!config.isLogSuccessRecordsEnabled() && !config.isLogRawSourceRecordsEnabled()) return;
try {
writeImportTaskResultDetailToLogs(taskResult);
} catch (Exception e) {
Expand Down Expand Up @@ -199,9 +198,7 @@ private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult
&& target.getStatus().equals(ImportTargetResultStatus.SAVED)) {

writeToLogWriter(successLogWriter, OBJECT_MAPPER.valueToTree(target));
}
if (config.isLogRawSourceRecordsEnabled()
&& !target.getStatus().equals(ImportTargetResultStatus.SAVED)) {
} else if (!target.getStatus().equals(ImportTargetResultStatus.SAVED)) {
writeToLogWriter(failureLogWriter, OBJECT_MAPPER.valueToTree(target));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public SplitByDataChunkImportLogger(
*/
@Override
public void onTaskComplete(ImportTaskResult taskResult) {
if (!config.isLogSuccessRecordsEnabled() && !config.isLogRawSourceRecordsEnabled()) return;
try {
writeImportTaskResultDetailToLogs(taskResult);
} catch (IOException e) {
Expand All @@ -94,8 +93,7 @@ private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult
ImportTargetResultStatus status = target.getStatus();
if (status.equals(ImportTargetResultStatus.SAVED) && config.isLogSuccessRecordsEnabled()) {
writeLog(target, LogFileType.SUCCESS, importTaskResult.getDataChunkId());
} else if (!status.equals(ImportTargetResultStatus.SAVED)
&& config.isLogRawSourceRecordsEnabled()) {
} else if (!status.equals(ImportTargetResultStatus.SAVED)) {
writeLog(target, LogFileType.FAILURE, importTaskResult.getDataChunkId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;

/**
Expand Down Expand Up @@ -183,6 +184,7 @@ private ImportTargetResult importIntoSingleTable(
.tableName(table)
.status(ImportTargetResultStatus.VALIDATION_FAILED)
.errors(Collections.singletonList(DataLoaderError.TABLE_METADATA_MISSING.buildMessage()))
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}

Expand All @@ -209,6 +211,7 @@ private ImportTargetResult importIntoSingleTable(
.tableName(table)
.status(ImportTargetResultStatus.VALIDATION_FAILED)
.errors(validationResult.getErrorMessages())
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}

Expand All @@ -223,6 +226,7 @@ private ImportTargetResult importIntoSingleTable(
.errors(
Collections.singletonList(
DataLoaderError.COULD_NOT_FIND_PARTITION_KEY.buildMessage()))
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}
Optional<Key> optionalClusteringKey = Optional.empty();
Expand All @@ -238,6 +242,7 @@ private ImportTargetResult importIntoSingleTable(
.errors(
Collections.singletonList(
DataLoaderError.COULD_NOT_FIND_CLUSTERING_KEY.buildMessage()))
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}
}
Expand All @@ -254,6 +259,7 @@ private ImportTargetResult importIntoSingleTable(
.tableName(table)
.status(ImportTargetResultStatus.RETRIEVAL_FAILED)
.errors(Collections.singletonList(e.getMessage()))
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}
ImportTaskAction importAction =
Expand All @@ -273,6 +279,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
.errors(
Collections.singletonList(
DataLoaderError.UPSERT_INSERT_MISSING_COLUMNS.buildMessage()))
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}
}
Expand All @@ -281,7 +288,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
return ImportTargetResult.builder()
.namespace(namespace)
.tableName(table)
.importedRecord(mutableSourceRecord)
.importedRecord(getRecordForLogging(mutableSourceRecord))
.importAction(importAction)
.status(ImportTargetResultStatus.DATA_ALREADY_EXISTS)
.errors(Collections.singletonList(DataLoaderError.DATA_ALREADY_EXISTS.buildMessage()))
Expand All @@ -292,7 +299,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
return ImportTargetResult.builder()
.namespace(namespace)
.tableName(table)
.importedRecord(mutableSourceRecord)
.importedRecord(getRecordForLogging(mutableSourceRecord))
.importAction(importAction)
.status(ImportTargetResultStatus.DATA_NOT_FOUND)
.errors(Collections.singletonList(DataLoaderError.DATA_NOT_FOUND.buildMessage()))
Expand All @@ -314,7 +321,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
return ImportTargetResult.builder()
.namespace(namespace)
.tableName(table)
.importedRecord(mutableSourceRecord)
.importedRecord(getRecordForLogging(mutableSourceRecord))
.status(ImportTargetResultStatus.VALIDATION_FAILED)
.errors(Collections.singletonList(e.getMessage()))
.build();
Expand All @@ -333,13 +340,14 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
.namespace(namespace)
.tableName(table)
.importAction(importAction)
.importedRecord(mutableSourceRecord)
.importedRecord(getRecordForLogging(mutableSourceRecord))
.status(ImportTargetResultStatus.SAVED)
.build();

} catch (ScalarDbDaoException e) {
return ImportTargetResult.builder()
.namespace(namespace)
.importedRecord(getRecordForLogging(mutableSourceRecord))
.tableName(table)
.importAction(importAction)
.status(ImportTargetResultStatus.SAVE_FAILED)
Expand Down Expand Up @@ -423,6 +431,21 @@ private boolean shouldFailForExistingData(
&& importOptions.getImportMode() == ImportMode.INSERT;
}

/**
* Returns the given source record only if raw record logging is enabled in the import options.
*
* <p>This helper method centralizes the logic for conditionally including the raw source record
* in {@code ImportTargetResult}. If {@code logRawRecord} is disabled, {@code null} is returned to
* avoid storing or displaying the raw record.
*
* @param record the source record to include conditionally
* @return the provided record if raw record logging is enabled; otherwise {@code null}
*/
@Nullable
private ObjectNode getRecordForLogging(ObjectNode record) {
return params.getImportOptions().isLogRawRecord() ? record : null;
}

/**
* Determines whether the operation should fail if the expected data is missing.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -160,6 +161,60 @@ private void assertTransactionBatchResults(
}
}

@Test
void onTransactionBatchCompleted_HasErrors_ShouldWriteToFailureLogFileWithoutRawRecord()
throws IOException {
testTransactionBatchCompletedForRawRecords(false);
}

@Test
void onTransactionBatchCompleted_HasErrors_ShouldWriteToFailureLogFileWithRawRecord()
throws IOException {
testTransactionBatchCompletedForRawRecords(true);
}

private void testTransactionBatchCompletedForRawRecords(boolean logRawRecords)
throws IOException {
// Arrange
ImportLoggerConfig config =
ImportLoggerConfig.builder()
.logDirectoryPath(tempDir.toString() + "/")
.isLogRawSourceRecordsEnabled(logRawRecords)
.isLogSuccessRecordsEnabled(false)
.build();
SingleFileImportLogger importLogger = new SingleFileImportLogger(config, logWriterFactory);

List<ImportTransactionBatchResult> batchResults = createBatchResults(1, false);

// Act
for (ImportTransactionBatchResult batchResult : batchResults) {
importLogger.onTransactionBatchCompleted(batchResult);
importLogger.onDataChunkCompleted(
ImportDataChunkStatus.builder().dataChunkId(batchResult.getDataChunkId()).build());
}
importLogger.onAllDataChunksCompleted();

// Assert
assertTransactionBatchResultsForRawRecords(logRawRecords);
}

private void assertTransactionBatchResultsForRawRecords(boolean logRawRecord) throws IOException {
DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper();

Path logFileName = tempDir.resolve(SingleFileImportLogger.FAILURE_LOG_FILE_NAME);
assertTrue(Files.exists(logFileName), "Log file should exist");
String logContent = new String(Files.readAllBytes(logFileName), StandardCharsets.UTF_8);
List<ImportTransactionBatchResult> logEntries =
objectMapper.readValue(
logContent, new TypeReference<List<ImportTransactionBatchResult>>() {});
ImportTaskResult importTaskResult = logEntries.get(0).getRecords().get(0);
if (logRawRecord) {
assertEquals(OBJECT_MAPPER.createObjectNode(), importTaskResult.getRawRecord());
} else {
assertNull(importTaskResult.getRawRecord());
}
}

private void assertTransactionBatchResult(
ImportTransactionBatchResult expected, ImportTransactionBatchResult actual) {
assertEquals(expected.getDataChunkId(), actual.getDataChunkId(), "Data chunk ID should match");
Expand Down