Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ Add a dependency using Maven
<dependency>
<groupId>com.sproutsocial</groupId>
<artifactId>nsq-j</artifactId>
<version>1.4.6</version>
<version>1.4.8</version>
</dependency>
```
or Gradle
```
dependencies {
compile 'com.sproutsocial:nsq-j:1.4.6'
compile 'com.sproutsocial:nsq-j:1.4.8'
}
```

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.sproutsocial</groupId>
<artifactId>nsq-j</artifactId>
<version>1.4.7</version>
<version>1.4.8</version>
<packaging>jar</packaging>

<name>nsq-j</name>
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/sproutsocial/nsq/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ public interface Message {

void forceFlush();

default boolean hasResponded() {
return false;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I like a default for this.

}

}
20 changes: 17 additions & 3 deletions src/main/java/com/sproutsocial/nsq/NSQMessage.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.sproutsocial.nsq;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

class NSQMessage implements Message {

Expand All @@ -10,6 +11,7 @@ class NSQMessage implements Message {
private final byte[] data;
private final String topic;
private final SubConnection connection;
private final AtomicBoolean responded = new AtomicBoolean();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we want more granular information about the final state of the message like was it finished, requeued, etc.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chrisparrinello this is mostly for bringing our library at-par (with respect to this feature) with other ones (go/python)
Adding more granular status, while may be desirable, is a bit out of scope.


NSQMessage(long timestamp, int attempts, String id, byte[] data, String topic, SubConnection connection) {
this.timestamp = timestamp;
Expand Down Expand Up @@ -47,21 +49,28 @@ public String getTopic() {

@Override
public void finish() {
connection.finish(id);
if (responded.compareAndSet(false, true)) {
connection.finish(id);
}
}

@Override
public void requeue() {
connection.requeue(id);
requeue(0);
}

@Override
public void requeue(int delayMillis) {
connection.requeue(id, delayMillis);
if (responded.compareAndSet(false, true)) {
connection.requeue(id, delayMillis);
}
}

@Override
public void touch() {
if (responded.get()) {
return;
}
connection.touch(id);
}

Expand All @@ -74,6 +83,11 @@ public void forceFlush() {
}
}

@Override
public boolean hasResponded() {
return responded.get();
}

SubConnection getConnection() {
return connection;
}
Expand Down
53 changes: 53 additions & 0 deletions src/test/java/com/sproutsocial/nsq/NSQMessageTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.sproutsocial.nsq;

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class NSQMessageTest {

private final SubConnection connection = Mockito.mock(SubConnection.class);

@Before
public void resetTest() {
Mockito.reset(connection);
}

@Test
public void shouldMarkRespondedOnFinish() {
final NSQMessage message = new NSQMessage(0L, 0, "id", null, "topic", connection);
assertFalse(message.hasResponded());
message.finish();
assertTrue(message.hasResponded());
Mockito.verify(connection).finish("id");
Mockito.verifyNoMoreInteractions(connection);
}

@Test
public void shouldMarkRespondedOnRequeue() {
final NSQMessage message = new NSQMessage(0L, 0, "id", null, "topic", connection);
assertFalse(message.hasResponded());
message.requeue();
assertTrue(message.hasResponded());
Mockito.verify(connection).requeue("id", 0);
Mockito.verifyNoMoreInteractions(connection);
}

@Test
public void shouldNotTouchWhenResponded() {
final NSQMessage message = new NSQMessage(0L, 0, "id", null, "topic", connection);
assertFalse(message.hasResponded());
message.touch();
Mockito.verify(connection).touch("id");
message.requeue();
assertTrue(message.hasResponded());
// reset mock
Mockito.reset(connection);
message.touch();
Mockito.verifyZeroInteractions(connection);
}

}