Skip to content

Commit 355920e

Browse files
Differentiate between initial and reconnect RCS connections (#134415)
Adds an initial connection attempt boolean to RemoteConnectionStrategy, with debug logging on connection success and warning logging on connection failure. This change will be used in a follow up PR where we will increment either an initial connection failure metric or a reconnection attempt failure metric. Resolves: ES-12694
1 parent 39503bf commit 355920e

File tree

5 files changed

+178
-5
lines changed

5 files changed

+178
-5
lines changed

docs/changelog/134415.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 134415
2+
summary: Differentiate between initial and reconnect RCS connections
3+
area: Network
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,6 @@ public void onFailure(Exception e) {
163163
exceptions.put(new Tuple<>(e.getClass(), e.getMessage()), e);
164164
if (countDown.countDown()) {
165165
if (attemptNumber >= MAX_CONNECT_ATTEMPTS_PER_RUN && connectionManager.size() == 0) {
166-
logger.warn(() -> "failed to open any proxy connections to cluster [" + clusterAlias + "]", e);
167166
if (exceptions.values().stream().allMatch(RemoteConnectionStrategy::isRetryableException)) {
168167
finished.onFailure(getNoSeedNodeLeftException(exceptions.values()));
169168
} else {

server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.apache.lucene.store.AlreadyClosedException;
1515
import org.elasticsearch.action.ActionListener;
1616
import org.elasticsearch.action.support.ContextPreservingActionListener;
17+
import org.elasticsearch.cluster.metadata.ProjectId;
1718
import org.elasticsearch.cluster.node.DiscoveryNode;
1819
import org.elasticsearch.common.io.stream.Writeable;
1920
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -76,12 +77,17 @@ public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
7677
private final AtomicBoolean closed = new AtomicBoolean(false);
7778
private final Object mutex = new Object();
7879
private List<ActionListener<Void>> listeners = new ArrayList<>();
80+
private final AtomicBoolean initialConnectionAttempted = new AtomicBoolean(false);
7981

8082
protected final TransportService transportService;
8183
protected final RemoteConnectionManager connectionManager;
84+
protected final ProjectId originProjectId;
85+
protected final ProjectId linkedProjectId;
8286
protected final String clusterAlias;
8387

8488
RemoteConnectionStrategy(LinkedProjectConfig config, TransportService transportService, RemoteConnectionManager connectionManager) {
89+
this.originProjectId = config.originProjectId();
90+
this.linkedProjectId = config.linkedProjectId();
8591
this.clusterAlias = config.linkedProjectAlias();
8692
this.transportService = transportService;
8793
this.connectionManager = connectionManager;
@@ -190,11 +196,13 @@ protected void doRun() {
190196
connectImpl(new ActionListener<>() {
191197
@Override
192198
public void onResponse(Void aVoid) {
199+
connectionAttemptCompleted(null);
193200
ActionListener.onResponse(getAndClearListeners(), aVoid);
194201
}
195202

196203
@Override
197204
public void onFailure(Exception e) {
205+
connectionAttemptCompleted(e);
198206
ActionListener.onFailure(getAndClearListeners(), e);
199207
}
200208
});
@@ -203,6 +211,24 @@ public void onFailure(Exception e) {
203211
}
204212
}
205213

214+
private void connectionAttemptCompleted(@Nullable Exception e) {
215+
final boolean isInitialAttempt = initialConnectionAttempted.compareAndSet(false, true);
216+
final org.apache.logging.log4j.util.Supplier<String> msgSupplier = () -> format(
217+
"Origin project [%s] %s linked project [%s] with alias [%s] on %s attempt",
218+
originProjectId,
219+
e == null ? "successfully connected to" : "failed to connect to",
220+
linkedProjectId,
221+
clusterAlias,
222+
isInitialAttempt ? "the initial connection" : "a reconnection"
223+
);
224+
if (e == null) {
225+
logger.debug(msgSupplier);
226+
} else {
227+
logger.warn(msgSupplier, e);
228+
// TODO: ES-12695: Increment either the initial or retry connection failure metric.
229+
}
230+
}
231+
206232
boolean shouldRebuildConnection(LinkedProjectConfig config) {
207233
return config.connectionStrategy().equals(strategyType()) == false
208234
|| connectionProfileChanged(config)

server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodesSuppl
144144
logger.debug(() -> "fetching nodes from external cluster [" + clusterAlias + "] failed moving to next seed node", e);
145145
collectRemoteNodes(seedNodesSuppliers, listener);
146146
} else {
147-
logger.warn(() -> "fetching nodes from external cluster [" + clusterAlias + "] failed", e);
148147
listener.onFailure(e);
149148
}
150149
};

server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java

Lines changed: 147 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,26 @@
99

1010
package org.elasticsearch.transport;
1111

12+
import org.apache.logging.log4j.Level;
13+
import org.elasticsearch.TransportVersion;
1214
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.support.PlainActionFuture;
16+
import org.elasticsearch.cluster.ClusterName;
17+
import org.elasticsearch.cluster.metadata.ProjectId;
18+
import org.elasticsearch.cluster.node.VersionInformation;
1319
import org.elasticsearch.common.settings.Settings;
1420
import org.elasticsearch.common.util.concurrent.ThreadContext;
21+
import org.elasticsearch.core.Strings;
1522
import org.elasticsearch.core.TimeValue;
1623
import org.elasticsearch.test.ESTestCase;
1724
import org.elasticsearch.test.EnumSerializationTestUtils;
25+
import org.elasticsearch.test.MockLog;
26+
import org.elasticsearch.test.junit.annotations.TestLogging;
27+
import org.elasticsearch.test.transport.MockTransportService;
28+
import org.elasticsearch.threadpool.TestThreadPool;
29+
import org.elasticsearch.threadpool.ThreadPool;
1830

31+
import static org.elasticsearch.test.MockLog.assertThatLogger;
1932
import static org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings.PROXY_ADDRESS;
2033
import static org.elasticsearch.transport.RemoteClusterSettings.REMOTE_CONNECTION_MODE;
2134
import static org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS;
@@ -171,21 +184,148 @@ public void testConnectionStrategySerialization() {
171184
);
172185
}
173186

187+
@TestLogging(
188+
value = "org.elasticsearch.transport.RemoteConnectionStrategyTests.FakeConnectionStrategy:DEBUG",
189+
reason = "logging verification"
190+
)
191+
public void testConnectionAttemptLogging() {
192+
final var originProjectId = randomUniqueProjectId();
193+
final var linkedProjectId = randomUniqueProjectId();
194+
final var alias = randomAlphanumericOfLength(10);
195+
196+
try (
197+
var threadPool = new TestThreadPool(getClass().getName());
198+
var transportService = startTransport(threadPool);
199+
var connectionManager = new RemoteConnectionManager(
200+
alias,
201+
RemoteClusterCredentialsManager.EMPTY,
202+
new ClusterConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class), threadContext)
203+
)
204+
) {
205+
for (boolean shouldConnectFail : new boolean[] { true, false }) {
206+
for (boolean isIntialConnectAttempt : new boolean[] { true, false }) {
207+
final var strategy = new FakeConnectionStrategy(
208+
originProjectId,
209+
linkedProjectId,
210+
alias,
211+
transportService,
212+
connectionManager
213+
);
214+
if (isIntialConnectAttempt == false) {
215+
waitForConnect(strategy);
216+
}
217+
strategy.setShouldConnectFail(shouldConnectFail);
218+
final var expectedLogLevel = shouldConnectFail ? Level.WARN : Level.DEBUG;
219+
final var expectedLogMessage = Strings.format(
220+
"Origin project [%s] %s to linked project [%s] with alias [%s] on %s attempt",
221+
originProjectId,
222+
shouldConnectFail ? "failed to connect" : "successfully connected",
223+
linkedProjectId,
224+
alias,
225+
isIntialConnectAttempt ? "the initial connection" : "a reconnection"
226+
);
227+
assertThatLogger(() -> {
228+
if (shouldConnectFail) {
229+
assertThrows(RuntimeException.class, () -> waitForConnect(strategy));
230+
} else {
231+
waitForConnect(strategy);
232+
}
233+
},
234+
strategy.getClass(),
235+
new MockLog.SeenEventExpectation(
236+
"connection strategy should log at "
237+
+ expectedLogLevel
238+
+ " after a "
239+
+ (shouldConnectFail ? "failed" : "successful")
240+
+ (isIntialConnectAttempt ? " initial connection attempt" : " reconnection attempt"),
241+
strategy.getClass().getCanonicalName(),
242+
expectedLogLevel,
243+
expectedLogMessage
244+
)
245+
);
246+
}
247+
}
248+
}
249+
}
250+
251+
private MockTransportService startTransport(ThreadPool threadPool) {
252+
boolean success = false;
253+
final Settings s = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "cluster1").put("node.name", "node1").build();
254+
MockTransportService newService = MockTransportService.createNewService(
255+
s,
256+
VersionInformation.CURRENT,
257+
TransportVersion.current(),
258+
threadPool
259+
);
260+
try {
261+
newService.start();
262+
newService.acceptIncomingRequests();
263+
success = true;
264+
return newService;
265+
} finally {
266+
if (success == false) {
267+
newService.close();
268+
}
269+
}
270+
}
271+
272+
private static void waitForConnect(RemoteConnectionStrategy strategy) {
273+
PlainActionFuture<Void> connectFuture = new PlainActionFuture<>();
274+
strategy.connect(connectFuture);
275+
connectFuture.actionGet();
276+
}
277+
174278
private static class FakeConnectionStrategy extends RemoteConnectionStrategy {
175279

176280
private final ConnectionStrategy strategy;
281+
private boolean shouldConnectFail;
177282

178283
FakeConnectionStrategy(
284+
ProjectId originProjectId,
285+
ProjectId linkedProjectId,
286+
String clusterAlias,
287+
TransportService transportService,
288+
RemoteConnectionManager connectionManager
289+
) {
290+
this(
291+
originProjectId,
292+
linkedProjectId,
293+
clusterAlias,
294+
transportService,
295+
connectionManager,
296+
randomFrom(RemoteConnectionStrategy.ConnectionStrategy.values())
297+
);
298+
}
299+
300+
FakeConnectionStrategy(
301+
String clusterAlias,
302+
TransportService transportService,
303+
RemoteConnectionManager connectionManager,
304+
RemoteConnectionStrategy.ConnectionStrategy strategy
305+
) {
306+
this(ProjectId.DEFAULT, ProjectId.DEFAULT, clusterAlias, transportService, connectionManager, strategy);
307+
}
308+
309+
FakeConnectionStrategy(
310+
ProjectId originProjectId,
311+
ProjectId linkedProjectId,
179312
String clusterAlias,
180313
TransportService transportService,
181314
RemoteConnectionManager connectionManager,
182315
RemoteConnectionStrategy.ConnectionStrategy strategy
183316
) {
184317
super(switch (strategy) {
185-
case PROXY -> new LinkedProjectConfig.ProxyLinkedProjectConfigBuilder(clusterAlias).build();
186-
case SNIFF -> new LinkedProjectConfig.SniffLinkedProjectConfigBuilder(clusterAlias).build();
318+
case PROXY -> new LinkedProjectConfig.ProxyLinkedProjectConfigBuilder(originProjectId, linkedProjectId, clusterAlias)
319+
.build();
320+
case SNIFF -> new LinkedProjectConfig.SniffLinkedProjectConfigBuilder(originProjectId, linkedProjectId, clusterAlias)
321+
.build();
187322
}, transportService, connectionManager);
188323
this.strategy = strategy;
324+
this.shouldConnectFail = false;
325+
}
326+
327+
void setShouldConnectFail(boolean shouldConnectFail) {
328+
this.shouldConnectFail = shouldConnectFail;
189329
}
190330

191331
@Override
@@ -205,7 +345,11 @@ protected boolean shouldOpenMoreConnections() {
205345

206346
@Override
207347
protected void connectImpl(ActionListener<Void> listener) {
208-
348+
if (shouldConnectFail) {
349+
listener.onFailure(new RuntimeException("simulated failure"));
350+
} else {
351+
listener.onResponse(null);
352+
}
209353
}
210354

211355
@Override

0 commit comments

Comments
 (0)