44import static com .scalar .db .dataloader .cli .util .CommandLineInputUtils .validatePositiveValue ;
55
66import com .fasterxml .jackson .databind .ObjectMapper ;
7+ import com .scalar .db .api .DistributedTransaction ;
78import com .scalar .db .api .DistributedTransactionAdmin ;
89import com .scalar .db .api .TableMetadata ;
910import com .scalar .db .dataloader .core .DataLoaderError ;
1011import com .scalar .db .dataloader .core .FileFormat ;
12+ import com .scalar .db .dataloader .core .ScalarDbMode ;
1113import com .scalar .db .dataloader .core .dataimport .ImportManager ;
1214import com .scalar .db .dataloader .core .dataimport .ImportOptions ;
1315import com .scalar .db .dataloader .core .dataimport .controlfile .ControlFile ;
@@ -67,20 +69,32 @@ public Integer call() throws Exception {
6769 spec .commandLine (), dataChunkQueueSize , DataLoaderError .INVALID_DATA_CHUNK_QUEUE_SIZE );
6870 ControlFile controlFile = parseControlFileFromPath (controlFilePath ).orElse (null );
6971 ImportOptions importOptions = createImportOptions (controlFile );
70- ImportLoggerConfig config =
72+ ImportLoggerConfig importLoggerConfig =
7173 ImportLoggerConfig .builder ()
7274 .logDirectoryPath (logDirectory )
7375 .isLogRawSourceRecordsEnabled (importOptions .isLogRawRecord ())
7476 .isLogSuccessRecordsEnabled (importOptions .isLogSuccessRecords ())
7577 .prettyPrint (prettyPrint )
7678 .build ();
77- LogWriterFactory logWriterFactory = createLogWriterFactory (config );
79+ LogWriterFactory logWriterFactory = createLogWriterFactory (importLoggerConfig );
80+ File configFile = new File (configFilePath );
81+ TransactionFactory transactionFactory = TransactionFactory .create (configFile );
82+
83+ // Validate transaction mode configuration before proceeding
84+ validateTransactionMode (transactionFactory );
85+
7886 Map <String , TableMetadata > tableMetadataMap =
79- createTableMetadataMap (controlFile , namespace , tableName );
87+ createTableMetadataMap (controlFile , namespace , tableName , transactionFactory );
8088 try (BufferedReader reader =
8189 Files .newBufferedReader (Paths .get (sourceFilePath ), Charset .defaultCharset ())) {
8290 ImportManager importManager =
83- createImportManager (importOptions , tableMetadataMap , reader , logWriterFactory , config );
91+ createImportManager (
92+ importOptions ,
93+ tableMetadataMap ,
94+ reader ,
95+ importLoggerConfig ,
96+ logWriterFactory ,
97+ transactionFactory );
8498 importManager .startImport ();
8599 }
86100 return 0 ;
@@ -101,14 +115,16 @@ private LogWriterFactory createLogWriterFactory(ImportLoggerConfig config) {
101115 * @param controlFile control file
102116 * @param namespace Namespace
103117 * @param tableName Single table name
118+ * @param transactionFactory transaction factory to use
104119 * @return {@code Map<String, TableMetadata>} a table metadata map
105120 * @throws ParameterException if one of the argument values is wrong
106121 */
107122 private Map <String , TableMetadata > createTableMetadataMap (
108- ControlFile controlFile , String namespace , String tableName )
109- throws IOException , TableMetadataException {
110- File configFile = new File (configFilePath );
111- TransactionFactory transactionFactory = TransactionFactory .create (configFile );
123+ ControlFile controlFile ,
124+ String namespace ,
125+ String tableName ,
126+ TransactionFactory transactionFactory )
127+ throws TableMetadataException {
112128 try (DistributedTransactionAdmin transactionAdmin = transactionFactory .getTransactionAdmin ()) {
113129 TableMetadataService tableMetadataService = new TableMetadataService (transactionAdmin );
114130 Map <String , TableMetadata > tableMetadataMap = new HashMap <>();
@@ -133,18 +149,19 @@ private Map<String, TableMetadata> createTableMetadataMap(
133149 * @param importOptions import options
134150 * @param tableMetadataMap table metadata map
135151 * @param reader buffered reader with source data
152+ * @param importLoggerConfig import logging config
136153 * @param logWriterFactory log writer factory object
137- * @param config import logging config
154+ * @param transactionFactory transaction factory to use
138155 * @return ImportManager object
139156 */
140157 private ImportManager createImportManager (
141158 ImportOptions importOptions ,
142159 Map <String , TableMetadata > tableMetadataMap ,
143160 BufferedReader reader ,
161+ ImportLoggerConfig importLoggerConfig ,
144162 LogWriterFactory logWriterFactory ,
145- ImportLoggerConfig config )
163+ TransactionFactory transactionFactory )
146164 throws IOException {
147- File configFile = new File (configFilePath );
148165 ImportProcessorFactory importProcessorFactory = new DefaultImportProcessorFactory ();
149166 ImportManager importManager =
150167 new ImportManager (
@@ -153,11 +170,12 @@ private ImportManager createImportManager(
153170 importOptions ,
154171 importProcessorFactory ,
155172 scalarDbMode ,
156- TransactionFactory . create ( configFile ) .getTransactionManager ());
173+ transactionFactory .getTransactionManager ());
157174 if (importOptions .getLogMode ().equals (LogMode .SPLIT_BY_DATA_CHUNK )) {
158- importManager .addListener (new SplitByDataChunkImportLogger (config , logWriterFactory ));
175+ importManager .addListener (
176+ new SplitByDataChunkImportLogger (importLoggerConfig , logWriterFactory ));
159177 } else {
160- importManager .addListener (new SingleFileImportLogger (config , logWriterFactory ));
178+ importManager .addListener (new SingleFileImportLogger (importLoggerConfig , logWriterFactory ));
161179 }
162180 return importManager ;
163181 }
@@ -236,6 +254,48 @@ private void validateLogDirectory(String logDirectory) throws ParameterException
236254 }
237255 }
238256
257+ /**
258+ * Validate transaction mode configuration by attempting to start and abort a transaction
259+ *
260+ * @param transactionFactory transaction factory to test
261+ * @throws ParameterException if transaction mode is incompatible with the configured transaction
262+ * manager
263+ */
264+ void validateTransactionMode (TransactionFactory transactionFactory ) {
265+ // Only validate when in TRANSACTION mode
266+ if (scalarDbMode != ScalarDbMode .TRANSACTION ) {
267+ return ;
268+ }
269+
270+ DistributedTransaction transaction = null ;
271+ try {
272+ // Try to start a read only transaction to verify the transaction manager is properly
273+ // configured
274+ transaction = transactionFactory .getTransactionManager ().startReadOnly ();
275+ } catch (UnsupportedOperationException e ) {
276+ // Transaction mode is not supported by the configured transaction manager
277+ throw new ParameterException (
278+ spec .commandLine (),
279+ DataLoaderError .INVALID_TRANSACTION_MODE .buildMessage (e .getMessage ()),
280+ e );
281+ } catch (Exception e ) {
282+ // Other exceptions - configuration or runtime error
283+ throw new ParameterException (
284+ spec .commandLine (),
285+ DataLoaderError .TRANSACTION_MODE_VALIDATION_FAILED .buildMessage (e .getMessage ()),
286+ e );
287+ } finally {
288+ // Ensure transaction is aborted
289+ if (transaction != null ) {
290+ try {
291+ transaction .abort ();
292+ } catch (Exception ignored ) {
293+ // Ignore errors during cleanup
294+ }
295+ }
296+ }
297+ }
298+
239299 /**
240300 * Generate control file from a valid control file path
241301 *
0 commit comments