Skip to content

Commit b89dcb1

Browse files
Songdoeonrstoyanchev
authored andcommitted
Subscription.unsubscribe() returns Receiptable
See gh-35224 Signed-off-by: Songdoeon <[email protected]>
1 parent 876b7d4 commit b89dcb1

File tree

3 files changed

+92
-10
lines changed

3 files changed

+92
-10
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -345,14 +345,24 @@ public Receiptable acknowledge(StompHeaders headers, boolean consumed) {
345345
return receiptable;
346346
}
347347

348-
private void unsubscribe(String id, @Nullable StompHeaders headers) {
349-
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.UNSUBSCRIBE);
350-
if (headers != null) {
351-
accessor.addNativeHeaders(headers);
348+
private Receiptable unsubscribe(String id, @Nullable StompHeaders headers) {
349+
Assert.hasText(id, "Subscription id is required");
350+
351+
if (headers == null){
352+
headers = new StompHeaders();
352353
}
354+
355+
String receiptId = checkOrAddReceipt(headers);
356+
Receiptable receiptable = new ReceiptHandler(receiptId);
357+
358+
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.UNSUBSCRIBE);
359+
accessor.addNativeHeaders(headers);
353360
accessor.setSubscriptionId(id);
361+
354362
Message<byte[]> message = createMessage(accessor, EMPTY_PAYLOAD);
355363
execute(message);
364+
365+
return receiptable;
356366
}
357367

358368
@Override
@@ -674,17 +684,19 @@ public StompFrameHandler getHandler() {
674684
}
675685

676686
@Override
677-
public void unsubscribe() {
678-
unsubscribe(null);
687+
public Receiptable unsubscribe() {
688+
return unsubscribe(null);
679689
}
680690

681691
@Override
682-
public void unsubscribe(@Nullable StompHeaders headers) {
692+
public Receiptable unsubscribe(@Nullable StompHeaders headers) {
683693
String id = this.headers.getId();
694+
Receiptable receiptable = new ReceiptHandler(null);
684695
if (id != null) {
685696
DefaultStompSession.this.subscriptions.remove(id);
686-
DefaultStompSession.this.unsubscribe(id, headers);
697+
receiptable = DefaultStompSession.this.unsubscribe(id, headers);
687698
}
699+
return receiptable;
688700
}
689701

690702
@Override

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ interface Subscription extends Receiptable {
183183
/**
184184
* Remove the subscription by sending an UNSUBSCRIBE frame.
185185
*/
186-
void unsubscribe();
186+
Receiptable unsubscribe();
187187

188188
/**
189189
* Alternative to {@link #unsubscribe()} with additional custom headers
@@ -192,7 +192,7 @@ interface Subscription extends Receiptable {
192192
* @param headers the custom headers, if any
193193
* @since 5.0
194194
*/
195-
void unsubscribe(@Nullable StompHeaders headers);
195+
Receiptable unsubscribe(@Nullable StompHeaders headers);
196196
}
197197

198198
}

spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Map;
2323
import java.util.concurrent.CompletableFuture;
2424
import java.util.concurrent.ScheduledFuture;
25+
import java.util.concurrent.atomic.AtomicBoolean;
2526
import java.util.concurrent.atomic.AtomicReference;
2627

2728
import org.junit.jupiter.api.BeforeEach;
@@ -662,6 +663,75 @@ public void receiptNotReceived() {
662663
verifyNoMoreInteractions(future);
663664
}
664665

666+
@Test
667+
void unsubscribeWithReceipt() {
668+
this.session.afterConnected(this.connection);
669+
assertThat(this.session.isConnected()).isTrue();
670+
Subscription subscription = this.session.subscribe("/topic/foo", mock());
671+
672+
Receiptable receipt = subscription.unsubscribe();
673+
assertThat(receipt).isNotNull();
674+
assertThat(receipt.getReceiptId()).isNull();
675+
676+
Message<byte[]> message = this.messageCaptor.getValue();
677+
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
678+
assertThat(accessor.getCommand()).isEqualTo(StompCommand.UNSUBSCRIBE);
679+
680+
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
681+
assertThat(stompHeaders).hasSize(1);
682+
assertThat(stompHeaders.getId()).isEqualTo(subscription.getSubscriptionId());
683+
}
684+
685+
@Test
686+
void unsubscribeWithCustomHeaderAndReceipt() {
687+
this.session.afterConnected(this.connection);
688+
this.session.setTaskScheduler(mock());
689+
this.session.setAutoReceipt(true);
690+
691+
StompHeaders subHeaders = new StompHeaders();
692+
subHeaders.setDestination("/topic/foo");
693+
Subscription subscription = this.session.subscribe(subHeaders, mock());
694+
695+
StompHeaders custom = new StompHeaders();
696+
custom.set("x-cust", "value");
697+
698+
Receiptable receipt = subscription.unsubscribe(custom);
699+
assertThat(receipt).isNotNull();
700+
assertThat(receipt.getReceiptId()).isNotNull();
701+
702+
Message<byte[]> message = this.messageCaptor.getValue();
703+
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
704+
assertThat(accessor.getCommand()).isEqualTo(StompCommand.UNSUBSCRIBE);
705+
706+
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
707+
assertThat(stompHeaders.getId()).isEqualTo(subscription.getSubscriptionId());
708+
assertThat(stompHeaders.get("x-cust")).containsExactly("value");
709+
assertThat(stompHeaders.getReceipt()).isEqualTo(receipt.getReceiptId());
710+
}
711+
712+
@Test
713+
void receiptReceivedOnUnsubscribe() {
714+
this.session.afterConnected(this.connection);
715+
TaskScheduler scheduler = mock();
716+
this.session.setTaskScheduler(scheduler);
717+
this.session.setAutoReceipt(true);
718+
719+
Subscription subscription = this.session.subscribe("/topic/foo", mock());
720+
Receiptable receipt = subscription.unsubscribe();
721+
722+
StompHeaderAccessor ack = StompHeaderAccessor.create(StompCommand.RECEIPT);
723+
ack.setReceiptId(receipt.getReceiptId());
724+
ack.setLeaveMutable(true);
725+
Message<byte[]> receiptMessage = MessageBuilder.createMessage(new byte[0], ack.getMessageHeaders());
726+
727+
AtomicBoolean called = new AtomicBoolean(false);
728+
receipt.addReceiptTask(() -> called.set(true));
729+
730+
this.session.handleMessage(receiptMessage);
731+
732+
assertThat(called.get()).isTrue();
733+
}
734+
665735
@Test
666736
void disconnect() {
667737
this.session.afterConnected(this.connection);

0 commit comments

Comments
 (0)