Skip to content

Commit 3d2e15b

Browse files
authored
Merge pull request #35505 from Abacn/pr-35414
Fix Debezium xlang
2 parents 4fc87df + a1b7437 commit 3d2e15b

File tree

6 files changed

+47
-31
lines changed

6 files changed

+47
-31
lines changed

.github/workflows/beam_PostCommit_Python.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ jobs:
7878
- name: Setup environment
7979
uses: ./.github/actions/setup-environment-action
8080
with:
81+
java-version: |
82+
21
83+
11
8184
python-version: ${{ matrix.python_version }}
8285
- name: Install docker compose
8386
run: |
@@ -94,6 +97,7 @@ jobs:
9497
with:
9598
gradle-command: :python${{steps.set_py_ver_clean.outputs.py_ver_clean}}PostCommit
9699
arguments: |
100+
-Pjava21Home=$JAVA_HOME_21_X64 \
97101
-PuseWheelDistribution \
98102
-PpythonVersion=${{ matrix.python_version }} \
99103
env:

sdks/java/io/debezium/build.gradle

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ dependencies {
3636
implementation project(path: ":sdks:java:core", configuration: "shadow")
3737
implementation library.java.slf4j_api
3838
implementation library.java.joda_time
39+
// Explicitly declare Jackson dependencies
40+
// and align with the 2.17.1 version.
41+
implementation 'com.fasterxml.jackson.core:jackson-core:2.17.1'
42+
implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.1'
43+
permitUnusedDeclared 'com.fasterxml.jackson.core:jackson-core:2.17.1'
44+
permitUnusedDeclared 'com.fasterxml.jackson.core:jackson-databind:2.17.1'
3945
provided library.java.jackson_dataformat_csv
4046
permitUnusedDeclared library.java.jackson_dataformat_csv
4147

@@ -66,26 +72,21 @@ dependencies {
6672
testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '3.1.1.Final'
6773
}
6874

69-
// TODO: Remove pin after Beam has unpinned it (PR #35231)
70-
// Pin the Antlr version
75+
// Pin the Antlr version to 4.10
76+
// and force Jackson versions to 2.17.1
77+
// TODO: Remove pin after upgrading Beam's Jackson version
78+
// This overrides the global 2.15.4 forced by BeamModulePlugin.
7179
configurations.all {
7280
resolutionStrategy {
73-
force 'org.antlr:antlr4:4.10', 'org.antlr:antlr4-runtime:4.10'
81+
force 'org.antlr:antlr4:4.10',
82+
'org.antlr:antlr4-runtime:4.10',
83+
'com.fasterxml.jackson.core:jackson-core:2.17.1',
84+
'com.fasterxml.jackson.core:jackson-annotations:2.17.1',
85+
'com.fasterxml.jackson.core:jackson-databind:2.17.1',
86+
'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.1'
7487
}
7588
}
7689

77-
// TODO: Remove pin after upgrading Beam's Jackson version
78-
// Force Jackson versions for the test runtime classpath
79-
configurations.named("testRuntimeClasspath") {
80-
resolutionStrategy.force (
81-
'com.fasterxml.jackson.core:jackson-core:2.17.1',
82-
'com.fasterxml.jackson.core:jackson-annotations:2.17.1',
83-
'com.fasterxml.jackson.core:jackson-databind:2.17.1',
84-
'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.1',
85-
'com.fasterxml.jackson.module:jackson-module-afterburner:2.17.1'
86-
)
87-
}
88-
8990
def configureTestJvmArgs(Task task) {
9091
List<String> currentJvmArgs = task.jvmArgs ? new ArrayList<>(task.jvmArgs) : new ArrayList<>()
9192

sdks/java/io/debezium/expansion-service/build.gradle

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,21 @@ dependencies {
4646
runtimeOnly group: 'io.debezium', name: 'debezium-connector-oracle', version: debezium_version
4747
runtimeOnly group: 'io.debezium', name: 'debezium-connector-db2', version: debezium_version
4848
}
49+
50+
// sync with debezium/build.gradle depenency pins
51+
configurations.all {
52+
resolutionStrategy {
53+
force 'org.antlr:antlr4:4.10',
54+
'org.antlr:antlr4-runtime:4.10',
55+
'com.fasterxml.jackson.core:jackson-core:2.17.1',
56+
'com.fasterxml.jackson.core:jackson-annotations:2.17.1',
57+
'com.fasterxml.jackson.core:jackson-databind:2.17.1',
58+
'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.1'
59+
}
60+
}
61+
62+
shadowJar {
63+
manifest {
64+
attributes(["Multi-Release": true])
65+
}
66+
}

sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public PTransform<PBegin, PCollection<String>> buildExternal(Configuration confi
102102
String[] parts = connectionProperty.split("=", -1);
103103
String key = parts[0];
104104
String value = parts[1];
105-
connectorConfiguration.withConnectionProperty(key, value);
105+
connectorConfiguration = connectorConfiguration.withConnectionProperty(key, value);
106106
}
107107
}
108108

sdks/python/apache_beam/io/debezium.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@
8484
from typing import NamedTuple
8585
from typing import Optional
8686

87-
from apache_beam.transforms import DoFn
88-
from apache_beam.transforms import ParDo
87+
from apache_beam.transforms import Map
8988
from apache_beam.transforms import PTransform
9089
from apache_beam.transforms.external import BeamJarExpansionService
9190
from apache_beam.transforms.external import ExternalTransform
@@ -113,13 +112,6 @@ class DriverClassName(Enum):
113112
('connection_properties', List[str])])
114113

