Skip to content

Commit 44baa62

Browse files
committed
Make hosts with same Endpoint share same ConvictionPolicy instance
Due to `Connection` relying on `Endpoint` to get `ConvictionPolicy` instance, there is possibility that same connection can address different `ConvictionPolicy` instances thoughout it's lifecycle. As result, a connection can increase `openConnections` on one `ConvictionPolicy` instance and decrease on another. When it happens, assertion panic occures: ``` java.lang.AssertionError at com.datastax.driver.core.ConvictionPolicy$DefaultConvictionPolicy.signalConnectionClosed(ConvictionPolicy.java:90) at com.datastax.driver.core.Connection.closeAsync(Connection.java:953) at com.datastax.driver.core.HostConnectionPool.discardAvailableConnections(HostConnectionPool.java:882) at com.datastax.driver.core.HostConnectionPool.closeAsync(HostConnectionPool.java:843) ``` To fix that we need to make `ConvictionPolicy` instance being shared between different hosts with same `Endpoint`. It is not the best solution, but given lot's of code uses `Endpoint` as unique host identifier it is good enough.
1 parent cd0b08e commit 44baa62

File tree

6 files changed

+57
-33
lines changed

6 files changed

+57
-33
lines changed

driver-core/src/main/java/com/datastax/driver/core/Cluster.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1611,8 +1611,7 @@ class Manager implements Connection.DefaultResponseHandler {
16111611
Connection.Factory connectionFactory;
16121612
ControlConnection controlConnection;
16131613

1614-
final ConvictionPolicy.Factory convictionPolicyFactory =
1615-
new ConvictionPolicy.DefaultConvictionPolicy.Factory();
1614+
final ConvictionPolicy.Factory convictionPolicyFactory;
16161615

16171616
ListeningExecutorService executor;
16181617
ListeningExecutorService blockingExecutor;
@@ -1676,8 +1675,11 @@ private Manager(
16761675
.withNettyOptions(configuration.getNettyOptions())
16771676
.withCodecRegistry(configuration.getCodecRegistry())
16781677
.build();
1678+
this.convictionPolicyFactory =
1679+
new ConvictionPolicy.DefaultConvictionPolicy.Factory(policies.getReconnectionPolicy());
16791680
} else {
16801681
this.configuration = configuration;
1682+
this.convictionPolicyFactory = new ConvictionPolicy.DefaultConvictionPolicy.Factory(null);
16811683
}
16821684
this.contactPoints = contactPoints;
16831685
this.listeners = new CopyOnWriteArraySet<Host.StateListener>(listeners);

driver-core/src/main/java/com/datastax/driver/core/Connection.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,9 @@ else if (Host.statesLogger.isDebugEnabled())
809809
// we still want to signal the error to the conviction policy.
810810
boolean decrement = signaled.compareAndSet(false, true);
811811

812-
boolean hostDown = host.convictionPolicy.signalConnectionFailure(this, decrement);
812+
boolean hostDown =
813+
host.convictionPolicy.signalConnectionFailure(
814+
this, decrement, host.state == Host.State.DOWN);
813815
if (hostDown) {
814816
factory.manager.signalHostDown(host, host.wasJustAdded());
815817
} else {

driver-core/src/main/java/com/datastax/driver/core/ConvictionPolicy.java

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static java.util.concurrent.TimeUnit.NANOSECONDS;
2020

2121
import com.datastax.driver.core.policies.ReconnectionPolicy;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.ConcurrentMap;
2224
import java.util.concurrent.atomic.AtomicInteger;
2325

2426
/**
@@ -45,7 +47,8 @@ abstract class ConvictionPolicy {
4547
*
4648
* @return whether the host should be considered down.
4749
*/
48-
abstract boolean signalConnectionFailure(Connection connection, boolean decrement);
50+
abstract boolean signalConnectionFailure(
51+
Connection connection, boolean decrement, boolean hostIsDown);
4952

5053
abstract boolean canReconnectNow();
5154

@@ -57,92 +60,111 @@ interface Factory {
5760
/**
5861
* Creates a new ConvictionPolicy instance for {@code host}.
5962
*
60-
* @param host the host this policy applies to
63+
* @param endpoint the host this policy applies to
6164
* @return the newly created {@link ConvictionPolicy} instance.
6265
*/
63-
ConvictionPolicy create(Host host, ReconnectionPolicy reconnectionPolicy);
66+
ConvictionPolicy create(EndPoint endpoint);
6467
}
6568

6669
static class DefaultConvictionPolicy extends ConvictionPolicy {
67-
private final Host host;
70+
private final EndPoint endpoint;
6871
private final ReconnectionPolicy reconnectionPolicy;
6972
private final AtomicInteger openConnections = new AtomicInteger();
7073

7174
private volatile long nextReconnectionTime = Long.MIN_VALUE;
7275
private ReconnectionPolicy.ReconnectionSchedule reconnectionSchedule;
7376

74-
private DefaultConvictionPolicy(Host host, ReconnectionPolicy reconnectionPolicy) {
75-
this.host = host;
77+
private DefaultConvictionPolicy(EndPoint endpoint, ReconnectionPolicy reconnectionPolicy) {
78+
this.endpoint = endpoint;
7679
this.reconnectionPolicy = reconnectionPolicy;
7780
}
7881

7982
@Override
8083
void signalConnectionsOpening(int count) {
81-
int newTotal = openConnections.addAndGet(count);
84+
int newTotal = this.openConnections.addAndGet(count);
8285
Host.statesLogger.debug(
83-
"[{}] preparing to open {} new connections, total = {}", host, count, newTotal);
86+
"[{}] preparing to open {} new connections, total = {}", endpoint, count, newTotal);
8487
resetReconnectionTime();
8588
}
8689

8790
@Override
8891
void signalConnectionClosed(Connection connection) {
89-
int remaining = openConnections.decrementAndGet();
92+
int remaining = this.openConnections.decrementAndGet();
9093
assert remaining >= 0;
91-
Host.statesLogger.debug("[{}] {} closed, remaining = {}", host, connection, remaining);
94+
Host.statesLogger.debug("[{}] {} closed, remaining = {}", endpoint, connection, remaining);
9295
}
9396

9497
@Override
95-
boolean signalConnectionFailure(Connection connection, boolean decrement) {
98+
boolean signalConnectionFailure(Connection connection, boolean decrement, boolean hostIsDown) {
9699
int remaining;
97100
if (decrement) {
98-
if (host.state != Host.State.DOWN) updateReconnectionTime();
101+
if (!hostIsDown) updateReconnectionTime();
99102

100-
remaining = openConnections.decrementAndGet();
103+
remaining = this.openConnections.decrementAndGet();
101104
assert remaining >= 0;
102-
Host.statesLogger.debug("[{}] {} failed, remaining = {}", host, connection, remaining);
105+
Host.statesLogger.debug("[{}] {} failed, remaining = {}", endpoint, connection, remaining);
103106
} else {
104-
remaining = openConnections.get();
107+
remaining = this.openConnections.get();
105108
}
106109
return remaining == 0;
107110
}
108111

109112
private synchronized void updateReconnectionTime() {
110113
long now = System.nanoTime();
111-
if (nextReconnectionTime > now)
114+
if (this.nextReconnectionTime > now)
112115
// Someone else updated the time before us
113116
return;
114117

115-
if (reconnectionSchedule == null) reconnectionSchedule = reconnectionPolicy.newSchedule();
118+
if (this.reconnectionSchedule == null)
119+
this.reconnectionSchedule = reconnectionPolicy.newSchedule();
116120

117-
long nextDelayMs = reconnectionSchedule.nextDelayMs();
121+
long nextDelayMs = this.reconnectionSchedule.nextDelayMs();
118122
Host.statesLogger.debug(
119-
"[{}] preventing new connections for the next {} ms", host, nextDelayMs);
120-
nextReconnectionTime = now + NANOSECONDS.convert(nextDelayMs, MILLISECONDS);
123+
"[{}] preventing new connections for the next {} ms", endpoint, nextDelayMs);
124+
this.nextReconnectionTime = now + NANOSECONDS.convert(nextDelayMs, MILLISECONDS);
121125
}
122126

123127
private synchronized void resetReconnectionTime() {
124-
reconnectionSchedule = null;
125-
nextReconnectionTime = Long.MIN_VALUE;
128+
this.reconnectionSchedule = null;
129+
this.nextReconnectionTime = Long.MIN_VALUE;
126130
}
127131

128132
@Override
129133
boolean canReconnectNow() {
130134
boolean canReconnectNow =
131-
nextReconnectionTime == Long.MIN_VALUE || System.nanoTime() >= nextReconnectionTime;
135+
this.nextReconnectionTime == Long.MIN_VALUE
136+
|| System.nanoTime() >= this.nextReconnectionTime;
132137
Host.statesLogger.trace("canReconnectNow={}", canReconnectNow);
133138
return canReconnectNow;
134139
}
135140

141+
private static class ConvictionState {}
142+
136143
@Override
137144
boolean hasActiveConnections() {
138-
return openConnections.get() > 0;
145+
return this.openConnections.get() > 0;
139146
}
140147

141148
static class Factory implements ConvictionPolicy.Factory {
149+
private final ConcurrentMap<EndPoint, ConvictionPolicy> policyCache =
150+
new ConcurrentHashMap<>();
151+
private final ReconnectionPolicy reconnectionPolicy;
152+
153+
public Factory(ReconnectionPolicy reconnectionPolicy) {
154+
this.reconnectionPolicy = reconnectionPolicy;
155+
}
142156

143157
@Override
144-
public ConvictionPolicy create(Host host, ReconnectionPolicy reconnectionPolicy) {
145-
return new DefaultConvictionPolicy(host, reconnectionPolicy);
158+
public ConvictionPolicy create(EndPoint endpoint) {
159+
ConvictionPolicy convictionPolicy = policyCache.get(endpoint);
160+
if (convictionPolicy == null) {
161+
convictionPolicy = new DefaultConvictionPolicy(endpoint, reconnectionPolicy);
162+
ConvictionPolicy alreadyExisting = policyCache.putIfAbsent(endpoint, convictionPolicy);
163+
if (alreadyExisting != null) {
164+
return alreadyExisting;
165+
}
166+
}
167+
return convictionPolicy;
146168
}
147169
}
148170
}

driver-core/src/main/java/com/datastax/driver/core/DelegatingCluster.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@ protected DelegatingCluster() {
4444
// accidentally calling a
4545
// parent method could initialize the parent state.
4646

47-
// Construct parent class with dummy parameters that will never get used (since super.init() is
48-
// never called).
4947
super("delegating_cluster", Collections.<EndPoint>emptyList(), null);
5048

5149
// Immediately close the parent class's internal Manager, to make sure that it will fail fast if

driver-core/src/main/java/com/datastax/driver/core/Host.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ enum State {
109109
if (endPoint == null || convictionPolicyFactory == null) throw new NullPointerException();
110110

111111
this.endPoint = endPoint;
112-
this.convictionPolicy = convictionPolicyFactory.create(this, manager.reconnectionPolicy());
112+
this.convictionPolicy = convictionPolicyFactory.create(this.endPoint);
113113
this.manager = manager;
114114
this.defaultExecutionInfo = new ExecutionInfo(this);
115115
this.state = State.ADDED;

driver-core/src/test/java/com/datastax/driver/core/AbstractReplicationStrategyTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ protected static class HostMock extends Host {
4545
private HostMock(InetSocketAddress address, Cluster.Manager manager) {
4646
super(
4747
EndPoints.forAddress(address),
48-
new ConvictionPolicy.DefaultConvictionPolicy.Factory(),
48+
new ConvictionPolicy.DefaultConvictionPolicy.Factory(null),
4949
manager);
5050
this.address = address;
5151
}

0 commit comments

Comments
 (0)