Skip to content

Commit cad5823

Browse files
committed
Rename ImportProcessor methods for clarity
1 parent 1ee50bd commit cad5823

File tree

1 file changed

+11
-10
lines changed
  • data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor

1 file changed

+11
-10
lines changed

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,11 @@ public void removeListener(ImportEventListener listener) {
169169
}
170170

171171
/**
172-
* Notify once the task is completed
172+
* Notify once a single CRUD task is completed
173173
*
174174
* @param result task result object
175175
*/
176-
protected void notifyStorageRecordCompleted(ImportTaskResult result) {
176+
protected void notifySingleCrudRecordCompleted(ImportTaskResult result) {
177177
// Add data to summary, success logs with/without raw data
178178
for (ImportEventListener listener : listeners) {
179179
listener.onTaskComplete(result);
@@ -366,14 +366,14 @@ private void abortTransactionSafely(@Nullable DistributedTransaction transaction
366366
}
367367

368368
/**
369-
* Processes a single record in storage mode (non-transactional). Each record is processed
369+
* Processes a single record in single CRUD mode (non-transactional). Each record is processed
370370
* independently without transaction guarantees.
371371
*
372372
* @param dataChunkId the parent data chunk id of the chunk containing this record
373373
* @param importRow the record to process
374374
* @return an {@link ImportTaskResult} containing the processing result for the record
375375
*/
376-
private ImportTaskResult processStorageRecord(int dataChunkId, ImportRow importRow) {
376+
private ImportTaskResult processSingleCrudRecord(int dataChunkId, ImportRow importRow) {
377377
ImportTaskParams taskParams =
378378
ImportTaskParams.builder()
379379
.sourceRecord(importRow.getSourceData())
@@ -394,7 +394,7 @@ private ImportTaskResult processStorageRecord(int dataChunkId, ImportRow importR
394394
.targets(importRecordResult.getTargets())
395395
.dataChunkId(dataChunkId)
396396
.build();
397-
notifyStorageRecordCompleted(modifiedTaskResult);
397+
notifySingleCrudRecordCompleted(modifiedTaskResult);
398398
return modifiedTaskResult;
399399
}
400400

@@ -416,9 +416,10 @@ private void processDataChunk(ImportDataChunk dataChunk, int transactionBatchSiz
416416
notifyDataChunkStarted(status);
417417
ImportDataChunkStatus importDataChunkStatus;
418418
if (params.getTransactionMode() == TransactionMode.CONSENSUS_COMMIT) {
419-
importDataChunkStatus = processDataChunkWithTransactions(dataChunk, transactionBatchSize);
419+
importDataChunkStatus =
420+
processDataChunkInConsensusCommitMode(dataChunk, transactionBatchSize);
420421
} else {
421-
importDataChunkStatus = processDataChunkWithoutTransactions(dataChunk);
422+
importDataChunkStatus = processDataChunkInSingleCrudMode(dataChunk);
422423
}
423424
notifyDataChunkCompleted(importDataChunkStatus);
424425
}
@@ -431,7 +432,7 @@ private void processDataChunk(ImportDataChunk dataChunk, int transactionBatchSiz
431432
* @param transactionBatchSize the number of records per transaction batch
432433
* @return an {@link ImportDataChunkStatus} containing processing results and metrics
433434
*/
434-
private ImportDataChunkStatus processDataChunkWithTransactions(
435+
private ImportDataChunkStatus processDataChunkInConsensusCommitMode(
435436
ImportDataChunk dataChunk, int transactionBatchSize) {
436437
Instant startTime = Instant.now();
437438
List<ImportTransactionBatch> transactionBatches =
@@ -471,13 +472,13 @@ private ImportDataChunkStatus processDataChunkWithTransactions(
471472
* @param dataChunk the data chunk to process
472473
* @return an {@link ImportDataChunkStatus} containing processing results and metrics
473474
*/
474-
private ImportDataChunkStatus processDataChunkWithoutTransactions(ImportDataChunk dataChunk) {
475+
private ImportDataChunkStatus processDataChunkInSingleCrudMode(ImportDataChunk dataChunk) {
475476
Instant startTime = Instant.now();
476477
AtomicInteger successCount = new AtomicInteger(0);
477478
AtomicInteger failureCount = new AtomicInteger(0);
478479

479480
for (ImportRow importRow : dataChunk.getSourceData()) {
480-
ImportTaskResult result = processStorageRecord(dataChunk.getDataChunkId(), importRow);
481+
ImportTaskResult result = processSingleCrudRecord(dataChunk.getDataChunkId(), importRow);
481482
boolean allSaved =
482483
result.getTargets().stream()
483484
.allMatch(t -> t.getStatus().equals(ImportTargetResultStatus.SAVED));

0 commit comments

Comments
 (0)