Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.sproutsocial</groupId>
<artifactId>nsq-j</artifactId>
<version>1.6.0</version>
<version>1.7.0</version>
<packaging>jar</packaging>

<name>nsq-j</name>
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/com/sproutsocial/nsq/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ abstract class Connection extends BasePubSub implements Closeable {
private static final ThreadFactory readThreadFactory = Util.threadFactory("nsq-read");
private static final Set<String> nonFatalErrors = Collections.unmodifiableSet(new HashSet<String>(
Arrays.asList("E_FIN_FAILED", "E_REQ_FAILED", "E_TOUCH_FAILED")));
private static final Set<String> authErrors = Collections.unmodifiableSet(new HashSet<String>(
Arrays.asList("E_AUTH_FAILED", "E_UNAUTHORIZED")));

private static final Logger logger = LoggerFactory.getLogger(Connection.class);

Expand Down Expand Up @@ -242,6 +244,9 @@ else if (frameType == 1) { //error
if (nonFatalErrors.contains(errorCode)) {
logger.warn("non fatal nsqd error:{} probably due to message timeout", error);
}
else if (authErrors.contains(errorCode)) {
throw new NSQException("auth session expired on nsqd:" + error, NSQErrorCode.AUTH_FAILED);
}
else {
throw new NSQException("error from nsqd:" + error);
}
Expand Down Expand Up @@ -279,6 +284,20 @@ else if (response != null) {
close();
}
}
catch (NSQException e) {
if (isReading) {
if (e.getErrorCode() == NSQErrorCode.AUTH_FAILED) {
logger.warn("auth session expired, triggering immediate reconnect. con:{}", toString());
close();
handleAuthFailure();
}
else {
respQueue.offer(e.toString());
close();
logger.error("read thread exception. con:{}", toString(), e);
}
}
}
catch (Exception e) {
if (isReading) {
respQueue.offer(e.toString());
Expand All @@ -289,6 +308,13 @@ else if (response != null) {
logger.debug("read loop done {}", toString());
}

/**
* Called when auth session expires. Override in subclasses to trigger immediate reconnection.
*/
protected void handleAuthFailure() {
// Base implementation does nothing. Overridden by SubConnection.
}

private synchronized void receivedHeartbeat() {
try {
out.write("NOP\n".getBytes(Util.US_ASCII));
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/com/sproutsocial/nsq/NSQErrorCode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.sproutsocial.nsq;

/**
* Error codes for NSQ exceptions
*/
public enum NSQErrorCode {
/**
* Authentication failure - auth session expired or invalid credentials
*/
AUTH_FAILED,

/**
* General NSQ error (protocol errors, invalid operations, etc.)
*/
GENERAL
}
16 changes: 16 additions & 0 deletions src/main/java/com/sproutsocial/nsq/NSQException.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,28 @@

public class NSQException extends RuntimeException {

private final NSQErrorCode errorCode;

public NSQException(String message) {
this(message, NSQErrorCode.GENERAL);
}

public NSQException(String message, NSQErrorCode errorCode) {
super(message);
this.errorCode = errorCode;
}

public NSQException(String message, Throwable cause) {
this(message, NSQErrorCode.GENERAL, cause);
}

public NSQException(String message, NSQErrorCode errorCode, Throwable cause) {
super(message, cause);
this.errorCode = errorCode;
}

public NSQErrorCode getErrorCode() {
return errorCode;
}

}
6 changes: 6 additions & 0 deletions src/main/java/com/sproutsocial/nsq/SubConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ public void run() {
}
}

@Override
protected void handleAuthFailure() {
// Trigger immediate reconnection when auth session expires
subscription.getSubscriber().immediateCheckConnections(topic);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make reconnect behavior pluggable. A few different ways a caller might want to handle authentication failures:

  1. Keep existing behavior, let auth failures bubble out of the client.
  2. Exponential / linear retry backoff.

For the case of actual bad credentials, won't an "immediate reconnect" bombard the auth server with infinite retries as fast as it can? At a minimum, we should probably give up after a certain number of authentication failures, since this would happen in the case of legitimate bad credentials.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i considered this early on but is not an issue because checkConnections() :

  1. attempts to create a new connection
  2. via read() will encounter the exception
  3. never adds the connection to Subscription.connectionMap
  4. while (isReading) is not running so there is not infinite loop.
    instead, we will see logger.error("error connecting to:{}", activeHost, e); every minute - this is current behavior.

}

@Override
public void close() {
super.close();
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/com/sproutsocial/nsq/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,21 @@ private synchronized void lookup() {
}
}

/**
* Immediately checks connections for a specific topic. This bypasses the normal periodic
* lookup interval and is used for immediate reconnection when auth failures occur.
*/
synchronized void immediateCheckConnections(String topic) {
if (isStopping) {
return;
}
for (Subscription sub : subscriptions) {
if (sub.getTopic().equals(topic)) {
sub.checkConnections(lookupTopic(topic));
}
}
}

@GuardedBy("this")
protected Set<HostAndPort> lookupTopic(String topic) {
Set<HostAndPort> nsqds = new HashSet<HostAndPort>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package com.sproutsocial.nsq;

import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Integration test for immediate reconnection behavior.
*
* These tests verify that the immediateCheckConnections() method (which is called
* when auth failures occur) works correctly to trigger immediate reconnection
* rather than waiting up to 60 seconds for the periodic checkConnections() call.
*
* Note: Actual E_AUTH_FAILED/E_UNAUTHORIZED error handling is tested in
* ConnectionAuthFailureTest, which verifies those errors throw AuthFailedException
* and trigger the handleAuthFailure() -> immediateCheckConnections() code path.
* These integration tests verify the reconnection mechanism itself works correctly.
*/
public class AuthFailureRecoveryDockerTestIT extends BaseDockerTestIT {
private static final Logger logger = LoggerFactory.getLogger(AuthFailureRecoveryDockerTestIT.class);
private Publisher publisher;
private Subscriber subscriber;

@Override
public void setup() {
super.setup();
publisher = this.backupPublisher();
}

/**
* Tests that the subscriber can successfully establish connections and receive messages.
* This baseline test ensures the immediate reconnection mechanism doesn't break
* normal operation.
*/
@Test
public void testNormalSubscriberOperation() {
TestMessageHandler handler = new TestMessageHandler();
subscriber = new Subscriber(client, 1, 10, cluster.getLookupNode().getHttpHostAndPort().toString());
subscriber.setDefaultMaxInFlight(10);
subscriber.subscribe(topic, "channel", handler);

// Send messages and verify they're received
List<String> messages = messages(20, 40);
send(topic, messages, 0, 0, publisher);

// Verify all messages received
List<NSQMessage> receivedMessages = handler.drainMessagesOrTimeOut(20);
validateReceivedAllMessages(messages, receivedMessages, false);
}

/**
* Tests that subscriber can recover from connection drops.
* This simulates a scenario similar to auth failure where connection is lost
* and needs to be re-established.
*/
@Test
public void testSubscriberReconnectionAfterNetworkDisruption() {
TestMessageHandler handler = new TestMessageHandler();
subscriber = new Subscriber(client, 1, 10, cluster.getLookupNode().getHttpHostAndPort().toString());
subscriber.setDefaultMaxInFlight(10);
subscriber.subscribe(topic, "channel", handler);

// Send first batch of messages
List<String> batch1 = messages(10, 40);
send(topic, batch1, 0, 0, publisher);
List<NSQMessage> received1 = handler.drainMessagesOrTimeOut(10);
validateReceivedAllMessages(batch1, received1, false);

// Simulate connection issue by disconnecting and reconnecting network
logger.info("Disconnecting network for nsqd node to simulate connection drop");
cluster.disconnectNetworkFor(cluster.getNsqdNodes().get(0));
Util.sleepQuietly(2000);

cluster.reconnectNetworkFor(cluster.getNsqdNodes().get(0));
Util.sleepQuietly(2000);

// Send second batch and verify recovery
List<String> batch2 = messages(10, 40);
send(topic, batch2, 0, 0, publisher);
List<NSQMessage> received2 = handler.drainMessagesOrTimeOut(10, 20000);
validateReceivedAllMessages(batch2, received2, false);

logger.info("Successfully received messages after connection recovery");
}

/**
* Tests that the immediateCheckConnections method is accessible and functional.
* This verifies the immediate reconnection path exists.
*/
@Test
public void testImmediateCheckConnectionsMethod() {
TestMessageHandler handler = new TestMessageHandler();
subscriber = new Subscriber(client, 1, 10, cluster.getLookupNode().getHttpHostAndPort().toString());
subscriber.setDefaultMaxInFlight(10);
subscriber.subscribe(topic, "channel", handler);

// Send initial messages to establish connection
List<String> messages = messages(5, 40);
send(topic, messages, 0, 0, publisher);
handler.drainMessagesOrTimeOut(5);

// Call immediateCheckConnections (this is what gets called on auth failure)
subscriber.immediateCheckConnections(topic);

// Verify subscriber still works after immediate check
List<String> moreMessages = messages(5, 40);
send(topic, moreMessages, 0, 0, publisher);
List<NSQMessage> received = handler.drainMessagesOrTimeOut(5);
validateReceivedAllMessages(moreMessages, received, false);

logger.info("immediateCheckConnections method works correctly");
}

/**
* Tests that multiple rapid connection checks don't cause issues.
* This simulates what might happen if multiple connections experience auth failures.
*/
@Test
public void testMultipleRapidConnectionChecks() {
TestMessageHandler handler = new TestMessageHandler();
subscriber = new Subscriber(client, 1, 10, cluster.getLookupNode().getHttpHostAndPort().toString());
subscriber.setDefaultMaxInFlight(10);
subscriber.subscribe(topic, "channel", handler);

// Send initial messages to establish connections
List<String> messages = messages(10, 40);
send(topic, messages, 0, 0, publisher);
handler.drainMessagesOrTimeOut(10);

// Verify connections are established
Assert.assertTrue("Should have connections", subscriber.getConnectionCount() > 0);

// Call immediateCheckConnections multiple times rapidly
// (simulating multiple auth failures in quick succession)
for (int i = 0; i < 5; i++) {
subscriber.immediateCheckConnections(topic);
Util.sleepQuietly(100);
}

// Verify subscriber still works correctly
List<String> moreMessages = messages(10, 40);
send(topic, moreMessages, 0, 0, publisher);
List<NSQMessage> received = handler.drainMessagesOrTimeOut(10);
validateReceivedAllMessages(moreMessages, received, false);

logger.info("Multiple rapid connection checks handled correctly");
}

/**
* Verifies that the subscriber's connection count updates correctly after
* immediate connection checks (which would happen during auth failure recovery).
*/
@Test
public void testConnectionCountAfterImmediateReconnect() {
TestMessageHandler handler = new TestMessageHandler();
subscriber = new Subscriber(client, 1, 10, cluster.getLookupNode().getHttpHostAndPort().toString());
subscriber.setDefaultMaxInFlight(10);
subscriber.subscribe(topic, "channel", handler);

// Send messages to create topic and establish initial connections
List<String> initialMessages = messages(5, 40);
send(topic, initialMessages, 0, 0, publisher);
handler.drainMessagesOrTimeOut(5);

int initialConnectionCount = subscriber.getConnectionCount();
Assert.assertTrue("Should have at least one connection", initialConnectionCount > 0);
logger.info("Initial connection count: {}", initialConnectionCount);

// Trigger immediate check (simulating auth failure recovery)
subscriber.immediateCheckConnections(topic);
Util.sleepQuietly(2000);

int afterCheckCount = subscriber.getConnectionCount();
Assert.assertTrue("Should still have connections after immediate check", afterCheckCount > 0);
logger.info("Connection count after immediate check: {}", afterCheckCount);

// Verify messages can still be processed
List<String> messages = messages(10, 40);
send(topic, messages, 0, 0, publisher);
List<NSQMessage> received = handler.drainMessagesOrTimeOut(10);
Assert.assertEquals("Should receive all messages after reconnect", 10, received.size());
}

/**
* Tests that calling immediateCheckConnections with bad/non-existent hosts doesn't hang.
* This simulates what would happen with genuinely bad credentials where
* connection attempts fail but should complete quickly without infinite loops.
*
* Key behavior being tested:
* - immediateCheckConnections() completes quickly even if connections fail
* - No infinite loops - test completes in reasonable time
* - Failed connections are not added to connectionMap
*/
@Test
public void testBadConnectionsDoNotCauseInfiniteLoop() {
logger.info("Testing that failed connections don't cause infinite retry loops");

// Create subscriber pointing to non-existent lookup server
// This simulates a scenario where auth credentials are bad
subscriber = new Subscriber(client, 300, 10, "127.0.0.1:54321"); // Unreachable port
subscriber.setDefaultMaxInFlight(10);

TestMessageHandler handler = new TestMessageHandler();
subscriber.subscribe(topic, "channel", handler);

// Verify no connections established (lookup fails)
Util.sleepQuietly(1000);
int initialConnectionCount = subscriber.getConnectionCount();
Assert.assertEquals("Should have no connections with bad lookup", 0, initialConnectionCount);

// Call immediateCheckConnections multiple times
// If this caused an infinite loop, the test would hang here
long startTime = System.currentTimeMillis();
for (int i = 0; i < 5; i++) {
subscriber.immediateCheckConnections(topic);
Util.sleepQuietly(100);
}
long elapsedTime = System.currentTimeMillis() - startTime;

// Verify still no connections (failed attempts not added)
int finalConnectionCount = subscriber.getConnectionCount();
Assert.assertEquals("Failed connections should not be in connectionMap", 0, finalConnectionCount);

// Verify test completed quickly (no infinite loop)
Assert.assertTrue("Should complete quickly without infinite loop", elapsedTime < 10000);

logger.info("Successfully verified: Failed connections don't cause infinite retry loops");
logger.info("- {} calls to immediateCheckConnections completed in {}ms", 5, elapsedTime);
logger.info("- Connection count remained at 0 (failed connections not added)");
}

@Override
public void teardown() throws InterruptedException {
if (publisher != null) {
publisher.stop();
}
if (subscriber != null) {
subscriber.stop();
}
super.teardown();
}
}
Loading