Skip to content

Commit 527e821

Browse files
authored
Support managed jdbc io (SQLServer) (#36055)
* Add sqlserver read and write to managed io * Address reviewer's comment.
1 parent da6f7b4 commit 527e821

File tree

7 files changed

+422
-2
lines changed

7 files changed

+422
-2
lines changed

model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ message ManagedTransforms {
8484
"beam:schematransform:org.apache.beam:mysql_read:v1"];
8585
MYSQL_WRITE = 10 [(org.apache.beam.model.pipeline.v1.beam_urn) =
8686
"beam:schematransform:org.apache.beam:mysql_write:v1"];
87+
SQL_SERVER_READ = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) =
88+
"beam:schematransform:org.apache.beam:sql_server_read:v1"];
89+
SQL_SERVER_WRITE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) =
90+
"beam:schematransform:org.apache.beam:sql_server_write:v1"];
8791
}
8892
}
8993

sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,30 @@
1818
package org.apache.beam.sdk.io.jdbc.providers;
1919

2020
import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MSSQL;
21+
import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
2122

2223
import com.google.auto.service.AutoService;
24+
import java.util.Collections;
25+
import java.util.List;
26+
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
2327
import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider;
28+
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
2429
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
2530
import org.checkerframework.checker.initialization.qual.Initialized;
2631
import org.checkerframework.checker.nullness.qual.NonNull;
2732
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
2835

