Skip to content

Commit 23d53b6

Browse files
Backport to branch(3.16) : Fix failure log writing issue in storage mode for Data Loader imports (#3202)
Co-authored-by: inv-jishnu <[email protected]>
1 parent b7de25e commit 23d53b6

File tree

4 files changed

+84
-11
lines changed

4 files changed

+84
-11
lines changed

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ public SingleFileImportLogger(ImportLoggerConfig config, LogWriterFactory logWri
7474
*/
7575
@Override
7676
public void onTaskComplete(ImportTaskResult taskResult) {
77-
if (!config.isLogSuccessRecordsEnabled() && !config.isLogRawSourceRecordsEnabled()) return;
7877
try {
7978
writeImportTaskResultDetailToLogs(taskResult);
8079
} catch (Exception e) {
@@ -199,9 +198,7 @@ private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult
199198
&& target.getStatus().equals(ImportTargetResultStatus.SAVED)) {
200199

201200
writeToLogWriter(successLogWriter, OBJECT_MAPPER.valueToTree(target));
202-
}
203-
if (config.isLogRawSourceRecordsEnabled()
204-
&& !target.getStatus().equals(ImportTargetResultStatus.SAVED)) {
201+
} else if (!target.getStatus().equals(ImportTargetResultStatus.SAVED)) {
205202
writeToLogWriter(failureLogWriter, OBJECT_MAPPER.valueToTree(target));
206203
}
207204
}

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ public SplitByDataChunkImportLogger(
7373
*/
7474
@Override
7575
public void onTaskComplete(ImportTaskResult taskResult) {
76-
if (!config.isLogSuccessRecordsEnabled() && !config.isLogRawSourceRecordsEnabled()) return;
7776
try {
7877
writeImportTaskResultDetailToLogs(taskResult);
7978
} catch (IOException e) {
@@ -94,8 +93,7 @@ private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult
9493
ImportTargetResultStatus status = target.getStatus();
9594
if (status.equals(ImportTargetResultStatus.SAVED) && config.isLogSuccessRecordsEnabled()) {
9695
writeLog(target, LogFileType.SUCCESS, importTaskResult.getDataChunkId());
97-
} else if (!status.equals(ImportTargetResultStatus.SAVED)
98-
&& config.isLogRawSourceRecordsEnabled()) {
96+
} else if (!status.equals(ImportTargetResultStatus.SAVED)) {
9997
writeLog(target, LogFileType.FAILURE, importTaskResult.getDataChunkId());
10098
}
10199
}

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.Optional;
34+
import javax.annotation.Nullable;
3435
import lombok.RequiredArgsConstructor;
3536

3637
/**
@@ -183,6 +184,7 @@ private ImportTargetResult importIntoSingleTable(
183184
.tableName(table)
184185
.status(ImportTargetResultStatus.VALIDATION_FAILED)
185186
.errors(Collections.singletonList(DataLoaderError.TABLE_METADATA_MISSING.buildMessage()))
187+
.importedRecord(getRecordForLogging(mutableSourceRecord))
186188
.build();
187189
}
188190

@@ -209,6 +211,7 @@ private ImportTargetResult importIntoSingleTable(
209211
.tableName(table)
210212
.status(ImportTargetResultStatus.VALIDATION_FAILED)
211213
.errors(validationResult.getErrorMessages())
214+
.importedRecord(getRecordForLogging(mutableSourceRecord))
212215
.build();
213216
}
214217

@@ -223,6 +226,7 @@ private ImportTargetResult importIntoSingleTable(
223226
.errors(
224227
Collections.singletonList(
225228
DataLoaderError.COULD_NOT_FIND_PARTITION_KEY.buildMessage()))
229+
.importedRecord(getRecordForLogging(mutableSourceRecord))
226230
.build();
227231
}
228232
Optional<Key> optionalClusteringKey = Optional.empty();
@@ -238,6 +242,7 @@ private ImportTargetResult importIntoSingleTable(
238242
.errors(
239243
Collections.singletonList(
240244
DataLoaderError.COULD_NOT_FIND_CLUSTERING_KEY.buildMessage()))
245+
.importedRecord(getRecordForLogging(mutableSourceRecord))
241246
.build();
242247
}
243248
}
@@ -254,6 +259,7 @@ private ImportTargetResult importIntoSingleTable(
254259
.tableName(table)
255260
.status(ImportTargetResultStatus.RETRIEVAL_FAILED)
256261
.errors(Collections.singletonList(e.getMessage()))
262+
.importedRecord(getRecordForLogging(mutableSourceRecord))
257263
.build();
258264
}
259265
ImportTaskAction importAction =
@@ -273,6 +279,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
273279
.errors(
274280
Collections.singletonList(
275281
DataLoaderError.UPSERT_INSERT_MISSING_COLUMNS.buildMessage()))
282+
.importedRecord(getRecordForLogging(mutableSourceRecord))
276283
.build();
277284
}
278285
}
@@ -281,7 +288,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
281288
return ImportTargetResult.builder()
282289
.namespace(namespace)
283290
.tableName(table)
284-
.importedRecord(mutableSourceRecord)
291+
.importedRecord(getRecordForLogging(mutableSourceRecord))
285292
.importAction(importAction)
286293
.status(ImportTargetResultStatus.DATA_ALREADY_EXISTS)
287294
.errors(Collections.singletonList(DataLoaderError.DATA_ALREADY_EXISTS.buildMessage()))
@@ -292,7 +299,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
292299
return ImportTargetResult.builder()
293300
.namespace(namespace)
294301
.tableName(table)
295-
.importedRecord(mutableSourceRecord)
302+
.importedRecord(getRecordForLogging(mutableSourceRecord))
296303
.importAction(importAction)
297304
.status(ImportTargetResultStatus.DATA_NOT_FOUND)
298305
.errors(Collections.singletonList(DataLoaderError.DATA_NOT_FOUND.buildMessage()))
@@ -314,7 +321,7 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
314321
return ImportTargetResult.builder()
315322
.namespace(namespace)
316323
.tableName(table)
317-
.importedRecord(mutableSourceRecord)
324+
.importedRecord(getRecordForLogging(mutableSourceRecord))
318325
.status(ImportTargetResultStatus.VALIDATION_FAILED)
319326
.errors(Collections.singletonList(e.getMessage()))
320327
.build();
@@ -333,13 +340,14 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
333340
.namespace(namespace)
334341
.tableName(table)
335342
.importAction(importAction)
336-
.importedRecord(mutableSourceRecord)
343+
.importedRecord(getRecordForLogging(mutableSourceRecord))
337344
.status(ImportTargetResultStatus.SAVED)
338345
.build();
339346

340347
} catch (ScalarDbDaoException e) {
341348
return ImportTargetResult.builder()
342349
.namespace(namespace)
350+
.importedRecord(getRecordForLogging(mutableSourceRecord))
343351
.tableName(table)
344352
.importAction(importAction)
345353
.status(ImportTargetResultStatus.SAVE_FAILED)
@@ -423,6 +431,21 @@ private boolean shouldFailForExistingData(
423431
&& importOptions.getImportMode() == ImportMode.INSERT;
424432
}
425433

434+
/**
435+
* Returns the given source record only if raw record logging is enabled in the import options.
436+
*
437+
* <p>This helper method centralizes the logic for conditionally including the raw source record
438+
* in {@code ImportTargetResult}. If {@code logRawRecord} is disabled, {@code null} is returned to
439+
* avoid storing or displaying the raw record.
440+
*
441+
* @param record the source record to include conditionally
442+
* @return the provided record if raw record logging is enabled; otherwise {@code null}
443+
*/
444+
@Nullable
445+
private ObjectNode getRecordForLogging(ObjectNode record) {
446+
return params.getImportOptions().isLogRawRecord() ? record : null;
447+
}
448+
426449
/**
427450
* Determines whether the operation should fail if the expected data is missing.
428451
*

data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static org.junit.jupiter.api.Assertions.assertEquals;
44
import static org.junit.jupiter.api.Assertions.assertFalse;
5+
import static org.junit.jupiter.api.Assertions.assertNull;
56
import static org.junit.jupiter.api.Assertions.assertTrue;
67

78
import com.fasterxml.jackson.core.type.TypeReference;
@@ -160,6 +161,60 @@ private void assertTransactionBatchResults(
160161
}
161162
}
162163

164+
@Test
165+
void onTransactionBatchCompleted_HasErrors_ShouldWriteToFailureLogFileWithoutRawRecord()
166+
throws IOException {
167+
testTransactionBatchCompletedForRawRecords(false);
168+
}
169+
170+
@Test
171+
void onTransactionBatchCompleted_HasErrors_ShouldWriteToFailureLogFileWithRawRecord()
172+
throws IOException {
173+
testTransactionBatchCompletedForRawRecords(true);
174+
}
175+
176+
private void testTransactionBatchCompletedForRawRecords(boolean logRawRecords)
177+
throws IOException {
178+
// Arrange
179+
ImportLoggerConfig config =
180+
ImportLoggerConfig.builder()
181+
.logDirectoryPath(tempDir.toString() + "/")
182+
.isLogRawSourceRecordsEnabled(logRawRecords)
183+
.isLogSuccessRecordsEnabled(false)
184+
.build();
185+
SingleFileImportLogger importLogger = new SingleFileImportLogger(config, logWriterFactory);
186+
187+
List<ImportTransactionBatchResult> batchResults = createBatchResults(1, false);
188+
189+
// Act
190+
for (ImportTransactionBatchResult batchResult : batchResults) {
191+
importLogger.onTransactionBatchCompleted(batchResult);
192+
importLogger.onDataChunkCompleted(
193+
ImportDataChunkStatus.builder().dataChunkId(batchResult.getDataChunkId()).build());
194+
}
195+
importLogger.onAllDataChunksCompleted();
196+
197+
// Assert
198+
assertTransactionBatchResultsForRawRecords(logRawRecords);
199+
}
200+
201+
private void assertTransactionBatchResultsForRawRecords(boolean logRawRecord) throws IOException {
202+
DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper();
203+
204+
Path logFileName = tempDir.resolve(SingleFileImportLogger.FAILURE_LOG_FILE_NAME);
205+
assertTrue(Files.exists(logFileName), "Log file should exist");
206+
String logContent = new String(Files.readAllBytes(logFileName), StandardCharsets.UTF_8);
207+
List<ImportTransactionBatchResult> logEntries =
208+
objectMapper.readValue(
209+
logContent, new TypeReference<List<ImportTransactionBatchResult>>() {});
210+
ImportTaskResult importTaskResult = logEntries.get(0).getRecords().get(0);
211+
if (logRawRecord) {
212+
assertEquals(OBJECT_MAPPER.createObjectNode(), importTaskResult.getRawRecord());
213+
} else {
214+
assertNull(importTaskResult.getRawRecord());
215+
}
216+
}
217+
163218
private void assertTransactionBatchResult(
164219
ImportTransactionBatchResult expected, ImportTransactionBatchResult actual) {
165220
assertEquals(expected.getDataChunkId(), actual.getDataChunkId(), "Data chunk ID should match");

0 commit comments

Comments
 (0)