Skip to content

Commit f0ce024

Browse files
committed
Passing datachunk id only instead of datachunk
1 parent c2cfa91 commit f0ce024

File tree

1 file changed

+12
-11
lines changed
  • data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor

1 file changed

+12
-11
lines changed

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -226,16 +226,16 @@ private List<ImportTransactionBatch> splitIntoTransactionBatches(
226226
* all records in the batch, and commits or aborts the transaction based on the success of all
227227
* operations.
228228
*
229-
* @param dataChunk the parent data chunk containing this batch
229+
* @param dataChunkId the parent data chunk id of the chunk containing this batch
230230
* @param transactionBatch the batch of records to process in a single transaction
231231
* @return an {@link ImportTransactionBatchResult} containing the processing results and any
232232
* errors
233233
*/
234234
private ImportTransactionBatchResult processTransactionBatch(
235-
ImportDataChunk dataChunk, ImportTransactionBatch transactionBatch) {
235+
int dataChunkId, ImportTransactionBatch transactionBatch) {
236236
ImportTransactionBatchStatus status =
237237
ImportTransactionBatchStatus.builder()
238-
.dataChunkId(dataChunk.getDataChunkId())
238+
.dataChunkId(dataChunkId)
239239
.transactionBatchId(transactionBatch.getTransactionBatchId())
240240
.build();
241241
notifyTransactionBatchStarted(status);
@@ -252,7 +252,7 @@ private ImportTransactionBatchResult processTransactionBatch(
252252
ImportTaskParams taskParams =
253253
ImportTaskParams.builder()
254254
.sourceRecord(importRow.getSourceData())
255-
.dataChunkId(dataChunk.getDataChunkId())
255+
.dataChunkId(dataChunkId)
256256
.rowNumber(importRow.getRowNumber())
257257
.importOptions(params.getImportOptions())
258258
.tableColumnDataTypes(params.getTableColumnDataTypes())
@@ -295,7 +295,7 @@ private ImportTransactionBatchResult processTransactionBatch(
295295
ImportTransactionBatchResult.builder()
296296
.transactionBatchId(transactionBatch.getTransactionBatchId())
297297
.success(isSuccess)
298-
.dataChunkId(dataChunk.getDataChunkId())
298+
.dataChunkId(dataChunkId)
299299
.records(importRecordResult)
300300
.errors(Collections.singletonList(error))
301301
.build();
@@ -307,15 +307,15 @@ private ImportTransactionBatchResult processTransactionBatch(
307307
* Processes a single record in storage mode (non-transactional). Each record is processed
308308
* independently without transaction guarantees.
309309
*
310-
* @param dataChunk the parent data chunk containing this record
310+
* @param dataChunkId the parent data chunk id of the chunk containing this record
311311
* @param importRow the record to process
312312
* @return an {@link ImportTaskResult} containing the processing result for the record
313313
*/
314-
private ImportTaskResult processStorageRecord(ImportDataChunk dataChunk, ImportRow importRow) {
314+
private ImportTaskResult processStorageRecord(int dataChunkId, ImportRow importRow) {
315315
ImportTaskParams taskParams =
316316
ImportTaskParams.builder()
317317
.sourceRecord(importRow.getSourceData())
318-
.dataChunkId(dataChunk.getDataChunkId())
318+
.dataChunkId(dataChunkId)
319319
.rowNumber(importRow.getRowNumber())
320320
.importOptions(params.getImportOptions())
321321
.tableColumnDataTypes(params.getTableColumnDataTypes())
@@ -330,7 +330,7 @@ private ImportTaskResult processStorageRecord(ImportDataChunk dataChunk, ImportR
330330
.rowNumber(importRecordResult.getRowNumber())
331331
.rawRecord(importRecordResult.getRawRecord())
332332
.targets(importRecordResult.getTargets())
333-
.dataChunkId(dataChunk.getDataChunkId())
333+
.dataChunkId(dataChunkId)
334334
.build();
335335
notifyStorageRecordCompleted(modifiedTaskResult);
336336
return modifiedTaskResult;
@@ -382,7 +382,7 @@ private ImportDataChunkStatus processDataChunkWithTransactions(
382382
for (ImportTransactionBatch transactionBatch : transactionBatches) {
383383
Future<?> transactionBatchFuture =
384384
transactionBatchExecutor.submit(
385-
() -> processTransactionBatch(dataChunk, transactionBatch));
385+
() -> processTransactionBatch(dataChunk.getDataChunkId(), transactionBatch));
386386
transactionBatchFutures.add(transactionBatchFuture);
387387
}
388388

@@ -446,7 +446,8 @@ private ImportDataChunkStatus processDataChunkWithoutTransactions(ImportDataChun
446446
try {
447447
for (ImportRow importRow : dataChunk.getSourceData()) {
448448
Future<?> recordFuture =
449-
recordExecutor.submit(() -> processStorageRecord(dataChunk, importRow));
449+
recordExecutor.submit(
450+
() -> processStorageRecord(dataChunk.getDataChunkId(), importRow));
450451
recordFutures.add(recordFuture);
451452
}
452453
waitForFuturesToComplete(recordFutures);

0 commit comments

Comments
 (0)