Skip to content

Commit 573d69b

Browse files
committed
Add committed offset to stream stats
References rabbitmq/rabbitmq-server#15525
1 parent 8a6f237 commit 573d69b

File tree

6 files changed

+69
-19
lines changed

6 files changed

+69
-19
lines changed

.github/workflows/test-pr.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ jobs:
3030
cache: 'maven'
3131
- name: Start broker
3232
run: ci/start-broker.sh
33+
env:
34+
RABBITMQ_IMAGE: pivotalrabbitmq/rabbitmq:pr-15225-otp28
3335
- name: Test
3436
run: |
3537
./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \

src/main/java/com/rabbitmq/stream/StreamStats.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2020-2026 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -46,4 +46,6 @@ public interface StreamStats {
4646
* @throws NoOffsetException if there is no committed chunk yet
4747
*/
4848
long committedChunkId();
49+
50+
long committedOffset();
4951
}

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2020-2026 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -557,10 +557,14 @@ public StreamStats queryStreamStats(String stream) {
557557
};
558558
LongSupplier firstOffsetSupplier =
559559
offsetSupplierLogic.apply("first_chunk_id", "No first offset for stream " + stream);
560-
LongSupplier committedOffsetSupplier =
560+
LongSupplier committedChunkIdSupplier =
561561
offsetSupplierLogic.apply(
562562
"committed_chunk_id", "No committed chunk ID for stream " + stream);
563-
return new DefaultStreamStats(firstOffsetSupplier, committedOffsetSupplier);
563+
LongSupplier committedOffsetSupplier =
564+
offsetSupplierLogic.apply("committed_offset", "No committed offset for stream " + stream);
565+
566+
return new DefaultStreamStats(
567+
firstOffsetSupplier, committedChunkIdSupplier, committedOffsetSupplier);
564568
} else {
565569
throw convertCodeToException(
566570
response.getResponseCode(),
@@ -629,31 +633,43 @@ public boolean streamExists(String stream) {
629633

630634
private static class DefaultStreamStats implements StreamStats {
631635

632-
private final LongSupplier firstOffsetSupplier, committedOffsetSupplier;
636+
private final LongSupplier firstOffsetSupplier,
637+
committedChunkIdSupplier,
638+
committedOffsetSupplier;
633639

634640
private DefaultStreamStats(
635-
LongSupplier firstOffsetSupplier, LongSupplier committedOffsetSupplier) {
641+
LongSupplier firstOffsetSupplier,
642+
LongSupplier committedChunkIdSupplier,
643+
LongSupplier committedOffsetSupplier) {
636644
this.firstOffsetSupplier = firstOffsetSupplier;
645+
this.committedChunkIdSupplier = committedChunkIdSupplier;
637646
this.committedOffsetSupplier = committedOffsetSupplier;
638647
}
639648

640649
@Override
641650
public long firstOffset() {
642-
return firstOffsetSupplier.getAsLong();
651+
return this.firstOffsetSupplier.getAsLong();
643652
}
644653

645654
@Override
646655
public long committedChunkId() {
647-
return committedOffsetSupplier.getAsLong();
656+
return this.committedChunkIdSupplier.getAsLong();
657+
}
658+
659+
@Override
660+
public long committedOffset() {
661+
return this.committedOffsetSupplier.getAsLong();
648662
}
649663

650664
@Override
651665
public String toString() {
652666
return "StreamStats{"
653667
+ "firstOffset="
654668
+ firstOffset()
655-
+ ", committedOffset="
669+
+ ", committedChunkId="
656670
+ committedChunkId()
671+
+ ", committedOffset="
672+
+ committedOffset()
657673
+ '}';
658674
}
659675
}

src/test/java/com/rabbitmq/stream/impl/ClientTest.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2020-2026 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -14,6 +14,7 @@
1414
1515
package com.rabbitmq.stream.impl;
1616

17+
import static com.rabbitmq.stream.impl.TestUtils.BrokerVersion.RABBITMQ_4_3_0;
1718
import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ko;
1819
import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ok;
1920
import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.responseCode;
@@ -94,6 +95,7 @@ public class ClientTest {
9495

9596
String stream;
9697
TestUtils.ClientFactory cf;
98+
String brokerVersion;
9799

98100
static boolean await(CountDownLatch latch, Duration timeout) {
99101
try {
@@ -907,10 +909,10 @@ void streamStatsShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
907909
int publishCount = 20_000;
908910
CountDownLatch latch = new CountDownLatch(publishCount);
909911

910-
AtomicLong committedOffset = new AtomicLong();
912+
AtomicLong committedChunkId = new AtomicLong();
911913
Client.MessageListener messageListener =
912-
(corr, offset, chkTimestamp, committedOfft, chunkContext, message) -> {
913-
committedOffset.set(committedOfft);
914+
(corr, offset, chkTimestamp, committedChkId, chunkContext, message) -> {
915+
committedChunkId.set(committedChkId);
914916
latch.countDown();
915917
};
916918
Client client =
@@ -920,11 +922,22 @@ void streamStatsShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
920922
.messageListener(messageListener));
921923
StreamStatsResponse response = client.streamStats(stream);
922924
assertThat(response.getInfo()).containsEntry("first_chunk_id", -1L);
925+
assertThat(response.getInfo()).containsEntry("last_chunk_id", -1L);
923926
assertThat(response.getInfo()).containsEntry("committed_chunk_id", -1L);
927+
928+
if (brokerVersion43Ormore()) {
929+
assertThat(response.getInfo()).containsEntry("committed_offset", -1L);
930+
}
931+
924932
TestUtils.publishAndWaitForConfirms(cf, publishCount, stream);
925933
response = client.streamStats(stream);
926934
assertThat(response.getInfo()).containsEntry("first_chunk_id", 0L);
927-
assertThat(response.getInfo().get("committed_chunk_id")).isNotEqualTo(-1L);
935+
assertThat(response.getInfo().get("last_chunk_id")).isPositive();
936+
assertThat(response.getInfo().get("committed_chunk_id")).isPositive();
937+
938+
if (brokerVersion43Ormore()) {
939+
assertThat(response.getInfo()).containsEntry("committed_offset", (long) (publishCount - 1));
940+
}
928941

929942
client.exchangeCommandVersions();
930943

@@ -933,8 +946,8 @@ void streamStatsShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
933946
assertThat(subscribeResponse.isOk()).isTrue();
934947

935948
assertThat(latch.await(10, SECONDS)).isTrue();
936-
assertThat(committedOffset.get()).isPositive();
937-
assertThat(committedOffset).hasValue(response.getInfo().get("committed_chunk_id"));
949+
assertThat(committedChunkId.get()).isPositive();
950+
assertThat(committedChunkId).hasValue(response.getInfo().get("committed_chunk_id"));
938951
}
939952

940953
@Test
@@ -1117,4 +1130,8 @@ public int fragmentLength(Object obj) {
11171130
// we should get messages only from the "second" part of the stream
11181131
assertThat(consumedMessageCount).hasValueLessThan(messageCount * 2);
11191132
}
1133+
1134+
private boolean brokerVersion43Ormore() {
1135+
return TestUtils.atLeastVersion(RABBITMQ_4_3_0.version(), brokerVersion);
1136+
}
11201137
}

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2020-2026 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -16,6 +16,7 @@
1616

1717
import static com.rabbitmq.stream.Cli.listLocatorConnections;
1818
import static com.rabbitmq.stream.impl.Assertions.assertThat;
19+
import static com.rabbitmq.stream.impl.TestUtils.BrokerVersion.RABBITMQ_4_3_0;
1920
import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed;
2021
import static com.rabbitmq.stream.impl.TestUtils.ExceptionConditions.responseCode;
2122
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
@@ -111,6 +112,7 @@ public class StreamEnvironmentTest {
111112
String stream;
112113
TestUtils.ClientFactory cf;
113114
EventLoopGroup eventLoopGroup;
115+
String brokerVersion;
114116

115117
@BeforeEach
116118
void init() {
@@ -636,13 +638,19 @@ void queryStreamStatsShouldReturnFirstOffsetAndCommittedOffset(boolean lazyInit)
636638
StreamStats stats = env.queryStreamStats(stream);
637639
assertThatThrownBy(stats::firstOffset).isInstanceOf(NoOffsetException.class);
638640
assertThatThrownBy(stats::committedChunkId).isInstanceOf(NoOffsetException.class);
641+
assertThatThrownBy(stats::committedOffset).isInstanceOf(NoOffsetException.class);
639642

640643
int publishCount = 20_000;
641644
TestUtils.publishAndWaitForConfirms(cf, publishCount, stream);
642645

643646
StreamStats stats2 = env.queryStreamStats(stream);
644647
assertThat(stats2.firstOffset()).isZero();
645648
assertThat(stats2.committedChunkId()).isPositive();
649+
if (brokerVersion43Ormore()) {
650+
assertThat(stats2.committedOffset()).isEqualTo(publishCount - 1);
651+
} else {
652+
assertThatThrownBy(stats::committedOffset).isInstanceOf(NoOffsetException.class);
653+
}
646654

647655
CountDownLatch latch = new CountDownLatch(publishCount);
648656
AtomicLong committedChunkId = new AtomicLong();
@@ -880,4 +888,8 @@ private void nativeIo(IoHandlerFactory ioHandlerFactory, Class<? extends Channel
880888
epollEventLoopGroup.shutdownGracefully(0, 0, SECONDS);
881889
}
882890
}
891+
892+
private boolean brokerVersion43Ormore() {
893+
return TestUtils.atLeastVersion(RABBITMQ_4_3_0.version(), brokerVersion);
894+
}
883895
}

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2020-2026 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -1115,7 +1115,8 @@ public enum BrokerVersion {
11151115
RABBITMQ_4_0_0("4.0.0"),
11161116
RABBITMQ_4_1_2("4.1.2"),
11171117
RABBITMQ_4_1_4("4.1.4"),
1118-
RABBITMQ_4_2_0("4.2.0");
1118+
RABBITMQ_4_2_0("4.2.0"),
1119+
RABBITMQ_4_3_0("4.3.0");
11191120

11201121
final String value;
11211122

0 commit comments

Comments
 (0)