Skip to content

Commit 773cb47

Browse files
authored
Fix compile issue caused by Pulsar refactored Backoff implementation (#1793)
1 parent 4264587 commit 773cb47

File tree

2 files changed

+24
-17
lines changed

2 files changed

+24
-17
lines changed

amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchangeReplicator.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
1717

1818
import io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange;
19+
import java.time.Duration;
1920
import java.util.ArrayList;
2021
import java.util.List;
2122
import java.util.Map;
@@ -55,8 +56,11 @@ public abstract class AmqpExchangeReplicator implements AsyncCallbacks.ReadEntri
5556
private ManagedCursor cursor;
5657
private ScheduledExecutorService scheduledExecutorService;
5758

58-
protected final Backoff backOff = new Backoff(
59-
100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
59+
protected final Backoff backOff = Backoff.builder()
60+
.initialDelay(Duration.ZERO.ofMillis(100))
61+
.maxBackoff(Duration.ofMinutes(1))
62+
.mandatoryStop(Duration.ofMillis(0))
63+
.build();
6064

6165
private static final AtomicReferenceFieldUpdater<AmqpExchangeReplicator, State> STATE_UPDATER =
6266
AtomicReferenceFieldUpdater.newUpdater(AmqpExchangeReplicator.class, State.class, "state");
@@ -83,8 +87,11 @@ protected enum State {
8387
AtomicIntegerFieldUpdater.newUpdater(AmqpExchangeReplicator.class, "havePendingRead");
8488
private volatile int havePendingRead = FALSE;
8589

86-
private final Backoff readFailureBackoff = new Backoff(
87-
1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
90+
private final Backoff readFailureBackoff = Backoff.builder()
91+
.initialDelay(Duration.ofSeconds(1))
92+
.maxBackoff(Duration.ofMinutes(1))
93+
.mandatoryStop(Duration.ofMillis(0))
94+
.build();
8895

8996
private final ExecutorService routeExecutor;
9097

@@ -114,7 +121,7 @@ private void initMaxRouteQueueSize(int maxRouteQueueSize) {
114121

115122
public void startReplicate() {
116123
if (STATE_UPDATER.get(AmqpExchangeReplicator.this).equals(AmqpExchangeReplicator.State.Stopping)) {
117-
long waitTimeMs = backOff.next();
124+
long waitTimeMs = backOff.next().toMillis();
118125
if (log.isDebugEnabled()) {
119126
log.debug("{} Waiting for producer close before attempting reconnect, retrying in {} s",
120127
name, waitTimeMs / 1000);
@@ -153,7 +160,7 @@ public void openCursorFailed(ManagedLedgerException e, Object o) {
153160

154161
private void retryStartReplicator(Throwable ex) {
155162
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
156-
long waitTimeMs = backOff.next();
163+
long waitTimeMs = backOff.next().toMillis();
157164
if (log.isDebugEnabled()) {
158165
log.debug("{} Failed to start replicator, errorMsg: {}, retrying in {} s.",
159166
name, ex.getMessage(), waitTimeMs / 1000);
@@ -242,7 +249,7 @@ public void readEntriesComplete(List<Entry> list, Object o) {
242249
}
243250
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
244251
if (CollectionUtils.isEmpty(list)) {
245-
long delay = readFailureBackoff.next();
252+
long delay = readFailureBackoff.next().toMillis();
246253
log.warn("{} The read entry list is empty, will retry in {} ms. ReadPosition: {}, LAC: {}.",
247254
name, delay, cursor.getReadPosition(), topic.getManagedLedger().getLastConfirmedEntry());
248255
scheduledExecutorService.schedule(this::readMoreEntries, delay, TimeUnit.MILLISECONDS);
@@ -329,7 +336,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object o) {
329336
return;
330337
}
331338

332-
long waitTimeMs = readFailureBackoff.next();
339+
long waitTimeMs = readFailureBackoff.next().toMillis();
333340
if (log.isDebugEnabled()) {
334341
log.debug("{} Read entries from bookie failed, retrying in {} s", name, waitTimeMs / 1000, exception);
335342
}
@@ -380,7 +387,7 @@ public void closeFailed(ManagedLedgerException e, Object o) {
380387
STATE_UPDATER.set(AmqpExchangeReplicator.this, State.Stopped);
381388
return;
382389
}
383-
long waitTimeMs = backOff.next();
390+
long waitTimeMs = backOff.next().toMillis();
384391
log.error("[{}] AMQP Exchange Replicator stop failed. retrying in {} s",
385392
name, waitTimeMs / 1000, e);
386393
AmqpExchangeReplicator.this.scheduledExecutorService.schedule(

amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpPulsarConsumer.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.streamnative.pulsar.handlers.amqp;
1515

1616
import io.streamnative.pulsar.handlers.amqp.utils.MessageConvertUtils;
17+
import java.time.Duration;
1718
import java.util.List;
1819
import java.util.concurrent.ScheduledExecutorService;
1920
import java.util.concurrent.TimeUnit;
@@ -25,7 +26,6 @@
2526
import org.apache.pulsar.client.api.PulsarClientException;
2627
import org.apache.pulsar.client.impl.MessageIdImpl;
2728
import org.apache.pulsar.common.util.Backoff;
28-
import org.apache.pulsar.common.util.BackoffBuilder;
2929
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
3030

3131
/**
@@ -49,11 +49,11 @@ public AmqpPulsarConsumer(String consumerTag, Consumer<byte[]> consumer, boolean
4949
this.autoAck = autoAck;
5050
this.amqpChannel = amqpChannel;
5151
this.executorService = executorService;
52-
this.consumeBackoff = new BackoffBuilder()
53-
.setInitialTime(1, TimeUnit.MILLISECONDS)
54-
.setMax(1, TimeUnit.SECONDS)
55-
.setMandatoryStop(0, TimeUnit.SECONDS)
56-
.create();
52+
this.consumeBackoff = Backoff.builder()
53+
.initialDelay(Duration.ofMillis(1))
54+
.maxBackoff(Duration.ofSeconds(1))
55+
.mandatoryStop(Duration.ofSeconds(0))
56+
.build();
5757
}
5858

5959
public void startConsume() {
@@ -69,7 +69,7 @@ private void consume() {
6969
try {
7070
message = this.consumer.receive(0, TimeUnit.SECONDS);
7171
if (message == null) {
72-
this.executorService.schedule(this::consume, consumeBackoff.next(), TimeUnit.MILLISECONDS);
72+
this.executorService.schedule(this::consume, consumeBackoff.next().toMillis(), TimeUnit.MILLISECONDS);
7373
return;
7474
}
7575

@@ -95,7 +95,7 @@ private void consume() {
9595
consumeBackoff.reset();
9696
this.consume();
9797
} catch (Exception e) {
98-
long backoff = consumeBackoff.next();
98+
long backoff = consumeBackoff.next().toMillis();
9999
log.error("Failed to receive message and send to client, retry in {} ms.", backoff, e);
100100
this.executorService.schedule(this::consume, backoff, TimeUnit.MILLISECONDS);
101101
}

0 commit comments

Comments
 (0)