Skip to content

Commit be4fb97

Browse files
authored
Support managed jdbc io (Postgres) (#36034)
* Add postgres read to managed io * Add postgres write to managed io * Add integration tests for both managed and unmanaged postgres read and write. * Fix error in analyzeClassesDependencies gradle task * Fix spotless failure. * Fix python lint * Add schema transform translation for postgres read and write. * Add test for postgres schema transform translation. * Address reviewer's feedback.
1 parent be002c7 commit be4fb97

File tree

12 files changed

+642
-5
lines changed

12 files changed

+642
-5
lines changed

model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ message ManagedTransforms {
7676
"beam:schematransform:org.apache.beam:bigquery_write:v1"];
7777
ICEBERG_CDC_READ = 6 [(org.apache.beam.model.pipeline.v1.beam_urn) =
7878
"beam:schematransform:org.apache.beam:iceberg_cdc_read:v1"];
79+
POSTGRES_READ = 7 [(org.apache.beam.model.pipeline.v1.beam_urn) =
80+
"beam:schematransform:org.apache.beam:postgres_read:v1"];
81+
POSTGRES_WRITE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) =
82+
"beam:schematransform:org.apache.beam:postgres_write:v1"];
7983
}
8084
}
8185

sdks/java/io/jdbc/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ ext.summary = "IO to read and write on JDBC datasource."
2929
dependencies {
3030
implementation library.java.vendored_guava_32_1_2_jre
3131
implementation project(path: ":sdks:java:core", configuration: "shadow")
32+
implementation project(path: ":model:pipeline", configuration: "shadow")
3233
implementation library.java.dbcp2
3334
implementation library.java.joda_time
3435
implementation "org.apache.commons:commons-pool2:2.11.1"
@@ -39,8 +40,10 @@ dependencies {
3940
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
4041
testImplementation project(path: ":sdks:java:extensions:avro", configuration: "testRuntimeMigration")
4142
testImplementation project(path: ":sdks:java:io:common")
43+
testImplementation project(path: ":sdks:java:managed")
4244
testImplementation project(path: ":sdks:java:testing:test-utils")
4345
testImplementation library.java.junit
46+
testImplementation library.java.mockito_inline
4447
testImplementation library.java.slf4j_api
4548
testImplementation library.java.postgres
4649

sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import java.util.Objects;
2828
import javax.annotation.Nullable;
2929
import org.apache.beam.sdk.schemas.AutoValueSchema;
30+
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
31+
import org.apache.beam.sdk.schemas.SchemaRegistry;
3032
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
3133
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
3234
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -265,6 +267,20 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
265267
}
266268
return PCollectionRowTuple.of("output", input.getPipeline().apply(readRows));
267269
}
270+
271+
public Row getConfigurationRow() {
272+
try {
273+
// To stay consistent with our SchemaTransform configuration naming conventions,
274+
// we sort lexicographically
275+
return SchemaRegistry.createDefault()
276+
.getToRowFunction(JdbcReadSchemaTransformConfiguration.class)
277+
.apply(config)
278+
.sorted()
279+
.toSnakeCase();
280+
} catch (NoSuchSchemaException e) {
281+
throw new RuntimeException(e);
282+
}
283+
}
268284
}
269285

270286
@Override
@@ -401,6 +417,8 @@ public static Builder builder() {
401417
.Builder();
402418
}
403419

420+
public abstract Builder toBuilder();
421+
404422
@AutoValue.Builder
405423
public abstract static class Builder {
406424
public abstract Builder setDriverClassName(String value);

sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import java.util.Objects;
2828
import javax.annotation.Nullable;
2929
import org.apache.beam.sdk.schemas.AutoValueSchema;
30+
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
3031
import org.apache.beam.sdk.schemas.Schema;
32+
import org.apache.beam.sdk.schemas.SchemaRegistry;
3133
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
3234
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
3335
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -265,6 +267,20 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
265267
.setRowSchema(Schema.of());
266268
return PCollectionRowTuple.of("post_write", postWrite);
267269
}
270+
271+
public Row getConfigurationRow() {
272+
try {
273+
// To stay consistent with our SchemaTransform configuration naming conventions,
274+
// we sort lexicographically
275+
return SchemaRegistry.createDefault()
276+
.getToRowFunction(JdbcWriteSchemaTransformConfiguration.class)
277+
.apply(config)
278+
.sorted()
279+
.toSnakeCase();
280+
} catch (NoSuchSchemaException e) {
281+
throw new RuntimeException(e);
282+
}
283+
}
268284
}
269285

