Skip to content

Commit cbba68c

Browse files
committed
Fix
1 parent a53757b commit cbba68c

File tree

3 files changed

+240
-1
lines changed

3 files changed

+240
-1
lines changed

data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ private ExportManager createExportManager(
159159
throws IOException {
160160
ProducerTaskFactory taskFactory =
161161
new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson);
162-
if (scalarDbMode.equals(ScalarDbMode.TRANSACTION)) {
162+
if (scalarDbMode.equals(ScalarDbMode.STORAGE)) {
163163
DistributedStorage storage = StorageFactory.create(scalarDbPropertiesFilePath).getStorage();
164164
return createExportManagerWithStorage(storage, scalarDbDao, fileFormat, taskFactory);
165165
} else {
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package com.scalar.db.dataloader.cli.command.dataimport;
2+
3+
import com.scalar.db.dataloader.core.dataimport.ImportEventListener;
4+
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
5+
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
6+
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
7+
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus;
8+
import java.time.Duration;
9+
import java.util.Map;
10+
import java.util.concurrent.*;
11+
import java.util.concurrent.atomic.AtomicLong;
12+
13+
public class ConsoleImportProgressListener implements ImportEventListener {
14+
15+
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
16+
private final Duration updateInterval;
17+
private final long startTime;
18+
private final Map<Integer, String> chunkLogs = new ConcurrentHashMap<>();
19+
private final Map<Integer, String> chunkFailureLogs = new ConcurrentHashMap<>();
20+
private final AtomicLong totalRecords = new AtomicLong();
21+
private volatile boolean completed = false;
22+
23+
public ConsoleImportProgressListener(Duration updateInterval) {
24+
this.updateInterval = updateInterval;
25+
this.startTime = System.currentTimeMillis();
26+
scheduler.scheduleAtFixedRate(
27+
this::render, 0, updateInterval.toMillis(), TimeUnit.MILLISECONDS);
28+
}
29+
30+
@Override
31+
public void onDataChunkStarted(ImportDataChunkStatus status) {
32+
chunkLogs.put(
33+
status.getDataChunkId(),
34+
String.format(
35+
"🔄 Chunk %d: Processing... %d records so far",
36+
status.getDataChunkId(), status.getTotalRecords()));
37+
}
38+
39+
@Override
40+
public void onDataChunkCompleted(ImportDataChunkStatus status) {
41+
long elapsed = System.currentTimeMillis() - status.getStartTime().toEpochMilli();
42+
totalRecords.addAndGet(status.getTotalRecords());
43+
if (status.getSuccessCount() > 0) {
44+
chunkLogs.put(
45+
status.getDataChunkId(),
46+
String.format(
47+
"✓ Chunk %d: %,d records imported (%.1fs), %d records imported successfully, import of %d records failed",
48+
status.getDataChunkId(),
49+
status.getTotalRecords(),
50+
elapsed / 1000.0,
51+
status.getSuccessCount(),
52+
status.getFailureCount()));
53+
}
54+
// if (status.getFailureCount() > 0) {
55+
// chunkFailureLogs.put(
56+
// status.getDataChunkId(),
57+
// String.format(
58+
// "❌ Chunk %d: Failed - %d records failed to be imported) ",
59+
// status.getDataChunkId(), status.getFailureCount()));
60+
// }
61+
}
62+
63+
@Override
64+
public void onAllDataChunksCompleted() {
65+
completed = true;
66+
scheduler.shutdown();
67+
render(); // Final render
68+
}
69+
70+
@Override
71+
public void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) {
72+
// Optional: Implement if you want to show more granular batch progress
73+
}
74+
75+
@Override
76+
public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) {
77+
if (!batchResult.isSuccess()) {
78+
chunkFailureLogs.put(
79+
batchResult.getDataChunkId(),
80+
String.format(
81+
"❌ Chunk %d: Transaction batch %d Failed - %d records failed to be imported) ",
82+
batchResult.getDataChunkId(),
83+
batchResult.getTransactionBatchId(),
84+
batchResult.getRecords().size()));
85+
}
86+
// Optional: Implement error reporting or success/failure count
87+
}
88+
89+
@Override
90+
public void onTaskComplete(ImportTaskResult taskResult) {
91+
// Optional: Summary or stats after final chunk
92+
}
93+
94+
private void render() {
95+
StringBuilder builder = new StringBuilder();
96+
long now = System.currentTimeMillis();
97+
long elapsed = now - startTime;
98+
double recPerSec = (totalRecords.get() * 1000.0) / (elapsed == 0 ? 1 : elapsed);
99+
100+
builder.append(
101+
String.format(
102+
"\rImporting... %,d records | %.0f rec/s | %s\n",
103+
totalRecords.get(), recPerSec, formatElapsed(elapsed)));
104+
105+
chunkLogs.values().stream()
106+
.sorted() // Optional: stable ordering
107+
.forEach(line -> builder.append(line).append("\n"));
108+
chunkFailureLogs.values().stream()
109+
.sorted() // Optional: stable ordering
110+
.forEach(line -> builder.append(line).append("\n"));
111+
112+
clearConsole();
113+
System.out.print(builder);
114+
System.out.flush();
115+
}
116+
117+
private String formatElapsed(long elapsedMillis) {
118+
long seconds = (elapsedMillis / 1000) % 60;
119+
long minutes = (elapsedMillis / 1000) / 60;
120+
return String.format("%dm %ds elapsed", minutes, seconds);
121+
}
122+
123+
private void clearConsole() {
124+
// Clear screen for updated multiline rendering
125+
System.out.print("\033[H\033[2J"); // ANSI escape for clearing screen
126+
System.out.flush();
127+
}
128+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package com.scalar.db.dataloader.cli.command.dataimport;
2+
3+
import com.scalar.db.dataloader.core.dataimport.ImportEventListener;
4+
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
5+
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
6+
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
7+
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus;
8+
import java.util.concurrent.ConcurrentHashMap;
9+
import java.util.concurrent.atomic.AtomicInteger;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
public class ImportListener implements ImportEventListener {
14+
15+
private static final Logger logger = LoggerFactory.getLogger(ImportListener.class);
16+
private final AtomicInteger totalSuccessImportCount = new AtomicInteger(0);
17+
private final AtomicInteger totalFailureImportCount = new AtomicInteger(0);
18+
private final ConcurrentHashMap<Integer, AtomicInteger> dataChunkImportCountMap =
19+
new ConcurrentHashMap<>();
20+
private static final String DATA_CHUNK_COMPLETED_SUCCESS_MSG =
21+
"\u2713 Chunk %d: %d records imported (%ds) successfully";
22+
private static final String DATA_CHUNK_COMPLETED_FAILURE_MSG =
23+
"\u274C Chunk %d: %d records failed to import";
24+
private static final String DATA_CHUNK_IN_PROCESS_MSG =
25+
"\uD83D\uDD04 Chunk %d: Processing... %d records so far imported";
26+
27+
/**
28+
* Called when processing of a data chunk begins.
29+
*
30+
* @param status the current status of the data chunk being processed
31+
*/
32+
@Override
33+
public void onDataChunkStarted(ImportDataChunkStatus status) {}
34+
35+
/**
36+
* Called when processing of a data chunk is completed.
37+
*
38+
* @param status the final status of the completed data chunk
39+
*/
40+
@Override
41+
public void onDataChunkCompleted(ImportDataChunkStatus status) {
42+
if (status.getSuccessCount() > 0) {
43+
logger.info(
44+
String.format(
45+
DATA_CHUNK_COMPLETED_SUCCESS_MSG,
46+
status.getDataChunkId(),
47+
status.getSuccessCount(),
48+
status.getTotalDurationInMilliSeconds() / 1000));
49+
this.totalSuccessImportCount.addAndGet(status.getSuccessCount());
50+
}
51+
if (status.getFailureCount() > 0) {
52+
logger.info(
53+
String.format(
54+
DATA_CHUNK_COMPLETED_FAILURE_MSG, status.getDataChunkId(), status.getFailureCount()));
55+
this.totalFailureImportCount.addAndGet(status.getFailureCount());
56+
}
57+
}
58+
59+
/**
60+
* Called when all data chunks have been processed. This indicates that the entire chunked import
61+
* process is complete.
62+
*/
63+
@Override
64+
public void onAllDataChunksCompleted() {
65+
System.out.println("Logger Factory: " + LoggerFactory.getILoggerFactory().getClass());
66+
if (totalSuccessImportCount.get() > 0) {}
67+
}
68+
69+
/**
70+
* Called when processing of a transaction batch begins.
71+
*
72+
* @param batchStatus the initial status of the transaction batch
73+
*/
74+
@Override
75+
public void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) {}
76+
77+
/**
78+
* Called when processing of a transaction batch is completed.
79+
*
80+
* @param batchResult the result of the completed transaction batch
81+
*/
82+
@Override
83+
public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) {
84+
if (batchResult.isSuccess()) {
85+
Integer dataChunkId = batchResult.getDataChunkId();
86+
dataChunkImportCountMap.compute(
87+
dataChunkId,
88+
(k, v) -> {
89+
if (v == null) {
90+
return new AtomicInteger(batchResult.getRecords().size());
91+
} else {
92+
v.addAndGet(batchResult.getRecords().size());
93+
return v;
94+
}
95+
});
96+
logger.info(
97+
String.format(
98+
DATA_CHUNK_IN_PROCESS_MSG,
99+
dataChunkId,
100+
dataChunkImportCountMap.get(dataChunkId).get()));
101+
}
102+
}
103+
104+
/**
105+
* Called when an import task is completed.
106+
*
107+
* @param taskResult the result of the completed import task
108+
*/
109+
@Override
110+
public void onTaskComplete(ImportTaskResult taskResult) {}
111+
}

0 commit comments

Comments
 (0)