Skip to content

Commit d794dd4

Browse files
poorbarcodemukesh-ctds
authored andcommitted
[fix][broker] Geo Replication lost messages or frequently fails due to Deduplication is not appropriate for Geo-Replication (apache#23697)
(cherry picked from commit 4ac4f3c) (cherry picked from commit 307b5c9)
1 parent 2a8931e commit d794dd4

File tree

19 files changed

+1552
-47
lines changed

19 files changed

+1552
-47
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ public void startProducer() {
177177
prepareCreateProducer().thenCompose(ignore -> {
178178
ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder;
179179
builderImpl.getConf().setNonPartitionedTopicExpected(true);
180+
builderImpl.getConf().setReplProducer(true);
180181
return producerBuilder.createAsync().thenAccept(producer -> {
181182
setProducerAndTriggerReadEntries(producer);
182183
});

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java

Lines changed: 75 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
2222
import static org.apache.pulsar.broker.service.AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER;
23+
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_IS_REPL_MARKER;
24+
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION;
2325
import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
2426
import static org.apache.pulsar.common.protocol.Commands.readChecksum;
2527
import com.google.common.annotations.VisibleForTesting;
@@ -87,6 +89,7 @@ public class Producer {
8789

8890
private final PublisherStatsImpl stats;
8991
private final boolean isRemote;
92+
private final boolean isRemoteOrShadow;
9093
private final String remoteCluster;
9194
private final boolean isNonPersistentTopic;
9295
private final boolean isShadowTopic;
@@ -148,6 +151,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
148151

149152
String replicatorPrefix = serviceConf.getReplicatorPrefix() + ".";
150153
this.isRemote = producerName.startsWith(replicatorPrefix);
154+
this.isRemoteOrShadow = isRemoteOrShadow(producerName, serviceConf.getReplicatorPrefix());
151155
this.remoteCluster = parseRemoteClusterName(producerName, isRemote, replicatorPrefix);
152156

153157
this.isEncrypted = isEncrypted;
@@ -160,6 +164,13 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
160164
this.brokerInterceptor = cnx.getBrokerService().getInterceptor();
161165
}
162166

167+
/**
168+
* Difference with "isRemote" is whether the prefix string is end with a dot.
169+
*/
170+
public static boolean isRemoteOrShadow(String producerName, String replicatorPrefix) {
171+
return producerName != null && producerName.startsWith(replicatorPrefix);
172+
}
173+
163174
/**
164175
* Producer name for replicator is in format.
165176
* "replicatorPrefix.localCluster" (old)
@@ -281,11 +292,16 @@ private boolean checkCanProduceTxnOnTopic(long sequenceId, ByteBuf headersAndPay
281292
return true;
282293
}
283294

295+
private boolean isSupportsReplDedupByLidAndEid() {
296+
// Non-Persistent topic does not have ledger id or entry id, so it does not support.
297+
return cnx.isClientSupportsReplDedupByLidAndEid() && topic.isPersistent();
298+
}
299+
284300
private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked,
285301
boolean isMarker, Position position) {
286302
MessagePublishContext messagePublishContext =
287303
MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(),
288-
batchSize, isChunked, System.nanoTime(), isMarker, position);
304+
batchSize, isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
289305
if (brokerInterceptor != null) {
290306
brokerInterceptor
291307
.onMessagePublish(this, headersAndPayload, messagePublishContext);
@@ -297,7 +313,7 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenc
297313
long batchSize, boolean isChunked, boolean isMarker, Position position) {
298314
MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId,
299315
highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
300-
isChunked, System.nanoTime(), isMarker, position);
316+
isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
301317
if (brokerInterceptor != null) {
302318
brokerInterceptor
303319
.onMessagePublish(this, headersAndPayload, messagePublishContext);
@@ -393,6 +409,7 @@ private static final class MessagePublishContext implements PublishContext, Runn
393409
private long batchSize;
394410
private boolean chunked;
395411
private boolean isMarker;
412+
private boolean supportsReplDedupByLidAndEid;
396413

397414
private long startTimeNs;
398415

@@ -483,6 +500,11 @@ public long getOriginalSequenceId() {
483500
return originalSequenceId;
484501
}
485502

503+
@Override
504+
public boolean supportsReplDedupByLidAndEid() {
505+
return supportsReplDedupByLidAndEid;
506+
}
507+
486508
@Override
487509
public void setOriginalHighestSequenceId(long originalHighestSequenceId) {
488510
this.originalHighestSequenceId = originalHighestSequenceId;
@@ -550,8 +572,12 @@ public void run() {
550572
// stats
551573
rateIn.recordMultipleEvents(batchSize, msgSize);
552574
producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS);
553-
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId,
554-
ledgerId, entryId);
575+
if (producer.isRemoteOrShadow && producer.isSupportsReplDedupByLidAndEid()) {
576+
sendSendReceiptResponseRepl();
577+
} else {
578+
// Repl V1 is the same as normal for this handling.
579+
sendSendReceiptResponseNormal();
580+
}
555581
producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize);
556582
if (this.chunked) {
557583
producer.chunkedMessageRate.recordEvent();
@@ -564,8 +590,46 @@ public void run() {
564590
recycle();
565591
}
566592

593+
private void sendSendReceiptResponseRepl() {
594+
// Case-1: is a repl marker.
595+
boolean isReplMarker = getProperty(MSG_PROP_IS_REPL_MARKER) != null;
596+
if (isReplMarker) {
597+
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, Long.MIN_VALUE,
598+
ledgerId, entryId);
599+
600+
return;
601+
}
602+
// Case-2: is a repl message.
603+
Object positionPairObj = getProperty(MSG_PROP_REPL_SOURCE_POSITION);
604+
if (positionPairObj == null || !(positionPairObj instanceof long[])
605+
|| ((long[]) positionPairObj).length < 2) {
606+
log.error("[{}] Message can not determine whether the message is duplicated due to the acquired"
607+
+ " messages props were are invalid. producer={}. supportsReplDedupByLidAndEid: {},"
608+
+ " sequence-id {}, prop-{}: not in expected format",
609+
producer.topic.getName(), producer.producerName,
610+
supportsReplDedupByLidAndEid(), getSequenceId(),
611+
MSG_PROP_REPL_SOURCE_POSITION);
612+
producer.cnx.getCommandSender().sendSendError(producer.producerId,
613+
Math.max(highestSequenceId, sequenceId),
614+
ServerError.PersistenceError, "Message can not determine whether the message is"
615+
+ " duplicated due to the acquired messages props were are invalid");
616+
return;
617+
}
618+
long[] positionPair = (long[]) positionPairObj;
619+
long replSequenceLId = positionPair[0];
620+
long replSequenceEId = positionPair[1];
621+
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, replSequenceLId,
622+
replSequenceEId, ledgerId, entryId);
623+
}
624+
625+
private void sendSendReceiptResponseNormal() {
626+
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId,
627+
ledgerId, entryId);
628+
}
629+
567630
static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize,
568-
long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) {
631+
long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position,
632+
boolean supportsReplDedupByLidAndEid) {
569633
MessagePublishContext callback = RECYCLER.get();
570634
callback.producer = producer;
571635
callback.sequenceId = sequenceId;
@@ -577,6 +641,7 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn
577641
callback.originalSequenceId = -1L;
578642
callback.startTimeNs = startTimeNs;
579643
callback.isMarker = isMarker;
644+
callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid;
580645
callback.ledgerId = position == null ? -1 : position.getLedgerId();
581646
callback.entryId = position == null ? -1 : position.getEntryId();
582647
if (callback.propertyMap != null) {
@@ -586,7 +651,8 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn
586651
}
587652

588653
static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn,
589-
int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) {
654+
int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position,
655+
boolean supportsReplDedupByLidAndEid) {
590656
MessagePublishContext callback = RECYCLER.get();
591657
callback.producer = producer;
592658
callback.sequenceId = lowestSequenceId;
@@ -599,6 +665,7 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long
599665
callback.startTimeNs = startTimeNs;
600666
callback.chunked = chunked;
601667
callback.isMarker = isMarker;
668+
callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid;
602669
callback.ledgerId = position == null ? -1 : position.getLedgerId();
603670
callback.entryId = position == null ? -1 : position.getEntryId();
604671
if (callback.propertyMap != null) {
@@ -829,7 +896,8 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon
829896
}
830897
MessagePublishContext messagePublishContext =
831898
MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
832-
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null);
899+
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null,
900+
cnx.isClientSupportsReplDedupByLidAndEid());
833901
if (brokerInterceptor != null) {
834902
brokerInterceptor
835903
.onMessagePublish(this, headersAndPayload, messagePublishContext);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import javax.net.ssl.SSLSession;
6565
import javax.ws.rs.WebApplicationException;
6666
import javax.ws.rs.core.Response;
67+
import lombok.Getter;
6768
import org.apache.bookkeeper.mledger.AsyncCallbacks;
6869
import org.apache.bookkeeper.mledger.Entry;
6970
import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -235,6 +236,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
235236

236237
// Flag to manage throttling-rate by atomically enable/disable read-channel.
237238
private volatile boolean autoReadDisabledRateLimiting = false;
239+
@Getter
238240
private FeatureFlags features;
239241

240242
private PulsarCommandSender commandSender;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ default long getEntryTimestamp() {
127127
default void setEntryTimestamp(long entryTimestamp) {
128128

129129
}
130+
131+
default boolean supportsReplDedupByLidAndEid() {
132+
return false;
133+
}
130134
}
131135

132136
CompletableFuture<Void> initialize();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.net.SocketAddress;
2424
import java.util.concurrent.CompletableFuture;
2525
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
26+
import org.apache.pulsar.common.api.proto.FeatureFlags;
2627

2728
public interface TransportCnx {
2829

@@ -90,4 +91,11 @@ public interface TransportCnx {
9091
* is null if the connection liveness check is disabled.
9192
*/
9293
CompletableFuture<Boolean> checkConnectionLiveness();
94+
95+
FeatureFlags getFeatures();
96+
97+
default boolean isClientSupportsReplDedupByLidAndEid() {
98+
return getFeatures() != null && getFeatures().hasSupportsReplDedupByLidAndEid()
99+
&& getFeatures().isSupportsReplDedupByLidAndEid();
100+
}
93101
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.service.persistent;
2020

21+
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION;
2122
import io.netty.buffer.ByteBuf;
2223
import java.util.List;
2324
import java.util.concurrent.CompletableFuture;
@@ -195,9 +196,15 @@ protected boolean replicateEntries(List<Entry> entries) {
195196
msg.setSchemaInfoForReplicator(schemaFuture.get());
196197
msg.getMessageBuilder().clearTxnidMostBits();
197198
msg.getMessageBuilder().clearTxnidLeastBits();
199+
// Add props for sequence checking.
200+
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_POSITION)
201+
.setValue(String.format("%s:%s", entry.getLedgerId(), entry.getEntryId()));
198202
msgOut.recordEvent(headersAndPayload.readableBytes());
199203
// Increment pending messages for messages produced locally
200204
PENDING_MESSAGES_UPDATER.incrementAndGet(this);
205+
if (log.isDebugEnabled()) {
206+
log.debug("[{}] Publishing {}:{}", replicatorId, entry.getLedgerId(), entry.getEntryId());
207+
}
201208
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
202209
atLeastOneMessageSentForReplication = true;
203210
}

0 commit comments

Comments
 (0)