Skip to content

Commit 6c6ae87

Browse files
committed
Add tests.
1 parent 791eec6 commit 6c6ae87

File tree

2 files changed

+169
-0
lines changed

2 files changed

+169
-0
lines changed

sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,54 @@ public void testReadWithSchema() {
470470
pipeline.run();
471471
}
472472

473+
@Test
474+
public void testReadRowsWithExplicitSchema() {
475+
Schema customSchema =
476+
Schema.of(
477+
Schema.Field.of("CUSTOMER_NAME", Schema.FieldType.STRING).withNullable(true),
478+
Schema.Field.of("CUSTOMER_ID", Schema.FieldType.INT64).withNullable(true));
479+
480+
PCollection<Row> rows =
481+
pipeline.apply(
482+
JdbcIO.readRows()
483+
.withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION)
484+
.withQuery(String.format("select name,id from %s where name = ?", READ_TABLE_NAME))
485+
.withStatementPreparator(
486+
preparedStatement -> preparedStatement.setString(1, TestRow.getNameForSeed(1)))
487+
.withSchema(customSchema));
488+
489+
assertEquals(customSchema, rows.getSchema());
490+
491+
PCollection<Row> output = rows.apply(Select.fieldNames("CUSTOMER_NAME", "CUSTOMER_ID"));
492+
PAssert.that(output)
493+
.containsInAnyOrder(
494+
ImmutableList.of(Row.withSchema(customSchema).addValues("Testval1", 1L).build()));
495+
496+
pipeline.run();
497+
}
498+
499+
@Test
500+
@SuppressWarnings({"UnusedVariable"})
501+
public void testIncompatibleSchemaThrowsError() {
502+
Schema incompatibleSchema =
503+
Schema.of(
504+
Schema.Field.of(
505+
"WRONG_TYPE_NAME", Schema.FieldType.INT64), // Wrong type for a string column
506+
Schema.Field.of(
507+
"WRONG_TYPE_ID", Schema.FieldType.STRING) // Wrong type for an integer column
508+
);
509+
510+
Pipeline pipeline = Pipeline.create();
511+
pipeline.apply(
512+
JdbcIO.readRows()
513+
.withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION)
514+
.withQuery(String.format("select name,id from %s limit 10", READ_TABLE_NAME))
515+
.withSchema(incompatibleSchema));
516+
517+
PipelineExecutionException exception =
518+
assertThrows(PipelineExecutionException.class, () -> pipeline.run().waitUntilFinish());
519+
}
520+
473521
@Test
474522
public void testReadWithPartitions() {
475523
PCollection<TestRow> rows =
@@ -486,6 +534,32 @@ public void testReadWithPartitions() {
486534
pipeline.run();
487535
}
488536

537+
@Test
538+
public void testReadWithPartitionsWithExplicitSchema() {
539+
Schema customSchema =
540+
Schema.of(
541+
Schema.Field.of("CUSTOMER_NAME", Schema.FieldType.STRING).withNullable(true),
542+
Schema.Field.of("CUSTOMER_ID", Schema.FieldType.INT32).withNullable(true));
543+
544+
PCollection<Row> rows =
545+
pipeline.apply(
546+
JdbcIO.<Row>readWithPartitions()
547+
.withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION)
548+
.withTable(String.format("(select name,id from %s) as subq", READ_TABLE_NAME))
549+
.withNumPartitions(5)
550+
.withPartitionColumn("id")
551+
.withLowerBound(0L)
552+
.withUpperBound(1000L)
553+
.withRowOutput()
554+
.withSchema(customSchema));
555+
556+
assertEquals(customSchema, rows.getSchema());
557+
558+
PAssert.thatSingleton(rows.apply("Count All", Count.globally())).isEqualTo(1000L);
559+
560+
pipeline.run();
561+
}
562+
489563
@Test
490564
public void testReadWithPartitionsBySubqery() {
491565
PCollection<TestRow> rows =

sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProviderTest.java

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.sdk.io.jdbc;
1919

2020
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertNotNull;
2122

2223
import java.sql.Connection;
2324
import java.sql.PreparedStatement;
@@ -85,6 +86,100 @@ public void testPartitionedRead() {
8586
pipeline.run();
8687
}
8788

89+
@Test
90+
public void testPartitionedReadWithExplicitSchema() {
91+
JdbcSchemaIOProvider provider = new JdbcSchemaIOProvider();
92+
93+
// Define a custom schema with different field names
94+
Schema customSchema =
95+
Schema.of(
96+
Schema.Field.of("CUSTOMER_NAME", Schema.FieldType.STRING).withNullable(true),
97+
Schema.Field.of("CUSTOMER_ID", Schema.FieldType.INT32).withNullable(true));
98+
99+
Row config =
100+
Row.withSchema(provider.configurationSchema())
101+
.withFieldValue("driverClassName", DATA_SOURCE_CONFIGURATION.getDriverClassName().get())
102+
.withFieldValue("jdbcUrl", DATA_SOURCE_CONFIGURATION.getUrl().get())
103+
.withFieldValue("username", "")
104+
.withFieldValue("password", "")
105+
.withFieldValue("partitionColumn", "id")
106+
.withFieldValue("partitions", (short) 10)
107+
.build();
108+
109+
JdbcSchemaIOProvider.JdbcSchemaIO schemaIO =
110+
provider.from(
111+
String.format("(select name,id from %s) as subq", READ_TABLE_NAME),
112+
config,
113+
customSchema);
114+
115+
PCollection<Row> output = pipeline.apply(schemaIO.buildReader());
116+
117+
// Verify the schema is the one we provided
118+
assertEquals(customSchema, output.getSchema());
119+
120+
// Verify row count
121+
Long expected = Long.valueOf(EXPECTED_ROW_COUNT);
122+
PAssert.that(output.apply(Count.globally())).containsInAnyOrder(expected);
123+
124+
// Verify field names match our custom schema
125+
PAssert.that(output)
126+
.satisfies(
127+
rows -> {
128+
for (Row row : rows) {
129+
assertNotNull(row.getString("CUSTOMER_NAME"));
130+
assertNotNull(row.getInt32("CUSTOMER_ID"));
131+
}
132+
return null;
133+
});
134+
135+
pipeline.run();
136+
}
137+
138+
@Test
139+
public void testReadWithExplicitSchema() {
140+
JdbcSchemaIOProvider provider = new JdbcSchemaIOProvider();
141+
142+
// Define a custom schema with different field names
143+
Schema customSchema =
144+
Schema.of(
145+
Schema.Field.of("CUSTOMER_NAME", Schema.FieldType.STRING).withNullable(true),
146+
Schema.Field.of("CUSTOMER_ID", Schema.FieldType.INT32).withNullable(true));
147+
148+
Row config =
149+
Row.withSchema(provider.configurationSchema())
150+
.withFieldValue("driverClassName", DATA_SOURCE_CONFIGURATION.getDriverClassName().get())
151+
.withFieldValue("jdbcUrl", DATA_SOURCE_CONFIGURATION.getUrl().get())
152+
.withFieldValue("username", "")
153+
.withFieldValue("password", "")
154+
.withFieldValue("readQuery", "SELECT name, id FROM " + READ_TABLE_NAME)
155+
.build();
156+
157+
JdbcSchemaIOProvider.JdbcSchemaIO schemaIO =
158+
provider.from(READ_TABLE_NAME, config, customSchema);
159+
160+
PCollection<Row> output = pipeline.apply(schemaIO.buildReader());
161+
162+
// Verify the schema is the one we provided
163+
assertEquals(customSchema, output.getSchema());
164+
165+
// Verify row count
166+
Long expected = Long.valueOf(EXPECTED_ROW_COUNT);
167+
PAssert.that(output.apply(Count.globally())).containsInAnyOrder(expected);
168+
169+
// Verify field names match our custom schema
170+
PAssert.that(output)
171+
.satisfies(
172+
rows -> {
173+
for (Row row : rows) {
174+
assertNotNull(row.getString("CUSTOMER_NAME"));
175+
assertNotNull(row.getInt32("CUSTOMER_ID"));
176+
}
177+
return null;
178+
});
179+
180+
pipeline.run();
181+
}
182+
88183
// This test shouldn't work because we only support numeric and datetime columns and we are trying
89184
// to use a string column as our partition source.
90185
@Test

0 commit comments

Comments
 (0)