Skip to content

Commit 6aaa8d9

Browse files
authored
Merge branch 'master' into fix/data-loader/raw-record-issue
2 parents 33ec6e7 + 4cfac72 commit 6aaa8d9

File tree

6 files changed

+225
-18
lines changed

6 files changed

+225
-18
lines changed

data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
import static com.scalar.db.dataloader.cli.util.CommandLineInputUtils.validatePositiveValue;
55

66
import com.fasterxml.jackson.databind.ObjectMapper;
7+
import com.scalar.db.api.DistributedTransaction;
78
import com.scalar.db.api.DistributedTransactionAdmin;
89
import com.scalar.db.api.TableMetadata;
910
import com.scalar.db.dataloader.core.DataLoaderError;
1011
import com.scalar.db.dataloader.core.FileFormat;
12+
import com.scalar.db.dataloader.core.ScalarDbMode;
1113
import com.scalar.db.dataloader.core.dataimport.ImportManager;
1214
import com.scalar.db.dataloader.core.dataimport.ImportOptions;
1315
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile;
@@ -67,20 +69,32 @@ public Integer call() throws Exception {
6769
spec.commandLine(), dataChunkQueueSize, DataLoaderError.INVALID_DATA_CHUNK_QUEUE_SIZE);
6870
ControlFile controlFile = parseControlFileFromPath(controlFilePath).orElse(null);
6971
ImportOptions importOptions = createImportOptions(controlFile);
70-
ImportLoggerConfig config =
72+
ImportLoggerConfig importLoggerConfig =
7173
ImportLoggerConfig.builder()
7274
.logDirectoryPath(logDirectory)
7375
.isLogRawSourceRecordsEnabled(importOptions.isLogRawRecord())
7476
.isLogSuccessRecordsEnabled(importOptions.isLogSuccessRecords())
7577
.prettyPrint(prettyPrint)
7678
.build();
77-
LogWriterFactory logWriterFactory = createLogWriterFactory(config);
79+
LogWriterFactory logWriterFactory = createLogWriterFactory(importLoggerConfig);
80+
File configFile = new File(configFilePath);
81+
TransactionFactory transactionFactory = TransactionFactory.create(configFile);
82+
83+
// Validate transaction mode configuration before proceeding
84+
validateTransactionMode(transactionFactory);
85+
7886
Map<String, TableMetadata> tableMetadataMap =
79-
createTableMetadataMap(controlFile, namespace, tableName);
87+
createTableMetadataMap(controlFile, namespace, tableName, transactionFactory);
8088
try (BufferedReader reader =
8189
Files.newBufferedReader(Paths.get(sourceFilePath), Charset.defaultCharset())) {
8290
ImportManager importManager =
83-
createImportManager(importOptions, tableMetadataMap, reader, logWriterFactory, config);
91+
createImportManager(
92+
importOptions,
93+
tableMetadataMap,
94+
reader,
95+
importLoggerConfig,
96+
logWriterFactory,
97+
transactionFactory);
8498
importManager.startImport();
8599
}
86100
return 0;
@@ -101,14 +115,16 @@ private LogWriterFactory createLogWriterFactory(ImportLoggerConfig config) {
101115
* @param controlFile control file
102116
* @param namespace Namespace
103117
* @param tableName Single table name
118+
* @param transactionFactory transaction factory to use
104119
* @return {@code Map<String, TableMetadata>} a table metadata map
105120
* @throws ParameterException if one of the argument values is wrong
106121
*/
107122
private Map<String, TableMetadata> createTableMetadataMap(
108-
ControlFile controlFile, String namespace, String tableName)
109-
throws IOException, TableMetadataException {
110-
File configFile = new File(configFilePath);
111-
TransactionFactory transactionFactory = TransactionFactory.create(configFile);
123+
ControlFile controlFile,
124+
String namespace,
125+
String tableName,
126+
TransactionFactory transactionFactory)
127+
throws TableMetadataException {
112128
try (DistributedTransactionAdmin transactionAdmin = transactionFactory.getTransactionAdmin()) {
113129
TableMetadataService tableMetadataService = new TableMetadataService(transactionAdmin);
114130
Map<String, TableMetadata> tableMetadataMap = new HashMap<>();
@@ -133,18 +149,19 @@ private Map<String, TableMetadata> createTableMetadataMap(
133149
* @param importOptions import options
134150
* @param tableMetadataMap table metadata map
135151
* @param reader buffered reader with source data
152+
* @param importLoggerConfig import logging config
136153
* @param logWriterFactory log writer factory object
137-
* @param config import logging config
154+
* @param transactionFactory transaction factory to use
138155
* @return ImportManager object
139156
*/
140157
private ImportManager createImportManager(
141158
ImportOptions importOptions,
142159
Map<String, TableMetadata> tableMetadataMap,
143160
BufferedReader reader,
161+
ImportLoggerConfig importLoggerConfig,
144162
LogWriterFactory logWriterFactory,
145-
ImportLoggerConfig config)
163+
TransactionFactory transactionFactory)
146164
throws IOException {
147-
File configFile = new File(configFilePath);
148165
ImportProcessorFactory importProcessorFactory = new DefaultImportProcessorFactory();
149166
ImportManager importManager =
150167
new ImportManager(
@@ -153,11 +170,12 @@ private ImportManager createImportManager(
153170
importOptions,
154171
importProcessorFactory,
155172
scalarDbMode,
156-
TransactionFactory.create(configFile).getTransactionManager());
173+
transactionFactory.getTransactionManager());
157174
if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) {
158-
importManager.addListener(new SplitByDataChunkImportLogger(config, logWriterFactory));
175+
importManager.addListener(
176+
new SplitByDataChunkImportLogger(importLoggerConfig, logWriterFactory));
159177
} else {
160-
importManager.addListener(new SingleFileImportLogger(config, logWriterFactory));
178+
importManager.addListener(new SingleFileImportLogger(importLoggerConfig, logWriterFactory));
161179
}
162180
return importManager;
163181
}
@@ -236,6 +254,48 @@ private void validateLogDirectory(String logDirectory) throws ParameterException
236254
}
237255
}
238256

