Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/beam_PostCommit_Python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ jobs:
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
java-version: |
21
11
python-version: ${{ matrix.python_version }}
- name: Install docker compose
run: |
Expand All @@ -94,6 +97,7 @@ jobs:
with:
gradle-command: :python${{steps.set_py_ver_clean.outputs.py_ver_clean}}PostCommit
arguments: |
-Pjava21Home=$JAVA_HOME_21_X64 \
-PuseWheelDistribution \
-PpythonVersion=${{ matrix.python_version }} \
env:
Expand Down
31 changes: 16 additions & 15 deletions sdks/java/io/debezium/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.slf4j_api
implementation library.java.joda_time
// Explicitly declare Jackson dependencies
// and align with the 2.17.1 version.
implementation 'com.fasterxml.jackson.core:jackson-core:2.17.1'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.1'
permitUnusedDeclared 'com.fasterxml.jackson.core:jackson-core:2.17.1'
permitUnusedDeclared 'com.fasterxml.jackson.core:jackson-databind:2.17.1'
provided library.java.jackson_dataformat_csv
permitUnusedDeclared library.java.jackson_dataformat_csv

Expand Down Expand Up @@ -66,26 +72,21 @@ dependencies {
testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '3.1.1.Final'
}

// TODO: Remove pin after Beam has unpinned it (PR #35231)
// Pin the Antlr version
// Pin the Antlr version to 4.10
// and force Jackson versions to 2.17.1
// TODO: Remove pin after upgrading Beam's Jackson version
// This overrides the global 2.15.4 forced by BeamModulePlugin.
configurations.all {
resolutionStrategy {
force 'org.antlr:antlr4:4.10', 'org.antlr:antlr4-runtime:4.10'
force 'org.antlr:antlr4:4.10',
'org.antlr:antlr4-runtime:4.10',
'com.fasterxml.jackson.core:jackson-core:2.17.1',
'com.fasterxml.jackson.core:jackson-annotations:2.17.1',
'com.fasterxml.jackson.core:jackson-databind:2.17.1',
'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.1'
}
}

// TODO: Remove pin after upgrading Beam's Jackson version
// Force Jackson versions for the test runtime classpath
configurations.named("testRuntimeClasspath") {
resolutionStrategy.force (
'com.fasterxml.jackson.core:jackson-core:2.17.1',
'com.fasterxml.jackson.core:jackson-annotations:2.17.1',
'com.fasterxml.jackson.core:jackson-databind:2.17.1',
'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.1',
'com.fasterxml.jackson.module:jackson-module-afterburner:2.17.1'
)
}

def configureTestJvmArgs(Task task) {
List<String> currentJvmArgs = task.jvmArgs ? new ArrayList<>(task.jvmArgs) : new ArrayList<>()

Expand Down
18 changes: 18 additions & 0 deletions sdks/java/io/debezium/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,21 @@ dependencies {
runtimeOnly group: 'io.debezium', name: 'debezium-connector-oracle', version: debezium_version
runtimeOnly group: 'io.debezium', name: 'debezium-connector-db2', version: debezium_version
}

// sync with debezium/build.gradle depenency pins
configurations.all {
resolutionStrategy {
force 'org.antlr:antlr4:4.10',
'org.antlr:antlr4-runtime:4.10',
'com.fasterxml.jackson.core:jackson-core:2.17.1',
'com.fasterxml.jackson.core:jackson-annotations:2.17.1',
'com.fasterxml.jackson.core:jackson-databind:2.17.1',
'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.1'
}
}

shadowJar {
manifest {
attributes(["Multi-Release": true])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public PTransform<PBegin, PCollection<String>> buildExternal(Configuration confi
String[] parts = connectionProperty.split("=", -1);
String key = parts[0];
String value = parts[1];
connectorConfiguration.withConnectionProperty(key, value);
connectorConfiguration = connectorConfiguration.withConnectionProperty(key, value);
}
}

Expand Down
16 changes: 5 additions & 11 deletions sdks/python/apache_beam/io/debezium.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@
from typing import NamedTuple
from typing import Optional

from apache_beam.transforms import DoFn
from apache_beam.transforms import ParDo
from apache_beam.transforms import Map
from apache_beam.transforms import PTransform
from apache_beam.transforms.external import BeamJarExpansionService
from apache_beam.transforms.external import ExternalTransform
Expand Down Expand Up @@ -113,13 +112,6 @@ class DriverClassName(Enum):
('connection_properties', List[str])])


class _JsonStringToDictionaries(DoFn):
""" A DoFn that consumes a JSON string and yields a python dictionary """
def process(self, json_string):
obj = json.loads(json_string)
yield obj


class ReadFromDebezium(PTransform):
"""
An external PTransform which reads from Debezium and returns
Expand Down Expand Up @@ -152,11 +144,12 @@ def __init__(
to be fetched before stop.
:param connection_properties: properties of the debezium
connection passed as string
with format
with with format
[propertyName=property;]*
:param expansion_service: The address (host:port)
of the ExpansionService.
"""

self.params = ReadFromDebeziumSchema(
connector_class=connector_class.value,
username=username,
Expand All @@ -173,4 +166,5 @@ def expand(self, pbegin):
self.URN,
NamedTupleBasedPayloadBuilder(self.params),
self.expansion_service,
) | ParDo(_JsonStringToDictionaries()))
).with_output_types(str)
| 'JsonToDict' >> Map(json.loads).with_output_types(dict))
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ def setUp(self):
self.connection_properties = [
"database.dbname=inventory",
"database.server.name=dbserver1",
"database.include.list=inventory",
"include.schema.changes=false"
]

Expand All @@ -90,8 +89,8 @@ def test_xlang_debezium_read(self):
expected_response = [{
"metadata": {
"connector": "postgresql",
"version": "1.3.1.Final",
"name": "dbserver1",
"version": "3.1.1.Final",
"name": "beam-debezium-connector",
"database": "inventory",
"schema": "inventory",
"table": "customers"
Expand Down Expand Up @@ -130,7 +129,7 @@ def start_db_container(self, retries):
try:
self.db = PostgresContainer(
'quay.io/debezium/example-postgres:latest',
user=self.username,
username=self.username,
password=self.password,
dbname=self.database)
self.db.start()
Expand Down
Loading