2424import com .google .cloud .teleport .v2 .source .reader .io .jdbc .iowrapper .config .TableConfig ;
2525import com .google .cloud .teleport .v2 .source .reader .io .jdbc .rowmapper .JdbcSourceRowMapper ;
2626import com .google .cloud .teleport .v2 .source .reader .io .jdbc .uniformsplitter .range .PartitionColumn ;
27+ import com .google .cloud .teleport .v2 .source .reader .io .jdbc .uniformsplitter .range .TableIdentifier ;
28+ import com .google .cloud .teleport .v2 .source .reader .io .jdbc .uniformsplitter .range .TableReadSpecification ;
29+ import com .google .cloud .teleport .v2 .source .reader .io .jdbc .uniformsplitter .range .TableSplitSpecification ;
2730import com .google .cloud .teleport .v2 .source .reader .io .jdbc .uniformsplitter .transforms .ReadWithUniformPartitions ;
2831import com .google .cloud .teleport .v2 .source .reader .io .row .SourceRow ;
2932import com .google .cloud .teleport .v2 .source .reader .io .schema .SchemaDiscovery ;
6265public final class JdbcIoWrapper implements IoWrapper {
6366 private static final Logger LOG = LoggerFactory .getLogger (JdbcIoWrapper .class );
6467
65- private final ImmutableMap <SourceTableReference , PTransform <PBegin , PCollection <SourceRow >>>
68+ private final ImmutableMap <
69+ ImmutableList <SourceTableReference >, PTransform <PBegin , PCollection <SourceRow >>>
6670 tableReaders ;
6771 private final SourceSchema sourceSchema ;
6872
@@ -90,8 +94,9 @@ public static JdbcIoWrapper of(JdbcIOWrapperConfig config) throws SuitableIndexN
9094 autoInferTableConfigs (config , schemaDiscovery , DataSource .ofJdbc (dataSource ));
9195 SourceSchema sourceSchema =
9296 getSourceSchema (config , schemaDiscovery , DataSource .ofJdbc (dataSource ), tableConfigs );
93- ImmutableMap <SourceTableReference , PTransform <PBegin , PCollection <SourceRow >>> tableReaders =
94- buildTableReaders (config , tableConfigs , dataSourceConfiguration , sourceSchema );
97+ ImmutableMap <ImmutableList <SourceTableReference >, PTransform <PBegin , PCollection <SourceRow >>>
98+ tableReaders =
99+ buildTableReaders (config , tableConfigs , dataSourceConfiguration , sourceSchema );
95100 return new JdbcIoWrapper (tableReaders , sourceSchema );
96101 }
97102
@@ -155,7 +160,8 @@ private static void setConnectionProperty(
155160 * @return Read transforms.
156161 */
157162 @ Override
158- public ImmutableMap <SourceTableReference , PTransform <PBegin , PCollection <SourceRow >>>
163+ public ImmutableMap <
164+ ImmutableList <SourceTableReference >, PTransform <PBegin , PCollection <SourceRow >>>
159165 getTableReaders () {
160166 return this .tableReaders ;
161167 }
@@ -170,50 +176,55 @@ public SourceSchema discoverTableSchema() {
170176 return this .sourceSchema ;
171177 }
172178
173- static ImmutableMap <SourceTableReference , PTransform <PBegin , PCollection <SourceRow >>>
179+ static ImmutableMap <
180+ ImmutableList <SourceTableReference >, PTransform <PBegin , PCollection <SourceRow >>>
174181 buildTableReaders (
175182 JdbcIOWrapperConfig config ,
176183 ImmutableList <TableConfig > tableConfigs ,
177184 DataSourceConfiguration dataSourceConfiguration ,
178185 SourceSchema sourceSchema ) {
186+ if (config .readWithUniformPartitionsFeatureEnabled () && !tableConfigs .isEmpty ()) {
187+ return getMultiTableReadWithUniformPartitionIO (
188+ config ,
189+ dataSourceConfiguration ,
190+ sourceSchema .schemaReference (),
191+ tableConfigs ,
192+ sourceSchema );
193+ }
179194 return tableConfigs .stream ()
180195 .map (
181196 tableConfig -> {
182197 SourceTableSchema sourceTableSchema =
183198 findSourceTableSchema (sourceSchema , tableConfig );
184- long estimatedRowSize = sourceTableSchema .estimatedRowSize ();
185- Integer calculatedFetchSize =
186- FetchSizeCalculator .getFetchSize (
187- tableConfig ,
188- estimatedRowSize ,
189- config .workerMemoryBytes (),
190- config .workerCores ());
191- int fetchSize = calculatedFetchSize ;
192- return Map .entry (
199+ int fetchSize = getFetchSize (config , tableConfig , sourceTableSchema );
200+ SourceTableReference sourceTableReference =
193201 SourceTableReference .builder ()
194202 .setSourceSchemaReference (sourceSchema .schemaReference ())
195203 .setSourceTableName (delimitIdentifier (sourceTableSchema .tableName ()))
196204 .setSourceTableSchemaUUID (sourceTableSchema .tableSchemaUUID ())
197- .build (),
198- (config .readWithUniformPartitionsFeatureEnabled ())
199- ? getReadWithUniformPartitionIO (
200- config ,
201- dataSourceConfiguration ,
202- sourceSchema .schemaReference (),
203- tableConfig ,
204- sourceTableSchema ,
205- fetchSize )
206- : getJdbcIO (
207- config ,
208- dataSourceConfiguration ,
209- sourceSchema .schemaReference (),
210- tableConfig ,
211- sourceTableSchema ,
212- fetchSize ));
205+ .build ();
206+ return Map .entry (
207+ ImmutableList .of (sourceTableReference ),
208+ getJdbcIO (
209+ config ,
210+ dataSourceConfiguration ,
211+ sourceSchema .schemaReference (),
212+ tableConfig ,
213+ sourceTableSchema ,
214+ fetchSize ));
213215 })
214216 .collect (ImmutableMap .toImmutableMap (Map .Entry ::getKey , Map .Entry ::getValue ));
215217 }
216218
219+ private static int getFetchSize (
220+ JdbcIOWrapperConfig config , TableConfig tableConfig , SourceTableSchema sourceTableSchema ) {
221+ return FetchSizeCalculator .getFetchSize (
222+ tableConfig ,
223+ sourceTableSchema .estimatedRowSize (),
224+ config .workerMemoryBytes (),
225+ config .workerCores ());
226+ }
227+
217228 static SourceTableSchema findSourceTableSchema (
218229 SourceSchema sourceSchema , TableConfig tableConfig ) {
219230 return sourceSchema .tableSchemas ().stream ()
@@ -468,54 +479,98 @@ private static PTransform<PBegin, PCollection<SourceRow>> getJdbcIO(
468479 }
469480
470481 /**
471- * Private helper to construct {@link ReadWithUniformPartitions} as per the reader configuration.
482+ * Private helper to construct {@link ReadWithUniformPartitions} for multiple tables as per the
483+ * reader configuration.
472484 *
473485 * @param config Configuration.
474486 * @param dataSourceConfiguration dataSourceConfiguration (which is derived earlier from the
475487 * reader configuration)
476- * @param tableConfig discovered table configurations.
477- * @param sourceTableSchema schema of the source table.
478- * @return
488+ * @param sourceSchemaReference reference for the source schema.
489+ * @param tableConfigs list of discovered table configurations.
490+ * @param sourceSchema schema of the source.
491+ * @return a map with a single entry where the key is a list of all table references and the value
492+ * is the multi-table reader transform.
479493 */
480- private static PTransform <PBegin , PCollection <SourceRow >> getReadWithUniformPartitionIO (
481- JdbcIOWrapperConfig config ,
482- DataSourceConfiguration dataSourceConfiguration ,
483- SourceSchemaReference sourceSchemaReference ,
484- TableConfig tableConfig ,
485- SourceTableSchema sourceTableSchema ,
486- int fetchSize ) {
494+ private static ImmutableMap <
495+ ImmutableList <SourceTableReference >, PTransform <PBegin , PCollection <SourceRow >>>
496+ getMultiTableReadWithUniformPartitionIO (
497+ JdbcIOWrapperConfig config ,
498+ DataSourceConfiguration dataSourceConfiguration ,
499+ SourceSchemaReference sourceSchemaReference ,
500+ ImmutableList <TableConfig > tableConfigs ,
501+ SourceSchema sourceSchema ) {
502+
503+ ImmutableList .Builder <TableSplitSpecification > splitSpecsBuilder = ImmutableList .builder ();
504+ ImmutableMap .Builder <TableIdentifier , TableReadSpecification <SourceRow >> readSpecsBuilder =
505+ ImmutableMap .builder ();
506+ ImmutableList .Builder <SourceTableReference > tableReferencesBuilder = ImmutableList .builder ();
507+
508+ for (TableConfig tableConfig : tableConfigs ) {
509+ SourceTableSchema sourceTableSchema = findSourceTableSchema (sourceSchema , tableConfig );
510+ int fetchSize = getFetchSize (config , tableConfig , sourceTableSchema );
511+ TableIdentifier tableIdentifier =
512+ TableIdentifier .builder ()
513+ .setTableName (delimitIdentifier (tableConfig .tableName ()))
514+ .build ();
515+
516+ TableSplitSpecification .Builder tableSplitSpecificationBuilder =
517+ TableSplitSpecification .builder ()
518+ .setTableIdentifier (tableIdentifier )
519+ .setPartitionColumns (tableConfig .partitionColumns ())
520+ .setApproxRowCount (tableConfig .approxRowCount ());
521+ if (tableConfig .maxPartitions () != null ) {
522+ tableSplitSpecificationBuilder =
523+ tableSplitSpecificationBuilder .setMaxPartitionsHint ((long ) tableConfig .maxPartitions ());
524+ }
525+ if (config .splitStageCountHint () >= 0 ) {
526+ tableSplitSpecificationBuilder =
527+ tableSplitSpecificationBuilder .setSplitStagesCount ((long ) config .splitStageCountHint ());
528+ }
529+ splitSpecsBuilder .add (tableSplitSpecificationBuilder .build ());
530+
531+ TableReadSpecification .Builder <SourceRow > tableReadSpecificationBuilder =
532+ TableReadSpecification .<SourceRow >builder ()
533+ .setFetchSize (fetchSize )
534+ .setTableIdentifier (tableIdentifier )
535+ .setRowMapper (
536+ new JdbcSourceRowMapper (
537+ config .valueMappingsProvider (),
538+ sourceSchemaReference ,
539+ sourceTableSchema ,
540+ config .shardID ()));
541+ if (config .maxFetchSize () != null ) {
542+ tableReadSpecificationBuilder =
543+ tableReadSpecificationBuilder .setFetchSize (config .maxFetchSize ());
544+ }
545+ readSpecsBuilder .put (tableIdentifier , tableReadSpecificationBuilder .build ());
546+
547+ tableReferencesBuilder .add (
548+ SourceTableReference .builder ()
549+ .setSourceSchemaReference (sourceSchemaReference )
550+ .setSourceTableName (delimitIdentifier (sourceTableSchema .tableName ()))
551+ .setSourceTableSchemaUUID (sourceTableSchema .tableSchemaUUID ())
552+ .build ());
553+ }
487554
488- ReadWithUniformPartitions . Builder <SourceRow > readWithUniformPartitionsBuilder =
555+ ReadWithUniformPartitions <SourceRow > readWithUniformPartitions =
489556 ReadWithUniformPartitions .<SourceRow >builder ()
490- .setTableName ( delimitIdentifier ( tableConfig . tableName () ))
491- .setPartitionColumns ( tableConfig . partitionColumns ())
557+ .setTableSplitSpecifications ( splitSpecsBuilder . build ( ))
558+ .setTableReadSpecifications ( readSpecsBuilder . build ())
492559 .setDataSourceProviderFn (JdbcIO .PoolableDataSourceProvider .of (dataSourceConfiguration ))
493560 .setDbAdapter (config .dialectAdapter ())
494- .setApproxTotalRowCount (tableConfig .approxRowCount ())
495- .setFetchSize (fetchSize )
496- .setRowMapper (
497- new JdbcSourceRowMapper (
498- config .valueMappingsProvider (),
499- sourceSchemaReference ,
500- sourceTableSchema ,
501- config .shardID ()))
502561 .setWaitOn (config .waitOn ())
503562 .setDbParallelizationForSplitProcess (config .dbParallelizationForSplitProcess ())
504563 .setDbParallelizationForReads (config .dbParallelizationForReads ())
505- .setAdditionalOperationsOnRanges (config .additionalOperationsOnRanges ());
564+ .setAdditionalOperationsOnRanges (config .additionalOperationsOnRanges ())
565+ .build ();
506566
507- if (config .splitStageCountHint () >= 0 ) {
508- readWithUniformPartitionsBuilder =
509- readWithUniformPartitionsBuilder .setSplitStageCountHint (config .splitStageCountHint ());
510- }
567+ LOG .info (
568+ "Configured Multi-Table ReadWithUniformPartitions {} for tables {} with config {}" ,
569+ readWithUniformPartitions ,
570+ tableConfigs .stream ().map (TableConfig ::tableName ).collect (Collectors .toList ()),
571+ config );
511572
512- if (tableConfig .maxPartitions () != null ) {
513- readWithUniformPartitionsBuilder =
514- readWithUniformPartitionsBuilder .setMaxPartitionsHint ((long ) tableConfig .maxPartitions ());
515- }
516- ReadWithUniformPartitions readWithUniformPartitions = readWithUniformPartitionsBuilder .build ();
517- LOG .info ("Configured ReadWithUniformPartitions {} for {}" , readWithUniformPartitions , config );
518- return readWithUniformPartitions ;
573+ return ImmutableMap .of (tableReferencesBuilder .build (), readWithUniformPartitions );
519574 }
520575
521576 /**
@@ -542,7 +597,8 @@ private static DataSourceConfiguration getDataSourceConfiguration(JdbcIOWrapperC
542597 * Beam classes like {@link JdbcIO}
543598 */
544599 private JdbcIoWrapper (
545- ImmutableMap <SourceTableReference , PTransform <PBegin , PCollection <SourceRow >>> tableReaders ,
600+ ImmutableMap <ImmutableList <SourceTableReference >, PTransform <PBegin , PCollection <SourceRow >>>
601+ tableReaders ,
546602 SourceSchema sourceSchema ) {
547603 this .tableReaders = tableReaders ;
548604 this .sourceSchema = sourceSchema ;
0 commit comments