Skip to content

Commit 4b13240

Browse files
committed
feat(graph-size-per-shard): UT for Integrating MultiTableReadWithUniformPartitions
1 parent 5225598 commit 4b13240

28 files changed

+3507
-711
lines changed

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraIOWrapperHelperTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -244,11 +244,12 @@ public void testTableReaders() {
244244
dataSource,
245245
cassandraSchemaReference,
246246
ImmutableList.of(BASIC_TEST_TABLE, PRIMITIVE_TYPES_TABLE));
247-
ImmutableMap<SourceTableReference, PTransform<PBegin, PCollection<SourceRow>>> tableReraders =
248-
CassandraIOWrapperHelper.getTableReaders(dataSource, sourceSchema);
247+
ImmutableMap<ImmutableList<SourceTableReference>, PTransform<PBegin, PCollection<SourceRow>>>
248+
tableReraders = CassandraIOWrapperHelper.getTableReaders(dataSource, sourceSchema);
249+
tableReraders.keySet().stream().forEach(tableList -> assertThat(tableList.size()).isEqualTo(1));
249250
assertThat(
250251
tableReraders.keySet().stream()
251-
.map(t -> t.sourceTableName())
252+
.map(t -> t.get(0).sourceTableName())
252253
.collect(Collectors.toList()))
253254
.isEqualTo(List.of(BASIC_TEST_TABLE, PRIMITIVE_TYPES_TABLE));
254255
}

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraIoWrapperTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,9 @@ public void testCassandraIoWrapperBasic() {
8686
SourceSchema mockSourceSchema = Mockito.mock(SourceSchema.class);
8787
SourceTableReference mockSourceTableReference = Mockito.mock(SourceTableReference.class);
8888
CassandraIO.Read<SourceRow> mockTableReader = Mockito.mock(CassandraIO.Read.class);
89-
ImmutableMap<SourceTableReference, PTransform<PBegin, PCollection<SourceRow>>>
90-
mockTableReaders = ImmutableMap.of(mockSourceTableReference, mockTableReader);
89+
ImmutableMap<ImmutableList<SourceTableReference>, PTransform<PBegin, PCollection<SourceRow>>>
90+
mockTableReaders =
91+
ImmutableMap.of(ImmutableList.of(mockSourceTableReference), mockTableReader);
9192

9293
try (MockedStatic mockCassandraIoWrapperHelper = mockStatic(CassandraIOWrapperHelper.class)) {
9394
mockCassandraIoWrapperHelper

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java

Lines changed: 176 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import com.google.common.collect.ImmutableMap;
4747
import java.math.BigDecimal;
4848
import java.sql.SQLException;
49+
import java.util.stream.Collectors;
4950
import org.apache.beam.sdk.io.jdbc.JdbcIO;
5051
import org.apache.beam.sdk.transforms.PTransform;
5152
import org.apache.beam.sdk.values.PBegin;
@@ -125,9 +126,8 @@ public void testJdbcIoWrapperBasic() throws RetriableSchemaDiscoveryException {
125126
assertThat(tableSchema.tableName()).isEqualTo("testTable");
126127
assertThat(tableSchema.sourceColumnNameToSourceColumnType())
127128
.isEqualTo(ImmutableMap.of(testCol, testColType));
128-
assertThat(tableSchema.primaryKeyColumns()).isEqualTo(ImmutableList.of(testCol));
129-
ImmutableMap<SourceTableReference, PTransform<PBegin, PCollection<SourceRow>>> tableReaders =
130-
jdbcIoWrapper.getTableReaders();
129+
ImmutableMap<ImmutableList<SourceTableReference>, PTransform<PBegin, PCollection<SourceRow>>>
130+
tableReaders = jdbcIoWrapper.getTableReaders();
131131
assertThat(tableReaders.size()).isEqualTo(1);
132132
}
133133

@@ -179,8 +179,8 @@ public void testJdbcIoWrapperWithoutInference() throws RetriableSchemaDiscoveryE
179179
assertThat(tableSchema.sourceColumnNameToSourceColumnType())
180180
.isEqualTo(ImmutableMap.of(testCol, testColType));
181181
assertThat(tableSchema.primaryKeyColumns()).isEqualTo(ImmutableList.of(testCol));
182-
ImmutableMap<SourceTableReference, PTransform<PBegin, PCollection<SourceRow>>> tableReaders =
183-
jdbcIoWrapper.getTableReaders();
182+
ImmutableMap<ImmutableList<SourceTableReference>, PTransform<PBegin, PCollection<SourceRow>>>
183+
tableReaders = jdbcIoWrapper.getTableReaders();
184184
assertThat(tableReaders.size()).isEqualTo(1);
185185
}
186186

@@ -336,8 +336,8 @@ public void testJdbcIoWrapperDifferentTables() throws RetriableSchemaDiscoveryEx
336336
.setDialectAdapter(mockDialectAdapter)
337337
.setTables(ImmutableList.of("spanner_table"))
338338
.build());
339-
ImmutableMap<SourceTableReference, PTransform<PBegin, PCollection<SourceRow>>> tableReaders =
340-
jdbcIoWrapper.getTableReaders();
339+
ImmutableMap<ImmutableList<SourceTableReference>, PTransform<PBegin, PCollection<SourceRow>>>
340+
tableReaders = jdbcIoWrapper.getTableReaders();
341341
assertThat(tableReaders.size()).isEqualTo(0);
342342
}
343343