115114

116-
class _JsonStringToDictionaries(DoFn):
117-
""" A DoFn that consumes a JSON string and yields a python dictionary """
118-
def process(self, json_string):
119-
obj = json.loads(json_string)
120-
yield obj
121-
122-
123115
class ReadFromDebezium(PTransform):
124116
"""
125117
An external PTransform which reads from Debezium and returns
@@ -152,11 +144,12 @@ def __init__(
152144
to be fetched before stop.
153145
:param connection_properties: properties of the debezium
154146
connection passed as string
155-
with format
147+
with with format
156148
[propertyName=property;]*
157149
:param expansion_service: The address (host:port)
158150
of the ExpansionService.
159151
"""
152+
160153
self.params = ReadFromDebeziumSchema(
161154
connector_class=connector_class.value,
162155
username=username,
@@ -173,4 +166,5 @@ def expand(self, pbegin):
173166
self.URN,
174167
NamedTupleBasedPayloadBuilder(self.params),
175168
self.expansion_service,
176-
) | ParDo(_JsonStringToDictionaries()))
169+
).with_output_types(str)
170+
| 'JsonToDict' >> Map(json.loads).with_output_types(dict))

sdks/python/apache_beam/io/external/xlang_debeziumio_it_test.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ def setUp(self):
7474
self.connection_properties = [
7575
"database.dbname=inventory",
7676
"database.server.name=dbserver1",
77-
"database.include.list=inventory",
7877
"include.schema.changes=false"
7978
]
8079

@@ -90,8 +89,8 @@ def test_xlang_debezium_read(self):
9089
expected_response = [{
9190
"metadata": {
9291
"connector": "postgresql",
93-
"version": "1.3.1.Final",
94-
"name": "dbserver1",
92+
"version": "3.1.1.Final",
93+
"name": "beam-debezium-connector",
9594
"database": "inventory",
9695
"schema": "inventory",
9796
"table": "customers"
@@ -130,7 +129,7 @@ def start_db_container(self, retries):
130129
try:
131130
self.db = PostgresContainer(
132131
'quay.io/debezium/example-postgres:latest',
133-
user=self.username,
132+
username=self.username,
134133
password=self.password,
135134
dbname=self.database)
136135
self.db.start()

0 commit comments

Comments
 (0)