Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
</developers>

<properties>
<stream-client.version>[0.12.0-SNAPSHOT,)</stream-client.version>
<stream-client.version>[1.2.0-SNAPSHOT,)</stream-client.version>
<junit.jupiter.version>5.13.3</junit.jupiter.version>
<assertj.version>3.27.3</assertj.version>
<logback.version>1.2.13</logback.version>
<slf4j.version>2.0.17</slf4j.version>
<logback.version>1.5.18</logback.version>
<maven.compiler.plugin.version>3.14.0</maven.compiler.plugin.version>
<maven-surefire-plugin.version>3.5.3</maven-surefire-plugin.version>
<spotless.version>2.44.5</spotless.version>
<google-java-format.version>1.17.0</google-java-format.version>
<google-java-format.version>1.27.0</google-java-format.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

Expand All @@ -45,6 +46,12 @@
<version>${stream-client.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
Expand Down Expand Up @@ -73,6 +80,14 @@
<scope>test</scope>
</dependency>

<!-- add explicitly to update automatically with dependabot -->
<dependency>
<groupId>com.google.googlejavaformat</groupId>
<artifactId>google-java-format</artifactId>
<version>${google-java-format.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom
// Inc. and/or its subsidiaries. All rights reserved.
//

package com.rabbitmq.stream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom
// Inc. and/or its subsidiaries. All rights reserved.
//

package com.rabbitmq.stream;
Expand All @@ -33,7 +34,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -220,16 +220,23 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
executorService.submit(
() -> {
connected.set(false);

try { Thread.sleep(2000); } catch (Exception e) {}
Client locator =
cf.get(new Client.ClientParameters().port(streamPortNode2()));
// wait until there's a new leader
AtomicReference<Client> locator = new AtomicReference<>();
try {
waitAtMost(
Duration.ofSeconds(5),
() -> {
Client.StreamMetadata m = locator.metadata(stream).get(stream);
try {
locator.set(
cf.get(new Client.ClientParameters().port(streamPortNode2())));
return true;
} catch (Exception e) {
return false;
}
});
waitAtMost(
Duration.ofSeconds(5),
() -> {
Client.StreamMetadata m = locator.get().metadata(stream).get(stream);
return m.getLeader() != null
&& m.getLeader().getPort() != streamPortNode1();
});
Expand All @@ -238,7 +245,8 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
return;
}

int newLeaderPort = locator.metadata(stream).get(stream).getLeader().getPort();
int newLeaderPort =
locator.get().metadata(stream).get(stream).getLeader().getPort();
Client newPublisher =
cf.get(
new Client.ClientParameters()
Expand Down Expand Up @@ -468,14 +476,23 @@ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception {
// avoid long-running task in the IO thread
executorService.submit(
() -> {
try { Thread.sleep(2000); } catch (Exception e) {}
Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
int newReplicaPort = m.getReplicas().get(0).getPort();
AtomicInteger newReplicaPort = new AtomicInteger(-1);
waitAtMost(
Duration.ofSeconds(5),
() -> {
try {
Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
newReplicaPort.set(m.getReplicas().get(0).getPort());
return true;
} catch (Exception e) {
return false;
}
});

Client newConsumer =
cf.get(
new Client.ClientParameters()
.port(newReplicaPort)
.port(newReplicaPort.get())
.shutdownListener(shutdownListenerReference.get())
.chunkListener(credit())
.messageListener(messageListener));
Expand Down Expand Up @@ -588,7 +605,8 @@ void declarePublisherShouldNotReturnStreamDoesNotExistOnRestart() throws Excepti
}

@Test
void shouldReceiveMetadataUpdateWhenReplicaIsKilledWithPublisherAndConsumerOnSameConnection() throws Exception {
void shouldReceiveMetadataUpdateWhenReplicaIsKilledWithPublisherAndConsumerOnSameConnection()
throws Exception {
Client metadataClient = cf.get(new Client.ClientParameters().port(streamPortNode1()));
Map<String, Client.StreamMetadata> metadata = metadataClient.metadata(stream);
Client.StreamMetadata streamMetadata = metadata.get(stream);
Expand All @@ -602,8 +620,7 @@ void shouldReceiveMetadataUpdateWhenReplicaIsKilledWithPublisherAndConsumerOnSam
assertThat(streamMetadata.getLeader().getPort()).isEqualTo(streamPortNode1());
Client.Broker broker =
streamMetadata.getReplicas().stream()
.filter(
r -> r.getPort() == streamPortNode1() || r.getPort() == streamPortNode2())
.filter(r -> r.getPort() == streamPortNode1() || r.getPort() == streamPortNode2())
.findFirst()
.get();

Expand All @@ -612,8 +629,7 @@ void shouldReceiveMetadataUpdateWhenReplicaIsKilledWithPublisherAndConsumerOnSam
cf.get(
new ClientParameters()
.port(broker.getPort())
.metadataListener(
(stream, code) -> metadataNotifications.incrementAndGet()));
.metadataListener((stream, code) -> metadataNotifications.incrementAndGet()));
client.declarePublisher((byte) 42, null, stream);
client.subscribe((byte) 66, stream, OffsetSpecification.first(), 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom
// Inc. and/or its subsidiaries. All rights reserved.
//

package com.rabbitmq.stream;
Expand Down Expand Up @@ -101,7 +102,8 @@ public static String node2name() {
return System.getProperty("node2.name", "rabbit-2@" + hostname());
}

public static Process killStreamLocalMemberProcess(String stream, String nodename) throws IOException {
public static Process killStreamLocalMemberProcess(String stream, String nodename)
throws IOException {
return rabbitmqctl(
"eval 'case rabbit_stream_manager:lookup_local_member(<<\"/\">>, <<\""
+ stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom
// Inc. and/or its subsidiaries. All rights reserved.
//

package com.rabbitmq.stream;
Expand All @@ -27,8 +28,8 @@
import com.rabbitmq.stream.impl.Client.ClientParameters;
import com.rabbitmq.stream.impl.Client.Response;
import com.rabbitmq.stream.impl.Client.StreamMetadata;
import java.util.Collections;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -58,9 +59,8 @@ void invalidLocatorShouldReturnError() {
void clientLocalLocatorShouldMakeLeaderOnConnectedNode() {
int[] ports = new int[] {TestUtils.streamPortNode1(), TestUtils.streamPortNode2()};
for (int port : ports) {
Client client = cf.get(new Client.ClientParameters()
.port(port)
.rpcTimeout(Duration.ofSeconds(30)));
Client client =
cf.get(new Client.ClientParameters().port(port).rpcTimeout(Duration.ofSeconds(30)));
String s = UUID.randomUUID().toString();
try {
Response response =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom
// Inc. and/or its subsidiaries. All rights reserved.
//

package com.rabbitmq.stream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom
// Inc. and/or its subsidiaries. All rights reserved.
//

package com.rabbitmq.stream;
Expand Down Expand Up @@ -52,15 +53,20 @@ static void waitUntil(BooleanSupplier condition) throws InterruptedException {
waitAtMost(Duration.ofSeconds(10), condition);
}

static void waitAtMost(Duration duration, BooleanSupplier condition) throws InterruptedException {
static void waitAtMost(Duration duration, BooleanSupplier condition) {
if (condition.getAsBoolean()) {
return;
}
int waitTime = 100;
int waitedTime = 0;
long timeoutInMs = duration.toMillis();
while (waitedTime <= timeoutInMs) {
Thread.sleep(waitTime);
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
if (condition.getAsBoolean()) {
return;
}
Expand Down
21 changes: 18 additions & 3 deletions deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
</developers>

<properties>
<stream-client.version>[0.12.0-SNAPSHOT,)</stream-client.version>
<stream-client.version>[1.2.0-SNAPSHOT,)</stream-client.version>
<junit.jupiter.version>5.13.3</junit.jupiter.version>
<assertj.version>3.27.3</assertj.version>
<logback.version>1.2.13</logback.version>
<slf4j.version>2.0.17</slf4j.version>
<logback.version>1.5.18</logback.version>
<maven.compiler.plugin.version>3.14.0</maven.compiler.plugin.version>
<maven-surefire-plugin.version>3.5.3</maven-surefire-plugin.version>
<spotless.version>2.44.5</spotless.version>
<google-java-format.version>1.18.1</google-java-format.version>
<google-java-format.version>1.27.0</google-java-format.version>
<okhttp.version>5.0.0</okhttp.version>
<gson.version>2.13.1</gson.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand All @@ -47,6 +48,12 @@
<version>${stream-client.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
Expand Down Expand Up @@ -89,6 +96,14 @@
<scope>test</scope>
</dependency>

<!-- add explicitly to update automatically with dependabot -->
<dependency>
<groupId>com.google.googlejavaformat</groupId>
<artifactId>google-java-format</artifactId>
<version>${google-java-format.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom
// Inc. and/or its subsidiaries. All rights reserved.
//

package com.rabbitmq.stream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom
// Inc. and/or its subsidiaries. All rights reserved.
//

package com.rabbitmq.stream;
Expand Down
Loading