Skip to content

Commit 261d0d5

Browse files
committed
Revert "[fix][broker] Geo Replication lost messages or frequently fails due to Deduplication is not appropriate for Geo-Replication (apache#23697)"
This reverts commit d794dd4.
1 parent 561c833 commit 261d0d5

File tree

19 files changed

+47
-1552
lines changed

19 files changed

+47
-1552
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ public void startProducer() {
177177
prepareCreateProducer().thenCompose(ignore -> {
178178
ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder;
179179
builderImpl.getConf().setNonPartitionedTopicExpected(true);
180-
builderImpl.getConf().setReplProducer(true);
181180
return producerBuilder.createAsync().thenAccept(producer -> {
182181
setProducerAndTriggerReadEntries(producer);
183182
});

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java

Lines changed: 7 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020

2121
import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
2222
import static org.apache.pulsar.broker.service.AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER;
23-
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_IS_REPL_MARKER;
24-
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION;
2523
import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
2624
import static org.apache.pulsar.common.protocol.Commands.readChecksum;
2725
import com.google.common.annotations.VisibleForTesting;
@@ -89,7 +87,6 @@ public class Producer {
8987

9088
private final PublisherStatsImpl stats;
9189
private final boolean isRemote;
92-
private final boolean isRemoteOrShadow;
9390
private final String remoteCluster;
9491
private final boolean isNonPersistentTopic;
9592
private final boolean isShadowTopic;
@@ -151,7 +148,6 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
151148

152149
String replicatorPrefix = serviceConf.getReplicatorPrefix() + ".";
153150
this.isRemote = producerName.startsWith(replicatorPrefix);
154-
this.isRemoteOrShadow = isRemoteOrShadow(producerName, serviceConf.getReplicatorPrefix());
155151
this.remoteCluster = parseRemoteClusterName(producerName, isRemote, replicatorPrefix);
156152

157153
this.isEncrypted = isEncrypted;
@@ -164,13 +160,6 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
164160
this.brokerInterceptor = cnx.getBrokerService().getInterceptor();
165161
}
166162

167-
/**
168-
* Difference with "isRemote" is whether the prefix string is end with a dot.
169-
*/
170-
public static boolean isRemoteOrShadow(String producerName, String replicatorPrefix) {
171-
return producerName != null && producerName.startsWith(replicatorPrefix);
172-
}
173-
174163
/**
175164
* Producer name for replicator is in format.
176165
* "replicatorPrefix.localCluster" (old)
@@ -292,16 +281,11 @@ private boolean checkCanProduceTxnOnTopic(long sequenceId, ByteBuf headersAndPay
292281
return true;
293282
}
294283

295-
private boolean isSupportsReplDedupByLidAndEid() {
296-
// Non-Persistent topic does not have ledger id or entry id, so it does not support.
297-
return cnx.isClientSupportsReplDedupByLidAndEid() && topic.isPersistent();
298-
}
299-
300284
private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked,
301285
boolean isMarker, Position position) {
302286
MessagePublishContext messagePublishContext =
303287
MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(),
304-
batchSize, isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
288+
batchSize, isChunked, System.nanoTime(), isMarker, position);
305289
if (brokerInterceptor != null) {
306290
brokerInterceptor
307291
.onMessagePublish(this, headersAndPayload, messagePublishContext);
@@ -313,7 +297,7 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenc
313297
long batchSize, boolean isChunked, boolean isMarker, Position position) {
314298
MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId,
315299
highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
316-
isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
300+
isChunked, System.nanoTime(), isMarker, position);
317301
if (brokerInterceptor != null) {
318302
brokerInterceptor
319303
.onMessagePublish(this, headersAndPayload, messagePublishContext);
@@ -409,7 +393,6 @@ private static final class MessagePublishContext implements PublishContext, Runn
409393
private long batchSize;
410394
private boolean chunked;
411395
private boolean isMarker;
412-
private boolean supportsReplDedupByLidAndEid;
413396

414397
private long startTimeNs;
415398

@@ -500,11 +483,6 @@ public long getOriginalSequenceId() {
500483
return originalSequenceId;
501484
}
502485

503-
@Override
504-
public boolean supportsReplDedupByLidAndEid() {
505-
return supportsReplDedupByLidAndEid;
506-
}
507-
508486
@Override
509487
public void setOriginalHighestSequenceId(long originalHighestSequenceId) {
510488
this.originalHighestSequenceId = originalHighestSequenceId;
@@ -572,12 +550,8 @@ public void run() {
572550
// stats
573551
rateIn.recordMultipleEvents(batchSize, msgSize);
574552
producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS);
575-
if (producer.isRemoteOrShadow && producer.isSupportsReplDedupByLidAndEid()) {
576-
sendSendReceiptResponseRepl();
577-
} else {
578-
// Repl V1 is the same as normal for this handling.
579-
sendSendReceiptResponseNormal();
580-
}
553+
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId,
554+
ledgerId, entryId);
581555
producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize);
582556
if (this.chunked) {
583557
producer.chunkedMessageRate.recordEvent();
@@ -590,46 +564,8 @@ public void run() {
590564
recycle();
591565
}
592566

593-
private void sendSendReceiptResponseRepl() {
594-
// Case-1: is a repl marker.
595-
boolean isReplMarker = getProperty(MSG_PROP_IS_REPL_MARKER) != null;
596-
if (isReplMarker) {
597-
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, Long.MIN_VALUE,
598-
ledgerId, entryId);
599-
600-
return;
601-
}
602-
// Case-2: is a repl message.
603-
Object positionPairObj = getProperty(MSG_PROP_REPL_SOURCE_POSITION);
604-
if (positionPairObj == null || !(positionPairObj instanceof long[])
605-
|| ((long[]) positionPairObj).length < 2) {
606-
log.error("[{}] Message can not determine whether the message is duplicated due to the acquired"
607-
+ " messages props were are invalid. producer={}. supportsReplDedupByLidAndEid: {},"
608-
+ " sequence-id {}, prop-{}: not in expected format",
609-
producer.topic.getName(), producer.producerName,
610-
supportsReplDedupByLidAndEid(), getSequenceId(),
611-
MSG_PROP_REPL_SOURCE_POSITION);
612-
producer.cnx.getCommandSender().sendSendError(producer.producerId,
613-
Math.max(highestSequenceId, sequenceId),
614-
ServerError.PersistenceError, "Message can not determine whether the message is"
615-
+ " duplicated due to the acquired messages props were are invalid");
616-
return;
617-
}
618-
long[] positionPair = (long[]) positionPairObj;
619-
long replSequenceLId = positionPair[0];
620-
long replSequenceEId = positionPair[1];
621-
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, replSequenceLId,
622-
replSequenceEId, ledgerId, entryId);
623-
}
624-
625-
private void sendSendReceiptResponseNormal() {
626-
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId,
627-
ledgerId, entryId);
628-
}
629-
630567
static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize,
631-
long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position,
632-
boolean supportsReplDedupByLidAndEid) {
568+
long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) {
633569
MessagePublishContext callback = RECYCLER.get();
634570
callback.producer = producer;
635571
callback.sequenceId = sequenceId;
@@ -641,7 +577,6 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn
641577
callback.originalSequenceId = -1L;
642578
callback.startTimeNs = startTimeNs;
643579
callback.isMarker = isMarker;
644-
callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid;
645580
callback.ledgerId = position == null ? -1 : position.getLedgerId();
646581
callback.entryId = position == null ? -1 : position.getEntryId();
647582
if (callback.propertyMap != null) {
@@ -651,8 +586,7 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn
651586
}
652587

653588
static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn,
654-
int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position,
655-
boolean supportsReplDedupByLidAndEid) {
589+
int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) {
656590
MessagePublishContext callback = RECYCLER.get();
657591
callback.producer = producer;
658592
callback.sequenceId = lowestSequenceId;
@@ -665,7 +599,6 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long
665599
callback.startTimeNs = startTimeNs;
666600
callback.chunked = chunked;
667601
callback.isMarker = isMarker;
668-
callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid;
669602
callback.ledgerId = position == null ? -1 : position.getLedgerId();
670603
callback.entryId = position == null ? -1 : position.getEntryId();
671604
if (callback.propertyMap != null) {
@@ -896,8 +829,7 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon
896829
}
897830
MessagePublishContext messagePublishContext =
898831
MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
899-
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null,
900-
cnx.isClientSupportsReplDedupByLidAndEid());
832+
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null);
901833
if (brokerInterceptor != null) {
902834
brokerInterceptor
903835
.onMessagePublish(this, headersAndPayload, messagePublishContext);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import javax.net.ssl.SSLSession;
6565
import javax.ws.rs.WebApplicationException;
6666
import javax.ws.rs.core.Response;
67-
import lombok.Getter;
6867
import org.apache.bookkeeper.mledger.AsyncCallbacks;
6968
import org.apache.bookkeeper.mledger.Entry;
7069
import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -236,7 +235,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
236235

237236
// Flag to manage throttling-rate by atomically enable/disable read-channel.
238237
private volatile boolean autoReadDisabledRateLimiting = false;
239-
@Getter
240238
private FeatureFlags features;
241239

242240
private PulsarCommandSender commandSender;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,6 @@ default long getEntryTimestamp() {
127127
default void setEntryTimestamp(long entryTimestamp) {
128128

129129
}
130-
131-
default boolean supportsReplDedupByLidAndEid() {
132-
return false;
133-
}
134130
}
135131

136132
CompletableFuture<Void> initialize();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.net.SocketAddress;
2424
import java.util.concurrent.CompletableFuture;
2525
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
26-
import org.apache.pulsar.common.api.proto.FeatureFlags;
2726

2827
public interface TransportCnx {
2928

@@ -91,11 +90,4 @@ public interface TransportCnx {
9190
* is null if the connection liveness check is disabled.
9291
*/
9392
CompletableFuture<Boolean> checkConnectionLiveness();
94-
95-
FeatureFlags getFeatures();
96-
97-
default boolean isClientSupportsReplDedupByLidAndEid() {
98-
return getFeatures() != null && getFeatures().hasSupportsReplDedupByLidAndEid()
99-
&& getFeatures().isSupportsReplDedupByLidAndEid();
100-
}
10193
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.pulsar.broker.service.persistent;
2020

21-
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION;
2221
import io.netty.buffer.ByteBuf;
2322
import java.util.List;
2423
import java.util.concurrent.CompletableFuture;
@@ -196,15 +195,9 @@ protected boolean replicateEntries(List<Entry> entries) {
196195
msg.setSchemaInfoForReplicator(schemaFuture.get());
197196
msg.getMessageBuilder().clearTxnidMostBits();
198197
msg.getMessageBuilder().clearTxnidLeastBits();
199-
// Add props for sequence checking.
200-
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_POSITION)
201-
.setValue(String.format("%s:%s", entry.getLedgerId(), entry.getEntryId()));
202198
msgOut.recordEvent(headersAndPayload.readableBytes());
203199
// Increment pending messages for messages produced locally
204200
PENDING_MESSAGES_UPDATER.incrementAndGet(this);
205-
if (log.isDebugEnabled()) {
206-
log.debug("[{}] Publishing {}:{}", replicatorId, entry.getLedgerId(), entry.getEntryId());
207-
}
208201
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
209202
atLeastOneMessageSentForReplication = true;
210203
}

0 commit comments

Comments
 (0)