Skip to content

Commit eceaa8c

Browse files
committed
try only pin dependency
1 parent a18b64f commit eceaa8c

File tree

6 files changed

+38
-67
lines changed

6 files changed

+38
-67
lines changed

sdks/java/io/debezium/build.gradle

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ dependencies {
4040
// and align with the 2.17.1 version.
4141
implementation 'com.fasterxml.jackson.core:jackson-core:2.17.1'
4242
implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.1'
43-
43+
permitUnusedDeclared 'com.fasterxml.jackson.core:jackson-core:2.17.1'
44+
permitUnusedDeclared 'com.fasterxml.jackson.core:jackson-databind:2.17.1'
4445
provided library.java.jackson_dataformat_csv
4546
permitUnusedDeclared library.java.jackson_dataformat_csv
4647

@@ -78,11 +79,11 @@ dependencies {
7879
configurations.all {
7980
resolutionStrategy {
8081
force 'org.antlr:antlr4:4.10',
81-
'org.antlr:antlr4-runtime:4.10',
82-
'com.fasterxml.jackson.core:jackson-core:2.17.1',
83-
'com.fasterxml.jackson.core:jackson-annotations:2.17.1',
84-
'com.fasterxml.jackson.core:jackson-databind:2.17.1',
85-
'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.1'
82+
'org.antlr:antlr4-runtime:4.10',
83+
'com.fasterxml.jackson.core:jackson-core:2.17.1',
84+
'com.fasterxml.jackson.core:jackson-annotations:2.17.1',
85+
'com.fasterxml.jackson.core:jackson-databind:2.17.1',
86+
'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.1'
8687
}
8788
}
8889

sdks/java/io/debezium/expansion-service/build.gradle

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,21 @@ dependencies {
4646
runtimeOnly group: 'io.debezium', name: 'debezium-connector-oracle', version: debezium_version
4747
runtimeOnly group: 'io.debezium', name: 'debezium-connector-db2', version: debezium_version
4848
}
49+
50+
// sync with debezium/build.gradle depenency pins
51+
configurations.all {
52+
resolutionStrategy {
53+
force 'org.antlr:antlr4:4.10',
54+
'org.antlr:antlr4-runtime:4.10',
55+
'com.fasterxml.jackson.core:jackson-core:2.17.1',
56+
'com.fasterxml.jackson.core:jackson-annotations:2.17.1',
57+
'com.fasterxml.jackson.core:jackson-databind:2.17.1',
58+
'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.1'
59+
}
60+
}
61+
62+
shadowJar {
63+
manifest {
64+
attributes(["Multi-Release": true])
65+
}
66+
}

sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
*/
1818
package org.apache.beam.io.debezium;
1919

20-
import com.fasterxml.jackson.core.type.TypeReference;
21-
import com.fasterxml.jackson.databind.ObjectMapper;
2220
import com.google.auto.service.AutoService;
2321
import com.google.auto.value.AutoValue;
2422
import java.util.Arrays;
@@ -123,23 +121,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
123121
configuration.getTable().substring(0, configuration.getTable().indexOf(".")));
124122
}
125123

