diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index 28942a31f7..15337bd170 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -802,6 +802,26 @@ public enum CoreError implements ScalarDbError { "Duplicated data mappings found for column '%s' in table '%s'", "", ""), + DATA_LOADER_MISSING_CLUSTERING_KEY_COLUMN( + Category.USER_ERROR, + "0175", + "Missing required field or column mapping for clustering key %s", + "", + ""), + DATA_LOADER_MISSING_PARTITION_KEY_COLUMN( + Category.USER_ERROR, + "0176", + "Missing required field or column mapping for partition key %s", + "", + ""), + DATA_LOADER_MISSING_COLUMN( + Category.USER_ERROR, "0177", "Missing field or column mapping for %s", "", ""), + DATA_LOADER_MISSING_SOURCE_FIELD( + Category.USER_ERROR, + "0178", + "The data mapping source field '%s' for table '%s' is missing in the json data record", + "", + ""), // // Errors for the concurrency error category diff --git a/data-loader/build.gradle b/data-loader/build.gradle index 87a057933b..836151e924 100644 --- a/data-loader/build.gradle +++ b/data-loader/build.gradle @@ -17,6 +17,7 @@ subprojects { implementation("org.apache.commons:commons-lang3:${commonsLangVersion}") implementation("commons-io:commons-io:${commonsIoVersion}") implementation("org.slf4j:slf4j-simple:${slf4jVersion}") + implementation("software.amazon.awssdk:s3:2.25.31") // Mockito testImplementation "org.mockito:mockito-core:${mockitoVersion}" diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/DataLoaderObjectMapper.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/DataLoaderObjectMapper.java new file mode 100644 index 0000000000..d90fd49b65 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/DataLoaderObjectMapper.java @@ -0,0 +1,14 @@ +package com.scalar.db.dataloader.core; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +public class DataLoaderObjectMapper extends ObjectMapper { + + public DataLoaderObjectMapper() { + super(); + this.setSerializationInclusion(JsonInclude.Include.NON_NULL); + this.registerModule(new JavaTimeModule()); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportEventListener.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportEventListener.java new file mode 100644 index 0000000000..10157569b4 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportEventListener.java @@ -0,0 +1,23 @@ +package com.scalar.db.dataloader.core.dataimport; + +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; + +public interface ImportEventListener { + + void onDataChunkStarted(ImportDataChunkStatus status); + + void addOrUpdateDataChunkStatus(ImportDataChunkStatus status); + + void onDataChunkCompleted(ImportDataChunkStatus status); + + void onAllDataChunksCompleted(); + + void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus); + + void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult); + + void onTaskComplete(ImportTaskResult taskResult); +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java new file mode 100644 index 0000000000..cbc0fc02a2 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java @@ -0,0 +1,141 @@ +package com.scalar.db.dataloader.core.dataimport; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.ScalarDBMode; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessor; +import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory; +import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorParams; +import com.scalar.db.dataloader.core.dataimport.processor.TableColumnDataTypes; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; +import java.io.BufferedReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.NonNull; + +@AllArgsConstructor +public class ImportManager implements ImportEventListener { + + @NonNull private final Map tableMetadata; + @NonNull private final BufferedReader importFileReader; + @NonNull private final ImportOptions importOptions; + private final ImportProcessorFactory importProcessorFactory; + private final List listeners = new ArrayList<>(); + private final ScalarDBMode scalarDBMode; + private final DistributedStorage distributedStorage; + private final DistributedTransactionManager distributedTransactionManager; + private final List importDataChunkStatusList = new ArrayList<>(); + + /** + * * Start the import process + * + * @return list of import data chunk status objects + */ + public List startImport() { + ImportProcessorParams params = + ImportProcessorParams.builder() + .scalarDBMode(scalarDBMode) + .importOptions(importOptions) + .tableMetadataByTableName(tableMetadata) + .dao(new ScalarDBDao()) + .distributedTransactionManager(distributedTransactionManager) + .distributedStorage(distributedStorage) + .tableColumnDataTypes(getTableColumnDataTypes()) + .build(); + ImportProcessor processor = importProcessorFactory.createImportProcessor(params); + processor.addListener(this); + // If the data chunk size is 0, then process the entire file in a single data chunk + int dataChunkSize = + importOptions.getDataChunkSize() == 0 + ? Integer.MAX_VALUE + : importOptions.getDataChunkSize(); + return processor.process( + dataChunkSize, importOptions.getTransactionBatchSize(), importFileReader); + } + + public void addListener(ImportEventListener listener) { + listeners.add(listener); + } + + public void removeListener(ImportEventListener listener) { + listeners.remove(listener); + } + + @Override + public void onDataChunkStarted(ImportDataChunkStatus status) { + for (ImportEventListener listener : listeners) { + listener.onDataChunkStarted(status); + } + } + + @Override + public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) { + synchronized (importDataChunkStatusList) { + for (int i = 0; i < importDataChunkStatusList.size(); i++) { + if (importDataChunkStatusList.get(i).getDataChunkId() == status.getDataChunkId()) { + // Object found, replace it with the new one + importDataChunkStatusList.set(i, status); + return; + } + } + // If object is not found, add it to the list + importDataChunkStatusList.add(status); + } + } + + @Override + public void onDataChunkCompleted(ImportDataChunkStatus status) { + for (ImportEventListener listener : listeners) { + listener.onDataChunkCompleted(status); + } + } + + @Override + public void onTransactionBatchStarted(ImportTransactionBatchStatus status) { + for (ImportEventListener listener : listeners) { + listener.onTransactionBatchStarted(status); + } + } + + @Override + public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) { + for (ImportEventListener listener : listeners) { + listener.onTransactionBatchCompleted(batchResult); + } + } + + @Override + public void onTaskComplete(ImportTaskResult taskResult) { + for (ImportEventListener listener : listeners) { + listener.onTaskComplete(taskResult); + } + } + + @Override + public void onAllDataChunksCompleted() { + for (ImportEventListener listener : listeners) { + listener.onAllDataChunksCompleted(); + } + } + + public List getImportDataChunkStatusList() { + return importDataChunkStatusList; + } + + public TableColumnDataTypes getTableColumnDataTypes() { + TableColumnDataTypes tableColumnDataTypes = new TableColumnDataTypes(); + tableMetadata.forEach( + (name, metadata) -> + metadata + .getColumnDataTypes() + .forEach((k, v) -> tableColumnDataTypes.addColumnDataType(name, k, v))); + return tableColumnDataTypes; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java new file mode 100644 index 0000000000..9cb6225d30 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java @@ -0,0 +1,38 @@ +package com.scalar.db.dataloader.core.dataimport; + +import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel; +import com.scalar.db.dataloader.core.dataimport.log.LogMode; +import lombok.Builder; +import lombok.Data; + +/** Import options to import data into one or more ScalarDB tables */ +@Builder +@Data +public class ImportOptions { + + @Builder.Default private final ImportMode importMode = ImportMode.UPSERT; + @Builder.Default private final boolean requireAllColumns = false; + @Builder.Default private final FileFormat fileFormat = FileFormat.JSON; + @Builder.Default private final boolean prettyPrint = false; + @Builder.Default private final boolean ignoreNullValues = false; + @Builder.Default private final LogMode logMode = LogMode.SPLIT_BY_DATA_CHUNK; + + @Builder.Default + private final ControlFileValidationLevel controlFileValidationLevel = + ControlFileValidationLevel.MAPPED; + + @Builder.Default private final char delimiter = ','; + + @Builder.Default private final boolean logSuccessRecords = false; + @Builder.Default private final boolean logRawRecord = false; + + private final int dataChunkSize; + private final int transactionBatchSize; + private final ControlFile controlFile; + private final String namespace; + private final String tableName; + private final int maxThreads; + private final String customHeaderRow; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/AbstractImportLogger.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/AbstractImportLogger.java new file mode 100644 index 0000000000..11a7493ca9 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/AbstractImportLogger.java @@ -0,0 +1,169 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.dataloader.core.Constants; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.ImportEventListener; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriter; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public abstract class AbstractImportLogger implements ImportEventListener { + + protected static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); + + protected final ImportLoggerConfig config; + protected final LogWriterFactory logWriterFactory; + protected final List listeners = new ArrayList<>(); + + public void addListener(ImportEventListener listener) { + listeners.add(listener); + } + + public void removeListener(ImportEventListener listener) { + listeners.remove(listener); + } + + @Override + public void onDataChunkStarted(ImportDataChunkStatus importDataChunkStatus) { + // Currently we are not logging the start of a data chunk + } + + @Override + public void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) { + // Currently we are not logging the start of a transaction batch + notifyTransactionBatchStarted(batchStatus); + } + + @Override + public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) { + // skip logging success records if the configuration is set to skip + if (shouldSkipLoggingSuccess(batchResult)) { + return; + } + + logTransactionBatch(batchResult); + notifyTransactionBatchCompleted(batchResult); + } + + @Override + public void onTaskComplete(ImportTaskResult taskResult) { + // TODO: we can remove this event if it's current not being used in the import Manager as well + } + + protected abstract void logTransactionBatch(ImportTransactionBatchResult batchResult); + + protected boolean shouldSkipLoggingSuccess(ImportTransactionBatchResult batchResult) { + return batchResult.isSuccess() && !config.isLogSuccessRecords(); + } + + protected JsonNode createFilteredTransactionBatchLogJsonNode( + ImportTransactionBatchResult batchResult) { + + // If the batch result does not contain any records, return the batch result as is + if (batchResult.getRecords() == null) { + return OBJECT_MAPPER.valueToTree(batchResult); + } + + // Create a new list to store the modified import task results + List modifiedRecords = new ArrayList<>(); + + // Loop over the records in the batchResult + for (ImportTaskResult taskResult : batchResult.getRecords()) { + // Create a new ImportTaskResult and not add the raw record yet + List targetResults = + batchResult.isSuccess() + ? taskResult.getTargets() + : updateTargetStatusForAbortedTransactionBatch(taskResult.getTargets()); + ImportTaskResult.ImportTaskResultBuilder builder = + ImportTaskResult.builder() + .rowNumber(taskResult.getRowNumber()) + .targets(targetResults) + .dataChunkId(taskResult.getDataChunkId()) + .rowNumber(taskResult.getRowNumber()); + + // Only add the raw record if the configuration is set to log raw source data + if (config.isLogRawSourceRecords()) { + builder.rawRecord(taskResult.getRawRecord()); + } + ImportTaskResult modifiedTaskResult = builder.build(); + + // Add the modified task result to the list + modifiedRecords.add(modifiedTaskResult); + } + + // Create a new transaction batch result with the modified import task results + ImportTransactionBatchResult modifiedBatchResult = + ImportTransactionBatchResult.builder() + .dataChunkId(batchResult.getDataChunkId()) + .transactionBatchId(batchResult.getTransactionBatchId()) + .transactionId(batchResult.getTransactionId()) + .records(modifiedRecords) + .errors(batchResult.getErrors()) + .success(batchResult.isSuccess()) + .build(); + + // Convert the modified batch result to a JsonNode + return OBJECT_MAPPER.valueToTree(modifiedBatchResult); + } + + protected void closeLogWriter(LogWriter logWriter) { + if (logWriter != null) { + try { + logWriter.close(); + } catch (IOException e) { + logError("Failed to close a log writer", e); + } + } + } + + protected abstract void logError(String errorMessage, Exception e); + + protected LogWriter createLogWriter(String logFilePath) throws IOException { + return logWriterFactory.createLogWriter(logFilePath); + } + + private void notifyTransactionBatchStarted(ImportTransactionBatchStatus status) { + for (ImportEventListener listener : listeners) { + listener.onTransactionBatchStarted(status); + } + } + + private void notifyTransactionBatchCompleted(ImportTransactionBatchResult batchResult) { + for (ImportEventListener listener : listeners) { + listener.onTransactionBatchCompleted(batchResult); + } + } + + private List updateTargetStatusForAbortedTransactionBatch( + List targetResults) { + for (int i = 0; i < targetResults.size(); i++) { + ImportTargetResult target = targetResults.get(i); + if (target.getStatus().equals(ImportTargetResultStatus.SAVED)) { + ImportTargetResult newTarget = + ImportTargetResult.builder() + .importAction(target.getImportAction()) + .status(ImportTargetResultStatus.ABORTED) + .importedRecord(target.getImportedRecord()) + .namespace(target.getNamespace()) + .tableName(target.getTableName()) + .dataMapped(target.isDataMapped()) + .errors(Collections.singletonList(Constants.ABORT_TRANSACTION_STATUS)) + .build(); + targetResults.set(i, newTarget); + } + } + return targetResults; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerConfig.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerConfig.java new file mode 100644 index 0000000000..fc0039bf90 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerConfig.java @@ -0,0 +1,13 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import lombok.Builder; +import lombok.Value; + +@Value +@Builder +public class ImportLoggerConfig { + String logDirectoryPath; + boolean logSuccessRecords; + boolean logRawSourceRecords; + boolean prettyPrint; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerException.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerException.java new file mode 100644 index 0000000000..52424c9975 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerException.java @@ -0,0 +1,12 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +public class ImportLoggerException extends Exception { + + public ImportLoggerException(String message) { + super(message); + } + + public ImportLoggerException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogConstants.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogConstants.java new file mode 100644 index 0000000000..379896bbaf --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogConstants.java @@ -0,0 +1,3 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +public class LogConstants {} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogMode.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogMode.java new file mode 100644 index 0000000000..cf0349366c --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogMode.java @@ -0,0 +1,7 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +/** Log modes available for import logging */ +public enum LogMode { + SINGLE_FILE, + SPLIT_BY_DATA_CHUNK +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogStorageLocation.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogStorageLocation.java new file mode 100644 index 0000000000..396cb3d8e4 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogStorageLocation.java @@ -0,0 +1,7 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +/** The location where the logs are stored. */ +public enum LogStorageLocation { + LOCAL_FILE_STORAGE, + AWS_S3 +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java new file mode 100644 index 0000000000..fc70770761 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java @@ -0,0 +1,139 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriter; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SingleFileImportLogger extends AbstractImportLogger { + + protected static final String SUMMARY_LOG_FILE_NAME = "summary.log"; + protected static final String SUCCESS_LOG_FILE_NAME = "success.json"; + protected static final String FAILURE_LOG_FILE_NAME = "failure.json"; + private static final Logger LOGGER = LoggerFactory.getLogger(SingleFileImportLogger.class); + private LogWriter summaryLogWriter; + private LogWriter successLogWriter; + private LogWriter failureLogWriter; + + public SingleFileImportLogger(ImportLoggerConfig config, LogWriterFactory logWriterFactory) + throws IOException { + super(config, logWriterFactory); + successLogWriter = createLogWriter(config.getLogDirectoryPath() + SUCCESS_LOG_FILE_NAME); + failureLogWriter = createLogWriter(config.getLogDirectoryPath() + FAILURE_LOG_FILE_NAME); + } + + @Override + public void onTaskComplete(ImportTaskResult taskResult) { + if (!config.isLogSuccessRecords() && !config.isLogRawSourceRecords()) return; + try { + writeImportTaskResultDetailToLogs(taskResult); + } catch (Exception e) { + logError("Failed to write success/failure logs", e); + } + } + + @Override + public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {} + + @Override + public void onDataChunkCompleted(ImportDataChunkStatus dataChunkStatus) { + try { + logDataChunkSummary(dataChunkStatus); + } catch (IOException e) { + logError("Failed to log the data chunk summary", e); + } + } + + @Override + public void onAllDataChunksCompleted() { + closeAllLogWriters(); + } + + @Override + protected void logTransactionBatch(ImportTransactionBatchResult batchResult) { + try { + LogWriter logWriter = getLogWriterForTransactionBatch(batchResult); + JsonNode jsonNode = createFilteredTransactionBatchLogJsonNode(batchResult); + writeToLogWriter(logWriter, jsonNode); + } catch (IOException e) { + logError("Failed to write a transaction batch record to the log file", e); + } + } + + @Override + protected void logError(String errorMessage, Exception exception) { + LOGGER.error(errorMessage, exception); + } + + private void logDataChunkSummary(ImportDataChunkStatus dataChunkStatus) throws IOException { + if (summaryLogWriter == null) { + summaryLogWriter = createLogWriter(config.getLogDirectoryPath() + SUMMARY_LOG_FILE_NAME); + } + writeImportDataChunkSummary(dataChunkStatus, summaryLogWriter); + } + + private void writeImportDataChunkSummary( + ImportDataChunkStatus dataChunkStatus, LogWriter logWriter) throws IOException { + JsonNode jsonNode = OBJECT_MAPPER.valueToTree(dataChunkStatus); + writeToLogWriter(logWriter, jsonNode); + } + + private LogWriter getLogWriterForTransactionBatch(ImportTransactionBatchResult batchResult) + throws IOException { + String logFileName = batchResult.isSuccess() ? SUCCESS_LOG_FILE_NAME : FAILURE_LOG_FILE_NAME; + LogWriter logWriter = batchResult.isSuccess() ? successLogWriter : failureLogWriter; + if (logWriter == null) { + logWriter = createLogWriter(config.getLogDirectoryPath() + logFileName); + if (batchResult.isSuccess()) { + successLogWriter = logWriter; + } else { + failureLogWriter = logWriter; + } + } + return logWriter; + } + + private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult) + throws IOException { + JsonNode jsonNode; + for (ImportTargetResult target : importTaskResult.getTargets()) { + if (config.isLogSuccessRecords() + && target.getStatus().equals(ImportTargetResultStatus.SAVED)) { + synchronized (successLogWriter) { + jsonNode = OBJECT_MAPPER.valueToTree(target); + successLogWriter.write(jsonNode); + successLogWriter.flush(); + } + } + if (config.isLogRawSourceRecords() + && !target.getStatus().equals(ImportTargetResultStatus.SAVED)) { + synchronized (failureLogWriter) { + jsonNode = OBJECT_MAPPER.valueToTree(target); + failureLogWriter.write(jsonNode); + failureLogWriter.flush(); + } + } + } + } + + private void writeToLogWriter(LogWriter logWriter, JsonNode jsonNode) throws IOException { + logWriter.write(jsonNode); + logWriter.flush(); + } + + private void closeAllLogWriters() { + closeLogWriter(summaryLogWriter); + closeLogWriter(successLogWriter); + closeLogWriter(failureLogWriter); + summaryLogWriter = null; + successLogWriter = null; + failureLogWriter = null; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java new file mode 100644 index 0000000000..bec306ef9b --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java @@ -0,0 +1,184 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogFileType; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriter; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SplitByDataChunkImportLogger extends AbstractImportLogger { + + protected static final String SUMMARY_LOG_FILE_NAME_FORMAT = "data_chunk_%s_summary.json"; + protected static final String FAILURE_LOG_FILE_NAME_FORMAT = "data_chunk_%s_failure.json"; + protected static final String SUCCESS_LOG_FILE_NAME_FORMAT = "data_chunk_%s_success.json"; + + private static final Logger LOGGER = LoggerFactory.getLogger(SplitByDataChunkImportLogger.class); + private final Map summaryLogWriters = new HashMap<>(); + private final Map successLogWriters = new HashMap<>(); + private final Map failureLogWriters = new HashMap<>(); + + public SplitByDataChunkImportLogger( + ImportLoggerConfig config, LogWriterFactory logWriterFactory) { + super(config, logWriterFactory); + } + + @Override + public void onTaskComplete(ImportTaskResult taskResult) { + if (!config.isLogSuccessRecords() && !config.isLogRawSourceRecords()) return; + try { + writeImportTaskResultDetailToLogs(taskResult); + } catch (IOException e) { + LOGGER.error("Failed to write success/failure logs"); + } + } + + private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult) + throws IOException { + JsonNode jsonNode; + for (ImportTargetResult target : importTaskResult.getTargets()) { + if (config.isLogSuccessRecords() + && target.getStatus().equals(ImportTargetResultStatus.SAVED)) { + jsonNode = OBJECT_MAPPER.valueToTree(target); + synchronized (successLogWriters) { + LogWriter successLogWriter = + initializeLogWriterIfNeeded(LogFileType.SUCCESS, importTaskResult.getDataChunkId()); + successLogWriter.write(jsonNode); + successLogWriter.flush(); + } + } + if (config.isLogRawSourceRecords() + && !target.getStatus().equals(ImportTargetResultStatus.SAVED)) { + jsonNode = OBJECT_MAPPER.valueToTree(target); + synchronized (failureLogWriters) { + LogWriter failureLogWriter = + initializeLogWriterIfNeeded(LogFileType.FAILURE, importTaskResult.getDataChunkId()); + failureLogWriter.write(jsonNode); + failureLogWriter.flush(); + } + } + } + } + + @Override + public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {} + + @Override + public void onDataChunkCompleted(ImportDataChunkStatus dataChunkStatus) { + try { + logDataChunkSummary(dataChunkStatus); + // Close the split log writers per data chunk if they exist for this data chunk id + closeLogWritersForDataChunk(dataChunkStatus.getDataChunkId()); + } catch (IOException e) { + LOGGER.error("Failed to log the data chunk summary", e); + } + } + + @Override + public void onAllDataChunksCompleted() { + closeAllDataChunkLogWriters(); + } + + @Override + protected void logTransactionBatch(ImportTransactionBatchResult batchResult) { + LogFileType logFileType = batchResult.isSuccess() ? LogFileType.SUCCESS : LogFileType.FAILURE; + try (LogWriter logWriter = + initializeLogWriterIfNeeded(logFileType, batchResult.getDataChunkId())) { + JsonNode jsonNode = createFilteredTransactionBatchLogJsonNode(batchResult); + synchronized (logWriter) { + logWriter.write(jsonNode); + logWriter.flush(); + } + } catch (IOException e) { + LOGGER.error("Failed to write a transaction batch record to a split mode log file", e); + } + } + + @Override + protected void logError(String errorMessage, Exception exception) { + LOGGER.error(errorMessage, exception); + } + + private void logDataChunkSummary(ImportDataChunkStatus dataChunkStatus) throws IOException { + try (LogWriter logWriter = + initializeLogWriterIfNeeded(LogFileType.SUMMARY, dataChunkStatus.getDataChunkId())) { + logWriter.write(OBJECT_MAPPER.valueToTree(dataChunkStatus)); + logWriter.flush(); + } + } + + private void closeLogWritersForDataChunk(int dataChunkId) { + closeLogWriter(successLogWriters.remove(dataChunkId)); + closeLogWriter(failureLogWriters.remove(dataChunkId)); + closeLogWriter(summaryLogWriters.remove(dataChunkId)); + } + + private void closeAllDataChunkLogWriters() { + summaryLogWriters.values().forEach(this::closeLogWriter); + successLogWriters.values().forEach(this::closeLogWriter); + failureLogWriters.values().forEach(this::closeLogWriter); + summaryLogWriters.clear(); + successLogWriters.clear(); + failureLogWriters.clear(); + } + + private String getLogFilePath(long batchId, LogFileType logFileType) { + String logfilePath; + switch (logFileType) { + case SUCCESS: + logfilePath = + config.getLogDirectoryPath() + String.format(SUCCESS_LOG_FILE_NAME_FORMAT, batchId); + break; + case FAILURE: + logfilePath = + config.getLogDirectoryPath() + String.format(FAILURE_LOG_FILE_NAME_FORMAT, batchId); + break; + case SUMMARY: + logfilePath = + config.getLogDirectoryPath() + String.format(SUMMARY_LOG_FILE_NAME_FORMAT, batchId); + break; + default: + logfilePath = ""; + } + return logfilePath; + } + + private LogWriter initializeLogWriterIfNeeded(LogFileType logFileType, int dataChunkId) + throws IOException { + Map logWriters = getLogWriters(logFileType); + if (!logWriters.containsKey(dataChunkId)) { + LogWriter logWriter = createLogWriter(logFileType, dataChunkId); + logWriters.put(dataChunkId, logWriter); + } + return logWriters.get(dataChunkId); + } + + private LogWriter createLogWriter(LogFileType logFileType, int dataChunkId) throws IOException { + String logFilePath = getLogFilePath(dataChunkId, logFileType); + return createLogWriter(logFilePath); + } + + private Map getLogWriters(LogFileType logFileType) { + Map logWriterMap = null; + switch (logFileType) { + case SUCCESS: + logWriterMap = successLogWriters; + break; + case FAILURE: + logWriterMap = failureLogWriters; + break; + case SUMMARY: + logWriterMap = summaryLogWriters; + break; + } + return logWriterMap; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/AwsS3LogWriter.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/AwsS3LogWriter.java new file mode 100644 index 0000000000..c11fab0b23 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/AwsS3LogWriter.java @@ -0,0 +1,29 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import lombok.AllArgsConstructor; +import software.amazon.awssdk.services.s3.S3AsyncClient; + +@AllArgsConstructor +public class AwsS3LogWriter implements LogWriter { + + private final S3AsyncClient s3AsyncClient; + private final String bucketName; + private final String objectKey; + + @Override + public void write(JsonNode sourceRecord) throws IOException { + // Implementation to write content to cloud storage + } + + @Override + public void flush() throws IOException { + // Implementation to flush content to cloud storage + } + + @Override + public void close() throws IOException { + // Implementation to close the cloud storage connection + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactory.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactory.java new file mode 100644 index 0000000000..5ced1e804d --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactory.java @@ -0,0 +1,36 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig; +import java.io.IOException; +import lombok.AllArgsConstructor; + +/** A factory class to create log writers. */ +@AllArgsConstructor +public class DefaultLogWriterFactory implements LogWriterFactory { + + private final LogWriterFactoryConfig config; + private final ImportLoggerConfig importLoggerConfig; + + /** + * Creates a log writer based on the configuration. + * + * @param logFilePath the path of the log file + * @return the log writer + */ + @Override + public LogWriter createLogWriter(String logFilePath) throws IOException { + LogWriter logWriter = null; + switch (config.getLogStorageLocation()) { + case LOCAL_FILE_STORAGE: + logWriter = new LocalFileLogWriter(logFilePath, importLoggerConfig); + break; + case AWS_S3: + logWriter = + new AwsS3LogWriter( + config.getS3AsyncClient(), config.getBucketName(), config.getObjectKey()); + break; + } + ; + return logWriter; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LocalFileLogWriter.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LocalFileLogWriter.java new file mode 100644 index 0000000000..b29395e8ec --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LocalFileLogWriter.java @@ -0,0 +1,63 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; + +public class LocalFileLogWriter implements LogWriter { + private final JsonGenerator logWriter; + private final DataLoaderObjectMapper objectMapper; + + /** + * Creates an instance of LocalFileLogWriter with the specified file path and log file type. + * + * @param filePath the file path + * @throws IOException if an I/O error occurs + */ + public LocalFileLogWriter(String filePath, ImportLoggerConfig importLoggerConfig) + throws IOException { + Path path = Paths.get(filePath); + this.objectMapper = new DataLoaderObjectMapper(); + this.logWriter = + objectMapper + .getFactory() + .createGenerator( + Files.newBufferedWriter( + path, StandardOpenOption.CREATE, StandardOpenOption.APPEND)); + // Start the JSON array + if (importLoggerConfig.isPrettyPrint()) this.logWriter.useDefaultPrettyPrinter(); + this.logWriter.writeStartArray(); + this.logWriter.flush(); + } + + @Override + public void write(JsonNode sourceRecord) throws IOException { + if (sourceRecord == null) { + return; + } + synchronized (logWriter) { + objectMapper.writeValue(logWriter, sourceRecord); + } + } + + @Override + public void flush() throws IOException { + logWriter.flush(); + } + + @Override + public void close() throws IOException { + if (logWriter.isClosed()) { + return; + } + logWriter.writeEndArray(); + logWriter.flush(); + logWriter.close(); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogFileType.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogFileType.java new file mode 100644 index 0000000000..5483aefc91 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogFileType.java @@ -0,0 +1,8 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +/** The type of the log writer. */ +public enum LogFileType { + SUCCESS, + FAILURE, + SUMMARY +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriter.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriter.java new file mode 100644 index 0000000000..f10917901f --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriter.java @@ -0,0 +1,14 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; + +public interface LogWriter extends AutoCloseable { + + void write(JsonNode sourceRecord) throws IOException; + + void flush() throws IOException; + + @Override + void close() throws IOException; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactory.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactory.java new file mode 100644 index 0000000000..b3c4dfc080 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactory.java @@ -0,0 +1,8 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import java.io.IOException; + +public interface LogWriterFactory { + + LogWriter createLogWriter(String logFilePath) throws IOException; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactoryConfig.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactoryConfig.java new file mode 100644 index 0000000000..901d0aae6f --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactoryConfig.java @@ -0,0 +1,15 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import com.scalar.db.dataloader.core.dataimport.log.LogStorageLocation; +import lombok.Builder; +import lombok.Value; +import software.amazon.awssdk.services.s3.S3AsyncClient; + +@Builder +@Value +public class LogWriterFactoryConfig { + LogStorageLocation logStorageLocation; + S3AsyncClient s3AsyncClient; + String bucketName; + String objectKey; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java new file mode 100644 index 0000000000..01f1dbcf10 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java @@ -0,0 +1,141 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow; +import java.io.BufferedReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +public class CsvImportProcessor extends ImportProcessor { + private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); + private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0); + + public CsvImportProcessor(ImportProcessorParams params) { + super(params); + } + + /** + * Process the data from the import file + * + * @param dataChunkSize size of data chunk + * @param transactionBatchSize size of transaction batch + * @param reader reader which reads the source file + * @return process data chunk status list + */ + @Override + public List process( + int dataChunkSize, int transactionBatchSize, BufferedReader reader) { + int numCores = Runtime.getRuntime().availableProcessors(); + ExecutorService dataChunkExecutor = Executors.newFixedThreadPool(numCores); + // Create a queue to hold data batches + Queue dataChunkQueue = new LinkedList<>(); + Thread readerThread = + new Thread( + () -> { + try { + String header = params.getImportOptions().getCustomHeaderRow(); + String delimiter = Character.toString(params.getImportOptions().getDelimiter()); + if (delimiter.trim().isEmpty()) { + delimiter = ","; + } + if (header == null) { + header = reader.readLine(); + } + String[] headerArray = header.split(delimiter); + String line; + int rowNumber = 1; + List currentDataChunk = new ArrayList<>(); + while ((line = reader.readLine()) != null) { + String[] dataArray = line.split(delimiter); + if (headerArray.length != dataArray.length) { + // Throw a custom exception for related issue + throw new RuntimeException(); + } + JsonNode jsonNode = combineHeaderAndData(headerArray, dataArray); + if (jsonNode == null || jsonNode.isEmpty()) { + continue; + } + + ImportRow importRow = new ImportRow(rowNumber, jsonNode); + currentDataChunk.add(importRow); + // If the data chunk is full, add it to the queue + if (currentDataChunk.size() == dataChunkSize) { + int dataChunkId = dataChunkIdCounter.getAndIncrement(); + ImportDataChunk importDataChunk = + ImportDataChunk.builder() + .dataChunkId(dataChunkId) + .sourceData(currentDataChunk) + .build(); + dataChunkQueue.offer(importDataChunk); + currentDataChunk = new ArrayList<>(); + } + rowNumber++; + } + + // Add the last data chunk to the queue + if (!currentDataChunk.isEmpty()) { + int dataChunkId = dataChunkIdCounter.getAndIncrement(); + ImportDataChunk importDataChunk = + ImportDataChunk.builder() + .dataChunkId(dataChunkId) + .sourceData(currentDataChunk) + .build(); + dataChunkQueue.offer(importDataChunk); + } + + } catch (IOException e) { + throw new RuntimeException(); + } + }); + + readerThread.start(); + try { + // Wait for readerThread to finish + readerThread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + // Process data chunks in parallel + List> dataChunkFutures = new ArrayList<>(); + while (!dataChunkQueue.isEmpty()) { + ImportDataChunk dataChunk = dataChunkQueue.poll(); + Future dataChunkFuture = + dataChunkExecutor.submit( + () -> processDataChunk(dataChunk, transactionBatchSize, numCores)); + dataChunkFutures.add(dataChunkFuture); + } + + List importDataChunkStatusList = new ArrayList<>(); + // Wait for all data chunk threads to complete + for (Future dataChunkFuture : dataChunkFutures) { + try { + importDataChunkStatusList.add((ImportDataChunkStatus) dataChunkFuture.get()); + } catch (Exception e) { + // TODO: handle the exception + e.printStackTrace(); + } + } + dataChunkExecutor.shutdown(); + notifyAllDataChunksCompleted(); + return importDataChunkStatusList; + } + + private JsonNode combineHeaderAndData(String[] header, String[] data) { + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + for (int i = 0; i < header.length; i++) { + objectNode.put(header[i], data[i]); + } + return objectNode; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactory.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactory.java new file mode 100644 index 0000000000..30c1c26085 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactory.java @@ -0,0 +1,29 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +public class DefaultImportProcessorFactory implements ImportProcessorFactory { + + /** + * Create import processor object based in file format in import params + * + * @param params import processor params objects + * @return generated import processor object + */ + @Override + public ImportProcessor createImportProcessor(ImportProcessorParams params) { + ImportProcessor importProcessor; + switch (params.getImportOptions().getFileFormat()) { + case JSONL: + importProcessor = new JsonLinesImportProcessor(params); + break; + case JSON: + importProcessor = new JsonImportProcessor(params); + break; + case CSV: + importProcessor = new CsvImportProcessor(params); + break; + default: + importProcessor = null; + } + return importProcessor; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java new file mode 100644 index 0000000000..2d16b9d189 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java @@ -0,0 +1,414 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.dataloader.core.ScalarDBMode; +import com.scalar.db.dataloader.core.dataimport.ImportEventListener; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatusState; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow; +import com.scalar.db.dataloader.core.dataimport.task.ImportStorageTask; +import com.scalar.db.dataloader.core.dataimport.task.ImportTaskParams; +import com.scalar.db.dataloader.core.dataimport.task.ImportTransactionalTask; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatch; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; +import com.scalar.db.exception.transaction.TransactionException; +import java.io.BufferedReader; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.RequiredArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RequiredArgsConstructor +public abstract class ImportProcessor { + + final ImportProcessorParams params; + private static final Logger LOGGER = LoggerFactory.getLogger(ImportProcessor.class); + private final List listeners = new ArrayList<>(); + + /** + * * Process the source data from import file + * + * @param dataChunkSize size of data chunk + * @param transactionBatchSize size of transaction batch + * @param reader reader which reads the source file + * @return list of import data chunk status objects + */ + public List process( + int dataChunkSize, int transactionBatchSize, BufferedReader reader) { + return Collections.emptyList(); + } + + /** + * Add import event listener to listener list + * + * @param listener import event listener + */ + public void addListener(ImportEventListener listener) { + listeners.add(listener); + } + + /** + * Remove import event listener from listener list + * + * @param listener import event listener + */ + public void removeListener(ImportEventListener listener) { + listeners.remove(listener); + } + + /** + * Notify once the task is completed + * + * @param result task result object + */ + protected void notifyStorageRecordCompleted(ImportTaskResult result) { + // Add data to summary, success logs with/without raw data + for (ImportEventListener listener : listeners) { + listener.onTaskComplete(result); + } + } + + /** + * Notify once the data chunk process is started + * + * @param status data chunk status object + */ + protected void notifyDataChunkStarted(ImportDataChunkStatus status) { + for (ImportEventListener listener : listeners) { + listener.onDataChunkStarted(status); + listener.addOrUpdateDataChunkStatus(status); + } + } + + /** + * Notify once the data chunk process is completed + * + * @param status data chunk status object + */ + protected void notifyDataChunkCompleted(ImportDataChunkStatus status) { + for (ImportEventListener listener : listeners) { + listener.onDataChunkCompleted(status); + listener.addOrUpdateDataChunkStatus(status); + } + } + + /** + * Notify once the import transaction batch is started + * + * @param batchStatus import transaction batch status object + */ + protected void notifyTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) { + for (ImportEventListener listener : listeners) { + listener.onTransactionBatchStarted(batchStatus); + } + } + + /** + * Notify once the import transaction batch is completed + * + * @param batchResult import transaction batch result object + */ + protected void notifyTransactionBatchCompleted(ImportTransactionBatchResult batchResult) { + for (ImportEventListener listener : listeners) { + listener.onTransactionBatchCompleted(batchResult); + } + } + + /** Notify when all data chunks processes are completed */ + protected void notifyAllDataChunksCompleted() { + for (ImportEventListener listener : listeners) { + listener.onAllDataChunksCompleted(); + } + } + + /** + * Split the data chunk into transaction batches + * + * @param dataChunk data chunk object + * @param batchSize batch size + * @return created list of transaction batches + */ + private List splitIntoTransactionBatches( + ImportDataChunk dataChunk, int batchSize) { + List transactionBatches = new ArrayList<>(); + AtomicInteger transactionBatchIdCounter = new AtomicInteger(0); + + List importRows = dataChunk.getSourceData(); + for (int i = 0; i < importRows.size(); i += batchSize) { + int endIndex = Math.min(i + batchSize, importRows.size()); + List transactionBatchData = importRows.subList(i, endIndex); + int transactionBatchId = transactionBatchIdCounter.getAndIncrement(); + ImportTransactionBatch transactionBatch = + ImportTransactionBatch.builder() + .transactionBatchId(transactionBatchId) + .sourceData(transactionBatchData) + .build(); + transactionBatches.add(transactionBatch); + } + return transactionBatches; + } + + /** + * To process a transaction batch and return the result + * + * @param dataChunk data chunk object + * @param transactionBatch transaction batch object + * @return processed transaction batch result + */ + private ImportTransactionBatchResult processTransactionBatch( + ImportDataChunk dataChunk, ImportTransactionBatch transactionBatch) { + ImportTransactionBatchStatus status = + ImportTransactionBatchStatus.builder() + .dataChunkId(dataChunk.getDataChunkId()) + .transactionBatchId(transactionBatch.getTransactionBatchId()) + .build(); + notifyTransactionBatchStarted(status); + List importRecordResult = new ArrayList<>(); + boolean isSuccess; + String error = ""; + try { + // Create the ScalarDB transaction + DistributedTransaction transaction = params.getDistributedTransactionManager().start(); + + // Loop over the transaction batch and process each record + for (ImportRow importRow : transactionBatch.getSourceData()) { + ImportTaskParams taskParams = + ImportTaskParams.builder() + .sourceRecord(importRow.getSourceData()) + .dataChunkId(dataChunk.getDataChunkId()) + .rowNumber(importRow.getRowNumber()) + .importOptions(params.getImportOptions()) + .tableColumnDataTypes(params.getTableColumnDataTypes()) + .tableMetadataByTableName(params.getTableMetadataByTableName()) + .dao(params.getDao()) + .build(); + importRecordResult.add(new ImportTransactionalTask(taskParams, transaction).execute()); + } + isSuccess = + importRecordResult.stream() + .allMatch( + importTaskResult -> + importTaskResult.getTargets().stream() + .allMatch( + targetResult -> + targetResult.getStatus().equals(ImportTargetResultStatus.SAVED))); + + // Check and Commit the transaction + if (isSuccess) { + transaction.commit(); + } else { + transaction.abort(); + error = "All transactions are aborted"; + } + + } catch (TransactionException e) { + isSuccess = false; + LOGGER.error(e.getMessage()); + } + ImportTransactionBatchResult importTransactionBatchResult = + ImportTransactionBatchResult.builder() + .transactionBatchId(transactionBatch.getTransactionBatchId()) + .success(isSuccess) + .dataChunkId(dataChunk.getDataChunkId()) + .records(importRecordResult) + .errors(Collections.singletonList(error)) + .build(); + notifyTransactionBatchCompleted(importTransactionBatchResult); + return importTransactionBatchResult; + } + + /** + * @param dataChunk data chunk object + * @param importRow data row object + * @return thr task result after processing the row data + */ + private ImportTaskResult processStorageRecord(ImportDataChunk dataChunk, ImportRow importRow) { + ImportTaskParams taskParams = + ImportTaskParams.builder() + .sourceRecord(importRow.getSourceData()) + .dataChunkId(dataChunk.getDataChunkId()) + .rowNumber(importRow.getRowNumber()) + .importOptions(params.getImportOptions()) + .tableColumnDataTypes(params.getTableColumnDataTypes()) + .tableMetadataByTableName(params.getTableMetadataByTableName()) + .dao(params.getDao()) + .build(); + ImportTaskResult importRecordResult = + new ImportStorageTask(taskParams, params.getDistributedStorage()).execute(); + + ImportTaskResult modifiedTaskResult = + ImportTaskResult.builder() + .rowNumber(importRecordResult.getRowNumber()) + .rawRecord(importRecordResult.getRawRecord()) + .targets(importRecordResult.getTargets()) + .dataChunkId(dataChunk.getDataChunkId()) + .build(); + notifyStorageRecordCompleted(modifiedTaskResult); + return modifiedTaskResult; + } + + /** + * Process data chunk data + * + * @param dataChunk data chunk object + * @param transactionBatchSize transaction batch size + * @param numCores num of cpu cores + * @return import data chunk status object after processing the data chunk + */ + protected ImportDataChunkStatus processDataChunk( + ImportDataChunk dataChunk, int transactionBatchSize, int numCores) { + ImportDataChunkStatus status = + ImportDataChunkStatus.builder() + .dataChunkId(dataChunk.getDataChunkId()) + .startTime(Instant.now()) + .status(ImportDataChunkStatusState.IN_PROGRESS) + .build(); + notifyDataChunkStarted(status); + ImportDataChunkStatus importDataChunkStatus; + if (params.getScalarDBMode() == ScalarDBMode.TRANSACTION) { + importDataChunkStatus = + processDataChunkWithTransactions(dataChunk, transactionBatchSize, numCores); + } else { + importDataChunkStatus = processDataChunkWithoutTransactions(dataChunk, numCores); + } + notifyDataChunkCompleted(importDataChunkStatus); + return importDataChunkStatus; + } + + /** + * Process data chunk data with transactions + * + * @param dataChunk data chunk object + * @param transactionBatchSize transaction batch size + * @param numCores num of cpu cores + * @return import data chunk status object after processing the data chunk + */ + private ImportDataChunkStatus processDataChunkWithTransactions( + ImportDataChunk dataChunk, int transactionBatchSize, int numCores) { + Instant startTime = Instant.now(); + List transactionBatches = + splitIntoTransactionBatches(dataChunk, transactionBatchSize); + ExecutorService transactionBatchExecutor = + Executors.newFixedThreadPool(Math.min(transactionBatches.size(), numCores)); + List> transactionBatchFutures = new ArrayList<>(); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger failureCount = new AtomicInteger(0); + for (ImportTransactionBatch transactionBatch : transactionBatches) { + Future transactionBatchFuture = + transactionBatchExecutor.submit( + () -> processTransactionBatch(dataChunk, transactionBatch)); + transactionBatchFutures.add(transactionBatchFuture); + } + + waitForFuturesToComplete(transactionBatchFutures); + transactionBatchExecutor.shutdown(); + transactionBatchFutures.forEach( + batchResult -> { + try { + ImportTransactionBatchResult importTransactionBatchResult = + (ImportTransactionBatchResult) batchResult.get(); + importTransactionBatchResult + .getRecords() + .forEach( + batchRecords -> { + if (batchRecords.getTargets().stream() + .allMatch( + targetResult -> + targetResult + .getStatus() + .equals(ImportTargetResultStatus.SAVED))) { + successCount.incrementAndGet(); + } else { + failureCount.incrementAndGet(); + } + }); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + Instant endTime = Instant.now(); + int totalDuration = (int) Duration.between(startTime, endTime).toMillis(); + return ImportDataChunkStatus.builder() + .dataChunkId(dataChunk.getDataChunkId()) + .failureCount(failureCount.get()) + .successCount(successCount.get()) + .totalRecords(dataChunk.getSourceData().size()) + .batchCount(transactionBatches.size()) + .status(ImportDataChunkStatusState.COMPLETE) + .startTime(startTime) + .endTime(endTime) + .totalDurationInMilliSeconds(totalDuration) + .build(); + } + + /** + * Process data chunk data without transactions + * + * @param dataChunk data chunk object + * @param numCores num of cpu cores + * @return import data chunk status object after processing the data chunk + */ + private ImportDataChunkStatus processDataChunkWithoutTransactions( + ImportDataChunk dataChunk, int numCores) { + Instant startTime = Instant.now(); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger failureCount = new AtomicInteger(0); + ExecutorService recordExecutor = Executors.newFixedThreadPool(numCores); + List> recordFutures = new ArrayList<>(); + for (ImportRow importRow : dataChunk.getSourceData()) { + Future recordFuture = + recordExecutor.submit(() -> processStorageRecord(dataChunk, importRow)); + recordFutures.add(recordFuture); + } + waitForFuturesToComplete(recordFutures); + recordExecutor.shutdown(); + recordFutures.forEach( + r -> { + try { + ImportTaskResult result = (ImportTaskResult) r.get(); + boolean allSaved = + result.getTargets().stream() + .allMatch(t -> t.getStatus().equals(ImportTargetResultStatus.SAVED)); + if (allSaved) successCount.incrementAndGet(); + else failureCount.incrementAndGet(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + Instant endTime = Instant.now(); + int totalDuration = (int) Duration.between(startTime, endTime).toMillis(); + return ImportDataChunkStatus.builder() + .dataChunkId(dataChunk.getDataChunkId()) + .totalRecords(dataChunk.getSourceData().size()) + .successCount(successCount.get()) + .failureCount(failureCount.get()) + .startTime(startTime) + .endTime(endTime) + .totalDurationInMilliSeconds(totalDuration) + .status(ImportDataChunkStatusState.COMPLETE) + .build(); + } + + private void waitForFuturesToComplete(List> futures) { + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorFactory.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorFactory.java new file mode 100644 index 0000000000..e953b12228 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorFactory.java @@ -0,0 +1,5 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +public interface ImportProcessorFactory { + ImportProcessor createImportProcessor(ImportProcessorParams params); +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java new file mode 100644 index 0000000000..632b1dc245 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java @@ -0,0 +1,23 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.ScalarDBMode; +import com.scalar.db.dataloader.core.dataimport.ImportOptions; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; +import java.util.Map; +import lombok.Builder; +import lombok.Value; + +@Builder +@Value +public class ImportProcessorParams { + ScalarDBMode scalarDBMode; + ImportOptions importOptions; + Map tableMetadataByTableName; + TableColumnDataTypes tableColumnDataTypes; + ScalarDBDao dao; + DistributedStorage distributedStorage; + DistributedTransactionManager distributedTransactionManager; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java new file mode 100644 index 0000000000..c02fa625b0 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java @@ -0,0 +1,136 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow; +import java.io.BufferedReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +public class JsonImportProcessor extends ImportProcessor { + + private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); + private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0); + + public JsonImportProcessor(ImportProcessorParams params) { + super(params); + } + + /** + * Process the data from the import file + * + * @param dataChunkSize size of data chunk + * @param transactionBatchSize size of transaction batch + * @param reader reader which reads the source file + * @return process data chunk status list + */ + @Override + public List process( + int dataChunkSize, int transactionBatchSize, BufferedReader reader) { + // Set the number of threads based on the available CPU cores + int numCores = Runtime.getRuntime().availableProcessors(); + + // Create a thread pool for processing data batches + ExecutorService dataChunkExecutor = Executors.newFixedThreadPool(numCores); + + // Create a queue to hold data batches + Queue dataChunkQueue = new LinkedList<>(); + + // Create a thread to read JSON lines and populate data batches + Thread readerThread = + new Thread( + () -> { + try (JsonParser jsonParser = new JsonFactory().createParser(reader)) { + if (jsonParser.nextToken() != JsonToken.START_ARRAY) { + throw new IOException("Expected content to be an array"); + } + + List currentDataChunk = new ArrayList<>(); + int rowNumber = 1; + while (jsonParser.nextToken() != JsonToken.END_ARRAY) { + JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonParser); + // TODO: do something with the null jsonNode + if (jsonNode == null || jsonNode.isEmpty()) { + continue; + } + + ImportRow importRow = new ImportRow(rowNumber, jsonNode); + + currentDataChunk.add(importRow); + + // If the data chunk is full, add it to the queue + if (currentDataChunk.size() == dataChunkSize) { + int dataChunkId = dataChunkIdCounter.getAndIncrement(); + ImportDataChunk importDataChunk = + ImportDataChunk.builder() + .dataChunkId(dataChunkId) + .sourceData(currentDataChunk) + .build(); + dataChunkQueue.offer(importDataChunk); + currentDataChunk = new ArrayList<>(); + } + + rowNumber++; + } + + // Add the last data chunk to the queue + if (!currentDataChunk.isEmpty()) { + int dataChunkId = dataChunkIdCounter.getAndIncrement(); + ImportDataChunk importDataChunk = + ImportDataChunk.builder() + .dataChunkId(dataChunkId) + .sourceData(currentDataChunk) + .build(); + dataChunkQueue.offer(importDataChunk); + } + } catch (IOException e) { + // TODO: handle this exception + throw new RuntimeException(e); + } + }); + readerThread.start(); + + try { + // Wait for readerThread to finish + readerThread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + System.err.println("Main thread was interrupted."); + } + + // Process data chunks in parallel + List> dataChunkFutures = new ArrayList<>(); + while (!dataChunkQueue.isEmpty()) { + ImportDataChunk dataChunk = dataChunkQueue.poll(); + Future dataChunkFuture = + dataChunkExecutor.submit( + () -> processDataChunk(dataChunk, transactionBatchSize, numCores)); + dataChunkFutures.add(dataChunkFuture); + } + List importDataChunkStatusList = new ArrayList<>(); + // Wait for all data chunk threads to complete + for (Future dataChunkFuture : dataChunkFutures) { + try { + importDataChunkStatusList.add((ImportDataChunkStatus) dataChunkFuture.get()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + dataChunkExecutor.shutdown(); + notifyAllDataChunksCompleted(); + return importDataChunkStatusList; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java new file mode 100644 index 0000000000..b63f897cbe --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java @@ -0,0 +1,126 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow; +import java.io.BufferedReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +public class JsonLinesImportProcessor extends ImportProcessor { + + private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); + private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0); + + public JsonLinesImportProcessor(ImportProcessorParams params) { + super(params); + } + + /** + * Process the data from the import file + * + * @param dataChunkSize size of data chunk + * @param transactionBatchSize size of transaction batch + * @param reader reader which reads the source file + * @return process data chunk status list + */ + @Override + public List process( + int dataChunkSize, int transactionBatchSize, BufferedReader reader) { + int numCores = Runtime.getRuntime().availableProcessors(); + + // Create a thread pool for processing data batches + ExecutorService dataChunkExecutor = Executors.newFixedThreadPool(numCores); + + // Create a queue to hold data batches + Queue dataChunkQueue = new LinkedList<>(); + + // Create a thread to read JSON lines and populate data batches + Thread readerThread = + new Thread( + () -> { + try { + List currentDataChunk = new ArrayList<>(); + int rowNumber = 1; + String line; + while ((line = reader.readLine()) != null) { + JsonNode jsonNode = OBJECT_MAPPER.readTree(line); + // TODO: do something with the null jsonNode + if (jsonNode == null || jsonNode.isEmpty()) { + continue; + } + + ImportRow importRow = new ImportRow(rowNumber, jsonNode); + currentDataChunk.add(importRow); + + // If the data chunk is full, add it to the queue + if (currentDataChunk.size() == dataChunkSize) { + int dataChunkId = dataChunkIdCounter.getAndIncrement(); + ImportDataChunk importDataChunk = + ImportDataChunk.builder() + .dataChunkId(dataChunkId) + .sourceData(currentDataChunk) + .build(); + dataChunkQueue.offer(importDataChunk); + currentDataChunk = new ArrayList<>(); + } + rowNumber++; + } + + // Add the last data chunk to the queue + if (!currentDataChunk.isEmpty()) { + int dataChunkId = dataChunkIdCounter.getAndIncrement(); + ImportDataChunk importDataChunk = + ImportDataChunk.builder() + .dataChunkId(dataChunkId) + .sourceData(currentDataChunk) + .build(); + dataChunkQueue.offer(importDataChunk); + } + } catch (IOException e) { + // TODO: handle this exception + throw new RuntimeException(e); + } + }); + readerThread.start(); + try { + // Wait for readerThread to finish + readerThread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + System.err.println("Main thread was interrupted."); + } + // Process data chunks in parallel + List> dataChunkFutures = new ArrayList<>(); + while (!dataChunkQueue.isEmpty()) { + ImportDataChunk dataChunk = dataChunkQueue.poll(); + Future dataChunkFuture = + dataChunkExecutor.submit( + () -> processDataChunk(dataChunk, transactionBatchSize, numCores)); + dataChunkFutures.add(dataChunkFuture); + } + + List importDataChunkStatusList = new ArrayList<>(); + // Wait for all data chunk threads to complete + for (Future dataChunkFuture : dataChunkFutures) { + try { + importDataChunkStatusList.add((ImportDataChunkStatus) dataChunkFuture.get()); + } catch (Exception e) { + // TODO: handle the exception + e.printStackTrace(); + } + } + dataChunkExecutor.shutdown(); + notifyAllDataChunksCompleted(); + return importDataChunkStatusList; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypes.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypes.java new file mode 100644 index 0000000000..54268b2ccf --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypes.java @@ -0,0 +1,31 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import com.scalar.db.io.DataType; +import java.util.HashMap; +import java.util.Map; + +public class TableColumnDataTypes { + private final Map> dataTypesByColumnsByTable; + + public TableColumnDataTypes() { + this.dataTypesByColumnsByTable = new HashMap<>(); + } + + public void addColumnDataType(String tableName, String columnName, DataType dataType) { + dataTypesByColumnsByTable + .computeIfAbsent(tableName, key -> new HashMap<>()) + .put(columnName, dataType); + } + + public DataType getDataType(String tableName, String columnName) { + Map columnDataTypes = dataTypesByColumnsByTable.get(tableName); + if (columnDataTypes != null) { + return columnDataTypes.get(columnName); + } + return null; + } + + public Map getColumnDataTypes(String tableName) { + return dataTypesByColumnsByTable.get(tableName); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java new file mode 100644 index 0000000000..07d3458072 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java @@ -0,0 +1,37 @@ +package com.scalar.db.dataloader.core.dataimport.task; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.Result; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException; +import com.scalar.db.io.Column; +import com.scalar.db.io.Key; +import java.util.List; +import java.util.Optional; + +public class ImportStorageTask extends ImportTask { + + private final DistributedStorage storage; + + public ImportStorageTask(ImportTaskParams params, DistributedStorage storage) { + super(params); + this.storage = storage; + } + + @Override + protected Optional getDataRecord( + String namespace, String tableName, Key partitionKey, Key clusteringKey) + throws ScalarDBDaoException { + return params.getDao().get(namespace, tableName, partitionKey, clusteringKey, this.storage); + } + + @Override + protected void saveRecord( + String namespace, + String tableName, + Key partitionKey, + Key clusteringKey, + List> columns) + throws ScalarDBDaoException { + params.getDao().put(namespace, tableName, partitionKey, clusteringKey, columns, this.storage); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java new file mode 100644 index 0000000000..8038e70ecf --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java @@ -0,0 +1,365 @@ +package com.scalar.db.dataloader.core.dataimport.task; + +import static com.scalar.db.dataloader.core.dataimport.task.ImportTaskConstants.ERROR_COULD_NOT_FIND_CLUSTERING_KEY; +import static com.scalar.db.dataloader.core.dataimport.task.ImportTaskConstants.ERROR_COULD_NOT_FIND_PARTITION_KEY; +import static com.scalar.db.dataloader.core.dataimport.task.ImportTaskConstants.ERROR_DATA_ALREADY_EXISTS; +import static com.scalar.db.dataloader.core.dataimport.task.ImportTaskConstants.ERROR_DATA_NOT_FOUND; +import static com.scalar.db.dataloader.core.dataimport.task.ImportTaskConstants.ERROR_TABLE_METADATA_MISSING; +import static com.scalar.db.dataloader.core.dataimport.task.ImportTaskConstants.ERROR_UPSERT_INSERT_MISSING_COLUMNS; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.scalar.db.api.Result; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.error.CoreError; +import com.scalar.db.dataloader.core.dataimport.ImportMode; +import com.scalar.db.dataloader.core.dataimport.ImportOptions; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTableFieldMapping; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException; +import com.scalar.db.dataloader.core.dataimport.processor.TableColumnDataTypes; +import com.scalar.db.dataloader.core.dataimport.task.mapping.ImportDataMapping; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.task.validation.ImportSourceRecordValidationResult; +import com.scalar.db.dataloader.core.dataimport.task.validation.ImportSourceRecordValidator; +import com.scalar.db.dataloader.core.exception.Base64Exception; +import com.scalar.db.dataloader.core.exception.ColumnParsingException; +import com.scalar.db.dataloader.core.util.ColumnUtils; +import com.scalar.db.dataloader.core.util.KeyUtils; +import com.scalar.db.dataloader.core.util.TableMetadataUtil; +import com.scalar.db.io.Column; +import com.scalar.db.io.DataType; +import com.scalar.db.io.Key; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public abstract class ImportTask { + + protected final ImportTaskParams params; + + public ImportTaskResult execute() { + + ObjectNode mutableSourceRecord = params.getSourceRecord().deepCopy(); + ImportOptions importOptions = params.getImportOptions(); + + // Single table import + if (importOptions.getControlFile() == null) { + String tableLookupKey = + TableMetadataUtil.getTableLookupKey( + importOptions.getNamespace(), importOptions.getTableName()); + ImportTargetResult singleTargetResult = + importIntoSingleTable( + importOptions.getNamespace(), + importOptions.getTableName(), + params.getTableMetadataByTableName().get(tableLookupKey), + params.getTableColumnDataTypes().getColumnDataTypes(tableLookupKey), + null, + mutableSourceRecord); + // Add the single target result to the list of targets and return the result + return ImportTaskResult.builder() + .rawRecord(params.getSourceRecord()) + .rowNumber(params.getRowNumber()) + .targets(Collections.singletonList(singleTargetResult)) + .build(); + } + + // Multi-table import + List multiTargetResults = + startMultiTableImportProcess( + importOptions.getControlFile(), + params.getTableMetadataByTableName(), + params.getTableColumnDataTypes(), + mutableSourceRecord); + + return ImportTaskResult.builder() + .targets(multiTargetResults) + .rawRecord(params.getSourceRecord()) + .rowNumber(params.getRowNumber()) + .build(); + } + + private List startMultiTableImportProcess( + ControlFile controlFile, + Map tableMetadataByTableName, + TableColumnDataTypes tableColumnDataTypes, + ObjectNode mutableSourceRecord) { + + List targetResults = new ArrayList<>(); + + // Import for every table mapping specified in the control file + for (ControlFileTable controlFileTable : controlFile.getTables()) { + for (ControlFileTableFieldMapping mapping : controlFileTable.getMappings()) { + if (!mutableSourceRecord.has(mapping.getSourceField()) + && !mutableSourceRecord.has(mapping.getTargetColumn())) { + String errorMessage = + CoreError.DATA_LOADER_MISSING_SOURCE_FIELD.buildMessage( + mapping.getSourceField(), controlFileTable.getTable()); + + ImportTargetResult targetResult = + ImportTargetResult.builder() + .namespace(controlFileTable.getNamespace()) + .tableName(controlFileTable.getTable()) + .errors(Collections.singletonList(errorMessage)) + .status(ImportTargetResultStatus.VALIDATION_FAILED) + .build(); + return Collections.singletonList(targetResult); + } + } + + // Import into a single table + String tableLookupKey = TableMetadataUtil.getTableLookupKey(controlFileTable); + TableMetadata tableMetadata = tableMetadataByTableName.get(tableLookupKey); + Map dataTypesByColumns = + tableColumnDataTypes.getColumnDataTypes(tableLookupKey); + // Copied data to an object node data was overwritten by following operations and data check + // fails when same object is referenced again in logic before + ObjectNode copyNode = mutableSourceRecord.deepCopy(); + ImportTargetResult result = + importIntoSingleTable( + controlFileTable.getNamespace(), + controlFileTable.getTable(), + tableMetadata, + dataTypesByColumns, + controlFileTable, + copyNode); + targetResults.add(result); + } + return targetResults; + } + + private ImportTargetResult importIntoSingleTable( + String namespace, + String tableName, + TableMetadata tableMetadata, + Map dataTypeByColumnName, + ControlFileTable controlFileTable, + ObjectNode mutableSourceRecord) { + + ImportOptions importOptions = params.getImportOptions(); + + if (dataTypeByColumnName == null || tableMetadata == null) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(tableName) + .status(ImportTargetResultStatus.VALIDATION_FAILED) + .errors(Collections.singletonList(ERROR_TABLE_METADATA_MISSING)) + .build(); + } + + LinkedHashSet partitionKeyNames = tableMetadata.getPartitionKeyNames(); + LinkedHashSet clusteringKeyNames = tableMetadata.getClusteringKeyNames(); + LinkedHashSet columnNames = tableMetadata.getColumnNames(); + + applyDataMapping(controlFileTable, mutableSourceRecord); + + boolean checkForMissingColumns = shouldCheckForMissingColumns(importOptions); + + ImportSourceRecordValidationResult validationResult = + validateSourceRecord( + partitionKeyNames, + clusteringKeyNames, + columnNames, + mutableSourceRecord, + checkForMissingColumns, + tableMetadata); + + if (!validationResult.isValid()) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(tableName) + .status(ImportTargetResultStatus.VALIDATION_FAILED) + .errors(validationResult.getErrorMessages()) + .build(); + } + + Optional optionalPartitionKey = + KeyUtils.createPartitionKeyFromSource( + partitionKeyNames, dataTypeByColumnName, mutableSourceRecord); + if (!optionalPartitionKey.isPresent()) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(tableName) + .status(ImportTargetResultStatus.VALIDATION_FAILED) + .errors(Collections.singletonList(ERROR_COULD_NOT_FIND_PARTITION_KEY)) + .build(); + } + Optional optionalClusteringKey = Optional.empty(); + if (!clusteringKeyNames.isEmpty()) { + optionalClusteringKey = + KeyUtils.createClusteringKeyFromSource( + clusteringKeyNames, dataTypeByColumnName, mutableSourceRecord); + if (!optionalClusteringKey.isPresent()) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(tableName) + .status(ImportTargetResultStatus.VALIDATION_FAILED) + .errors(Collections.singletonList(ERROR_COULD_NOT_FIND_CLUSTERING_KEY)) + .build(); + } + } + + Optional optionalScalarDBResult; + + try { + optionalScalarDBResult = + getDataRecord( + namespace, tableName, optionalPartitionKey.get(), optionalClusteringKey.orElse(null)); + } catch (ScalarDBDaoException e) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(tableName) + .status(ImportTargetResultStatus.RETRIEVAL_FAILED) + .errors(Collections.singletonList(e.getMessage())) + .build(); + } + ImportTaskAction importAction = + optionalScalarDBResult.isPresent() ? ImportTaskAction.UPDATE : ImportTaskAction.INSERT; + + if (importAction == ImportTaskAction.INSERT + && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) { + ImportSourceRecordValidationResult validationResultForMissingColumns = + new ImportSourceRecordValidationResult(); + ImportSourceRecordValidator.checkMissingColumns( + mutableSourceRecord, columnNames, validationResultForMissingColumns, tableMetadata); + if (!validationResultForMissingColumns.isValid()) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(tableName) + .status(ImportTargetResultStatus.MISSING_COLUMNS) + .errors(Collections.singletonList(ERROR_UPSERT_INSERT_MISSING_COLUMNS)) + .build(); + } + } + + if (shouldFailForExistingData(importAction, importOptions)) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(tableName) + .importedRecord(mutableSourceRecord) + .importAction(importAction) + .status(ImportTargetResultStatus.DATA_ALREADY_EXISTS) + .errors(Collections.singletonList(ERROR_DATA_ALREADY_EXISTS)) + .build(); + } + + if (shouldFailForMissingData(importAction, importOptions)) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(tableName) + .importedRecord(mutableSourceRecord) + .importAction(importAction) + .status(ImportTargetResultStatus.DATA_NOT_FOUND) + .errors(Collections.singletonList(ERROR_DATA_NOT_FOUND)) + .build(); + } + + List> columns; + + try { + columns = + ColumnUtils.getColumnsFromResult( + optionalScalarDBResult.orElse(null), + mutableSourceRecord, + importOptions.isIgnoreNullValues(), + tableMetadata); + } catch (Base64Exception | ColumnParsingException e) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(tableName) + .status(ImportTargetResultStatus.VALIDATION_FAILED) + .errors(Collections.singletonList(e.getMessage())) + .build(); + } + + // Time to save the record + try { + saveRecord( + namespace, + tableName, + optionalPartitionKey.get(), + optionalClusteringKey.orElse(null), + columns); + + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(tableName) + .importAction(importAction) + .importedRecord(mutableSourceRecord) + .status(ImportTargetResultStatus.SAVED) + .build(); + + } catch (ScalarDBDaoException e) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(tableName) + .importAction(importAction) + .status(ImportTargetResultStatus.SAVE_FAILED) + .errors(Collections.singletonList(e.getMessage())) + .build(); + } + } + + private void applyDataMapping(ControlFileTable controlFileTable, ObjectNode mutableSourceRecord) { + if (controlFileTable != null) { + ImportDataMapping.apply(mutableSourceRecord, controlFileTable); + } + } + + private boolean shouldCheckForMissingColumns(ImportOptions importOptions) { + return importOptions.getImportMode() == ImportMode.INSERT + || importOptions.isRequireAllColumns(); + } + + private ImportSourceRecordValidationResult validateSourceRecord( + LinkedHashSet partitionKeyNames, + LinkedHashSet clusteringKeyNames, + LinkedHashSet columnNames, + ObjectNode mutableSourceRecord, + boolean checkForMissingColumns, + TableMetadata tableMetadata) { + return ImportSourceRecordValidator.validateSourceRecord( + partitionKeyNames, + clusteringKeyNames, + columnNames, + mutableSourceRecord, + checkForMissingColumns, + tableMetadata); + } + + private boolean shouldRevalidateMissingColumns( + ImportOptions importOptions, boolean checkForMissingColumns) { + return !checkForMissingColumns && importOptions.getImportMode() == ImportMode.UPSERT; + } + + private boolean shouldFailForExistingData( + ImportTaskAction importAction, ImportOptions importOptions) { + return importAction == ImportTaskAction.UPDATE + && importOptions.getImportMode() == ImportMode.INSERT; + } + + private boolean shouldFailForMissingData( + ImportTaskAction importAction, ImportOptions importOptions) { + return importAction == ImportTaskAction.INSERT + && importOptions.getImportMode() == ImportMode.UPDATE; + } + + protected abstract Optional getDataRecord( + String namespace, String tableName, Key partitionKey, Key clusteringKey) + throws ScalarDBDaoException; + + protected abstract void saveRecord( + String namespace, + String tableName, + Key partitionKey, + Key clusteringKey, + List> columns) + throws ScalarDBDaoException; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskConstants.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskConstants.java new file mode 100644 index 0000000000..eb30211a0d --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskConstants.java @@ -0,0 +1,17 @@ +package com.scalar.db.dataloader.core.dataimport.task; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ImportTaskConstants { + public static final String ERROR_COULD_NOT_FIND_PARTITION_KEY = + "could not find the partition key"; + public static final String ERROR_UPSERT_INSERT_MISSING_COLUMNS = + "the source record needs to contain all fields if the UPSERT turns into an INSERT"; + public static final String ERROR_DATA_ALREADY_EXISTS = "record already exists"; + public static final String ERROR_DATA_NOT_FOUND = "record was not found"; + public static final String ERROR_COULD_NOT_FIND_CLUSTERING_KEY = + "could not find the clustering key"; + public static final String ERROR_TABLE_METADATA_MISSING = "No table metadata found"; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskParams.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskParams.java new file mode 100644 index 0000000000..f85671140d --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskParams.java @@ -0,0 +1,24 @@ +package com.scalar.db.dataloader.core.dataimport.task; + +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.dataimport.ImportOptions; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; +import com.scalar.db.dataloader.core.dataimport.processor.TableColumnDataTypes; +import java.util.Map; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; + +@Builder +@Value +public class ImportTaskParams { + + @NonNull JsonNode sourceRecord; + int dataChunkId; + int rowNumber; + @NonNull ImportOptions importOptions; + @NonNull Map tableMetadataByTableName; + @NonNull TableColumnDataTypes tableColumnDataTypes; + @NonNull ScalarDBDao dao; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTransactionalTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTransactionalTask.java new file mode 100644 index 0000000000..71e0d3ae23 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTransactionalTask.java @@ -0,0 +1,54 @@ +package com.scalar.db.dataloader.core.dataimport.task; + +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.Result; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException; +import com.scalar.db.exception.transaction.AbortException; +import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.db.io.Column; +import com.scalar.db.io.Key; +import java.util.List; +import java.util.Optional; + +public class ImportTransactionalTask extends ImportTask { + + private final DistributedTransaction transaction; + + public ImportTransactionalTask(ImportTaskParams params, DistributedTransaction transaction) { + super(params); + this.transaction = transaction; + } + + @Override + protected Optional getDataRecord( + String namespace, String tableName, Key partitionKey, Key clusteringKey) + throws ScalarDBDaoException { + return params.getDao().get(namespace, tableName, partitionKey, clusteringKey, transaction); + } + + @Override + protected void saveRecord( + String namespace, + String tableName, + Key partitionKey, + Key clusteringKey, + List> columns) + throws ScalarDBDaoException { + params.getDao().put(namespace, tableName, partitionKey, clusteringKey, columns, transaction); + } + + /** + * Abort the active ScalarDB transaction + * + * @throws TransactionException if something goes wrong during the aborting process + */ + private void abortActiveTransaction(DistributedTransaction tx) throws TransactionException { + if (tx != null) { + try { + tx.abort(); + } catch (AbortException e) { + throw new TransactionException(e.getMessage(), tx.getId()); + } + } + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/mapping/ImportDataMapping.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/mapping/ImportDataMapping.java new file mode 100644 index 0000000000..7f7524d263 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/mapping/ImportDataMapping.java @@ -0,0 +1,28 @@ +package com.scalar.db.dataloader.core.dataimport.task.mapping; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTableFieldMapping; + +public class ImportDataMapping { + + /** + * * Update the source data replace the source column name with the target column name according + * to control file table data + * + * @param source source data + * @param controlFileTable control file table to map source data + */ + public static void apply(ObjectNode source, ControlFileTable controlFileTable) { + // Copy the source field data to the target column if missing + for (ControlFileTableFieldMapping mapping : controlFileTable.getMappings()) { + String sourceField = mapping.getSourceField(); + String targetColumn = mapping.getTargetColumn(); + + if (source.has(sourceField) && !source.has(targetColumn)) { + source.set(targetColumn, source.get(sourceField)); + source.remove(sourceField); + } + } + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidationResult.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidationResult.java new file mode 100644 index 0000000000..30b878b9e6 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidationResult.java @@ -0,0 +1,48 @@ +package com.scalar.db.dataloader.core.dataimport.task.validation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import javax.annotation.concurrent.Immutable; + +/** The validation result for a data source record */ +@Immutable +public final class ImportSourceRecordValidationResult { + + private final List errorMessages; + private final Set columnsWithErrors; + + /** Constructor */ + public ImportSourceRecordValidationResult() { + this.errorMessages = new ArrayList<>(); + this.columnsWithErrors = new HashSet<>(); + } + + /** + * Add a validation error message for a column. Also marking the column as containing an error. + * + * @param columnName column name + * @param errorMessage error message + */ + public void addErrorMessage(String columnName, String errorMessage) { + this.columnsWithErrors.add(columnName); + this.errorMessages.add(errorMessage); + } + + /** @return Immutable list of validation error messages */ + public List getErrorMessages() { + return Collections.unmodifiableList(this.errorMessages); + } + + /** @return Immutable set of columns that had errors */ + public Set getColumnsWithErrors() { + return Collections.unmodifiableSet(this.columnsWithErrors); + } + + /** @return Validation is valid or not */ + public boolean isValid() { + return this.errorMessages.isEmpty(); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidator.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidator.java new file mode 100644 index 0000000000..6d773ffccc --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidator.java @@ -0,0 +1,119 @@ +package com.scalar.db.dataloader.core.dataimport.task.validation; + +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.error.CoreError; +import com.scalar.db.dataloader.core.DatabaseKeyType; +import com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils; +import java.util.Set; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ImportSourceRecordValidator { + + /** + * Create list for validation error messages. Validate everything and not return when one single + * error is found. Avoiding trial and error imports where every time a new error appears + * + * @param partitionKeyNames List of partition keys in table + * @param clusteringKeyNames List of clustering keys in table + * @param columnNames List of all column names in table + * @param sourceRecord source data + * @param allColumnsRequired If true treat missing columns as an error + * @return Source record validation result + */ + public static ImportSourceRecordValidationResult validateSourceRecord( + Set partitionKeyNames, + Set clusteringKeyNames, + Set columnNames, + JsonNode sourceRecord, + boolean allColumnsRequired, + TableMetadata tableMetadata) { + ImportSourceRecordValidationResult validationResult = new ImportSourceRecordValidationResult(); + + // check if partition keys are found + checkMissingKeys(DatabaseKeyType.PARTITION, partitionKeyNames, sourceRecord, validationResult); + + // check if clustering keys are found + checkMissingKeys( + DatabaseKeyType.CLUSTERING, clusteringKeyNames, sourceRecord, validationResult); + + // Check if the record is missing any columns + if (allColumnsRequired) { + checkMissingColumns( + sourceRecord, + columnNames, + validationResult, + validationResult.getColumnsWithErrors(), + tableMetadata); + } + + return validationResult; + } + + /** + * Check if the required keys are found in the data file. + * + * @param keyType Type of key to validate + * @param keyColumnNames List of required column names + * @param sourceRecord source data + * @param validationResult Source record validation result + */ + public static void checkMissingKeys( + DatabaseKeyType keyType, + Set keyColumnNames, + JsonNode sourceRecord, + ImportSourceRecordValidationResult validationResult) { + for (String columnName : keyColumnNames) { + if (!sourceRecord.has(columnName)) { + String errorMessageFormat = + keyType == DatabaseKeyType.PARTITION + ? CoreError.DATA_LOADER_MISSING_PARTITION_KEY_COLUMN.buildMessage(columnName) + : CoreError.DATA_LOADER_MISSING_CLUSTERING_KEY_COLUMN.buildMessage(columnName); + validationResult.addErrorMessage(columnName, errorMessageFormat); + } + } + } + + /** + * Make sure the json object is not missing any columns. Error added to validation errors lists + * + * @param sourceRecord Source json object + * @param columnNames List of column names for a table + * @param validationResult Source record validation result + * @param ignoreColumns Columns that can be ignored in the check + */ + public static void checkMissingColumns( + JsonNode sourceRecord, + Set columnNames, + ImportSourceRecordValidationResult validationResult, + Set ignoreColumns, + TableMetadata tableMetadata) { + for (String columnName : columnNames) { + // If the field is not a metadata column and is missing and should not be ignored + if ((ignoreColumns == null || !ignoreColumns.contains(columnName)) + && !ConsensusCommitUtils.isTransactionMetaColumn(columnName, tableMetadata) + && !sourceRecord.has(columnName)) { + validationResult.addErrorMessage( + columnName, CoreError.DATA_LOADER_MISSING_COLUMN.buildMessage(columnName)); + } + } + } + + /** + * Make sure the json object is not missing any columns. Error added to validation errors lists + * + * @param sourceRecord Source json object + * @param columnNames List of column names for a table + * @param validationResult Source record validation result + */ + public static void checkMissingColumns( + JsonNode sourceRecord, + Set columnNames, + ImportSourceRecordValidationResult validationResult, + TableMetadata tableMetadata) { + ImportSourceRecordValidator.checkMissingColumns( + sourceRecord, columnNames, validationResult, null, tableMetadata); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/ColumnUtils.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/ColumnUtils.java index 58f10d0f84..8574336d2f 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/ColumnUtils.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/ColumnUtils.java @@ -1,7 +1,11 @@ package com.scalar.db.dataloader.core.util; +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.api.Result; +import com.scalar.db.api.TableMetadata; import com.scalar.db.common.error.CoreError; import com.scalar.db.dataloader.core.ColumnInfo; +import com.scalar.db.dataloader.core.exception.Base64Exception; import com.scalar.db.dataloader.core.exception.ColumnParsingException; import com.scalar.db.io.BigIntColumn; import com.scalar.db.io.BlobColumn; @@ -12,7 +16,8 @@ import com.scalar.db.io.FloatColumn; import com.scalar.db.io.IntColumn; import com.scalar.db.io.TextColumn; -import java.util.Base64; +import com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils; +import java.util.*; import javax.annotation.Nullable; /** @@ -88,4 +93,124 @@ public static Column createColumnFromValue( e); } } + + /** + * Get columns from result data + * + * @param scalarDBResult result record + * @param sourceRecord source data + * @param ignoreNullValues ignore null values or not + * @return list of columns + * @throws Base64Exception if an error occurs while base64 decoding + */ + public static List> getColumnsFromResult( + Result scalarDBResult, + JsonNode sourceRecord, + boolean ignoreNullValues, + TableMetadata tableMetadata) + throws Base64Exception, ColumnParsingException { + + List> columns = new ArrayList<>(); + for (String columnName : tableMetadata.getColumnNames()) { + if (ConsensusCommitUtils.isTransactionMetaColumn(columnName, tableMetadata)) { + continue; + } + + Column column = + getColumn( + scalarDBResult, + sourceRecord, + columnName, + ignoreNullValues, + tableMetadata.getColumnDataTypes()); + + if (column != null) { + columns.add(column); + } + } + + return columns; + } + + /** + * Create a set of columns to ignore + * + * @param partitionKeyNames a set of partition key names + * @param clusteringKeyNames a set of clustering key names + * @return a set of columns to ignore + */ + private static Set getColumnsToIgnore( + Set partitionKeyNames, Set clusteringKeyNames) { + Set columnsToIgnore = + new HashSet<>(ConsensusCommitUtils.getTransactionMetaColumns().keySet()); + columnsToIgnore.addAll(partitionKeyNames); + columnsToIgnore.addAll(clusteringKeyNames); + return columnsToIgnore; + } + + /** + * Get columns from result data + * + * @param scalarDBResult result record + * @param sourceRecord source data + * @param columnName column name + * @param ignoreNullValues ignore null values or not + * @param dataTypesByColumns data types of columns + * @return column data + * @throws Base64Exception if an error occurs while base64 decoding + */ + private static Column getColumn( + Result scalarDBResult, + JsonNode sourceRecord, + String columnName, + boolean ignoreNullValues, + Map dataTypesByColumns) + throws Base64Exception, ColumnParsingException { + if (scalarDBResult != null && !sourceRecord.has(columnName)) { + return getColumnFromResult(scalarDBResult, columnName); + } else { + return getColumnFromSourceRecord( + sourceRecord, columnName, ignoreNullValues, dataTypesByColumns); + } + } + + /** + * Get column from result + * + * @param scalarDBResult result record + * @param columnName column name + * @return column data + */ + private static Column getColumnFromResult(Result scalarDBResult, String columnName) { + Map> columnValues = scalarDBResult.getColumns(); + return columnValues.get(columnName); + } + + /** + * Get column from result + * + * @param sourceRecord source data + * @param columnName column name + * @param ignoreNullValues ignore null values or not + * @param dataTypesByColumns data types of columns + * @return column data + * @throws Base64Exception if an error occurs while base64 decoding + */ + private static Column getColumnFromSourceRecord( + JsonNode sourceRecord, + String columnName, + boolean ignoreNullValues, + Map dataTypesByColumns) + throws Base64Exception, ColumnParsingException { + DataType dataType = dataTypesByColumns.get(columnName); + String columnValue = + sourceRecord.has(columnName) && !sourceRecord.get(columnName).isNull() + ? sourceRecord.get(columnName).asText() + : null; + if (!ignoreNullValues || columnValue != null) { + ColumnInfo columnInfo = ColumnInfo.builder().columnName(columnName).build(); + return createColumnFromValue(dataType, columnInfo, columnValue); + } + return null; + } } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/KeyUtils.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/KeyUtils.java index c2491df0f4..e46311545d 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/KeyUtils.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/KeyUtils.java @@ -1,14 +1,18 @@ package com.scalar.db.dataloader.core.util; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.scalar.db.api.TableMetadata; import com.scalar.db.common.error.CoreError; import com.scalar.db.dataloader.core.ColumnInfo; import com.scalar.db.dataloader.core.ColumnKeyValue; +import com.scalar.db.dataloader.core.exception.Base64Exception; import com.scalar.db.dataloader.core.exception.ColumnParsingException; import com.scalar.db.dataloader.core.exception.KeyParsingException; import com.scalar.db.io.Column; import com.scalar.db.io.DataType; import com.scalar.db.io.Key; +import java.util.*; import javax.annotation.Nullable; /** @@ -22,6 +26,22 @@ public final class KeyUtils { /** Restrict instantiation via private constructor */ private KeyUtils() {} + public static Optional createClusteringKeyFromSource( + Set clusteringKeyNames, + Map dataTypeByColumnName, + ObjectNode sourceRecord) { + return clusteringKeyNames.isEmpty() + ? Optional.empty() + : createKeyFromSource(clusteringKeyNames, dataTypeByColumnName, sourceRecord); + } + + public static Optional createPartitionKeyFromSource( + Set partitionKeyNames, + Map dataTypeByColumnName, + ObjectNode sourceRecord) { + return createKeyFromSource(partitionKeyNames, dataTypeByColumnName, sourceRecord); + } + /** * Converts a key-value pair, in the format of =, into a ScalarDB Key instance for a * specific ScalarDB table. @@ -85,4 +105,51 @@ public static Key createKey(DataType dataType, ColumnInfo columnInfo, String val throw new KeyParsingException(e.getMessage(), e); } } + + /** + * Create a new composite ScalarDB key. + * + * @param dataTypes List of data types for the columns + * @param columnNames List of column names + * @param values List of key values + * @return ScalarDB Key instance, or empty if the provided arrays are not of the same length + * @throws Base64Exception if there is an error creating the key values + */ + public static Optional createCompositeKey( + List dataTypes, List columnNames, List values) + throws Base64Exception, ColumnParsingException { + if (!CollectionUtil.areSameLength(dataTypes, columnNames, values)) { + return Optional.empty(); + } + Key.Builder builder = Key.newBuilder(); + for (int i = 0; i < dataTypes.size(); i++) { + ColumnInfo columnInfo = ColumnInfo.builder().columnName(columnNames.get(i)).build(); + Column keyValue = + ColumnUtils.createColumnFromValue(dataTypes.get(i), columnInfo, values.get(i)); + builder.add(keyValue); + } + return Optional.of(builder.build()); + } + + private static Optional createKeyFromSource( + Set keyNames, Map columnDataTypes, JsonNode sourceRecord) { + List dataTypes = new ArrayList<>(); + List columnNames = new ArrayList<>(); + List values = new ArrayList<>(); + + for (String keyName : keyNames) { + if (!columnDataTypes.containsKey(keyName) || !sourceRecord.has(keyName)) { + return Optional.empty(); + } + dataTypes.add(columnDataTypes.get(keyName)); + columnNames.add(keyName); + values.add(sourceRecord.get(keyName).asText()); + } + + try { + return createCompositeKey(dataTypes, columnNames, values); + } catch (Base64Exception | ColumnParsingException e) { + return Optional.empty(); + } + } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java new file mode 100644 index 0000000000..98e58109e7 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java @@ -0,0 +1,270 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.writer.DefaultLogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactoryConfig; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class SingleFileImportLoggerTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(SingleFileImportLoggerTest.class); + private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); + + @TempDir Path tempDir; + + private LogWriterFactory logWriterFactory; + + @BeforeEach + void setUp() { + LogWriterFactoryConfig logWriterFactoryConfig = + LogWriterFactoryConfig.builder() + .logStorageLocation(LogStorageLocation.LOCAL_FILE_STORAGE) + .build(); + ImportLoggerConfig importLoggerConfig = + ImportLoggerConfig.builder() + .prettyPrint(false) + .logSuccessRecords(false) + .logRawSourceRecords(false) + .logDirectoryPath("path") + .build(); + logWriterFactory = new DefaultLogWriterFactory(logWriterFactoryConfig, importLoggerConfig); + } + + @AfterEach + void tearDown() throws IOException { + cleanUpTempDir(); + } + + private void cleanUpTempDir() throws IOException { + try (Stream paths = Files.list(tempDir)) { + paths.forEach(this::deleteFile); + } + } + + private void deleteFile(Path file) { + try { + Files.deleteIfExists(file); + } catch (IOException e) { + LOGGER.error("Failed to delete file: {}", file, e); + } + } + + @Test + void onTransactionBatchCompleted_NoErrors_ShouldWriteToSuccessLogFile() throws IOException { + testTransactionBatchCompleted(true, true); + } + + @Test + void onTransactionBatchCompleted_HasErrors_ShouldWriteToFailureLogFile() throws IOException { + testTransactionBatchCompleted(false, true); + } + + private void testTransactionBatchCompleted(boolean success, boolean logSuccessRecords) + throws IOException { + // Arrange + ImportLoggerConfig config = + ImportLoggerConfig.builder() + .logDirectoryPath(tempDir.toString() + "/") + .logRawSourceRecords(true) + .logSuccessRecords(logSuccessRecords) + .build(); + SingleFileImportLogger importLogger = new SingleFileImportLogger(config, logWriterFactory); + + List batchResults = createBatchResults(1, success); + + // Act + for (ImportTransactionBatchResult batchResult : batchResults) { + importLogger.onTransactionBatchCompleted(batchResult); + importLogger.onDataChunkCompleted( + ImportDataChunkStatus.builder().dataChunkId(batchResult.getDataChunkId()).build()); + } + importLogger.onAllDataChunksCompleted(); + + // Assert + assertTransactionBatchResults(batchResults, success, logSuccessRecords); + } + + private List createBatchResults(int count, boolean success) { + List batchResults = new ArrayList<>(); + + for (int i = 1; i <= count; i++) { + List records = + Collections.singletonList( + ImportTaskResult.builder() + .rowNumber(i) + .rawRecord(OBJECT_MAPPER.createObjectNode()) + .targets(Collections.EMPTY_LIST) + .build()); + ImportTransactionBatchResult result = + ImportTransactionBatchResult.builder() + .dataChunkId(i) + .transactionBatchId(1) + .records(records) + .success(success) + .build(); + batchResults.add(result); + } + + return batchResults; + } + + private void assertTransactionBatchResults( + List batchResults, boolean success, boolean logSuccessRecords) + throws IOException { + DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); + + // Single file log mode + Path logFileName = + tempDir.resolve( + success + ? SingleFileImportLogger.SUCCESS_LOG_FILE_NAME + : SingleFileImportLogger.FAILURE_LOG_FILE_NAME); + if (logSuccessRecords || !success) { + assertTrue(Files.exists(logFileName), "Log file should exist"); + + String logContent = new String(Files.readAllBytes(logFileName), StandardCharsets.UTF_8); + + List logEntries = + objectMapper.readValue( + logContent, new TypeReference>() {}); + + assertEquals( + batchResults.size(), + logEntries.size(), + "Number of log entries should match the number of batch results"); + + for (int i = 0; i < batchResults.size(); i++) { + assertTransactionBatchResult(batchResults.get(i), logEntries.get(i)); + } + } else { + assertFalse(Files.exists(logFileName), "Log file should not exist"); + } + } + + private void assertTransactionBatchResult( + ImportTransactionBatchResult expected, ImportTransactionBatchResult actual) { + assertEquals(expected.getDataChunkId(), actual.getDataChunkId(), "Data chunk ID should match"); + assertEquals( + expected.getTransactionBatchId(), + actual.getTransactionBatchId(), + "Transaction batch ID should match"); + assertEquals( + expected.getTransactionId(), actual.getTransactionId(), "Transaction ID should match"); + assertEquals(expected.isSuccess(), actual.isSuccess(), "Success status should match"); + + List expectedRecords = expected.getRecords(); + List actualRecords = actual.getRecords(); + assertEquals(expectedRecords.size(), actualRecords.size(), "Number of records should match"); + for (int j = 0; j < expectedRecords.size(); j++) { + ImportTaskResult expectedRecord = expectedRecords.get(j); + ImportTaskResult actualRecord = actualRecords.get(j); + assertEquals( + expectedRecord.getRowNumber(), actualRecord.getRowNumber(), "Row number should match"); + assertEquals( + expectedRecord.getRawRecord(), actualRecord.getRawRecord(), "Raw record should match"); + assertEquals(expectedRecord.getTargets(), actualRecord.getTargets(), "Targets should match"); + } + } + + @Test + void onDataChunkCompleted_NoErrors_ShouldWriteToSummaryLogFile() throws IOException { + testDataChunkCompleted(false); + } + + @Test + void onDataChunkCompleted_HasErrors_ShouldWriteToSummaryLogFile() throws IOException { + testDataChunkCompleted(true); + } + + private void testDataChunkCompleted(boolean hasErrors) throws IOException { + ImportLoggerConfig config = + ImportLoggerConfig.builder() + .logDirectoryPath(tempDir.toString() + "/") + .logRawSourceRecords(true) + .logSuccessRecords(true) + .build(); + SingleFileImportLogger importLogger = new SingleFileImportLogger(config, logWriterFactory); + + List dataChunkStatuses = + Stream.of(1, 2) + .map(id -> createDataChunkStatus(id, hasErrors)) + .collect(Collectors.toList()); + + dataChunkStatuses.forEach(importLogger::onDataChunkCompleted); + importLogger.onAllDataChunksCompleted(); + + assertDataChunkStatusLog(SingleFileImportLogger.SUMMARY_LOG_FILE_NAME, dataChunkStatuses); + } + + private ImportDataChunkStatus createDataChunkStatus(int dataChunkId, boolean hasErrors) { + return ImportDataChunkStatus.builder() + .dataChunkId(dataChunkId) + .startTime(Instant.now()) + .endTime(Instant.now()) + .totalRecords(100) + .successCount(hasErrors ? 90 : 100) + .failureCount(hasErrors ? 10 : 0) + .batchCount(5) + .totalDurationInMilliSeconds(1000) + .build(); + } + + private void assertDataChunkStatusLog( + String logFilePattern, List dataChunkStatuses) throws IOException { + assertSingleFileLog(tempDir, logFilePattern, dataChunkStatuses); + } + + private void assertSingleFileLog( + Path tempDir, String logFileName, List dataChunkStatuses) + throws IOException { + Path summaryLogFile = tempDir.resolve(logFileName); + assertTrue(Files.exists(summaryLogFile)); + + String logContent = new String(Files.readAllBytes(summaryLogFile), StandardCharsets.UTF_8); + DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); + List logEntries = + objectMapper.readValue(logContent, new TypeReference>() {}); + + assertEquals(dataChunkStatuses.size(), logEntries.size()); + for (int i = 0; i < dataChunkStatuses.size(); i++) { + assertDataChunkStatusEquals(dataChunkStatuses.get(i), logEntries.get(i)); + } + } + + private void assertDataChunkStatusEquals( + ImportDataChunkStatus expected, ImportDataChunkStatus actual) { + assertEquals(expected.getDataChunkId(), actual.getDataChunkId()); + assertEquals(expected.getStartTime(), actual.getStartTime()); + assertEquals(expected.getEndTime(), actual.getEndTime()); + assertEquals(expected.getTotalRecords(), actual.getTotalRecords()); + assertEquals(expected.getSuccessCount(), actual.getSuccessCount()); + assertEquals(expected.getFailureCount(), actual.getFailureCount()); + assertEquals(expected.getBatchCount(), actual.getBatchCount()); + assertEquals( + expected.getTotalDurationInMilliSeconds(), actual.getTotalDurationInMilliSeconds()); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLoggerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLoggerTest.java new file mode 100644 index 0000000000..800ae4e97a --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLoggerTest.java @@ -0,0 +1,243 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.writer.DefaultLogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactoryConfig; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class SplitByDataChunkImportLoggerTest { + + private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); + + @TempDir Path tempDir; + + private LogWriterFactory logWriterFactory; + + @BeforeEach + void setUp() { + LogWriterFactoryConfig logWriterFactoryConfig = + LogWriterFactoryConfig.builder() + .logStorageLocation(LogStorageLocation.LOCAL_FILE_STORAGE) + .build(); + ImportLoggerConfig importLoggerConfig = + ImportLoggerConfig.builder() + .prettyPrint(false) + .logSuccessRecords(false) + .logRawSourceRecords(false) + .logDirectoryPath("path") + .build(); + logWriterFactory = new DefaultLogWriterFactory(logWriterFactoryConfig, importLoggerConfig); + } + + @Test + void onTransactionBatchCompleted_NoErrors_ShouldWriteToDataChunkSuccessFiles() + throws IOException { + testTransactionBatchCompleted(true, true); + } + + @Test + void onTransactionBatchCompleted_HasErrors_ShouldWriteToDataChunkFailureFiles() + throws IOException { + testTransactionBatchCompleted(false, true); + } + + @Test + void onTransactionBatchCompleted_NoErrorsAndNoSuccessFileLogging_ShouldNotWriteToSuccessFiles() + throws IOException { + testTransactionBatchCompleted(true, false); + } + + private void testTransactionBatchCompleted(boolean success, boolean logSuccessRecords) + throws IOException { + // Arrange + ImportLoggerConfig config = + ImportLoggerConfig.builder() + .logDirectoryPath(tempDir.toString() + "/") + .logRawSourceRecords(true) + .logSuccessRecords(logSuccessRecords) + .build(); + SplitByDataChunkImportLogger importLogger = + new SplitByDataChunkImportLogger(config, logWriterFactory); + + List batchResults = new ArrayList<>(); + + for (int i = 1; i <= 3; i++) { + List records = + Collections.singletonList( + ImportTaskResult.builder() + .rowNumber(i) + .targets(Collections.EMPTY_LIST) + .rawRecord(OBJECT_MAPPER.createObjectNode()) + .build()); + ImportTransactionBatchResult result = + ImportTransactionBatchResult.builder() + .dataChunkId(i) + .transactionBatchId(1) + .records(records) + .success(success) + .build(); + batchResults.add(result); + } + + // Act + for (ImportTransactionBatchResult batchResult : batchResults) { + importLogger.onTransactionBatchCompleted(batchResult); + importLogger.onDataChunkCompleted( + ImportDataChunkStatus.builder().dataChunkId(batchResult.getDataChunkId()).build()); + } + importLogger.onAllDataChunksCompleted(); + + // Assert + for (int i = 0; i < batchResults.size(); i++) { + ImportTransactionBatchResult batchResult = batchResults.get(i); + String logFileNameFormat = + success + ? SplitByDataChunkImportLogger.SUCCESS_LOG_FILE_NAME_FORMAT + : SplitByDataChunkImportLogger.FAILURE_LOG_FILE_NAME_FORMAT; + Path dataChunkLogFileName = tempDir.resolve(String.format(logFileNameFormat, i + 1)); + + if (success && logSuccessRecords) { + assertTrue(Files.exists(dataChunkLogFileName), "Data chunk success log file should exist"); + assertTransactionBatchResult(batchResult, dataChunkLogFileName); + } else if (!success) { + assertTrue(Files.exists(dataChunkLogFileName), "Data chunk failure log file should exist"); + assertTransactionBatchResult(batchResult, dataChunkLogFileName); + } else { + assertFalse( + Files.exists(dataChunkLogFileName), "Data chunk success log file should not exist"); + } + } + } + + private void assertTransactionBatchResult( + ImportTransactionBatchResult expected, Path dataChunkLogFileName) throws IOException { + // String logContent = Files.readString(dataChunkLogFileName); + String logContent = + new String(Files.readAllBytes(dataChunkLogFileName), StandardCharsets.UTF_8); + DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); + List logEntries = + objectMapper.readValue( + logContent, new TypeReference>() {}); + ImportTransactionBatchResult actual = logEntries.get(0); + + assertEquals(expected.getDataChunkId(), actual.getDataChunkId(), "Data chunk ID should match"); + assertEquals( + expected.getTransactionBatchId(), + actual.getTransactionBatchId(), + "Transaction batch ID should match"); + assertEquals( + expected.getTransactionId(), actual.getTransactionId(), "Transaction ID should match"); + assertEquals(expected.isSuccess(), actual.isSuccess(), "Success status should match"); + + List expectedRecords = expected.getRecords(); + List actualRecords = actual.getRecords(); + assertEquals(expectedRecords.size(), actualRecords.size(), "Number of records should match"); + for (int j = 0; j < expectedRecords.size(); j++) { + ImportTaskResult expectedRecord = expectedRecords.get(j); + ImportTaskResult actualRecord = actualRecords.get(j); + assertEquals( + expectedRecord.getRowNumber(), actualRecord.getRowNumber(), "Row number should match"); + assertEquals( + expectedRecord.getRawRecord(), actualRecord.getRawRecord(), "Raw record should match"); + assertEquals(expectedRecord.getTargets(), actualRecord.getTargets(), "Targets should match"); + } + } + + @Test + void onDataChunkCompleted_NoErrors_ShouldWriteToSummaryLogFile() throws IOException { + testDataChunkCompleted( + String.format(SplitByDataChunkImportLogger.SUMMARY_LOG_FILE_NAME_FORMAT, "%d"), false); + } + + @Test + void onDataChunkCompleted_HasErrors_ShouldWriteToSummaryLogFile() throws IOException { + testDataChunkCompleted( + String.format(SplitByDataChunkImportLogger.SUMMARY_LOG_FILE_NAME_FORMAT, "%d"), true); + } + + private void testDataChunkCompleted(String logFilePattern, boolean hasErrors) throws IOException { + ImportLoggerConfig config = + ImportLoggerConfig.builder() + .logDirectoryPath(tempDir.toString() + "/") + .logRawSourceRecords(true) + .logSuccessRecords(true) + .build(); + SplitByDataChunkImportLogger importLogger = + new SplitByDataChunkImportLogger(config, logWriterFactory); + + List dataChunkStatuses = + IntStream.rangeClosed(1, 2) + .mapToObj(id -> createDataChunkStatus(id, hasErrors)) + .collect(Collectors.toList()); + + dataChunkStatuses.forEach(importLogger::onDataChunkCompleted); + importLogger.onAllDataChunksCompleted(); + + assertDataChunkStatusLog(logFilePattern, dataChunkStatuses); + } + + private ImportDataChunkStatus createDataChunkStatus(int dataChunkId, boolean hasErrors) { + return ImportDataChunkStatus.builder() + .dataChunkId(dataChunkId) + .startTime(Instant.now()) + .endTime(Instant.now()) + .totalRecords(100) + .successCount(hasErrors ? 90 : 100) + .failureCount(hasErrors ? 10 : 0) + .batchCount(5) + .totalDurationInMilliSeconds(1000) + .build(); + } + + private void assertDataChunkStatusLog( + String logFilePattern, List dataChunkStatuses) throws IOException { + for (ImportDataChunkStatus dataChunkStatus : dataChunkStatuses) { + String logFileName = String.format(logFilePattern, dataChunkStatus.getDataChunkId()); + Path dataChunkLogFile = tempDir.resolve(logFileName); + assertTrue(Files.exists(dataChunkLogFile), "Data chunk summary log file should exist"); + + // String logContent = Files.readString(dataChunkLogFile); + String logContent = new String(Files.readAllBytes(dataChunkLogFile), StandardCharsets.UTF_8); + DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); + List logEntries = + objectMapper.readValue(logContent, new TypeReference>() {}); + + assertEquals(1, logEntries.size()); + assertDataChunkStatusEquals(dataChunkStatus, logEntries.get(0)); + } + } + + private void assertDataChunkStatusEquals( + ImportDataChunkStatus expected, ImportDataChunkStatus actual) { + assertEquals(expected.getDataChunkId(), actual.getDataChunkId()); + assertEquals(expected.getStartTime(), actual.getStartTime()); + assertEquals(expected.getEndTime(), actual.getEndTime()); + assertEquals(expected.getTotalRecords(), actual.getTotalRecords()); + assertEquals(expected.getSuccessCount(), actual.getSuccessCount()); + assertEquals(expected.getFailureCount(), actual.getFailureCount()); + assertEquals(expected.getBatchCount(), actual.getBatchCount()); + assertEquals( + expected.getTotalDurationInMilliSeconds(), actual.getTotalDurationInMilliSeconds()); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactoryTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactoryTest.java new file mode 100644 index 0000000000..e9102bca61 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactoryTest.java @@ -0,0 +1,66 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig; +import com.scalar.db.dataloader.core.dataimport.log.LogStorageLocation; +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import software.amazon.awssdk.services.s3.S3AsyncClient; + +class DefaultLogWriterFactoryTest { + + String filePath = Paths.get("").toAbsolutePath() + "/sample.log"; + DefaultLogWriterFactory defaultLogWriterFactory; + + @AfterEach + void removeFileIfCreated() { + File file = new File(filePath); + if (file.exists()) { + file.deleteOnExit(); + } + } + + @Test + void createLogWriter_withValidLocalLogFilePath_shouldReturnLocalFileLogWriterObject() + throws IOException { + defaultLogWriterFactory = + new DefaultLogWriterFactory( + LogWriterFactoryConfig.builder() + .logStorageLocation(LogStorageLocation.LOCAL_FILE_STORAGE) + .build(), + ImportLoggerConfig.builder() + .prettyPrint(false) + .logSuccessRecords(false) + .logRawSourceRecords(false) + .logDirectoryPath("path") + .build()); + LogWriter logWriter = defaultLogWriterFactory.createLogWriter(filePath); + Assertions.assertEquals(LocalFileLogWriter.class, logWriter.getClass()); + logWriter.close(); + } + + @Test + void createLogWriter_withValidFilePath_shouldReturnLogWriterObject() throws IOException { + defaultLogWriterFactory = + new DefaultLogWriterFactory( + LogWriterFactoryConfig.builder() + .logStorageLocation(LogStorageLocation.AWS_S3) + .bucketName("bucket") + .objectKey("ObjectKay") + .s3AsyncClient(Mockito.mock(S3AsyncClient.class)) + .build(), + ImportLoggerConfig.builder() + .prettyPrint(false) + .logSuccessRecords(false) + .logRawSourceRecords(false) + .logDirectoryPath("path") + .build()); + LogWriter logWriter = defaultLogWriterFactory.createLogWriter(filePath); + Assertions.assertEquals(AwsS3LogWriter.class, logWriter.getClass()); + logWriter.close(); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactoryTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactoryTest.java new file mode 100644 index 0000000000..e78b019dd1 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactoryTest.java @@ -0,0 +1,60 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + +import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.dataimport.ImportOptions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class DefaultImportProcessorFactoryTest { + + private DefaultImportProcessorFactory factory; + + @BeforeEach + void setUp() { + factory = new DefaultImportProcessorFactory(); + } + + @Test + void createImportProcessor_givenFileFormatIsJsonl_shouldReturnJsonLinesImportProcessor() { + // Arrange + ImportOptions importOptions = ImportOptions.builder().fileFormat(FileFormat.JSONL).build(); + ImportProcessorParams params = + ImportProcessorParams.builder().importOptions(importOptions).build(); + + // Act + ImportProcessor result = factory.createImportProcessor(params); + + // Assert + assertInstanceOf(JsonLinesImportProcessor.class, result); + } + + @Test + void createImportProcessor_givenFileFormatIsJson_shouldReturnJsonImportProcessor() { + // Given + ImportOptions importOptions = ImportOptions.builder().fileFormat(FileFormat.JSON).build(); + ImportProcessorParams params = + ImportProcessorParams.builder().importOptions(importOptions).build(); + + // When + ImportProcessor result = factory.createImportProcessor(params); + + // Then + assertInstanceOf(JsonImportProcessor.class, result); + } + + @Test + void createImportProcessor_givenFileFormatIsCsv_shouldReturnCsvImportProcessor() { + // Given + ImportOptions importOptions = ImportOptions.builder().fileFormat(FileFormat.CSV).build(); + ImportProcessorParams params = + ImportProcessorParams.builder().importOptions(importOptions).build(); + + // When + ImportProcessor result = factory.createImportProcessor(params); + + // Then + assertInstanceOf(CsvImportProcessor.class, result); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypesTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypesTest.java new file mode 100644 index 0000000000..2d72827f4f --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypesTest.java @@ -0,0 +1,33 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import com.scalar.db.io.DataType; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class TableColumnDataTypesTest { + + TableColumnDataTypes tableColumnDataTypes; + + @Test + void addColumnDataType_withValidData_shouldAddColumnDataType() { + tableColumnDataTypes = new TableColumnDataTypes(); + tableColumnDataTypes.addColumnDataType("table", "id", DataType.BIGINT); + tableColumnDataTypes.addColumnDataType("table", "name", DataType.TEXT); + Assertions.assertEquals( + DataType.BIGINT, tableColumnDataTypes.getColumnDataTypes("table").get("id")); + } + + @Test + void getDataType_withValidTableAndColumnName_shouldReturnCorrectDataType() { + tableColumnDataTypes = new TableColumnDataTypes(); + tableColumnDataTypes.addColumnDataType("table", "id", DataType.BIGINT); + tableColumnDataTypes.addColumnDataType("table", "name", DataType.TEXT); + Assertions.assertEquals(DataType.TEXT, tableColumnDataTypes.getDataType("table", "name")); + } + + @Test + void getDataType_withInvalidTableAndColumnName_shouldReturnCorrectDataType() { + tableColumnDataTypes = new TableColumnDataTypes(); + Assertions.assertNull(tableColumnDataTypes.getDataType("table", "name")); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/task/mapping/ImportDataMappingTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/task/mapping/ImportDataMappingTest.java new file mode 100644 index 0000000000..e2b9364ff3 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/task/mapping/ImportDataMappingTest.java @@ -0,0 +1,45 @@ +package com.scalar.db.dataloader.core.dataimport.task.mapping; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTableFieldMapping; +import java.util.ArrayList; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class ImportDataMappingTest { + + ControlFileTable controlFilTable; + + @BeforeEach + void setup() { + controlFilTable = new ControlFileTable("namespace", "table"); + ControlFileTableFieldMapping m1 = new ControlFileTableFieldMapping("source_id", "target_id"); + ControlFileTableFieldMapping m2 = + new ControlFileTableFieldMapping("source_name", "target_name"); + ControlFileTableFieldMapping m3 = + new ControlFileTableFieldMapping("source_email", "target_email"); + ArrayList mappingArrayList = new ArrayList<>(); + mappingArrayList.add(m1); + mappingArrayList.add(m2); + mappingArrayList.add(m3); + controlFilTable.getMappings().addAll(mappingArrayList); + } + + @Test + void apply_withValidData_shouldUpdateSourceData() throws JsonProcessingException { + ObjectMapper objectMapper = new ObjectMapper(); + ObjectNode source = objectMapper.createObjectNode(); + source.put("source_id", "111"); + source.put("source_name", "abc"); + source.put("source_email", "sam@dsd.com"); + ImportDataMapping.apply(source, controlFilTable); + // Assert changes + Assertions.assertEquals("111", source.get("target_id").asText()); + Assertions.assertEquals("abc", source.get("target_name").asText()); + Assertions.assertEquals("sam@dsd.com", source.get("target_email").asText()); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidatorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidatorTest.java new file mode 100644 index 0000000000..65a85b3c3d --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidatorTest.java @@ -0,0 +1,87 @@ +package com.scalar.db.dataloader.core.dataimport.task.validation; + +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.error.CoreError; +import com.scalar.db.dataloader.core.UnitTestUtils; +import java.util.HashSet; +import java.util.Set; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class ImportSourceRecordValidatorTest { + + TableMetadata mockMetadata = UnitTestUtils.createTestTableMetadata(); + + @Test + void + validateSourceRecord_withValidData_shouldReturnValidImportSourceRecordValidationResultWithoutErrors() { + Set partitionKeyNames = mockMetadata.getPartitionKeyNames(); + Set clusteringKeyNames = mockMetadata.getClusteringKeyNames(); + Set columnNames = mockMetadata.getColumnNames(); + JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata(); + ImportSourceRecordValidationResult result = + ImportSourceRecordValidator.validateSourceRecord( + partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, false, mockMetadata); + Assertions.assertTrue(result.getColumnsWithErrors().isEmpty()); + } + + @Test + void + validateSourceRecord_withValidDataWithAllColumnsRequired_shouldReturnValidImportSourceRecordValidationResultWithoutErrors() { + Set partitionKeyNames = mockMetadata.getPartitionKeyNames(); + Set clusteringKeyNames = mockMetadata.getClusteringKeyNames(); + Set columnNames = mockMetadata.getColumnNames(); + JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata(); + ImportSourceRecordValidationResult result = + ImportSourceRecordValidator.validateSourceRecord( + partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, true, mockMetadata); + Assertions.assertTrue(result.getColumnsWithErrors().isEmpty()); + } + + @Test + void + validateSourceRecord_withInValidPartitionKey_shouldReturnValidImportSourceRecordValidationResultWithErrors() { + Set partitionKeyNames = new HashSet<>(); + partitionKeyNames.add("id1"); + Set clusteringKeyNames = mockMetadata.getClusteringKeyNames(); + Set columnNames = mockMetadata.getColumnNames(); + JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata(); + ImportSourceRecordValidationResult result = + ImportSourceRecordValidator.validateSourceRecord( + partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, false, mockMetadata); + Assertions.assertFalse(result.getColumnsWithErrors().isEmpty()); + } + + @Test + void + validateSourceRecord_withInValidPartitionKeyWithAllColumnsRequired_shouldReturnValidImportSourceRecordValidationResultWithErrors() { + Set partitionKeyNames = new HashSet<>(); + partitionKeyNames.add("id1"); + Set clusteringKeyNames = mockMetadata.getClusteringKeyNames(); + Set columnNames = mockMetadata.getColumnNames(); + JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata(); + ImportSourceRecordValidationResult result = + ImportSourceRecordValidator.validateSourceRecord( + partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, true, mockMetadata); + Assertions.assertFalse(result.getColumnsWithErrors().isEmpty()); + Assertions.assertEquals(1, result.getErrorMessages().size()); + } + + @Test + void + validateSourceRecord_withInValidClusteringKey_shouldReturnValidImportSourceRecordValidationResultWithErrors() { + Set partitionKeyNames = mockMetadata.getPartitionKeyNames(); + Set clusteringKeyNames = new HashSet<>(); + clusteringKeyNames.add("id1"); + Set columnNames = mockMetadata.getColumnNames(); + JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata(); + ImportSourceRecordValidationResult result = + ImportSourceRecordValidator.validateSourceRecord( + partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, false, mockMetadata); + Assertions.assertFalse(result.getColumnsWithErrors().isEmpty()); + Assertions.assertEquals( + CoreError.DATA_LOADER_MISSING_CLUSTERING_KEY_COLUMN.buildMessage("id1"), + result.getErrorMessages().get(0)); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/ColumnUtilsTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/ColumnUtilsTest.java index cd47243b16..f38ca7fd72 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/ColumnUtilsTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/ColumnUtilsTest.java @@ -2,8 +2,14 @@ import static org.junit.jupiter.api.Assertions.*; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.scalar.db.api.Result; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.ResultImpl; import com.scalar.db.common.error.CoreError; import com.scalar.db.dataloader.core.ColumnInfo; +import com.scalar.db.dataloader.core.UnitTestUtils; +import com.scalar.db.dataloader.core.exception.Base64Exception; import com.scalar.db.dataloader.core.exception.ColumnParsingException; import com.scalar.db.io.BigIntColumn; import com.scalar.db.io.BlobColumn; @@ -16,7 +22,10 @@ import com.scalar.db.io.TextColumn; import java.nio.charset.StandardCharsets; import java.util.Base64; +import java.util.List; +import java.util.Map; import java.util.stream.Stream; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -25,6 +34,10 @@ class ColumnUtilsTest { private static final float FLOAT_VALUE = 2.78f; + private static final TableMetadata mockMetadata = UnitTestUtils.createTestTableMetadata(); + private static final ObjectNode sourceRecord = UnitTestUtils.getOutputDataWithMetadata(); + private static final Map> values = UnitTestUtils.createTestValues(); + private static final Result scalarDBResult = new ResultImpl(values, mockMetadata); private static Stream provideColumnsForCreateColumnFromValue() { return Stream.of( @@ -105,4 +118,12 @@ void createColumnFromValue_invalidBase64_throwsBase64Exception() { columnName, "table", "ns"), exception.getMessage()); } + + @Test + void getColumnsFromResult_withValidData_shouldReturnColumns() + throws Base64Exception, ColumnParsingException { + List> columns = + ColumnUtils.getColumnsFromResult(scalarDBResult, sourceRecord, false, mockMetadata); + Assertions.assertEquals(7, columns.size()); + } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/KeyUtilsTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/KeyUtilsTest.java index f2fe680490..5c1a04cc22 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/KeyUtilsTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/KeyUtilsTest.java @@ -3,10 +3,12 @@ import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.scalar.db.api.TableMetadata; import com.scalar.db.common.error.CoreError; import com.scalar.db.dataloader.core.ColumnInfo; import com.scalar.db.dataloader.core.ColumnKeyValue; +import com.scalar.db.dataloader.core.UnitTestUtils; import com.scalar.db.dataloader.core.exception.KeyParsingException; import com.scalar.db.io.BigIntColumn; import com.scalar.db.io.BlobColumn; @@ -18,7 +20,8 @@ import com.scalar.db.io.Key; import com.scalar.db.io.TextColumn; import java.nio.charset.StandardCharsets; -import java.util.Base64; +import java.util.*; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -28,6 +31,8 @@ class KeyUtilsTest { @Mock private TableMetadata tableMetadata; + private static final Map dataTypeByColumnName = UnitTestUtils.getColumnData(); + private static final ObjectNode sourceRecord = UnitTestUtils.getOutputDataWithMetadata(); @Test void parseKeyValue_nullKeyValue_returnsNull() throws KeyParsingException { @@ -146,4 +151,42 @@ void createKey_invalidBase64_throwsBase64Exception() { assertThrows( KeyParsingException.class, () -> KeyUtils.createKey(DataType.BLOB, columnInfo, value)); } + + @Test + void createClusteringKeyFromSource_withEmptyClusteringKeySet_shouldReturnEmpty() { + Optional key = KeyUtils.createClusteringKeyFromSource(Collections.EMPTY_SET, null, null); + Assertions.assertEquals(Optional.empty(), key); + } + + @Test + void createClusteringKeyFromSource_withValidClusteringKeySet_shouldReturnValidKey() { + Set clusterKeySet = new HashSet<>(); + clusterKeySet.add(UnitTestUtils.TEST_COLUMN_2_CK); + clusterKeySet.add(UnitTestUtils.TEST_COLUMN_3_CK); + Optional key = + KeyUtils.createClusteringKeyFromSource(clusterKeySet, dataTypeByColumnName, sourceRecord); + Assertions.assertEquals( + "Optional[Key{IntColumn{name=col2, value=2147483647, hasNullValue=false}, BooleanColumn{name=col3, value=true, hasNullValue=false}}]", + key.toString()); + } + + @Test + void createPartitionKeyFromSource_withInvalidData_shouldReturnEmpty() { + Set partitionKeySet = new HashSet<>(); + partitionKeySet.add("id1"); + Optional key = + KeyUtils.createPartitionKeyFromSource(partitionKeySet, dataTypeByColumnName, sourceRecord); + Assertions.assertEquals(Optional.empty(), key); + } + + @Test + void createPartitionKeyFromSource_withValidData_shouldReturnValidKey() { + Set partitionKeySet = new HashSet<>(); + partitionKeySet.add(UnitTestUtils.TEST_COLUMN_1_PK); + Optional key = + KeyUtils.createPartitionKeyFromSource(partitionKeySet, dataTypeByColumnName, sourceRecord); + Assertions.assertEquals( + "Optional[Key{BigIntColumn{name=col1, value=9007199254740992, hasNullValue=false}}]", + key.toString()); + } } diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 23254eb3ab..bab1669d82 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -37,7 +37,7 @@ - +