Skip to content

Commit 4e10b22

Browse files
dao-junsrinath-ctds
authored andcommitted
[fix][broker][admin] Fix cannot update properties on NonDurable subscription. (apache#22411)
(cherry picked from commit 902728e) # Conflicts: # pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java (cherry picked from commit 7cd0924)
1 parent d18f429 commit 4e10b22

File tree

2 files changed

+45
-3
lines changed

2 files changed

+45
-3
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -350,15 +350,19 @@ private CompletableFuture<Void> computeCursorProperties(
350350
final Function<Map<String, String>, Map<String, String>> updateFunction) {
351351
CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
352352

353-
final Stat lastCursorLedgerStat = ManagedCursorImpl.this.cursorLedgerStat;
354-
355353
Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);
354+
if (!isDurable()) {
355+
this.cursorProperties = Collections.unmodifiableMap(newProperties);
356+
updateCursorPropertiesResult.complete(null);
357+
return updateCursorPropertiesResult;
358+
}
359+
356360
ManagedCursorInfo copy = ManagedCursorInfo
357361
.newBuilder(ManagedCursorImpl.this.managedCursorInfo)
358362
.clearCursorProperties()
359363
.addAllCursorProperties(buildStringPropertiesMap(newProperties))
360364
.build();
361-
365+
final Stat lastCursorLedgerStat = ManagedCursorImpl.this.cursorLedgerStat;
362366
ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
363367
name, copy, lastCursorLedgerStat, new MetaStoreCallback<>() {
364368
@Override

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import static org.mockito.Mockito.verify;
3333
import static org.mockito.Mockito.when;
3434
import static org.testng.Assert.assertEquals;
35+
import static org.testng.Assert.assertNotNull;
3536
import static org.testng.Assert.assertSame;
3637
import static org.testng.Assert.assertTrue;
3738
import java.lang.reflect.Field;
@@ -53,6 +54,8 @@
5354
import javax.ws.rs.core.UriInfo;
5455
import lombok.Cleanup;
5556
import lombok.extern.slf4j.Slf4j;
57+
import org.apache.bookkeeper.mledger.ManagedCursor;
58+
import org.apache.commons.collections4.MapUtils;
5659
import org.apache.pulsar.broker.BrokerTestUtil;
5760
import org.apache.pulsar.broker.admin.v2.ExtPersistentTopics;
5861
import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
@@ -65,6 +68,8 @@
6568
import org.apache.pulsar.broker.resources.TopicResources;
6669
import org.apache.pulsar.broker.service.BrokerService;
6770
import org.apache.pulsar.broker.service.Topic;
71+
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
72+
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
6873
import org.apache.pulsar.broker.web.PulsarWebResource;
6974
import org.apache.pulsar.broker.web.RestException;
7075
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -76,6 +81,7 @@
7681
import org.apache.pulsar.client.api.Message;
7782
import org.apache.pulsar.client.api.MessageId;
7883
import org.apache.pulsar.client.api.Producer;
84+
import org.apache.pulsar.client.api.Reader;
7985
import org.apache.pulsar.client.api.Schema;
8086
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
8187
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -1773,4 +1779,36 @@ public void testNamespaceResources() throws Exception {
17731779
assertTrue(namespaces.contains(ns1V2));
17741780
assertTrue(namespaces.contains(ns1V1));
17751781
}
1782+
1783+
@Test
1784+
public void testUpdatePropertiesOnNonDurableSub() throws Exception {
1785+
String topic = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testUpdatePropertiesOnNonDurableSub";
1786+
String subscription = "sub";
1787+
admin.topics().createNonPartitionedTopic(topic);
1788+
1789+
@Cleanup
1790+
Reader<String> __ = pulsarClient.newReader(Schema.STRING)
1791+
.startMessageId(MessageId.earliest)
1792+
.subscriptionName(subscription)
1793+
.topic(topic)
1794+
.create();
1795+
1796+
PersistentTopic persistentTopic =
1797+
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get();
1798+
PersistentSubscription subscription1 = persistentTopic.getSubscriptions().get(subscription);
1799+
assertNotNull(subscription1);
1800+
ManagedCursor cursor = subscription1.getCursor();
1801+
1802+
Map<String, String> properties = admin.topics().getSubscriptionProperties(topic, subscription);
1803+
assertEquals(properties.size(), 0);
1804+
assertTrue(MapUtils.isEmpty(cursor.getCursorProperties()));
1805+
1806+
admin.topics().updateSubscriptionProperties(topic, subscription, Map.of("foo", "bar"));
1807+
properties = admin.topics().getSubscriptionProperties(topic, subscription);
1808+
assertEquals(properties.size(), 1);
1809+
assertEquals(properties.get("foo"), "bar");
1810+
1811+
assertEquals(cursor.getCursorProperties().size(), 1);
1812+
assertEquals(cursor.getCursorProperties().get("foo"), "bar");
1813+
}
17761814
}

0 commit comments

Comments
 (0)