126-
// Logic to handle both JSON string and list of strings from Portability Framework Runners
127-
List<String> debeziumConnectionProperties = configuration.getDebeziumConnectionProperties();
128-
final String jsonProperties = configuration.getDebeziumConnectionPropertiesJson();
129-
130-
// Check for the new JSON properties field first.
131-
if (jsonProperties != null) {
132-
try {
133-
ObjectMapper mapper = new ObjectMapper();
134-
debeziumConnectionProperties =
135-
mapper.readValue(jsonProperties, new TypeReference<List<String>>() {});
136-
} catch (Exception e) {
137-
throw new IllegalArgumentException(
138-
"Unable to parse debeziumConnectionPropertiesJson", e);
139-
}
140-
}
141-
142-
// Fall back to Java list-based properties if JSON is empty
124+
final List<String> debeziumConnectionProperties =
125+
configuration.getDebeziumConnectionProperties();
143126
if (debeziumConnectionProperties != null) {
144127
for (String connectionProperty : debeziumConnectionProperties) {
145128
String[] parts = connectionProperty.split("=", -1);
@@ -207,8 +190,6 @@ public abstract static class DebeziumReadSchemaTransformConfiguration {
207190

208191
public abstract @Nullable List<String> getDebeziumConnectionProperties();
209192

210-
public abstract @Nullable String getDebeziumConnectionPropertiesJson();
211-
212193
public static Builder builder() {
213194
return new AutoValue_DebeziumReadSchemaTransformProvider_DebeziumReadSchemaTransformConfiguration
214195
.Builder();
@@ -231,9 +212,6 @@ public abstract static class Builder {
231212
public abstract Builder setDebeziumConnectionProperties(
232213
List<String> debeziumConnectionProperties);
233214

234-
public abstract Builder setDebeziumConnectionPropertiesJson(
235-
String debeziumConnectionPropertiesJson);
236-
237215
public abstract DebeziumReadSchemaTransformConfiguration build();
238216
}
239217
}

sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717
*/
1818
package org.apache.beam.io.debezium;
1919

20-
import com.fasterxml.jackson.core.type.TypeReference;
21-
import com.fasterxml.jackson.databind.ObjectMapper;
2220
import com.google.auto.service.AutoService;
23-
import java.io.IOException;
2421
import java.util.List;
2522
import java.util.Map;
2623
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
@@ -79,17 +76,12 @@ public static class ReadBuilder
7976

8077
public static class Configuration extends CrossLanguageConfiguration {
8178
private @Nullable List<String> connectionProperties;
82-
private @Nullable String connectionPropertiesJson;
8379
private @Nullable Long maxNumberOfRecords;
8480

8581
public void setConnectionProperties(@Nullable List<String> connectionProperties) {
8682
this.connectionProperties = connectionProperties;
8783
}
8884

89-
public void setConnectionPropertiesJson(@Nullable String connectionPropertiesJson) {
90-
this.connectionPropertiesJson = connectionPropertiesJson;
91-
}
92-
9385
public void setMaxNumberOfRecords(@Nullable Long maxNumberOfRecords) {
9486
this.maxNumberOfRecords = maxNumberOfRecords;
9587
}
@@ -105,29 +97,12 @@ public PTransform<PBegin, PCollection<String>> buildExternal(Configuration confi
10597
.withPort(configuration.port)
10698
.withConnectorClass(configuration.connectorClass.getConnector());
10799

108-
List<String> propertiesToProcess = null;
109-
if (configuration.connectionPropertiesJson != null) {
110-
// Prioritize the new JSON format if present
111-
try {
112-
ObjectMapper mapper = new ObjectMapper();
113-
propertiesToProcess =
114-
mapper.readValue(
115-
configuration.connectionPropertiesJson, new TypeReference<List<String>>() {});
116-
} catch (IOException e) {
117-
throw new IllegalArgumentException("Error parsing connectionPropertiesJson.", e);
118-
}
119-
} else {
120-
// Fall back to the old list format for backward-compatibility
121-
propertiesToProcess = configuration.connectionProperties;
122-
}
123-
124-
if (propertiesToProcess != null) {
125-
for (String connectionProperty : propertiesToProcess) {
100+
if (configuration.connectionProperties != null) {
101+
for (String connectionProperty : configuration.connectionProperties) {
126102
String[] parts = connectionProperty.split("=", -1);
127-
if (parts.length == 2) {
128-
connectorConfiguration =
129-
connectorConfiguration.withConnectionProperty(parts[0], parts[1]);
130-
}
103+
String key = parts[0];
104+
String value = parts[1];
105+
connectorConfiguration = connectorConfiguration.withConnectionProperty(key, value);
131106
}
132107
}
133108

sdks/python/apache_beam/io/debezium.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080

8181
import json
8282
from enum import Enum
83+
from typing import List
8384
from typing import NamedTuple
8485
from typing import Optional
8586

@@ -108,7 +109,7 @@ class DriverClassName(Enum):
108109
'ReadFromDebeziumSchema',
109110
[('connector_class', str), ('username', str), ('password', str),
110111
('host', str), ('port', str), ('max_number_of_records', Optional[int]),
111-
('connection_properties_json', str)])
112+
('connection_properties', List[str])])
112113

113114

114115
class ReadFromDebezium(PTransform):
@@ -143,13 +144,11 @@ def __init__(
143144
to be fetched before stop.
144145
:param connection_properties: properties of the debezium
145146
connection passed as string
146-
with json format
147-
{"propertyName": "property"}
147+
with with format
148+
[propertyName=property;]*
148149
:param expansion_service: The address (host:port)
149150
of the ExpansionService.
150151
"""
151-
serialized_properties = json.dumps(
152-
connection_properties) if connection_properties else "[]"
153152

154153
self.params = ReadFromDebeziumSchema(
155154
connector_class=connector_class.value,
@@ -158,7 +157,7 @@ def __init__(
158157
host=host,
159158
port=port,
160159
max_number_of_records=max_number_of_records,
161-
connection_properties_json=serialized_properties)
160+
connection_properties=connection_properties)
162161
self.expansion_service = expansion_service or default_io_expansion_service()
163162

164163
def expand(self, pbegin):

sdks/python/apache_beam/io/external/xlang_debeziumio_it_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ def start_db_container(self, retries):
129129
try:
130130
self.db = PostgresContainer(
131131
'quay.io/debezium/example-postgres:latest',
132-
user=self.username,
132+
username=self.username,
133133
password=self.password,
134134
dbname=self.database)
135135
self.db.start()

0 commit comments

Comments
 (0)