From 05bab92ee3c42960a24d2d52d73668362e9f48a9 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Fri, 31 Oct 2025 15:52:05 +0200 Subject: [PATCH 1/2] point kafka factories to extensions modules & fixed null pointer exception --- build.gradle.kts | 2 +- sdks/java/extensions/kafka-factories/build.gradle | 5 ++++- .../src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java | 7 ++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 836ff29db3e0..456425af0e14 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -353,7 +353,7 @@ tasks.register("javaioPreCommit") { dependsOn(":sdks:java:io:jms:build") dependsOn(":sdks:java:io:kafka:build") dependsOn(":sdks:java:io:kafka:upgrade:build") - dependsOn(":sdks:java:io:kafka:file-aware-factories:build") + dependsOn(":sdks:java:extensions:kafka-factories:build") dependsOn(":sdks:java:io:kudu:build") dependsOn(":sdks:java:io:mongodb:build") dependsOn(":sdks:java:io:mqtt:build") diff --git a/sdks/java/extensions/kafka-factories/build.gradle b/sdks/java/extensions/kafka-factories/build.gradle index 30c5d3fd6642..070ffc4b1c97 100644 --- a/sdks/java/extensions/kafka-factories/build.gradle +++ b/sdks/java/extensions/kafka-factories/build.gradle @@ -29,7 +29,10 @@ dependencies { // ------------------------- CORE DEPENDENCIES ------------------------- implementation project(path: ":sdks:java:core", configuration: "shadow") provided library.java.kafka_clients - implementation 'com.google.cloud:google-cloud-secretmanager:2.72.0' + implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) + implementation library.java.google_cloud_secret_manager + implementation library.java.proto_google_cloud_secret_manager_v1 + implementation library.java.protobuf_java implementation library.java.slf4j_api implementation library.java.vendored_guava_32_1_2_jre implementation project(path: ":sdks:java:extensions:google-cloud-platform-core") diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 2a7cd62d33d2..cb5148b599af 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -667,11 +667,8 @@ public boolean advance() throws IOException { try { Message message; synchronized (this) { - if (receiveTimeoutMillis == 0L) { - message = this.consumer.receiveNoWait(); - } else { - message = this.consumer.receive(receiveTimeoutMillis); - } + long timeout = (receiveTimeoutMillis == 0L) ? 1L : receiveTimeoutMillis; + message = this.consumer.receive(timeout); // put add in synchronized to make sure all messages in preparer are in same session if (message != null) { checkpointMarkPreparer.add(message); From 1aad8831891f7b8abc68028172171d992e42f8c2 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Fri, 31 Oct 2025 19:47:33 +0200 Subject: [PATCH 2/2] Revert JmsIO change and add kafka factories to workflow paths --- .github/workflows/beam_PreCommit_Java_IOs_Direct.yml | 2 ++ .../src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java | 7 +++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/.github/workflows/beam_PreCommit_Java_IOs_Direct.yml b/.github/workflows/beam_PreCommit_Java_IOs_Direct.yml index 03ff102861c7..9d4a347b336a 100644 --- a/.github/workflows/beam_PreCommit_Java_IOs_Direct.yml +++ b/.github/workflows/beam_PreCommit_Java_IOs_Direct.yml @@ -22,6 +22,7 @@ on: paths: - "sdks/java/io/common/**" - "sdks/java/core/src/main/**" + - "sdks/java/extensions/kafka-factories/**" - "buildSrc/**" - ".github/workflows/beam_PreCommit_Java_IOs_Direct.yml" pull_request_target: @@ -29,6 +30,7 @@ on: paths: - "sdks/java/io/common/**" - "sdks/java/core/src/main/**" + - "sdks/java/extensions/kafka-factories/**" - 'release/trigger_all_tests.json' - '.github/trigger_files/beam_PreCommit_Java_IOs_Direct.json' issue_comment: diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index cb5148b599af..2a7cd62d33d2 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -667,8 +667,11 @@ public boolean advance() throws IOException { try { Message message; synchronized (this) { - long timeout = (receiveTimeoutMillis == 0L) ? 1L : receiveTimeoutMillis; - message = this.consumer.receive(timeout); + if (receiveTimeoutMillis == 0L) { + message = this.consumer.receiveNoWait(); + } else { + message = this.consumer.receive(receiveTimeoutMillis); + } // put add in synchronized to make sure all messages in preparer are in same session if (message != null) { checkpointMarkPreparer.add(message);