@@ -435,6 +435,175 @@ public void testReadWithUniformPartitionFeatureFlag() throws RetriableSchemaDisc
435435
.hasSize(1);
436436
}
437437

438+
@Test
439+
public void testReadWithUniformPartitionMultiTable() throws RetriableSchemaDiscoveryException {
440+
SourceSchemaReference testSourceSchemaReference =
441+
SourceSchemaReference.ofJdbc(JdbcSchemaReference.builder().setDbName("testDB").build());
442+
String testCol = "ID";
443+
SourceColumnType testColType = new SourceColumnType("INTEGER", new Long[] {}, null);
444+
445+
when(mockDialectAdapter.discoverTables(any(), (SourceSchemaReference) any()))
446+
.thenReturn(ImmutableList.of("table1", "table2"));
447+
when(mockDialectAdapter.discoverTableIndexes(any(), (SourceSchemaReference) any(), any()))
448+
.thenReturn(
449+
ImmutableMap.of(
450+
"table1",
451+
ImmutableList.of(
452+
SourceColumnIndexInfo.builder()
453+
.setIndexType(IndexType.NUMERIC)
454+
.setIndexName("PRIMARY")
455+
.setIsPrimary(true)
456+
.setCardinality(42L)
457+
.setColumnName(testCol)
458+
.setIsUnique(true)
459+
.setOrdinalPosition(1)
460+
.build()),
461+
"table2",
462+
ImmutableList.of(
463+
SourceColumnIndexInfo.builder()
464+
.setIndexType(IndexType.NUMERIC)
465+
.setIndexName("PRIMARY")
466+
.setIsPrimary(true)
467+
.setCardinality(100L)
468+
.setColumnName(testCol)
469+
.setIsUnique(true)
470+
.setOrdinalPosition(1)
471+
.build())));
472+
when(mockDialectAdapter.discoverTableSchema(any(), (SourceSchemaReference) any(), any()))
473+
.thenReturn(
474+
ImmutableMap.of(
475+
"table1", ImmutableMap.of(testCol, testColType),
476+
"table2", ImmutableMap.of(testCol, testColType)));
477+
478+
JdbcIOWrapperConfig config =
479+
JdbcIOWrapperConfig.builderWithMySqlDefaults()
480+
.setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true")
481+
.setSourceSchemaReference(testSourceSchemaReference)
482+
.setShardID("test")
483+
.setReadWithUniformPartitionsFeatureEnabled(true)
484+
.setDbAuth(
485+
LocalCredentialsProvider.builder()
486+
.setUserName("testUser")
487+
.setPassword("testPassword")
488+
.build())
489+
.setJdbcDriverJars("")
490+
.setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver")
491+
.setDialectAdapter(mockDialectAdapter)
492+
.setTables(ImmutableList.of("table1", "table2"))
493+
.build();
494+
495+
JdbcIoWrapper jdbcIoWrapper = JdbcIoWrapper.of(config);
496+
ImmutableMap<ImmutableList<SourceTableReference>, PTransform<PBegin, PCollection<SourceRow>>>
497+
tableReaders = jdbcIoWrapper.getTableReaders();
498+
499+
assertThat(tableReaders).hasSize(1);
500+
ImmutableList<SourceTableReference> key = tableReaders.keySet().iterator().next();
501+
assertThat(key).hasSize(2);
502+
assertThat(key.stream().map(SourceTableReference::sourceTableName).collect(Collectors.toList()))
503+
.containsExactly("\"table1\"", "\"table2\"");
504+
assertThat(tableReaders.values().iterator().next())
505+
.isInstanceOf(ReadWithUniformPartitions.class);
506+
}
507+
508+
@Test
509+
public void testReadWithUniformPartitionConfigOverrides()
510+
throws RetriableSchemaDiscoveryException {
511+
SourceSchemaReference testSourceSchemaReference =
512+
SourceSchemaReference.ofJdbc(JdbcSchemaReference.builder().setDbName("testDB").build());
513+
String testCol = "ID";
514+
SourceColumnType testColType = new SourceColumnType("INTEGER", new Long[] {}, null);
515+
516+
when(mockDialectAdapter.discoverTables(any(), (SourceSchemaReference) any()))
517+
.thenReturn(ImmutableList.of("table1"));
518+
when(mockDialectAdapter.discoverTableIndexes(any(), (SourceSchemaReference) any(), any()))
519+
.thenReturn(
520+
ImmutableMap.of(
521+
"table1",
522+
ImmutableList.of(
523+
SourceColumnIndexInfo.builder()
524+
.setIndexType(IndexType.NUMERIC)
525+
.setIndexName("PRIMARY")
526+
.setIsPrimary(true)
527+
.setCardinality(100L)
528+
.setColumnName(testCol)
529+
.setIsUnique(true)
530+
.setOrdinalPosition(1)
531+
.build())));
532+
when(mockDialectAdapter.discoverTableSchema(any(), (SourceSchemaReference) any(), any()))
533+
.thenReturn(ImmutableMap.of("table1", ImmutableMap.of(testCol, testColType)));
534+
535+
JdbcIOWrapperConfig config =
536+
JdbcIOWrapperConfig.builderWithMySqlDefaults()
537+
.setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true")
538+
.setSourceSchemaReference(testSourceSchemaReference)
539+
.setShardID("test")
540+
.setReadWithUniformPartitionsFeatureEnabled(true)
541+
.setMaxPartitions(10) // Triggers line 487 in getMultiTableReadWithUniformPartitionIO
542+
.setSplitStageCountHint(
543+
5L) // Triggers line 492 in getMultiTableReadWithUniformPartitionIO
544+
.setDbAuth(
545+
LocalCredentialsProvider.builder()
546+
.setUserName("testUser")
547+
.setPassword("testPassword")
548+
.build())
549+
.setJdbcDriverJars("")
550+
.setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver")
551+
.setDialectAdapter(mockDialectAdapter)
552+
.build();
553+
554+
JdbcIoWrapper jdbcIoWrapper = JdbcIoWrapper.of(config);
555+
assertThat(jdbcIoWrapper.getTableReaders()).hasSize(1);
556+
}
557+
558+
@Test
559+
public void testGetJdbcIOWithMaxPartitions() throws RetriableSchemaDiscoveryException {
560+
SourceSchemaReference testSourceSchemaReference =
561+
SourceSchemaReference.ofJdbc(JdbcSchemaReference.builder().setDbName("testDB").build());
562+
String testCol = "ID";
563+
SourceColumnType testColType = new SourceColumnType("INTEGER", new Long[] {}, null);
564+
565+
when(mockDialectAdapter.discoverTables(any(), (SourceSchemaReference) any()))
566+
.thenReturn(ImmutableList.of("testTable"));
567+
when(mockDialectAdapter.discoverTableIndexes(any(), (SourceSchemaReference) any(), any()))
568+
.thenReturn(
569+
ImmutableMap.of(
570+
"testTable",
571+
ImmutableList.of(
572+
SourceColumnIndexInfo.builder()
573+
.setIndexType(IndexType.NUMERIC)
574+
.setIndexName("PRIMARY")
575+
.setIsPrimary(true)
576+
.setCardinality(42L)
577+
.setColumnName(testCol)
578+
.setIsUnique(true)
579+
.setOrdinalPosition(1)
580+
.build())));
581+
when(mockDialectAdapter.discoverTableSchema(any(), (SourceSchemaReference) any(), any()))
582+
.thenReturn(ImmutableMap.of("testTable", ImmutableMap.of(testCol, testColType)));
583+
584+
JdbcIOWrapperConfig config =
585+
JdbcIOWrapperConfig.builderWithMySqlDefaults()
586+
.setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true")
587+
.setSourceSchemaReference(testSourceSchemaReference)
588+
.setShardID("test")
589+
.setReadWithUniformPartitionsFeatureEnabled(false)
590+
.setMaxPartitions(10) // Triggers line 439 in getJdbcIO
591+
.setDbAuth(
592+
LocalCredentialsProvider.builder()
593+
.setUserName("testUser")
594+
.setPassword("testPassword")
595+
.build())
596+
.setJdbcDriverJars("")
597+
.setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver")
598+
.setDialectAdapter(mockDialectAdapter)
599+
.build();
600+
601+
JdbcIoWrapper jdbcIoWrapper = JdbcIoWrapper.of(config);
602+
assertThat(jdbcIoWrapper.getTableReaders()).hasSize(1);
603+
assertThat(jdbcIoWrapper.getTableReaders().values().iterator().next())
604+
.isInstanceOf(JdbcIO.ReadWithPartitions.class);
605+
}
606+
438607
@Test
439608
public void testLoginTimeout() throws RetriableSchemaDiscoveryException {
440609

0 commit comments

Comments
 (0)