Skip to content

Commit 835e7ca

Browse files
committed
Allow store offset on consumer closing
When flushing with auto tracking strategy.
1 parent b96a87d commit 835e7ca

File tree

2 files changed

+12
-5
lines changed

2 files changed

+12
-5
lines changed

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ void start() {
352352

353353
@Override
354354
public void store(long offset) {
355-
checkOpen();
355+
checkNotClosed();
356356
trackingCallback.accept(offset);
357357
if (canTrack()) {
358358
if (offsetBefore(this.lastRequestedStoredOffset, offset)
@@ -444,8 +444,8 @@ boolean sacActive() {
444444
}
445445

446446
boolean canTrack() {
447-
// FIXME check the condition to be able to track
448-
return ((this.state() == OPENING || this.state() == OPEN)
447+
// closing is OK e.g. when flushing on closing
448+
return ((this.state() == OPENING || this.state() == OPEN || this.state() == CLOSING)
449449
|| (this.trackingClient == null && this.state() == RECOVERING))
450450
&& this.name != null;
451451
}
@@ -526,7 +526,7 @@ void running() {
526526
}
527527

528528
long storedOffset(Supplier<Client> clientSupplier) {
529-
checkOpen();
529+
checkNotClosed();
530530
if (canTrack()) {
531531
return OffsetTrackingUtils.storedOffset(clientSupplier, this.name, this.stream);
532532
} else if (this.name == null) {
@@ -607,4 +607,11 @@ void markRecovering() {
607607
void markOpen() {
608608
state(OPEN);
609609
}
610+
611+
private void checkNotClosed() {
612+
if (state() == CLOSED) {
613+
// will throw the appropriate exception
614+
checkOpen();
615+
}
616+
}
610617
}

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -971,7 +971,7 @@ void methodsShouldThrowExceptionWhenConsumerIsClosed() {
971971
ThrowingCallable[] calls =
972972
new ThrowingCallable[] {() -> consumer.store(1), () -> consumer.storedOffset()};
973973
Arrays.stream(calls)
974-
.forEach(call -> assertThatThrownBy(call).isInstanceOf(IllegalStateException.class));
974+
.forEach(call -> assertThatThrownBy(call).isInstanceOf(ResourceClosedException.class));
975975
}
976976

977977
@Test

0 commit comments

Comments
 (0)