Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/integration-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,22 @@ jobs:
distribution: 'adopt'
java-version: 17

- name: install org.apache.pulsar.tests:integration:jar:tests:4.0.1
- name: install org.apache.pulsar.tests:integration:jar:tests:4.0.2
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: |
cd ~
git clone --depth 50 --single-branch --branch v4.0.1 https://github.com/apache/pulsar
git clone --depth 50 --single-branch --branch v4.0.2 https://github.com/apache/pulsar
cd pulsar
mvn -B -ntp -f tests/pom.xml -pl org.apache.pulsar.tests:tests-parent,org.apache.pulsar.tests:integration install

- name: build apachepulsar/pulsar-test-latest-version:latest
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: |
cd ~/pulsar
docker pull apachepulsar/pulsar-all:4.0.1
docker pull apachepulsar/pulsar:4.0.1
docker tag apachepulsar/pulsar-all:4.0.1 apachepulsar/pulsar-all:4.0.1-$(git rev-parse --short=7 HEAD)
docker tag apachepulsar/pulsar:4.0.1 apachepulsar/pulsar:4.0.1-$(git rev-parse --short=7 HEAD)
docker pull apachepulsar/pulsar-all:4.0.2
docker pull apachepulsar/pulsar:4.0.2
docker tag apachepulsar/pulsar-all:4.0.2 apachepulsar/pulsar-all:4.0.2-$(git rev-parse --short=7 HEAD)
docker tag apachepulsar/pulsar:4.0.2 apachepulsar/pulsar:4.0.2-$(git rev-parse --short=7 HEAD)
mvn -B -ntp -f tests/docker-images/pom.xml install -pl org.apache.pulsar.tests:latest-version-image -am -Pdocker,-main -DskipTests

- name: run integration tests
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ mvn install
In order to build this repository the linked Pulsar release must be released to Maven Central
other wise you have to build it locally.

For instance if this code depends on Pulsar 4.0.1 you have to build Pulsar 4.0.1 locally
For instance if this code depends on Pulsar 4.0.2 you have to build Pulsar 4.0.2 locally

```
git clone https://github.com/apache/pulsar
git checkout v4.0.1
git checkout v4.0.2
mvn clean install -DskipTests
```

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
</issueManagement>

<properties>
<pulsar.version>4.0.1</pulsar.version>
<pulsar.version>4.0.2</pulsar.version>
<kafka-client.version>2.7.2</kafka-client.version>
<storm.version>2.0.0</storm.version>
<kafka_0_8.version>0.8.1.1</kafka_0_8.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,21 @@ public void ack(Object msgId) {
}
}

public void negativeAck(Object msgId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be rename as msg

Copy link
Author

@AnuragReddy2000 AnuragReddy2000 Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just following the same approach used in the ack and fail methods in the same class, where the method argument is called msgId and has the type Object. But inside the method, there is a check on whether msgId is an instance of the Message class and then is type cast into the Message type and assigned to a variable called msg.

Do you still want me to rename the argument to some other name?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdhabalia Any comments on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, let's rename it to msg as it's not msgId

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed it to msg

if (msgId instanceof Message) {
Message<?> msg = (Message<?>) msgId;
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Received negative ack for message {}", spoutId, msg.getMessageId());
}
consumer.negativeAcknowledge(msg);
pendingMessageRetries.remove(msg.getMessageId());
// we should also remove message from failedMessages but it will be
// eventually removed while emitting next
// tuple
--pendingAcks;
}
}

