Skip to content

Commit 76c6f6a

Browse files
authored
[fix][io] Acknowledge RabbitMQ message after processing the message successfully (#24354)
1 parent 12c96f6 commit 76c6f6a

File tree

1 file changed

+27
-4
lines changed

1 file changed

+27
-4
lines changed

pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,17 +96,40 @@ public RabbitMQConsumer(RabbitMQSource source, Channel channel) {
9696
@Override
9797
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
9898
throws IOException {
99-
source.consume(new RabbitMQRecord(Optional.ofNullable(envelope.getRoutingKey()), body));
10099
long deliveryTag = envelope.getDeliveryTag();
101-
// positively acknowledge all deliveries up to this delivery tag to reduce network traffic
102-
// since manual message acknowledgments are turned on by default
103-
this.getChannel().basicAck(deliveryTag, true);
100+
source.consume(new RabbitMQRecord(Optional.ofNullable(envelope.getRoutingKey()), body, () -> {
101+
// acknowledge this delivery tag to RabbitMQ
102+
try {
103+
this.getChannel().basicAck(deliveryTag, false);
104+
} catch (IOException e) {
105+
logger.error("Error while acknowledging envelope {}.", envelope, e);
106+
}
107+
}, () -> {
108+
// negatively acknowledge this delivery tag to RabbitMQ
109+
try {
110+
this.getChannel().basicNack(deliveryTag, false, true);
111+
} catch (IOException e) {
112+
logger.error("Error while negatively acknowledging envelope {}.", envelope, e);
113+
}
114+
}));
104115
}
105116
}
106117

107118
@Data
108119
private static class RabbitMQRecord implements Record<byte[]> {
109120
private final Optional<String> key;
110121
private final byte[] value;
122+
private final Runnable ackFunction;
123+
private final Runnable nackFunction;
124+
125+
@Override
126+
public void ack() {
127+
ackFunction.run();
128+
}
129+
130+
@Override
131+
public void fail() {
132+
nackFunction.run();
133+
}
111134
}
112135
}

0 commit comments

Comments
 (0)