Skip to content

Commit 7ab8b65

Browse files
committed
Merge branch 'master' into add-scanner-api-to-transaction-abstraction
2 parents 9ad1219 + e45325b commit 7ab8b65

File tree

14 files changed

+245
-317
lines changed

14 files changed

+245
-317
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ subprojects {
3030
awssdkVersion = '2.31.3'
3131
commonsDbcp2Version = '2.13.0'
3232
mysqlDriverVersion = '8.4.0'
33-
postgresqlDriverVersion = '42.7.5'
33+
postgresqlDriverVersion = '42.7.6'
3434
oracleDriverVersion = '23.8.0.25.04'
3535
sqlserverDriverVersion = '12.8.1.jre8'
3636
sqliteDriverVersion = '3.49.1.0'

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportEventListener.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,6 @@ public interface ImportEventListener {
1818
*/
1919
void onDataChunkStarted(ImportDataChunkStatus status);
2020

21-
/**
22-
* Updates or adds new status information for a data chunk.
23-
*
24-
* @param status the updated status information for the data chunk
25-
*/
26-
void addOrUpdateDataChunkStatus(ImportDataChunkStatus status);
27-
2821
/**
2922
* Called when processing of a data chunk is completed.
3023
*

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import java.util.ArrayList;
1818
import java.util.List;
1919
import java.util.Map;
20-
import java.util.concurrent.ConcurrentHashMap;
2120
import lombok.AllArgsConstructor;
2221
import lombok.NonNull;
2322

@@ -46,20 +45,15 @@ public class ImportManager implements ImportEventListener {
4645
private final ScalarDbMode scalarDbMode;
4746
private final DistributedStorage distributedStorage;
4847
private final DistributedTransactionManager distributedTransactionManager;
49-
private final ConcurrentHashMap<Integer, ImportDataChunkStatus> importDataChunkStatusMap =
50-
new ConcurrentHashMap<>();
5148

5249
/**
5350
* Starts the import process using the configured parameters.
5451
*
5552
* <p>If the data chunk size in {@link ImportOptions} is set to 0, the entire file will be
5653
* processed as a single chunk. Otherwise, the file will be processed in chunks of the specified
5754
* size.
58-
*
59-
* @return a map of {@link ImportDataChunkStatus} objects containing the status of each processed
60-
* chunk
6155
*/
62-
public ConcurrentHashMap<Integer, ImportDataChunkStatus> startImport() {
56+
public void startImport() {
6357
ImportProcessorParams params =
6458
ImportProcessorParams.builder()
6559
.scalarDbMode(scalarDbMode)
@@ -77,8 +71,7 @@ public ConcurrentHashMap<Integer, ImportDataChunkStatus> startImport() {
7771
importOptions.getDataChunkSize() == 0
7872
? Integer.MAX_VALUE
7973
: importOptions.getDataChunkSize();
80-
return processor.process(
81-
dataChunkSize, importOptions.getTransactionBatchSize(), importFileReader);
74+
processor.process(dataChunkSize, importOptions.getTransactionBatchSize(), importFileReader);
8275
}
8376

8477
/**
@@ -108,15 +101,6 @@ public void onDataChunkStarted(ImportDataChunkStatus status) {
108101
}
109102
}
110103

111-
/**
112-
* {@inheritDoc} Updates or adds the status of a data chunk in the status map. This method is
113-
* thread-safe.
114-
*/
115-
@Override
116-
public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {
117-
importDataChunkStatusMap.put(status.getDataChunkId(), status);
118-
}
119-
120104
/** {@inheritDoc} Forwards the event to all registered listeners. */
121105
@Override
122106
public void onDataChunkCompleted(ImportDataChunkStatus status) {
@@ -152,18 +136,46 @@ public void onTaskComplete(ImportTaskResult taskResult) {
152136
/** {@inheritDoc} Forwards the event to all registered listeners. */
153137
@Override
154138
public void onAllDataChunksCompleted() {
139+
Throwable firstException = null;
140+
155141
for (ImportEventListener listener : listeners) {
156-
listener.onAllDataChunksCompleted();
142+
try {
143+
listener.onAllDataChunksCompleted();
144+
} catch (Throwable e) {
145+
if (firstException == null) {
146+
firstException = e;
147+
} else {
148+
firstException.addSuppressed(e);
149+
}
150+
}
151+
}
152+
153+
try {
154+
closeResources();
155+
} catch (Throwable e) {
156+
if (firstException != null) {
157+
firstException.addSuppressed(e);
158+
} else {
159+
firstException = e;
160+
}
161+
}
162+
163+
if (firstException != null) {
164+
throw new RuntimeException("Error during completion", firstException);
157165
}
158166
}
159167

160-
/**
161-
* Returns the current map of import data chunk status objects.
162-
*
163-
* @return a map of {@link ImportDataChunkStatus} objects
164-
*/
165-
public ConcurrentHashMap<Integer, ImportDataChunkStatus> getImportDataChunkStatus() {
166-
return importDataChunkStatusMap;
168+
/** Close resources properly once the process is completed */
169+
public void closeResources() {
170+
try {
171+
if (distributedStorage != null) {
172+
distributedStorage.close();
173+
} else if (distributedTransactionManager != null) {
174+
distributedTransactionManager.close();
175+
}
176+
} catch (Throwable e) {
177+
throw new RuntimeException("Failed to close the resource", e);
178+
}
167179
}
168180

169181
/**

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,10 @@
2222
import java.util.NoSuchElementException;
2323
import java.util.Optional;
2424
import javax.annotation.Nullable;
25-
import org.slf4j.Logger;
26-
import org.slf4j.LoggerFactory;
2725

2826
/** The generic DAO that is used to scan ScalarDB data */
2927
public class ScalarDbDao {
3028

31-
/* Class logger */
32-
private static final Logger logger = LoggerFactory.getLogger(ScalarDbDao.class);
33-
private static final String GET_COMPLETED_MSG = "GET completed for %s";
34-
private static final String PUT_COMPLETED_MSG = "PUT completed for %s";
35-
private static final String SCAN_START_MSG = "SCAN started...";
36-
private static final String SCAN_END_MSG = "SCAN completed";
37-
3829
/**
3930
* Retrieve record from ScalarDB instance in storage mode
4031
*
@@ -59,9 +50,7 @@ public Optional<Result> get(
5950

6051
try {
6152
Get get = createGetWith(namespace, table, partitionKey, clusteringKey);
62-
Optional<Result> result = storage.get(get);
63-
logger.info(String.format(GET_COMPLETED_MSG, loggingKey));
64-
return result;
53+
return storage.get(get);
6554
} catch (ExecutionException e) {
6655
throw new ScalarDbDaoException("error GET " + loggingKey, e);
6756
}
@@ -90,9 +79,7 @@ public Optional<Result> get(
9079
// Retrieving the key data for logging
9180
String loggingKey = keysToString(partitionKey, clusteringKey);
9281
try {
93-
Optional<Result> result = transaction.get(get);
94-
logger.info(String.format(GET_COMPLETED_MSG, loggingKey));
95-
return result;
82+
return transaction.get(get);
9683
} catch (CrudException e) {
9784
throw new ScalarDbDaoException("error GET " + loggingKey, e.getCause());
9885
}
@@ -125,7 +112,6 @@ public void put(
125112
throw new ScalarDbDaoException(
126113
CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(e.getMessage()), e);
127114
}
128-
logger.info(String.format(PUT_COMPLETED_MSG, keysToString(partitionKey, clusteringKey)));
129115
}
130116

131117
/**
@@ -154,7 +140,6 @@ public void put(
154140
throw new ScalarDbDaoException(
155141
CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(e.getMessage()), e);
156142
}
157-
logger.info(String.format(PUT_COMPLETED_MSG, keysToString(partitionKey, clusteringKey)));
158143
}
159144

160145
/**
@@ -186,11 +171,8 @@ public List<Result> scan(
186171

187172
// scan data
188173
try {
189-
logger.info(SCAN_START_MSG);
190174
try (Scanner scanner = storage.scan(scan)) {
191-
List<Result> allResults = scanner.all();
192-
logger.info(SCAN_END_MSG);
193-
return allResults;
175+
return scanner.all();
194176
}
195177
} catch (ExecutionException | IOException e) {
196178
throw new ScalarDbDaoException(
@@ -229,10 +211,7 @@ public List<Result> scan(
229211

230212
// scan data
231213
try {
232-
logger.info(SCAN_START_MSG);
233-
List<Result> results = transaction.scan(scan);
234-
logger.info(SCAN_END_MSG);
235-
return results;
214+
return transaction.scan(scan);
236215
} catch (CrudException | NoSuchElementException e) {
237216
// No such element Exception is thrown when the scan is done in transaction mode but
238217
// ScalarDB is running in storage mode

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,6 @@ public void onTaskComplete(ImportTaskResult taskResult) {
7070
}
7171
}
7272

73-
/**
74-
* Called to add or update the status of a data chunk. This implementation does nothing as the
75-
* status is only logged when the data chunk is completed.
76-
*
77-
* @param status the status of the data chunk
78-
*/
79-
@Override
80-
public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {}
81-
8273
/**
8374
* Called when a data chunk is completed. Logs the summary of the data chunk to the summary log
8475
* file.

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -112,15 +112,6 @@ private void writeLog(ImportTargetResult target, LogFileType logFileType, int da
112112
writer.write(jsonNode);
113113
}
114114

115-
/**
116-
* Called to add or update the status of a data chunk. This implementation does nothing as the
117-
* status is only logged when the data chunk is completed.
118-
*
119-
* @param status the status of the data chunk
120-
*/
121-
@Override
122-
public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {}
123-
124115
/**
125116
* Called when a data chunk is completed. Logs the summary of the data chunk and closes the log
126117
* writers for that data chunk.

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java

Lines changed: 3 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import com.scalar.db.common.error.CoreError;
66
import com.scalar.db.dataloader.core.DataLoaderObjectMapper;
77
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk;
8-
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
98
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow;
109
import java.io.BufferedReader;
1110
import java.io.IOException;
@@ -14,12 +13,6 @@
1413
import java.util.List;
1514
import java.util.Optional;
1615
import java.util.concurrent.BlockingQueue;
17-
import java.util.concurrent.CompletableFuture;
18-
import java.util.concurrent.ConcurrentHashMap;
19-
import java.util.concurrent.ExecutorService;
20-
import java.util.concurrent.Executors;
21-
import java.util.concurrent.LinkedBlockingQueue;
22-
import java.util.concurrent.TimeUnit;
2316
import java.util.concurrent.atomic.AtomicInteger;
2417

2518
/**
@@ -39,7 +32,7 @@
3932
*/
4033
public class CsvImportProcessor extends ImportProcessor {
4134
private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper();
42-
private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0);
35+
private final AtomicInteger dataChunkIdCounter = new AtomicInteger(0);
4336

4437
/**
4538
* Creates a new CsvImportProcessor with the specified parameters.
@@ -50,60 +43,6 @@ public CsvImportProcessor(ImportProcessorParams params) {
5043
super(params);
5144
}
5245

53-
/**
54-
* Processes the source data from the given import file.
55-
*
56-
* <p>This method reads data from the provided {@link BufferedReader}, processes it in chunks, and
57-
* batches transactions according to the specified sizes. The method returns a list of {@link
58-
* ImportDataChunkStatus} objects, each representing the status of a processed data chunk.
59-
*
60-
* @param dataChunkSize the number of records to include in each data chunk
61-
* @param transactionBatchSize the number of records to include in each transaction batch
62-
* @param reader the {@link BufferedReader} used to read the source file
63-
* @return a map of {@link ImportDataChunkStatus} objects indicating the processing status of each
64-
* data chunk
65-
*/
66-
@Override
67-
public ConcurrentHashMap<Integer, ImportDataChunkStatus> process(
68-
int dataChunkSize, int transactionBatchSize, BufferedReader reader) {
69-
ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor();
70-
BlockingQueue<ImportDataChunk> dataChunkQueue =
71-
new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize());
72-
73-
try {
74-
CompletableFuture<Void> readerFuture =
75-
CompletableFuture.runAsync(
76-
() -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkExecutor);
77-
78-
ConcurrentHashMap<Integer, ImportDataChunkStatus> result = new ConcurrentHashMap<>();
79-
80-
while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) {
81-
ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS);
82-
if (dataChunk != null) {
83-
ImportDataChunkStatus status = processDataChunk(dataChunk, transactionBatchSize);
84-
result.put(status.getDataChunkId(), status);
85-
}
86-
}
87-
readerFuture.join();
88-
return result;
89-
} catch (InterruptedException e) {
90-
Thread.currentThread().interrupt();
91-
throw new RuntimeException(
92-
CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(e.getMessage()), e);
93-
} finally {
94-
dataChunkExecutor.shutdown();
95-
try {
96-
if (!dataChunkExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
97-
dataChunkExecutor.shutdownNow();
98-
}
99-
} catch (InterruptedException e) {
100-
dataChunkExecutor.shutdownNow();
101-
Thread.currentThread().interrupt();
102-
}
103-
notifyAllDataChunksCompleted();
104-
}
105-
}
106-
10746
/**
10847
* Reads and processes CSV data in chunks from the provided reader.
10948
*
@@ -122,7 +61,8 @@ public ConcurrentHashMap<Integer, ImportDataChunkStatus> process(
12261
* @param dataChunkQueue the queue where data chunks are placed for processing
12362
* @throws RuntimeException if there are errors reading the file or if interrupted
12463
*/
125-
private void readDataChunks(
64+
@Override
65+
protected void readDataChunks(
12666
BufferedReader reader, int dataChunkSize, BlockingQueue<ImportDataChunk> dataChunkQueue) {
12767
try {
12868
String delimiter =

0 commit comments

Comments
 (0)