2936
@AutoService(SchemaTransformProvider.class)
3037
public class ReadFromSqlServerSchemaTransformProvider extends JdbcReadSchemaTransformProvider {
3138

39+
private static final Logger LOG =
40+
LoggerFactory.getLogger(ReadFromSqlServerSchemaTransformProvider.class);
41+
3242
@Override
3343
public @UnknownKeyFor @NonNull @Initialized String identifier() {
34-
return "beam:schematransform:org.apache.beam:sql_server_read:v1";
44+
return getUrn(ExternalTransforms.ManagedTransforms.Urns.SQL_SERVER_READ);
3545
}
3646

3747
@Override
@@ -43,4 +53,35 @@ public String description() {
4353
protected String jdbcType() {
4454
return MSSQL;
4555
}
56+
57+
@Override
58+
public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
59+
JdbcReadSchemaTransformConfiguration configuration) {
60+
String jdbcType = configuration.getJdbcType();
61+
if (jdbcType != null && !jdbcType.isEmpty() && !jdbcType.equals(jdbcType())) {
62+
LOG.warn(
63+
"Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.",
64+
jdbcType(),
65+
jdbcType,
66+
jdbcType());
67+
configuration = configuration.toBuilder().setJdbcType(jdbcType()).build();
68+
}
69+
70+
List<@org.checkerframework.checker.nullness.qual.Nullable String> connectionInitSql =
71+
configuration.getConnectionInitSql();
72+
if (connectionInitSql != null && !connectionInitSql.isEmpty()) {
73+
throw new IllegalArgumentException("SQL Server does not support connectionInitSql.");
74+
}
75+
76+
// Override "connectionInitSql" for sqlserver
77+
configuration = configuration.toBuilder().setConnectionInitSql(Collections.emptyList()).build();
78+
return new SqlServerReadSchemaTransform(configuration);
79+
}
80+
81+
public static class SqlServerReadSchemaTransform extends JdbcReadSchemaTransform {
82+
public SqlServerReadSchemaTransform(JdbcReadSchemaTransformConfiguration config) {
83+
super(config, MSSQL);
84+
config.validate(MSSQL);
85+
}
86+
}
4687
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.jdbc.providers;
19+
20+
import static org.apache.beam.sdk.io.jdbc.providers.ReadFromSqlServerSchemaTransformProvider.SqlServerReadSchemaTransform;
21+
import static org.apache.beam.sdk.io.jdbc.providers.WriteToSqlServerSchemaTransformProvider.SqlServerWriteSchemaTransform;
22+
import static org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator;
23+
24+
import com.google.auto.service.AutoService;
25+
import java.util.Map;
26+
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
27+
import org.apache.beam.sdk.transforms.PTransform;
28+
import org.apache.beam.sdk.util.construction.PTransformTranslation;
29+
import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
30+
import org.apache.beam.sdk.values.Row;
31+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
32+
33+
public class SqlServerSchemaTransformTranslation {
34+
static class SqlServerReadSchemaTransformTranslator
35+
extends SchemaTransformPayloadTranslator<SqlServerReadSchemaTransform> {
36+
@Override
37+
public SchemaTransformProvider provider() {
38+
return new ReadFromSqlServerSchemaTransformProvider();
39+
}
40+
41+
@Override
42+
public Row toConfigRow(SqlServerReadSchemaTransform transform) {
43+
return transform.getConfigurationRow();
44+
}
45+
}
46+
47+
@AutoService(TransformPayloadTranslatorRegistrar.class)
48+
public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar {
49+
@Override
50+
@SuppressWarnings({
51+
"rawtypes",
52+
})
53+
public Map<
54+
? extends Class<? extends PTransform>,
55+
? extends PTransformTranslation.TransformPayloadTranslator>
56+
getTransformPayloadTranslators() {
57+
return ImmutableMap
58+
.<Class<? extends PTransform>, PTransformTranslation.TransformPayloadTranslator>builder()
59+
.put(SqlServerReadSchemaTransform.class, new SqlServerReadSchemaTransformTranslator())
60+
.build();
61+
}
62+
}
63+
64+
static class SqlServerWriteSchemaTransformTranslator
65+
extends SchemaTransformPayloadTranslator<SqlServerWriteSchemaTransform> {
66+
@Override
67+
public SchemaTransformProvider provider() {
68+
return new WriteToSqlServerSchemaTransformProvider();
69+
}
70+
71+
@Override
72+
public Row toConfigRow(SqlServerWriteSchemaTransform transform) {
73+
return transform.getConfigurationRow();
74+
}
75+
}
76+
77+
@AutoService(TransformPayloadTranslatorRegistrar.class)
78+
public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar {
79+
@Override
80+
@SuppressWarnings({
81+
"rawtypes",
82+
})
83+
public Map<
84+
? extends Class<? extends PTransform>,
85+
? extends PTransformTranslation.TransformPayloadTranslator>
86+
getTransformPayloadTranslators() {
87+
return ImmutableMap
88+
.<Class<? extends PTransform>, PTransformTranslation.TransformPayloadTranslator>builder()
89+
.put(SqlServerWriteSchemaTransform.class, new SqlServerWriteSchemaTransformTranslator())
90+
.build();
91+
}
92+
}
93+
}

sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,30 @@
1818
package org.apache.beam.sdk.io.jdbc.providers;
1919

2020
import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MSSQL;
21+
import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
2122

2223
import com.google.auto.service.AutoService;
24+
import java.util.Collections;
25+
import java.util.List;
26+
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
2327
import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider;
28+
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
2429
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
2530
import org.checkerframework.checker.initialization.qual.Initialized;
2631
import org.checkerframework.checker.nullness.qual.NonNull;
2732
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
2835

2936
@AutoService(SchemaTransformProvider.class)
3037
public class WriteToSqlServerSchemaTransformProvider extends JdbcWriteSchemaTransformProvider {
3138

39+
private static final Logger LOG =
40+
LoggerFactory.getLogger(WriteToSqlServerSchemaTransformProvider.class);
41+
3242
@Override
3343
public @UnknownKeyFor @NonNull @Initialized String identifier() {
34-
return "beam:schematransform:org.apache.beam:sql_server_write:v1";
44+
return getUrn(ExternalTransforms.ManagedTransforms.Urns.SQL_SERVER_WRITE);
3545
}
3646

3747
@Override
@@ -43,4 +53,35 @@ public String description() {
4353
protected String jdbcType() {
4454
return MSSQL;
4555
}
56+
57+
@Override
58+
public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
59+
JdbcWriteSchemaTransformConfiguration configuration) {
60+
String jdbcType = configuration.getJdbcType();
61+
if (jdbcType != null && !jdbcType.isEmpty() && !jdbcType.equals(jdbcType())) {
62+
LOG.warn(
63+
"Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.",
64+
jdbcType(),
65+
jdbcType,
66+
jdbcType());
67+
configuration = configuration.toBuilder().setJdbcType(jdbcType()).build();
68+
}
69+
70+
List<@org.checkerframework.checker.nullness.qual.Nullable String> connectionInitSql =
71+
configuration.getConnectionInitSql();
72+
if (connectionInitSql != null && !connectionInitSql.isEmpty()) {
73+
throw new IllegalArgumentException("SQL Server does not support connectionInitSql.");
74+
}
75+
76+
// Override "connectionInitSql" for sqlserver
77+
configuration = configuration.toBuilder().setConnectionInitSql(Collections.emptyList()).build();
78+
return new SqlServerWriteSchemaTransform(configuration);
79+
}
80+
81+
public static class SqlServerWriteSchemaTransform extends JdbcWriteSchemaTransform {
82+
public SqlServerWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration config) {
83+
super(config, MSSQL);
84+
config.validate(MSSQL);
85+
}
86+
}
4687
}

0 commit comments

Comments
 (0)