Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public SingleFileImportLogger(ImportLoggerConfig config, LogWriterFactory logWri
*/
@Override
public void onTaskComplete(ImportTaskResult taskResult) {
if (!config.isLogSuccessRecordsEnabled() && !config.isLogRawSourceRecordsEnabled()) return;
try {
writeImportTaskResultDetailToLogs(taskResult);
} catch (Exception e) {
Expand Down Expand Up @@ -199,9 +198,7 @@ private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult
&& target.getStatus().equals(ImportTargetResultStatus.SAVED)) {

writeToLogWriter(successLogWriter, OBJECT_MAPPER.valueToTree(target));
}
if (config.isLogRawSourceRecordsEnabled()
&& !target.getStatus().equals(ImportTargetResultStatus.SAVED)) {
} else if (!target.getStatus().equals(ImportTargetResultStatus.SAVED)) {
writeToLogWriter(failureLogWriter, OBJECT_MAPPER.valueToTree(target));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public SplitByDataChunkImportLogger(
*/
@Override
public void onTaskComplete(ImportTaskResult taskResult) {
if (!config.isLogSuccessRecordsEnabled() && !config.isLogRawSourceRecordsEnabled()) return;
try {
writeImportTaskResultDetailToLogs(taskResult);
} catch (IOException e) {
Expand All @@ -94,8 +93,7 @@ private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult
ImportTargetResultStatus status = target.getStatus();
if (status.equals(ImportTargetResultStatus.SAVED) && config.isLogSuccessRecordsEnabled()) {
writeLog(target, LogFileType.SUCCESS, importTaskResult.getDataChunkId());
} else if (!status.equals(ImportTargetResultStatus.SAVED)
&& config.isLogRawSourceRecordsEnabled()) {
} else if (!status.equals(ImportTargetResultStatus.SAVED)) {
writeLog(target, LogFileType.FAILURE, importTaskResult.getDataChunkId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ private ImportTargetResult importIntoSingleTable(
.tableName(table)
.status(ImportTargetResultStatus.VALIDATION_FAILED)
.errors(Collections.singletonList(DataLoaderError.TABLE_METADATA_MISSING.buildMessage()))
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}

Expand All @@ -209,6 +210,7 @@ private ImportTargetResult importIntoSingleTable(
.tableName(table)
.status(ImportTargetResultStatus.VALIDATION_FAILED)
.errors(validationResult.getErrorMessages())
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}

Expand All @@ -223,6 +225,7 @@ private ImportTargetResult importIntoSingleTable(
.errors(
Collections.singletonList(
DataLoaderError.COULD_NOT_FIND_PARTITION_KEY.buildMessage()))
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}
Optional<Key> optionalClusteringKey = Optional.empty();
Expand All @@ -238,6 +241,7 @@ private ImportTargetResult importIntoSingleTable(
.errors(
Collections.singletonList(
DataLoaderError.COULD_NOT_FIND_CLUSTERING_KEY.buildMessage()))
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}
}
Expand All @@ -254,6 +258,7 @@ private ImportTargetResult importIntoSingleTable(
.tableName(table)
.status(ImportTargetResultStatus.RETRIEVAL_FAILED)
.errors(Collections.singletonList(e.getMessage()))
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}
ImportTaskAction importAction =
Expand All @@ -273,6 +278,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
.errors(
Collections.singletonList(
DataLoaderError.UPSERT_INSERT_MISSING_COLUMNS.buildMessage()))
.importedRecord(getRecordForLogging(mutableSourceRecord))
.build();
}
}
Expand All @@ -281,7 +287,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
return ImportTargetResult.builder()
.namespace(namespace)
.tableName(table)
.importedRecord(mutableSourceRecord)
.importedRecord(getRecordForLogging(mutableSourceRecord))
.importAction(importAction)
.status(ImportTargetResultStatus.DATA_ALREADY_EXISTS)
.errors(Collections.singletonList(DataLoaderError.DATA_ALREADY_EXISTS.buildMessage()))
Expand All @@ -292,7 +298,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
return ImportTargetResult.builder()
.namespace(namespace)
.tableName(table)
.importedRecord(mutableSourceRecord)
.importedRecord(getRecordForLogging(mutableSourceRecord))
.importAction(importAction)
.status(ImportTargetResultStatus.DATA_NOT_FOUND)
.errors(Collections.singletonList(DataLoaderError.DATA_NOT_FOUND.buildMessage()))
Expand All @@ -314,7 +320,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
return ImportTargetResult.builder()
.namespace(namespace)
.tableName(table)
.importedRecord(mutableSourceRecord)
.importedRecord(getRecordForLogging(mutableSourceRecord))
.status(ImportTargetResultStatus.VALIDATION_FAILED)
.errors(Collections.singletonList(e.getMessage()))
.build();
Expand All @@ -333,13 +339,14 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
.namespace(namespace)
.tableName(table)
.importAction(importAction)
.importedRecord(mutableSourceRecord)
.importedRecord(getRecordForLogging(mutableSourceRecord))
.status(ImportTargetResultStatus.SAVED)
.build();

} catch (ScalarDbDaoException e) {
return ImportTargetResult.builder()
.namespace(namespace)
.importedRecord(getRecordForLogging(mutableSourceRecord))
.tableName(table)
.importAction(importAction)
.status(ImportTargetResultStatus.SAVE_FAILED)
Expand Down Expand Up @@ -423,6 +430,20 @@ private boolean shouldFailForExistingData(
&& importOptions.getImportMode() == ImportMode.INSERT;
}

/**
* Returns the given source record only if raw record logging is enabled in the import options.
*
* <p>This helper method centralizes the logic for conditionally including the raw source record
* in {@code ImportTargetResult}. If {@code logRawRecord} is disabled, {@code null} is returned to
* avoid storing or displaying the raw record.
*
* @param record the source record to include conditionally
* @return the provided record if raw record logging is enabled; otherwise {@code null}
*/
private ObjectNode getRecordForLogging(ObjectNode record) {
return params.getImportOptions().isLogRawRecord() ? record : null;
}

/**
* Determines whether the operation should fail if the expected data is missing.
*
Expand Down