diff --git a/README.md b/README.md
index c732d04..5e97b2b 100644
--- a/README.md
+++ b/README.md
@@ -10,13 +10,13 @@ Add a dependency using Maven
com.sproutsocial
nsq-j
- 1.4.6
+ 1.4.8
```
or Gradle
```
dependencies {
- compile 'com.sproutsocial:nsq-j:1.4.6'
+ compile 'com.sproutsocial:nsq-j:1.4.8'
}
```
diff --git a/pom.xml b/pom.xml
index aabe16b..92a3375 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
com.sproutsocial
nsq-j
- 1.4.7
+ 1.4.8
jar
nsq-j
diff --git a/src/main/java/com/sproutsocial/nsq/Message.java b/src/main/java/com/sproutsocial/nsq/Message.java
index 955a7bf..11cf5c1 100644
--- a/src/main/java/com/sproutsocial/nsq/Message.java
+++ b/src/main/java/com/sproutsocial/nsq/Message.java
@@ -22,4 +22,8 @@ public interface Message {
void forceFlush();
+ default boolean hasResponded() {
+ return false;
+ }
+
}
diff --git a/src/main/java/com/sproutsocial/nsq/NSQMessage.java b/src/main/java/com/sproutsocial/nsq/NSQMessage.java
index f84729e..be3049d 100644
--- a/src/main/java/com/sproutsocial/nsq/NSQMessage.java
+++ b/src/main/java/com/sproutsocial/nsq/NSQMessage.java
@@ -1,6 +1,7 @@
package com.sproutsocial.nsq;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
class NSQMessage implements Message {
@@ -10,6 +11,7 @@ class NSQMessage implements Message {
private final byte[] data;
private final String topic;
private final SubConnection connection;
+ private final AtomicBoolean responded = new AtomicBoolean();
NSQMessage(long timestamp, int attempts, String id, byte[] data, String topic, SubConnection connection) {
this.timestamp = timestamp;
@@ -47,21 +49,28 @@ public String getTopic() {
@Override
public void finish() {
- connection.finish(id);
+ if (responded.compareAndSet(false, true)) {
+ connection.finish(id);
+ }
}
@Override
public void requeue() {
- connection.requeue(id);
+ requeue(0);
}
@Override
public void requeue(int delayMillis) {
- connection.requeue(id, delayMillis);
+ if (responded.compareAndSet(false, true)) {
+ connection.requeue(id, delayMillis);
+ }
}
@Override
public void touch() {
+ if (responded.get()) {
+ return;
+ }
connection.touch(id);
}
@@ -74,6 +83,11 @@ public void forceFlush() {
}
}
+ @Override
+ public boolean hasResponded() {
+ return responded.get();
+ }
+
SubConnection getConnection() {
return connection;
}
diff --git a/src/test/java/com/sproutsocial/nsq/NSQMessageTest.java b/src/test/java/com/sproutsocial/nsq/NSQMessageTest.java
new file mode 100644
index 0000000..5f40fdf
--- /dev/null
+++ b/src/test/java/com/sproutsocial/nsq/NSQMessageTest.java
@@ -0,0 +1,53 @@
+package com.sproutsocial.nsq;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class NSQMessageTest {
+
+ private final SubConnection connection = Mockito.mock(SubConnection.class);
+
+ @Before
+ public void resetTest() {
+ Mockito.reset(connection);
+ }
+
+ @Test
+ public void shouldMarkRespondedOnFinish() {
+ final NSQMessage message = new NSQMessage(0L, 0, "id", null, "topic", connection);
+ assertFalse(message.hasResponded());
+ message.finish();
+ assertTrue(message.hasResponded());
+ Mockito.verify(connection).finish("id");
+ Mockito.verifyNoMoreInteractions(connection);
+ }
+
+ @Test
+ public void shouldMarkRespondedOnRequeue() {
+ final NSQMessage message = new NSQMessage(0L, 0, "id", null, "topic", connection);
+ assertFalse(message.hasResponded());
+ message.requeue();
+ assertTrue(message.hasResponded());
+ Mockito.verify(connection).requeue("id", 0);
+ Mockito.verifyNoMoreInteractions(connection);
+ }
+
+ @Test
+ public void shouldNotTouchWhenResponded() {
+ final NSQMessage message = new NSQMessage(0L, 0, "id", null, "topic", connection);
+ assertFalse(message.hasResponded());
+ message.touch();
+ Mockito.verify(connection).touch("id");
+ message.requeue();
+ assertTrue(message.hasResponded());
+ // reset mock
+ Mockito.reset(connection);
+ message.touch();
+ Mockito.verifyZeroInteractions(connection);
+ }
+
+}
\ No newline at end of file