@Override
public void fail(Object msgId) {
if (msgId instanceof Message) {
Expand All @@ -183,8 +198,13 @@ public void fail(Object msgId) {
--pendingAcks;
messagesFailed++;
} else {
LOG.warn("[{}] Number of retries limit reached, dropping the message {}", spoutId, id);
ack(msg);
if(pulsarSpoutConf.shouldNegativeAckFailedMessages()){
LOG.warn("[{}] Number of retries limit reached, negative acking the message {}", spoutId, id);
negativeAck(msg);
} else {
LOG.warn("[{}] Number of retries limit reached, dropping the message {}", spoutId, id);
ack(msg);
}
}
}

Expand Down Expand Up @@ -325,7 +345,13 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {

private boolean mapToValueAndEmit(Message<byte[]> msg) {
if (msg != null) {
Values values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg);
Values values;
try{
values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg);
} catch (Exception e){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change doesn't seem related to this PR? did we see any issue with the mapper failure? can we create a separate PR with the details if you have come across with such issue?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this change is not related to this PR. Let me remove it here and create a separate one for it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is removed.

LOG.error("[{}] Error mapping message to values", msg.getMessageId(), e);
return false;
}
++pendingAcks;
if (values == null) {
// since the mapper returned null, we can drop the message and
Expand Down Expand Up @@ -447,6 +473,11 @@ public void acknowledgeAsync(Message<?> msg) {
consumer.acknowledgeAsync(msg);
}

@Override
public void negativeAcknowledge(Message<?> msg) {
consumer.negativeAcknowledge(msg);
}

@Override
public void close() throws PulsarClientException {
consumer.close();
Expand Down Expand Up @@ -477,6 +508,11 @@ public void acknowledgeAsync(Message<?> msg) {
// No-op
}

@Override
public void negativeAcknowledge(Message<?> msg) {
// No-op
}

@Override
public void close() throws PulsarClientException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ public class PulsarSpoutConfiguration extends PulsarStormConfiguration {
private boolean autoUnsubscribe = false;
private boolean durableSubscription = true;
// read position if non-durable subscription is enabled : default oldest message available in topic
private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest;
private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest;
private boolean negativeAckFailedMessages = false;




/**
* @return the subscription name for the consumer in the spout
*/
Expand Down Expand Up @@ -192,4 +194,21 @@ public MessageId getNonDurableSubscriptionReadPosition() {
public void setNonDurableSubscriptionReadPosition(MessageId nonDurableSubscriptionReadPosition) {
this.nonDurableSubscriptionReadPosition = nonDurableSubscriptionReadPosition;
}

/**
*
* @return whether the consumer will negative ack the failed messages
*/
public boolean shouldNegativeAckFailedMessages(){
return this.negativeAckFailedMessages;
}

/**
* Sets whether the consumer will negative ack the failed messages. <i>(default: false)</i>
*
* @param negativeAckFailedMessages
*/
public void setNegativeAckFailedMessages(boolean negativeAckFailedMessages){
this.negativeAckFailedMessages = negativeAckFailedMessages;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ public interface PulsarSpoutConsumer {
*/
void acknowledgeAsync(Message<?> msg);

/**
* Negative ack the message.
*
* @param msg
*/
void negativeAcknowledge(Message<?> msg);

/**
* unsubscribe the consumer
* @throws PulsarClientException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,7 @@
*/
package org.apache.pulsar.storm;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

Expand All @@ -38,13 +29,11 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer;
import org.apache.storm.spout.SpoutOutputCollector;
Expand Down Expand Up @@ -98,6 +87,71 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
verify(consumer, atLeast(1)).receive(anyInt(), any());
}

@Test
public void testFailedMessageNegativeAck() throws Exception {
testFailedMessageRetryExhausted(true);
}

@Test
public void testFailedMessageAckAndDrop() throws Exception {
testFailedMessageRetryExhausted(false);
}


public void testFailedMessageRetryExhausted(boolean negativeAckFailedMessages) throws Exception {

PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
conf.setServiceUrl("http://localhost:8080");
conf.setSubscriptionName("sub1");
conf.setTopic("persistent://prop/ns1/topic1");
conf.setSubscriptionType(SubscriptionType.Exclusive);
conf.setMaxFailedRetries(1);
conf.setNegativeAckFailedMessages(negativeAckFailedMessages);
conf.setMessageToValuesMapper(new MessageToValuesMapper() {
@Override
public Values toValues(Message<byte[]> msg) {
return null;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}

});

DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder()
.maxRedeliverCount(1)
.deadLetterTopic("persistent://prop/ns1/dl-topic-1")
.build();
ConsumerConfigurationData<byte[]> consumerConfig = new ConsumerConfigurationData<>();
consumerConfig.setDeadLetterPolicy(deadLetterPolicy);

ClientConfigurationData clientConfigurationData = spy(new ClientBuilderImpl()).getClientConfigurationData().clone();
PulsarSpout spout = spy(new PulsarSpout(conf, clientConfigurationData, consumerConfig));

Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(),
new byte[0], Schema.BYTES, new MessageMetadata());
Consumer<byte[]> consumer = mock(Consumer.class);
SpoutConsumer spoutConsumer = new SpoutConsumer(consumer);
doNothing().when(consumer).negativeAcknowledge(msg);

Field consField = PulsarSpout.class.getDeclaredField("consumer");
consField.setAccessible(true);
consField.set(spout, spoutConsumer);

spout.fail(msg);
spout.fail(msg);

if(negativeAckFailedMessages){
verify(consumer, atLeast(1)).negativeAcknowledge(msg);
verify(consumer, never()).acknowledgeAsync(msg);
} else {
verify(consumer, never()).negativeAcknowledge(msg);
verify(consumer, atLeast(1)).acknowledgeAsync(msg);
}

}

@Test
public void testPulsarTuple() throws Exception {
testPulsarSpout(true);
Expand Down