270286
@Override
@@ -382,6 +398,8 @@ public static Builder builder() {
382398
.Builder();
383399
}
384400

401+
public abstract Builder toBuilder();
402+
385403
@AutoValue.Builder
386404
public abstract static class Builder {
387405
public abstract Builder setDriverClassName(String value);
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.jdbc.providers;
19+
20+
import static org.apache.beam.sdk.io.jdbc.providers.ReadFromPostgresSchemaTransformProvider.PostgresReadSchemaTransform;
21+
import static org.apache.beam.sdk.io.jdbc.providers.WriteToPostgresSchemaTransformProvider.PostgresWriteSchemaTransform;
22+
import static org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator;
23+
24+
import com.google.auto.service.AutoService;
25+
import java.util.Map;
26+
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
27+
import org.apache.beam.sdk.transforms.PTransform;
28+
import org.apache.beam.sdk.util.construction.PTransformTranslation;
29+
import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
30+
import org.apache.beam.sdk.values.Row;
31+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
32+
33+
public class PostgresSchemaTransformTranslation {
34+
static class PostgresReadSchemaTransformTranslator
35+
extends SchemaTransformPayloadTranslator<PostgresReadSchemaTransform> {
36+
@Override
37+
public SchemaTransformProvider provider() {
38+
return new ReadFromPostgresSchemaTransformProvider();
39+
}
40+
41+
@Override
42+
public Row toConfigRow(PostgresReadSchemaTransform transform) {
43+
return transform.getConfigurationRow();
44+
}
45+
}
46+
47+
@AutoService(TransformPayloadTranslatorRegistrar.class)
48+
public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar {
49+
@Override
50+
@SuppressWarnings({
51+
"rawtypes",
52+
})
53+
public Map<
54+
? extends Class<? extends PTransform>,
55+
? extends PTransformTranslation.TransformPayloadTranslator>
56+
getTransformPayloadTranslators() {
57+
return ImmutableMap
58+
.<Class<? extends PTransform>, PTransformTranslation.TransformPayloadTranslator>builder()
59+
.put(PostgresReadSchemaTransform.class, new PostgresReadSchemaTransformTranslator())
60+
.build();
61+
}
62+
}
63+
64+
static class PostgresWriteSchemaTransformTranslator
65+
extends SchemaTransformPayloadTranslator<PostgresWriteSchemaTransform> {
66+
@Override
67+
public SchemaTransformProvider provider() {
68+
return new WriteToPostgresSchemaTransformProvider();
69+
}
70+
71+
@Override
72+
public Row toConfigRow(PostgresWriteSchemaTransform transform) {
73+
return transform.getConfigurationRow();
74+
}
75+
}
76+
77+
@AutoService(TransformPayloadTranslatorRegistrar.class)
78+
public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar {
79+
@Override
80+
@SuppressWarnings({
81+
"rawtypes",
82+
})
83+
public Map<
84+
? extends Class<? extends PTransform>,
85+
? extends PTransformTranslation.TransformPayloadTranslator>
86+
getTransformPayloadTranslators() {
87+
return ImmutableMap
88+
.<Class<? extends PTransform>, PTransformTranslation.TransformPayloadTranslator>builder()
89+
.put(PostgresWriteSchemaTransform.class, new PostgresWriteSchemaTransformTranslator())
90+
.build();
91+
}
92+
}
93+
}

sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,30 @@
1818
package org.apache.beam.sdk.io.jdbc.providers;
1919

2020
import static org.apache.beam.sdk.io.jdbc.JdbcUtil.POSTGRES;
21+
import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
2122

2223
import com.google.auto.service.AutoService;
24+
import java.util.Collections;
25+
import java.util.List;
26+
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
2327
import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider;
28+
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
2429
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
2530
import org.checkerframework.checker.initialization.qual.Initialized;
2631
import org.checkerframework.checker.nullness.qual.NonNull;
2732
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
2835

2936
@AutoService(SchemaTransformProvider.class)
3037
public class ReadFromPostgresSchemaTransformProvider extends JdbcReadSchemaTransformProvider {
3138

39+
private static final Logger LOG =
40+
LoggerFactory.getLogger(ReadFromPostgresSchemaTransformProvider.class);
41+
3242
@Override
3343
public @UnknownKeyFor @NonNull @Initialized String identifier() {
34-
return "beam:schematransform:org.apache.beam:postgres_read:v1";
44+
return getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_READ);
3545
}
3646

3747
@Override
@@ -43,4 +53,40 @@ public String description() {
4353
protected String jdbcType() {
4454
return POSTGRES;
4555
}
56+
57+
@Override
58+
public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
59+
JdbcReadSchemaTransformConfiguration configuration) {
60+
String jdbcType = configuration.getJdbcType();
61+
if (jdbcType != null && !jdbcType.equals(jdbcType())) {
62+
throw new IllegalArgumentException(
63+
String.format("Wrong JDBC type. Expected '%s' but got '%s'", jdbcType(), jdbcType));
64+
}
65+
66+
List<@org.checkerframework.checker.nullness.qual.Nullable String> connectionInitSql =
67+
configuration.getConnectionInitSql();
68+
if (connectionInitSql != null && !connectionInitSql.isEmpty()) {
69+
LOG.warn("Postgres does not support connectionInitSql, ignoring.");
70+
}
71+
72+
Boolean disableAutoCommit = configuration.getDisableAutoCommit();
73+
if (disableAutoCommit != null && !disableAutoCommit) {
74+
LOG.warn("Postgres reads require disableAutoCommit to be true, overriding to true.");
75+
}
76+
77+
// Override "connectionInitSql" and "disableAutoCommit" for postgres
78+
configuration =
79+
configuration
80+
.toBuilder()
81+
.setConnectionInitSql(Collections.emptyList())
82+
.setDisableAutoCommit(true)
83+
.build();
84+
return new PostgresReadSchemaTransform(configuration);
85+
}
86+
87+
public static class PostgresReadSchemaTransform extends JdbcReadSchemaTransform {
88+
public PostgresReadSchemaTransform(JdbcReadSchemaTransformConfiguration config) {
89+
super(config, POSTGRES);
90+
}
91+
}
4692
}

sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,30 @@
1818
package org.apache.beam.sdk.io.jdbc.providers;
1919

2020
import static org.apache.beam.sdk.io.jdbc.JdbcUtil.POSTGRES;
21+
import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
2122

2223
import com.google.auto.service.AutoService;
24+
import java.util.Collections;
25+
import java.util.List;
26+
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
2327
import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider;
28+
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
2429
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
2530
import org.checkerframework.checker.initialization.qual.Initialized;
2631
import org.checkerframework.checker.nullness.qual.NonNull;
2732
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
2835

2936
@AutoService(SchemaTransformProvider.class)
3037
public class WriteToPostgresSchemaTransformProvider extends JdbcWriteSchemaTransformProvider {
3138

39+
private static final Logger LOG =
40+
LoggerFactory.getLogger(WriteToPostgresSchemaTransformProvider.class);
41+
3242
@Override
3343
public @UnknownKeyFor @NonNull @Initialized String identifier() {
34-
return "beam:schematransform:org.apache.beam:postgres_write:v1";
44+
return getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_WRITE);
3545
}
3646

3747
@Override
@@ -43,4 +53,30 @@ public String description() {
4353
protected String jdbcType() {
4454
return POSTGRES;
4555
}
56+
57+
@Override
58+
public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
59+
JdbcWriteSchemaTransformConfiguration configuration) {
60+
String jdbcType = configuration.getJdbcType();
61+
if (jdbcType != null && !jdbcType.equals(jdbcType())) {
62+
throw new IllegalArgumentException(
63+
String.format("Wrong JDBC type. Expected '%s' but got '%s'", jdbcType(), jdbcType));
64+
}
65+
66+
List<@org.checkerframework.checker.nullness.qual.Nullable String> connectionInitSql =
67+
configuration.getConnectionInitSql();
68+
if (connectionInitSql != null && !connectionInitSql.isEmpty()) {
69+
LOG.warn("Postgres does not support connectionInitSql, ignoring.");
70+
}
71+
72+
// Override "connectionInitSql" for postgres
73+
configuration = configuration.toBuilder().setConnectionInitSql(Collections.emptyList()).build();
74+
return new PostgresWriteSchemaTransform(configuration);
75+
}
76+
77+
public static class PostgresWriteSchemaTransform extends JdbcWriteSchemaTransform {
78+
public PostgresWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration config) {
79+
super(config, POSTGRES);
80+
}
81+
}
4682
}

0 commit comments

Comments
 (0)