257+
/**
258+
* Validate transaction mode configuration by attempting to start and abort a transaction
259+
*
260+
* @param transactionFactory transaction factory to test
261+
* @throws ParameterException if transaction mode is incompatible with the configured transaction
262+
* manager
263+
*/
264+
void validateTransactionMode(TransactionFactory transactionFactory) {
265+
// Only validate when in TRANSACTION mode
266+
if (scalarDbMode != ScalarDbMode.TRANSACTION) {
267+
return;
268+
}
269+
270+
DistributedTransaction transaction = null;
271+
try {
272+
// Try to start a read only transaction to verify the transaction manager is properly
273+
// configured
274+
transaction = transactionFactory.getTransactionManager().startReadOnly();
275+
} catch (UnsupportedOperationException e) {
276+
// Transaction mode is not supported by the configured transaction manager
277+
throw new ParameterException(
278+
spec.commandLine(),
279+
DataLoaderError.INVALID_TRANSACTION_MODE.buildMessage(e.getMessage()),
280+
e);
281+
} catch (Exception e) {
282+
// Other exceptions - configuration or runtime error
283+
throw new ParameterException(
284+
spec.commandLine(),
285+
DataLoaderError.TRANSACTION_MODE_VALIDATION_FAILED.buildMessage(e.getMessage()),
286+
e);
287+
} finally {
288+
// Ensure transaction is aborted
289+
if (transaction != null) {
290+
try {
291+
transaction.abort();
292+
} catch (Exception ignored) {
293+
// Ignore errors during cleanup
294+
}
295+
}
296+
}
297+
}
298+
239299
/**
240300
* Generate control file from a valid control file path
241301
*

data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,14 @@
33
import static org.junit.jupiter.api.Assertions.assertEquals;
44
import static org.junit.jupiter.api.Assertions.assertThrows;
55
import static org.junit.jupiter.api.Assertions.assertTrue;
6+
import static org.mockito.Mockito.mock;
7+
import static org.mockito.Mockito.when;
68

9+
import com.scalar.db.api.DistributedTransactionManager;
710
import com.scalar.db.dataloader.core.FileFormat;
11+
import com.scalar.db.dataloader.core.ScalarDbMode;
812
import com.scalar.db.dataloader.core.dataimport.ImportMode;
13+
import com.scalar.db.service.TransactionFactory;
914
import java.io.IOException;
1015
import java.nio.file.Files;
1116
import java.nio.file.Path;
@@ -279,4 +284,52 @@ void call_withoutMaxThreads_shouldDefaultToAvailableProcessors() {
279284
// Verify it was set to available processors
280285
assertEquals(Runtime.getRuntime().availableProcessors(), command.maxThreads);
281286
}
287+
288+
@Test
289+
void validateTransactionMode_withUnsupportedOperation_shouldThrowException() throws Exception {
290+
// Arrange - Mock TransactionFactory to throw UnsupportedOperationException
291+
TransactionFactory mockFactory = mock(TransactionFactory.class);
292+
DistributedTransactionManager mockManager = mock(DistributedTransactionManager.class);
293+
294+
when(mockFactory.getTransactionManager()).thenReturn(mockManager);
295+
when(mockManager.startReadOnly())
296+
.thenThrow(new UnsupportedOperationException("Transaction mode is not supported"));
297+
298+
ImportCommand command = new ImportCommand();
299+
CommandLine cmd = new CommandLine(command);
300+
command.spec = cmd.getCommandSpec();
301+
command.scalarDbMode = ScalarDbMode.TRANSACTION;
302+
303+
// Act & Assert
304+
CommandLine.ParameterException thrown =
305+
assertThrows(
306+
CommandLine.ParameterException.class,
307+
() -> command.validateTransactionMode(mockFactory));
308+
309+
assertTrue(thrown.getMessage().contains("TRANSACTION mode is not compatible"));
310+
}
311+
312+
@Test
313+
void validateTransactionMode_withOtherException_shouldThrowException() throws Exception {
314+
// Arrange - Mock TransactionFactory to throw a different exception
315+
TransactionFactory mockFactory = mock(TransactionFactory.class);
316+
DistributedTransactionManager mockManager = mock(DistributedTransactionManager.class);
317+
318+
when(mockFactory.getTransactionManager()).thenReturn(mockManager);
319+
when(mockManager.startReadOnly()).thenThrow(new RuntimeException("Connection failed"));
320+
321+
ImportCommand command = new ImportCommand();
322+
CommandLine cmd = new CommandLine(command);
323+
command.spec = cmd.getCommandSpec();
324+
command.scalarDbMode = ScalarDbMode.TRANSACTION;
325+
326+
// Act & Assert
327+
CommandLine.ParameterException thrown =
328+
assertThrows(
329+
CommandLine.ParameterException.class,
330+
() -> command.validateTransactionMode(mockFactory));
331+
332+
assertTrue(thrown.getMessage().contains("Failed to validate TRANSACTION mode"));
333+
assertTrue(thrown.getMessage().contains("Connection failed"));
334+
}
282335
}

data-loader/core/src/main/java/com/scalar/db/dataloader/core/DataLoaderError.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,14 @@ public enum DataLoaderError implements ScalarDbError {
216216
"Cannot specify both deprecated option '%s' and new option '%s'. Please use only '%s'",
217217
"",
218218
""),
219+
INVALID_TRANSACTION_MODE(
220+
Category.USER_ERROR,
221+
"0058",
222+
"TRANSACTION mode is not compatible with the current configuration. Please try with STORAGE mode or check your ScalarDB configuration. Details: %s",
223+
"",
224+
""),
225+
TRANSACTION_MODE_VALIDATION_FAILED(
226+
Category.USER_ERROR, "0059", "Failed to validate TRANSACTION mode. Details: %s", "", ""),
219227

220228
//
221229
// Errors for the internal error category

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,11 @@ public ExportReport startExport(
8080
ExecutorService executorService =
8181
Executors.newFixedThreadPool(exportOptions.getMaxThreadCount());
8282

83-
BufferedWriter bufferedWriter = new BufferedWriter(writer);
8483
boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON;
8584

8685
try (TransactionManagerCrudOperable.Scanner scanner =
87-
createScanner(exportOptions, dao, manager)) {
86+
createScanner(exportOptions, dao, manager);
87+
BufferedWriter bufferedWriter = new BufferedWriter(writer)) {
8888

8989
Iterator<Result> iterator = scanner.iterator();
9090
AtomicBoolean isFirstBatch = new AtomicBoolean(true);
@@ -106,18 +106,20 @@ public ExportReport startExport(
106106
executorService.shutdown();
107107
if (executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
108108
logger.info("All tasks completed");
109+
processFooter(exportOptions, tableMetadata, bufferedWriter);
109110
} else {
110111
logger.error("Timeout occurred while waiting for tasks to complete");
111112
// TODO: handle this
112113
}
113-
processFooter(exportOptions, tableMetadata, bufferedWriter);
114114
} catch (InterruptedException
115115
| IOException
116116
| UnknownTransactionStatusException
117117
| CrudException e) {
118118
logger.error("Error during export: ", e);
119119
} finally {
120-
bufferedWriter.flush();
120+
if (!executorService.isShutdown()) {
121+
executorService.shutdownNow();
122+
}
121123
}
122124
} catch (ExportOptionsValidationException | IOException | ScalarDbDaoException e) {
123125
logger.error("Error during export: {}", e.getMessage());

data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/ColumnUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ public static Column<?> createColumnFromValue(
8383
DataType dataType, ColumnInfo columnInfo, @Nullable String value)
8484
throws ColumnParsingException {
8585
String columnName = columnInfo.getColumnName();
86+
if (value != null && !dataType.equals(DataType.TEXT) && value.equalsIgnoreCase("null")) {
87+
value = null;
88+
}
8689
try {
8790
switch (dataType) {
8891
case BOOLEAN:

data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/ColumnUtilsTest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,4 +228,85 @@ void getColumnsFromResult_withResultNull_withValidData_shouldReturnColumns()
228228
null, sourceRecord, false, mockMetadata, "namespace", "table");
229229
assertEquals(8, columns.size());
230230
}
231+
232+
/**
233+
* Tests that the string "null" (lowercase) is correctly treated as null for numeric, boolean, and
234+
* date/time types.
235+
*/
236+
@Test
237+
void createColumnFromValue_valueIsLowercaseNull_shouldReturnNullColumn()
238+
throws ColumnParsingException {
239+
String columnName = "testColumn";
240+
ColumnInfo columnInfo = ColumnInfo.builder().columnName(columnName).build();
241+
242+
// Integer type
243+
Column<?> intColumn = ColumnUtils.createColumnFromValue(DataType.INT, columnInfo, "null");
244+
assertEquals(IntColumn.ofNull(columnName), intColumn);
245+
intColumn = ColumnUtils.createColumnFromValue(DataType.INT, columnInfo, "Null");
246+
assertEquals(IntColumn.ofNull(columnName), intColumn);
247+
248+
// Double type
249+
Column<?> doubleColumn = ColumnUtils.createColumnFromValue(DataType.DOUBLE, columnInfo, "null");
250+
assertEquals(DoubleColumn.ofNull(columnName), doubleColumn);
251+
doubleColumn = ColumnUtils.createColumnFromValue(DataType.DOUBLE, columnInfo, "NULL");
252+
assertEquals(DoubleColumn.ofNull(columnName), doubleColumn);
253+
254+
// Boolean type
255+
Column<?> boolColumn = ColumnUtils.createColumnFromValue(DataType.BOOLEAN, columnInfo, "null");
256+
assertEquals(BooleanColumn.ofNull(columnName), boolColumn);
257+
258+
boolColumn = ColumnUtils.createColumnFromValue(DataType.BOOLEAN, columnInfo, "nuLL");
259+
assertEquals(BooleanColumn.ofNull(columnName), boolColumn);
260+
261+
// Date type
262+
Column<?> dateColumn = ColumnUtils.createColumnFromValue(DataType.DATE, columnInfo, "null");
263+
assertEquals(DateColumn.ofNull(columnName), dateColumn);
264+
dateColumn = ColumnUtils.createColumnFromValue(DataType.DATE, columnInfo, "NULL");
265+
assertEquals(DateColumn.ofNull(columnName), dateColumn);
266+
267+
// Time type
268+
Column<?> timeColumn = ColumnUtils.createColumnFromValue(DataType.TIME, columnInfo, "null");
269+
assertEquals(TimeColumn.ofNull(columnName), timeColumn);
270+
271+
timeColumn = ColumnUtils.createColumnFromValue(DataType.TIME, columnInfo, "nuLL");
272+
assertEquals(TimeColumn.ofNull(columnName), timeColumn);
273+
274+
// Timestamp type
275+
Column<?> timestampColumn =
276+
ColumnUtils.createColumnFromValue(DataType.TIMESTAMP, columnInfo, "null");
277+
assertEquals(TimestampColumn.ofNull(columnName), timestampColumn);
278+
timestampColumn = ColumnUtils.createColumnFromValue(DataType.TIMESTAMP, columnInfo, "NULL");
279+
assertEquals(TimestampColumn.ofNull(columnName), timestampColumn);
280+
281+
// Timestamp type
282+
Column<?> timestamprtzColumn =
283+
ColumnUtils.createColumnFromValue(DataType.TIMESTAMPTZ, columnInfo, "null");
284+
assertEquals(TimestampTZColumn.ofNull(columnName), timestamprtzColumn);
285+
timestamprtzColumn =
286+
ColumnUtils.createColumnFromValue(DataType.TIMESTAMPTZ, columnInfo, "Null");
287+
assertEquals(TimestampTZColumn.ofNull(columnName), timestamprtzColumn);
288+
}
289+
290+
/**
291+
* Tests that when the string value "null" is provided for TEXT columns, it is treated as a
292+
* literal string and not converted to null.
293+
*/
294+
@Test
295+
void createColumnFromValue_valueIsNullString_shouldRemainLiteralForTextType()
296+
throws ColumnParsingException {
297+
String columnName = "textColumn";
298+
ColumnInfo columnInfo = ColumnInfo.builder().columnName(columnName).build();
299+
300+
Column<?> textCol = ColumnUtils.createColumnFromValue(DataType.TEXT, columnInfo, "null");
301+
assertEquals(TextColumn.of(columnName, "null"), textCol);
302+
303+
textCol = ColumnUtils.createColumnFromValue(DataType.TEXT, columnInfo, "NULL");
304+
assertEquals(TextColumn.of(columnName, "NULL"), textCol);
305+
306+
textCol = ColumnUtils.createColumnFromValue(DataType.TEXT, columnInfo, "Null");
307+
assertEquals(TextColumn.of(columnName, "Null"), textCol);
308+
309+
textCol = ColumnUtils.createColumnFromValue(DataType.TEXT, columnInfo, "nuLL");
310+
assertEquals(TextColumn.of(columnName, "nuLL"), textCol);
311+
}
231312
}

0 commit comments

Comments
 (0)