diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java index 73f40b0577..884a0a535b 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.scalar.db.api.DistributedStorageAdmin; import com.scalar.db.api.TableMetadata; +import com.scalar.db.config.DatabaseConfig; import com.scalar.db.dataloader.core.DataLoaderError; import com.scalar.db.dataloader.core.FileFormat; import com.scalar.db.dataloader.core.ScalarDbMode; @@ -28,6 +29,7 @@ import com.scalar.db.dataloader.core.util.TableMetadataUtil; import com.scalar.db.service.StorageFactory; import com.scalar.db.service.TransactionFactory; +import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager; import java.io.BufferedReader; import java.io.File; import java.io.IOException; @@ -159,8 +161,9 @@ private ImportManager createImportManager( null, scalarDbTransactionManager.getDistributedTransactionManager()); } else { + DatabaseConfig databaseConfig = new DatabaseConfig(configFile); ScalarDbStorageManager scalarDbStorageManager = - new ScalarDbStorageManager(StorageFactory.create(configFile)); + new ScalarDbStorageManager(new SingleCrudOperationTransactionManager(databaseConfig)); importManager = new ImportManager( tableMetadataMap, @@ -168,7 +171,7 @@ private ImportManager createImportManager( importOptions, importProcessorFactory, ScalarDbMode.STORAGE, - scalarDbStorageManager.getDistributedStorage(), + scalarDbStorageManager.getSingleCrudOperationTransactionManager(), null); } if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java index 07ef2dd756..ee06e4e8c3 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java @@ -1,6 +1,5 @@ package com.scalar.db.dataloader.core.dataimport; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.ScalarDbMode; @@ -13,6 +12,7 @@ import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; +import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager; import java.io.BufferedReader; import java.util.ArrayList; import java.util.List; @@ -44,7 +44,7 @@ public class ImportManager implements ImportEventListener { private final ImportProcessorFactory importProcessorFactory; private final List listeners = new ArrayList<>(); private final ScalarDbMode scalarDbMode; - private final DistributedStorage distributedStorage; + private final SingleCrudOperationTransactionManager singleCrudOperationTransactionManager; private final DistributedTransactionManager distributedTransactionManager; /** @@ -62,7 +62,7 @@ public void startImport() { .tableMetadataByTableName(tableMetadata) .dao(new ScalarDbDao()) .distributedTransactionManager(distributedTransactionManager) - .distributedStorage(distributedStorage) + .singleCrudOperationTransactionManager(singleCrudOperationTransactionManager) .tableColumnDataTypes(getTableColumnDataTypes()) .build(); ImportProcessor processor = importProcessorFactory.createImportProcessor(params); @@ -169,8 +169,8 @@ public void onAllDataChunksCompleted() { /** Close resources properly once the process is completed */ public void closeResources() { try { - if (distributedStorage != null) { - distributedStorage.close(); + if (singleCrudOperationTransactionManager != null) { + singleCrudOperationTransactionManager.close(); } else if (distributedTransactionManager != null) { distributedTransactionManager.close(); } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java index afd7b124af..ab7879283a 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java @@ -2,6 +2,7 @@ import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Get; import com.scalar.db.api.GetBuilder; import com.scalar.db.api.Put; @@ -10,12 +11,14 @@ import com.scalar.db.api.Scan; import com.scalar.db.api.ScanBuilder; import com.scalar.db.api.Scanner; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.dataloader.core.DataLoaderError; import com.scalar.db.dataloader.core.ScanRange; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.transaction.CrudException; import com.scalar.db.io.Column; import com.scalar.db.io.Key; +import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -56,6 +59,36 @@ public Optional get( } } + /** + * Retrieve record from ScalarDB instance in storage mode + * + * @param namespace Namespace name + * @param table Table name + * @param partitionKey Partition key + * @param clusteringKey Optional clustering key for get + * @param manager SingleCrudOperationTransactionManager object + * @return Optional get result + * @throws ScalarDbDaoException if something goes wrong while reading the data + */ + public Optional get( + String namespace, + String table, + Key partitionKey, + Key clusteringKey, + SingleCrudOperationTransactionManager manager) + throws ScalarDbDaoException { + + // Retrieving the key data for logging + String loggingKey = keysToString(partitionKey, clusteringKey); + + try { + Get get = createGetWith(namespace, table, partitionKey, clusteringKey); + return manager.get(get); + } catch (CrudException e) { + throw new ScalarDbDaoException("error GET " + loggingKey, e); + } + } + /** * Retrieve record from ScalarDB instance in transaction mode * @@ -114,6 +147,34 @@ public void put( } } + /** + * Save record in ScalarDB instance + * + * @param namespace Namespace name + * @param table Table name + * @param partitionKey Partition key + * @param clusteringKey Optional clustering key + * @param columns List of column values to be inserted or updated + * @param manager SingleCrudOperationTransactionManager object + * @throws ScalarDbDaoException if something goes wrong while executing the transaction + */ + public void put( + String namespace, + String table, + Key partitionKey, + Key clusteringKey, + List> columns, + SingleCrudOperationTransactionManager manager) + throws ScalarDbDaoException { + Put put = createPutWith(namespace, table, partitionKey, clusteringKey, columns); + try { + manager.put(put); + } catch (CrudException e) { + throw new ScalarDbDaoException( + DataLoaderError.ERROR_CRUD_EXCEPTION.buildMessage(e.getMessage()), e); + } + } + /** * Save record in ScalarDB instance * @@ -179,6 +240,44 @@ public List scan( } } + /** + * Scan a ScalarDB table + * + * @param namespace ScalarDB namespace + * @param table ScalarDB table name + * @param partitionKey Partition key used in ScalarDB scan + * @param range Optional range to set ScalarDB scan start and end values + * @param sorts Optional scan clustering key sorting values + * @param projections List of column projection to use during scan + * @param limit Scan limit value + * @param manager SingleCrudOperationTransactionManager object + * @return List of ScalarDB scan results + * @throws ScalarDbDaoException if scan fails + */ + public List scan( + String namespace, + String table, + Key partitionKey, + ScanRange range, + List sorts, + List projections, + int limit, + SingleCrudOperationTransactionManager manager) + throws ScalarDbDaoException { + + // Create scan + Scan scan = createScan(namespace, table, partitionKey, range, sorts, projections, limit); + + // scan data + try { + return manager.scan(scan); + } catch (CrudException | NoSuchElementException e) { + // No such element Exception is thrown when the scan is done in transaction mode but + // ScalarDB is running in storage mode + throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e); + } + } + /** * Scan a ScalarDB table * @@ -245,6 +344,60 @@ public Scanner createScanner( } } + /** + * Create a ScalarDB scanner instance + * + * @param namespace ScalarDB namespace + * @param table ScalarDB table name + * @param projectionColumns List of column projection to use during scan + * @param limit Scan limit value + * @param transaction Distributed transaction object + * @return ScalarDB Scanner object + * @throws ScalarDbDaoException if scan fails + */ + public TransactionManagerCrudOperable.Scanner createScanner( + String namespace, + String table, + List projectionColumns, + int limit, + DistributedTransactionManager transaction) + throws ScalarDbDaoException { + Scan scan = + createScan(namespace, table, null, null, new ArrayList<>(), projectionColumns, limit); + try { + return transaction.getScanner(scan); + } catch (CrudException e) { + throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e); + } + } + + /** + * Create a ScalarDB scanner instance + * + * @param namespace ScalarDB namespace + * @param table ScalarDB table name + * @param projectionColumns List of column projection to use during scan + * @param limit Scan limit value + * @param manager SingleCrudOperationTransactionManager object + * @return ScalarDB Scanner object + * @throws ScalarDbDaoException if scan fails + */ + public TransactionManagerCrudOperable.Scanner createScanner( + String namespace, + String table, + List projectionColumns, + int limit, + SingleCrudOperationTransactionManager manager) + throws ScalarDbDaoException { + Scan scan = + createScan(namespace, table, null, null, new ArrayList<>(), projectionColumns, limit); + try { + return manager.getScanner(scan); + } catch (CrudException e) { + throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e); + } + } + /** * Create a ScalarDB scanner instance * @@ -278,6 +431,71 @@ public Scanner createScanner( } } + /** + * Create a ScalarDB scanner instance + * + * @param namespace ScalarDB namespace + * @param table ScalarDB table name + * @param partitionKey Partition key used in ScalarDB scan + * @param scanRange Optional range to set ScalarDB scan start and end values + * @param sortOrders Optional scan clustering key sorting values + * @param projectionColumns List of column projection to use during scan + * @param limit Scan limit value + * @param manager SingleCrudOperationTransactionManager object + * @return ScalarDB Scanner object + * @throws ScalarDbDaoException if scan fails + */ + public TransactionManagerCrudOperable.Scanner createScanner( + String namespace, + String table, + @Nullable Key partitionKey, + @Nullable ScanRange scanRange, + @Nullable List sortOrders, + @Nullable List projectionColumns, + int limit, + SingleCrudOperationTransactionManager manager) + throws ScalarDbDaoException { + Scan scan = + createScan(namespace, table, partitionKey, scanRange, sortOrders, projectionColumns, limit); + try { + return manager.getScanner(scan); + } catch (CrudException e) { + throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e); + } + } + + /** + * Create a ScalarDB scanner instance + * + * @param namespace ScalarDB namespace + * @param table ScalarDB table name + * @param partitionKey Partition key used in ScalarDB scan + * @param scanRange Optional range to set ScalarDB scan start and end values + * @param sortOrders Optional scan clustering key sorting values + * @param projectionColumns List of column projection to use during scan + * @param limit Scan limit value + * @param transaction Distributed transaction object + * @return ScalarDB Scanner object + */ + public TransactionManagerCrudOperable.Scanner createScanner( + String namespace, + String table, + @Nullable Key partitionKey, + @Nullable ScanRange scanRange, + @Nullable List sortOrders, + @Nullable List projectionColumns, + int limit, + DistributedTransactionManager transaction) + throws ScalarDbDaoException { + Scan scan = + createScan(namespace, table, partitionKey, scanRange, sortOrders, projectionColumns, limit); + try { + return transaction.getScanner(scan); + } catch (CrudException e) { + throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e); + } + } + /** * Create ScalarDB scan instance * diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbStorageManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbStorageManager.java index 54185b9b3a..3662339269 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbStorageManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbStorageManager.java @@ -1,49 +1,29 @@ package com.scalar.db.dataloader.core.dataimport.dao; -import com.scalar.db.api.DistributedStorage; -import com.scalar.db.api.DistributedStorageAdmin; -import com.scalar.db.service.StorageFactory; -import javax.annotation.Nullable; +import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager; +import lombok.Getter; /** - * A manager class for handling ScalarDB operations in storage mode. + * A manager class for handling ScalarDB operations in single CRUD operation mode. * - *

Provides access to {@link DistributedStorage} for data operations and {@link - * DistributedStorageAdmin} for administrative operations such as schema management. + *

Provides access to {@link SingleCrudOperationTransactionManager} for executing individual CRUD + * operations in a lightweight, non-distributed manner. * - *

This class is typically used when interacting with ScalarDB in a non-transactional, - * storage-only configuration. + *

This class is typically used when ScalarDB is configured in single CRUD operation mode, + * allowing direct operations without the overhead of distributed transactions. */ public class ScalarDbStorageManager { - @Nullable private final DistributedStorage storage; - private final DistributedStorageAdmin storageAdmin; + @Getter private final SingleCrudOperationTransactionManager singleCrudOperationTransactionManager; /** - * Constructs a {@code ScalarDbStorageManager} using the provided {@link StorageFactory}. + * Constructs a {@code ScalarDbStorageManager} with the provided {@link + * SingleCrudOperationTransactionManager}. * - * @param storageFactory the factory used to create the ScalarDB storage and admin instances + * @param manager the {@code SingleCrudOperationTransactionManager} instance to be used for + * performing storage operations */ - public ScalarDbStorageManager(StorageFactory storageFactory) { - storage = storageFactory.getStorage(); - storageAdmin = storageFactory.getStorageAdmin(); - } - - /** - * Returns distributed storage for ScalarDB connection that is running in storage mode - * - * @return distributed storage object - */ - public DistributedStorage getDistributedStorage() { - return storage; - } - - /** - * Returns distributed storage admin for ScalarDB admin operations - * - * @return distributed storage admin object - */ - public DistributedStorageAdmin getDistributedStorageAdmin() { - return storageAdmin; + public ScalarDbStorageManager(SingleCrudOperationTransactionManager manager) { + singleCrudOperationTransactionManager = manager; } } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java index 81daf9646e..51674b04a1 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java @@ -360,7 +360,8 @@ private ImportTaskResult processStorageRecord(int dataChunkId, ImportRow importR .dao(params.getDao()) .build(); ImportTaskResult importRecordResult = - new ImportStorageTask(taskParams, params.getDistributedStorage()).execute(); + new ImportStorageTask(taskParams, params.getSingleCrudOperationTransactionManager()) + .execute(); ImportTaskResult modifiedTaskResult = ImportTaskResult.builder() diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java index 2d85325044..1f2f2da7f3 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java @@ -1,11 +1,11 @@ package com.scalar.db.dataloader.core.dataimport.processor; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.ScalarDbMode; import com.scalar.db.dataloader.core.dataimport.ImportOptions; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; +import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager; import java.util.Map; import lombok.Builder; import lombok.Value; @@ -36,8 +36,8 @@ public class ImportProcessorParams { /** Data Access Object for ScalarDB operations. */ ScalarDbDao dao; - /** Storage interface for non-transactional operations. */ - DistributedStorage distributedStorage; + /** Transaction manager for handling single crud operations. */ + SingleCrudOperationTransactionManager singleCrudOperationTransactionManager; /** Transaction manager for handling transactional operations. */ DistributedTransactionManager distributedTransactionManager; diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java index e847cc3a34..928860705a 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java @@ -1,84 +1,86 @@ package com.scalar.db.dataloader.core.dataimport.task; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.Result; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException; import com.scalar.db.io.Column; import com.scalar.db.io.Key; +import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager; import java.util.List; import java.util.Optional; /** - * An import task that interacts with a {@link DistributedStorage} for data retrieval and storage - * operations. + * An import task that performs data retrieval and storage operations using a {@link + * SingleCrudOperationTransactionManager}. * *

This class extends {@link ImportTask} and provides concrete implementations for fetching and - * storing records using a {@link DistributedStorage} instance. It acts as a bridge between the - * import process and the underlying distributed storage system. + * saving records through the associated DAO using the {@link + * SingleCrudOperationTransactionManager}. It serves as a bridge between the data import process and + * the underlying single CRUD operation layer of ScalarDB. * - *

The task handles both read and write operations: + *

The task handles two types of operations: * *

    - *
  • Reading existing records using partition and clustering keys - *
  • Storing new or updated records with their associated columns + *
  • Reading existing records using partition and clustering keys. + *
  • Writing or updating records with their corresponding columns. *
* - *

All storage operations are performed through the provided {@link DistributedStorage} instance, - * which must be properly initialized before creating this task. + *

All operations are executed via the configured DAO, which internally uses the provided {@link + * SingleCrudOperationTransactionManager} instance. */ public class ImportStorageTask extends ImportTask { - private final DistributedStorage storage; + private final SingleCrudOperationTransactionManager manager; /** - * Constructs an {@code ImportStorageTask} with the specified parameters and storage. + * Constructs an {@code ImportStorageTask} with the specified parameters and {@link + * SingleCrudOperationTransactionManager}. * - * @param params the import task parameters containing configuration and DAO objects - * @param storage the distributed storage instance to be used for data operations - * @throws NullPointerException if either params or storage is null + * @param params the import task parameters containing configuration and DAO instances + * @param manager the {@code SingleCrudOperationTransactionManager} used for data operations + * @throws NullPointerException if {@code params} or {@code manager} is {@code null} */ - public ImportStorageTask(ImportTaskParams params, DistributedStorage storage) { + public ImportStorageTask(ImportTaskParams params, SingleCrudOperationTransactionManager manager) { super(params); - this.storage = storage; + this.manager = manager; } /** - * Retrieves a data record from the distributed storage using the specified keys. + * Retrieves a data record using the provided keys. * - *

This method attempts to fetch a single record from the specified table using both partition - * and clustering keys. The operation is performed through the configured DAO using the associated - * storage instance. + *

This method attempts to fetch a single record from the specified table using the given + * partition and clustering keys. The operation is performed through the configured DAO using the + * associated {@link SingleCrudOperationTransactionManager}. * * @param namespace the namespace of the table to query * @param tableName the name of the table to query * @param partitionKey the partition key identifying the record's partition - * @param clusteringKey the clustering key for further record identification within the partition - * @return an {@link Optional} containing the {@link Result} if the record exists, otherwise an + * @param clusteringKey the clustering key identifying the record within the partition + * @return an {@link Optional} containing the {@link Result} if the record exists; otherwise, an * empty {@link Optional} * @throws ScalarDbDaoException if an error occurs during the retrieval operation, such as - * connection issues or invalid table/namespace + * connectivity issues or invalid schema references */ @Override protected Optional getDataRecord( String namespace, String tableName, Key partitionKey, Key clusteringKey) throws ScalarDbDaoException { - return params.getDao().get(namespace, tableName, partitionKey, clusteringKey, this.storage); + return params.getDao().get(namespace, tableName, partitionKey, clusteringKey, this.manager); } /** - * Saves a record into the distributed storage with the specified keys and columns. + * Saves or updates a record in the specified table using the provided keys and column values. * - *

This method writes or updates a record in the specified table using the provided keys and - * column values. The operation is performed through the configured DAO using the associated - * storage instance. + *

This method inserts or updates a record in the target table using the {@link + * SingleCrudOperationTransactionManager}. It is typically invoked during the data import process + * to persist new or modified records. * * @param namespace the namespace of the target table * @param tableName the name of the target table - * @param partitionKey the partition key determining where the record will be stored - * @param clusteringKey the clustering key for organizing records within the partition - * @param columns the list of columns containing the record's data to be saved + * @param partitionKey the partition key identifying where the record is stored + * @param clusteringKey the clustering key specifying the record's ordering within the partition + * @param columns the list of columns representing the record data to save * @throws ScalarDbDaoException if an error occurs during the save operation, such as connection - * issues, invalid data types, or constraint violations + * issues or constraint violations */ @Override protected void saveRecord( @@ -88,6 +90,6 @@ protected void saveRecord( Key clusteringKey, List> columns) throws ScalarDbDaoException { - params.getDao().put(namespace, tableName, partitionKey, clusteringKey, columns, this.storage); + params.getDao().put(namespace, tableName, partitionKey, clusteringKey, columns, this.manager); } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/ImportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/ImportManagerTest.java index bf348c5321..da1deca390 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/ImportManagerTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/ImportManagerTest.java @@ -7,11 +7,11 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.ScalarDbMode; import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory; +import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager; import java.io.BufferedReader; import java.util.HashMap; import java.util.Map; @@ -23,8 +23,8 @@ public class ImportManagerTest { private ImportManager importManager; private ImportEventListener listener1; private ImportEventListener listener2; - private DistributedStorage distributedStorage; private DistributedTransactionManager distributedTransactionManager; + private SingleCrudOperationTransactionManager singleCrudOperationTransactionManager; @BeforeEach void setUp() { @@ -35,8 +35,8 @@ void setUp() { listener1 = mock(ImportEventListener.class); listener2 = mock(ImportEventListener.class); - distributedStorage = mock(DistributedStorage.class); distributedTransactionManager = mock(DistributedTransactionManager.class); + singleCrudOperationTransactionManager = mock(SingleCrudOperationTransactionManager.class); importManager = new ImportManager( @@ -45,7 +45,7 @@ void setUp() { options, processorFactory, ScalarDbMode.STORAGE, - distributedStorage, + singleCrudOperationTransactionManager, null); // Only one resource present importManager.addListener(listener1); importManager.addListener(listener2); @@ -57,7 +57,7 @@ void onAllDataChunksCompleted_shouldNotifyListenersAndCloseStorage() { verify(listener1).onAllDataChunksCompleted(); verify(listener2).onAllDataChunksCompleted(); - verify(distributedStorage).close(); + verify(singleCrudOperationTransactionManager).close(); } @Test @@ -69,7 +69,7 @@ void onAllDataChunksCompleted_shouldAggregateListenerExceptionAndStillCloseResou assertTrue(thrown.getMessage().contains("Error during completion")); assertEquals("Listener1 failed", thrown.getCause().getMessage()); - verify(distributedStorage).close(); + verify(singleCrudOperationTransactionManager).close(); } @Test @@ -90,7 +90,9 @@ void closeResources_shouldCloseTransactionManagerIfStorageIsNull() { @Test void closeResources_shouldThrowIfResourceCloseFails() { - doThrow(new RuntimeException("Close failed")).when(distributedStorage).close(); + doThrow(new RuntimeException("Close failed")) + .when(singleCrudOperationTransactionManager) + .close(); RuntimeException ex = assertThrows(RuntimeException.class, () -> importManager.closeResources()); diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDaoTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDaoTest.java index 58a62203e8..4d04e1cfc5 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDaoTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDaoTest.java @@ -10,25 +10,44 @@ import static com.scalar.db.dataloader.core.UnitTestUtils.TEST_VALUE_INT; import static com.scalar.db.dataloader.core.UnitTestUtils.TEST_VALUE_LONG; import static org.assertj.core.api.Assertions.assertThat; - +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Scan; import com.scalar.db.api.ScanBuilder; +import com.scalar.db.api.Scanner; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.dataloader.core.ScanRange; +import com.scalar.db.exception.storage.ExecutionException; +import com.scalar.db.exception.transaction.CrudException; import com.scalar.db.io.Key; +import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; class ScalarDbDaoTest { private static final int TEST_VALUE_INT_MIN = 1; private ScalarDbDao dao; + private DistributedTransactionManager manager; + private DistributedStorage distributedStorage; + private SingleCrudOperationTransactionManager singleCrudOperationTransactionManager; @BeforeEach void setUp() { this.dao = new ScalarDbDao(); + this.distributedStorage = mock(DistributedStorage.class); + this.manager = mock(DistributedTransactionManager.class); + this.singleCrudOperationTransactionManager = mock(SingleCrudOperationTransactionManager.class); } @Test @@ -161,6 +180,86 @@ void createScan_scanAllWithLimitAndProjection_shouldCreateScanAllObjectWithLimit assertThat(scan.toString()).isEqualTo(expectedResult.toString()); } + @Test + void createScanner_withTransactionManager_ShouldCreateScannerObject() + throws CrudException, ScalarDbDaoException { + // Create Scan Object + TransactionManagerCrudOperable.Scanner mockScanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); + when(manager.getScanner(any())).thenReturn(mockScanner); + TransactionManagerCrudOperable.Scanner result = + this.dao.createScanner( + TEST_NAMESPACE, + TEST_TABLE_NAME, + null, + new ScanRange(null, null, false, false), + new ArrayList<>(), + new ArrayList<>(), + 0, + manager); + // Assert + assertNotNull(result); + assertEquals(mockScanner, result); + result = this.dao.createScanner(TEST_NAMESPACE, TEST_TABLE_NAME, null, 0, manager); + // Assert + assertNotNull(result); + assertEquals(mockScanner, result); + } + + @Test + void createScanner_withSingleCrudTransactionManager_ShouldCreateScannerObject() + throws CrudException, ScalarDbDaoException { + // Create Scan Object + TransactionManagerCrudOperable.Scanner mockScanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); + when(singleCrudOperationTransactionManager.getScanner(any())).thenReturn(mockScanner); + TransactionManagerCrudOperable.Scanner result = + this.dao.createScanner( + TEST_NAMESPACE, + TEST_TABLE_NAME, + null, + new ScanRange(null, null, false, false), + new ArrayList<>(), + new ArrayList<>(), + 0, + singleCrudOperationTransactionManager); + // Assert + assertNotNull(result); + assertEquals(mockScanner, result); + result = + this.dao.createScanner( + TEST_NAMESPACE, TEST_TABLE_NAME, null, 0, singleCrudOperationTransactionManager); + // Assert + assertNotNull(result); + assertEquals(mockScanner, result); + } + + @Test + void createScanner_withStorage_ShouldCreateScannerObject() + throws CrudException, ExecutionException, ScalarDbDaoException { + // Create Scan Object + Scanner mockScanner = mock(Scanner.class); + when(distributedStorage.scan(any())).thenReturn(mockScanner); + Scanner result = + this.dao.createScanner( + TEST_NAMESPACE, + TEST_TABLE_NAME, + null, + new ScanRange(null, null, false, false), + new ArrayList<>(), + new ArrayList<>(), + 0, + distributedStorage); + // Assert + assertNotNull(result); + assertEquals(mockScanner, result); + + result = this.dao.createScanner(TEST_NAMESPACE, TEST_TABLE_NAME, null, 0, distributedStorage); + // Assert + assertNotNull(result); + assertEquals(mockScanner, result); + } + /** * Create Scan Object * diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java index 5457bc51d1..ccf78098d5 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java @@ -3,7 +3,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; @@ -17,6 +16,7 @@ import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException; import com.scalar.db.dataloader.core.dataimport.log.LogMode; import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager; import java.util.HashMap; import java.util.Map; import org.junit.jupiter.api.Assertions; @@ -32,13 +32,14 @@ class CsvImportProcessorTest { @Mock TableColumnDataTypes tableColumnDataTypes; ScalarDbDao dao; - @Mock DistributedStorage distributedStorage; + SingleCrudOperationTransactionManager singleCrudOperationTransactionManager; DistributedTransactionManager distributedTransactionManager; CsvImportProcessor csvImportProcessor; @BeforeEach void setup() throws ScalarDbDaoException, TransactionException { dao = Mockito.mock(ScalarDbDao.class); + singleCrudOperationTransactionManager = mock(SingleCrudOperationTransactionManager.class); distributedTransactionManager = mock(DistributedTransactionManager.class); DistributedTransaction distributedTransaction = mock(DistributedTransaction.class); when(distributedTransactionManager.start()).thenReturn(distributedTransaction); @@ -65,7 +66,7 @@ void setup() throws ScalarDbDaoException, TransactionException { "table", UnitTestUtils.getPartitionKey(1), UnitTestUtils.getClusteringKey(), - distributedStorage)) + singleCrudOperationTransactionManager)) .thenReturn(UnitTestUtils.getResult(1)); Mockito.when( dao.get( @@ -84,7 +85,7 @@ void test_importProcessWithStorage() { .scalarDbMode(ScalarDbMode.STORAGE) .importOptions(importOptions) .dao(dao) - .distributedStorage(distributedStorage) + .singleCrudOperationTransactionManager(singleCrudOperationTransactionManager) .distributedTransactionManager(distributedTransactionManager) .tableColumnDataTypes(tableColumnDataTypes) .tableMetadataByTableName(tableMetadataByTableName) @@ -103,7 +104,7 @@ void test_importProcessWithTransaction() { .scalarDbMode(ScalarDbMode.TRANSACTION) .importOptions(importOptions) .dao(dao) - .distributedStorage(distributedStorage) + .singleCrudOperationTransactionManager(singleCrudOperationTransactionManager) .distributedTransactionManager(distributedTransactionManager) .tableColumnDataTypes(tableColumnDataTypes) .tableMetadataByTableName(tableMetadataByTableName) diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java index d60ebecb00..fd8d1e5dde 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java @@ -10,7 +10,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; @@ -26,6 +25,7 @@ import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager; import java.io.BufferedReader; import java.io.IOException; import java.io.StringReader; @@ -62,7 +62,7 @@ class ImportProcessorTest { @Mock private ImportProcessorParams params; @Mock private ImportOptions importOptions; @Mock private ScalarDbDao dao; - @Mock private DistributedStorage distributedStorage; + @Mock private SingleCrudOperationTransactionManager singleCrudOperationTransactionManager; @Mock private DistributedTransactionManager distributedTransactionManager; @Mock private DistributedTransaction distributedTransaction; @Mock private TableColumnDataTypes tableColumnDataTypes; @@ -87,7 +87,8 @@ void process_withStorageMode_shouldProcessAllDataChunks() { BufferedReader reader = new BufferedReader(new StringReader("test data")); when(params.getScalarDbMode()).thenReturn(ScalarDbMode.STORAGE); when(params.getDao()).thenReturn(dao); - when(params.getDistributedStorage()).thenReturn(distributedStorage); + when(params.getSingleCrudOperationTransactionManager()) + .thenReturn(singleCrudOperationTransactionManager); when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes); TestImportProcessor processor = new TestImportProcessor(params); @@ -150,7 +151,8 @@ void process_withMultipleDataChunks_shouldUseThreadPool() { final int maxThreads = 4; when(importOptions.getMaxThreads()).thenReturn(maxThreads); when(params.getDao()).thenReturn(dao); - when(params.getDistributedStorage()).thenReturn(distributedStorage); + when(params.getSingleCrudOperationTransactionManager()) + .thenReturn(singleCrudOperationTransactionManager); when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes); when(params.getTableMetadataByTableName()).thenReturn(tableMetadataByTableName); @@ -205,7 +207,8 @@ void process_withLargeNumberOfTasks_shouldWaitForAllTasksToComplete() { final int maxThreads = 2; when(importOptions.getMaxThreads()).thenReturn(maxThreads); when(params.getDao()).thenReturn(dao); - when(params.getDistributedStorage()).thenReturn(distributedStorage); + when(params.getSingleCrudOperationTransactionManager()) + .thenReturn(singleCrudOperationTransactionManager); when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes); when(params.getTableMetadataByTableName()).thenReturn(tableMetadataByTableName); @@ -235,7 +238,8 @@ void process_withShutdown_shouldShutdownExecutorsGracefully() { // Arrange when(params.getScalarDbMode()).thenReturn(ScalarDbMode.STORAGE); when(params.getDao()).thenReturn(dao); - when(params.getDistributedStorage()).thenReturn(distributedStorage); + when(params.getSingleCrudOperationTransactionManager()) + .thenReturn(singleCrudOperationTransactionManager); when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes); when(params.getTableMetadataByTableName()).thenReturn(tableMetadataByTableName); diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java index a5705d3684..b1901887a8 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java @@ -3,7 +3,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; @@ -17,6 +16,7 @@ import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException; import com.scalar.db.dataloader.core.dataimport.log.LogMode; import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager; import java.util.HashMap; import java.util.Map; import org.junit.jupiter.api.Assertions; @@ -32,7 +32,7 @@ class JsonImportProcessorTest { @Mock TableColumnDataTypes tableColumnDataTypes; ScalarDbDao dao; - @Mock DistributedStorage distributedStorage; + SingleCrudOperationTransactionManager singleCrudOperationTransactionManager; DistributedTransactionManager distributedTransactionManager; JsonImportProcessor jsonImportProcessor; @@ -40,6 +40,7 @@ class JsonImportProcessorTest { void setup() throws ScalarDbDaoException, TransactionException { dao = Mockito.mock(ScalarDbDao.class); distributedTransactionManager = mock(DistributedTransactionManager.class); + singleCrudOperationTransactionManager = mock(SingleCrudOperationTransactionManager.class); DistributedTransaction distributedTransaction = mock(DistributedTransaction.class); when(distributedTransactionManager.start()).thenReturn(distributedTransaction); tableMetadataByTableName = new HashMap<>(); @@ -65,7 +66,7 @@ void setup() throws ScalarDbDaoException, TransactionException { "table", UnitTestUtils.getPartitionKey(1), UnitTestUtils.getClusteringKey(), - distributedStorage)) + singleCrudOperationTransactionManager)) .thenReturn(UnitTestUtils.getResult(1)); Mockito.when( dao.get( @@ -84,7 +85,7 @@ void test_importProcessWithStorage() { .scalarDbMode(ScalarDbMode.STORAGE) .importOptions(importOptions) .dao(dao) - .distributedStorage(distributedStorage) + .singleCrudOperationTransactionManager(singleCrudOperationTransactionManager) .distributedTransactionManager(distributedTransactionManager) .tableColumnDataTypes(tableColumnDataTypes) .tableMetadataByTableName(tableMetadataByTableName) @@ -103,7 +104,7 @@ void test_importProcessWithTransaction() { .scalarDbMode(ScalarDbMode.TRANSACTION) .importOptions(importOptions) .dao(dao) - .distributedStorage(distributedStorage) + .singleCrudOperationTransactionManager(singleCrudOperationTransactionManager) .distributedTransactionManager(distributedTransactionManager) .tableColumnDataTypes(tableColumnDataTypes) .tableMetadataByTableName(tableMetadataByTableName) diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java index 30992f1d35..214d6ab8e2 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java @@ -3,7 +3,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; @@ -17,6 +16,7 @@ import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException; import com.scalar.db.dataloader.core.dataimport.log.LogMode; import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager; import java.util.HashMap; import java.util.Map; import org.junit.jupiter.api.Assertions; @@ -32,7 +32,7 @@ class JsonLinesImportProcessorTest { @Mock TableColumnDataTypes tableColumnDataTypes; ScalarDbDao dao; - @Mock DistributedStorage distributedStorage; + SingleCrudOperationTransactionManager singleCrudOperationTransactionManager; DistributedTransactionManager distributedTransactionManager; JsonLinesImportProcessor jsonLinesImportProcessor; @@ -40,6 +40,7 @@ class JsonLinesImportProcessorTest { void setup() throws ScalarDbDaoException, TransactionException { dao = Mockito.mock(ScalarDbDao.class); distributedTransactionManager = mock(DistributedTransactionManager.class); + singleCrudOperationTransactionManager = mock(SingleCrudOperationTransactionManager.class); DistributedTransaction distributedTransaction = mock(DistributedTransaction.class); when(distributedTransactionManager.start()).thenReturn(distributedTransaction); tableMetadataByTableName = new HashMap<>(); @@ -65,7 +66,7 @@ void setup() throws ScalarDbDaoException, TransactionException { "table", UnitTestUtils.getPartitionKey(1), UnitTestUtils.getClusteringKey(), - distributedStorage)) + singleCrudOperationTransactionManager)) .thenReturn(UnitTestUtils.getResult(1)); Mockito.when( dao.get( @@ -84,7 +85,7 @@ void test_importProcessWithStorage() { .scalarDbMode(ScalarDbMode.STORAGE) .importOptions(importOptions) .dao(dao) - .distributedStorage(distributedStorage) + .singleCrudOperationTransactionManager(singleCrudOperationTransactionManager) .distributedTransactionManager(distributedTransactionManager) .tableColumnDataTypes(tableColumnDataTypes) .tableMetadataByTableName(tableMetadataByTableName) @@ -103,7 +104,7 @@ void test_importProcessWithTransaction() { .scalarDbMode(ScalarDbMode.TRANSACTION) .importOptions(importOptions) .dao(dao) - .distributedStorage(distributedStorage) + .singleCrudOperationTransactionManager(singleCrudOperationTransactionManager) .distributedTransactionManager(distributedTransactionManager) .tableColumnDataTypes(tableColumnDataTypes) .tableMetadataByTableName(tableMetadataByTableName)