Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ tasks.register("javaioPreCommit") {
dependsOn(":sdks:java:io:jms:build")
dependsOn(":sdks:java:io:kafka:build")
dependsOn(":sdks:java:io:kafka:upgrade:build")
dependsOn(":sdks:java:io:kafka:file-aware-factories:build")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to add sdks/java/extensions/kafka-factories to the trigger path of https://github.com/apache/beam/blob/master/.github/workflows/beam_PreCommit_Java_IOs_Direct.yml that's why this wasn't captured

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

dependsOn(":sdks:java:extensions:kafka-factories:build")
dependsOn(":sdks:java:io:kudu:build")
dependsOn(":sdks:java:io:mongodb:build")
dependsOn(":sdks:java:io:mqtt:build")
Expand Down
5 changes: 4 additions & 1 deletion sdks/java/extensions/kafka-factories/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ dependencies {
// ------------------------- CORE DEPENDENCIES -------------------------
implementation project(path: ":sdks:java:core", configuration: "shadow")
provided library.java.kafka_clients
implementation 'com.google.cloud:google-cloud-secretmanager:2.72.0'
implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
implementation library.java.google_cloud_secret_manager
implementation library.java.proto_google_cloud_secret_manager_v1
implementation library.java.protobuf_java
implementation library.java.slf4j_api
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:extensions:google-cloud-platform-core")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,11 +667,8 @@ public boolean advance() throws IOException {
try {
Message message;
synchronized (this) {
if (receiveTimeoutMillis == 0L) {
message = this.consumer.receiveNoWait();
} else {
message = this.consumer.receive(receiveTimeoutMillis);
}
long timeout = (receiveTimeoutMillis == 0L) ? 1L : receiveTimeoutMillis;
message = this.consumer.receive(timeout);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why we need this change? It seems like we are no longer respecting the set receive timeout

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let's revert JmsIO change. This is a long standing flaky test likely due to test infra (in-memory jms server)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change was made to address a NullPointerException in JmsIOTest.testCheckpointMarkSafety. When receiveTimeoutMillis was 0, it used consumer.receiveNoWait(), which can return null immediately if no message is available. In the test environment with in memory jms server this led to a race where the test proxy wrapper returned null, causing that NullPointerExption

The fix replaced receiveNoWait() with receive(1L) when the timeout was 0, ensuring a minimal wait that could allow the message to arrive.
However, as @Abacn noted this masks a test infrastructure issue so I will revert it

// put add in synchronized to make sure all messages in preparer are in same session
if (message != null) {
checkpointMarkPreparer.add(message);
Expand Down
Loading