Skip to content

Commit 3306214

Browse files
JeremyDahlgrenKubik42
authored andcommitted
Add connection failure metrics in RemoteConnectionStrategy (elastic#137406)
This change registers a counter to track initial and reconnect attempt failures. The change also required minor refactoring to make the metrics registry available from the TransportService that is passed to the RemoteClusterService constructor. This change builds on the work done in elastic#134415. Resolves: ES-12695
1 parent a4ec365 commit 3306214

File tree

8 files changed

+104
-10
lines changed

8 files changed

+104
-10
lines changed

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1154,7 +1154,7 @@ public Map<String, String> queryFields() {
11541154
localNodeFactory,
11551155
settingsModule.getClusterSettings(),
11561156
taskManager,
1157-
telemetryProvider.getTracer(),
1157+
telemetryProvider,
11581158
nodeEnvironment.nodeId(),
11591159
linkedProjectConfigService,
11601160
projectResolver

server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.search.SearchService;
4141
import org.elasticsearch.search.fetch.FetchPhase;
4242
import org.elasticsearch.tasks.TaskManager;
43+
import org.elasticsearch.telemetry.TelemetryProvider;
4344
import org.elasticsearch.telemetry.tracing.Tracer;
4445
import org.elasticsearch.threadpool.ThreadPool;
4546
import org.elasticsearch.transport.ClusterConnectionManager;
@@ -120,7 +121,7 @@ TransportService newTransportService(
120121
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
121122
ClusterSettings clusterSettings,
122123
TaskManager taskManager,
123-
Tracer tracer,
124+
TelemetryProvider telemetryProvider,
124125
String nodeId,
125126
LinkedProjectConfigService linkedProjectConfigService,
126127
ProjectResolver projectResolver
@@ -135,6 +136,7 @@ TransportService newTransportService(
135136
new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()),
136137
taskManager,
137138
linkedProjectConfigService,
139+
telemetryProvider,
138140
projectResolver
139141
);
140142
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public final class RemoteClusterService extends RemoteClusterAware
6565
private static final Logger logger = LogManager.getLogger(RemoteClusterService.class);
6666

6767
public static final String REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME = "cluster:internal/remote_cluster/handshake";
68+
public static final String CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME = "es.projects.linked.connections.error.total";
6869

6970
private final boolean isRemoteClusterClient;
7071
private final boolean isSearchNode;
@@ -101,6 +102,11 @@ public boolean isRemoteClusterServerEnabled() {
101102
* the functionality to do it the right way is not yet ready -- replace this code when it's ready.
102103
*/
103104
this.crossProjectEnabled = settings.getAsBoolean("serverless.cross_project.enabled", false);
105+
if (transportService != null) {
106+
transportService.getTelemetryProvider()
107+
.getMeterRegistry()
108+
.registerLongCounter(CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME, "linked project connection attempt failure count", "count");
109+
}
104110
}
105111

106112
public RemoteClusterCredentialsManager getRemoteClusterCredentialsManager() {

server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2121
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2222
import org.elasticsearch.core.Nullable;
23+
import org.elasticsearch.telemetry.TelemetryProvider;
24+
import org.elasticsearch.telemetry.metric.LongCounter;
2325
import org.elasticsearch.threadpool.ThreadPool;
2426

2527
import java.io.Closeable;
@@ -30,6 +32,7 @@
3032
import java.util.ArrayList;
3133
import java.util.Collections;
3234
import java.util.List;
35+
import java.util.Map;
3336
import java.util.Objects;
3437
import java.util.concurrent.ExecutorService;
3538
import java.util.concurrent.atomic.AtomicBoolean;
@@ -70,6 +73,11 @@ public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
7073
}
7174
}
7275

76+
enum ConnectionAttempt {
77+
initial,
78+
reconnect
79+
}
80+
7381
private final int maxPendingConnectionListeners;
7482

7583
protected final Logger logger = LogManager.getLogger(getClass());
@@ -78,6 +86,7 @@ public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
7886
private final Object mutex = new Object();
7987
private List<ActionListener<Void>> listeners = new ArrayList<>();
8088
private final AtomicBoolean initialConnectionAttempted = new AtomicBoolean(false);
89+
private final LongCounter connectionAttemptFailures;
8190

8291
protected final TransportService transportService;
8392
protected final RemoteConnectionManager connectionManager;
@@ -92,9 +101,15 @@ public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
92101
this.transportService = transportService;
93102
this.connectionManager = connectionManager;
94103
this.maxPendingConnectionListeners = config.maxPendingConnectionListeners();
104+
this.connectionAttemptFailures = lookupConnectionFailureMetric(transportService.getTelemetryProvider());
95105
connectionManager.addListener(this);
96106
}
97107

108+
private LongCounter lookupConnectionFailureMetric(TelemetryProvider telemetryProvider) {
109+
final var meterRegistry = telemetryProvider == null ? null : telemetryProvider.getMeterRegistry();
110+
return meterRegistry == null ? null : meterRegistry.getLongCounter(RemoteClusterService.CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME);
111+
}
112+
98113
static ConnectionProfile buildConnectionProfile(LinkedProjectConfig config, String transportProfile) {
99114
ConnectionProfile.Builder builder = new ConnectionProfile.Builder().setConnectTimeout(config.transportConnectTimeout())
100115
.setHandshakeTimeout(config.transportConnectTimeout())
@@ -221,7 +236,21 @@ private void connectionAttemptCompleted(@Nullable Exception e) {
221236
logger.debug(msgSupplier);
222237
} else {
223238
logger.warn(msgSupplier, e);
224-
// TODO: ES-12695: Increment either the initial or retry connection failure metric.
239+
if (connectionAttemptFailures != null) {
240+
connectionAttemptFailures.incrementBy(
241+
1,
242+
Map.of(
243+
"linked_project_id",
244+
linkedProjectId.toString(),
245+
"linked_project_alias",
246+
clusterAlias,
247+
"attempt",
248+
(isInitialAttempt ? ConnectionAttempt.initial : ConnectionAttempt.reconnect).toString(),
249+
"strategy",
250+
strategyType().toString()
251+
)
252+
);
253+
}
225254
}
226255
}
227256

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.elasticsearch.node.ReportingService;
5151
import org.elasticsearch.tasks.Task;
5252
import org.elasticsearch.tasks.TaskManager;
53+
import org.elasticsearch.telemetry.TelemetryProvider;
5354
import org.elasticsearch.threadpool.Scheduler;
5455
import org.elasticsearch.threadpool.ThreadPool;
5556

@@ -140,6 +141,7 @@ protected boolean removeEldestEntry(Map.Entry<Long, TimeoutInfoHolder> eldest) {
140141
volatile String[] tracerLogExclude;
141142

142143
private final LinkedProjectConfigService linkedProjectConfigService;
144+
private final TelemetryProvider telemetryProvider;
143145
private final RemoteClusterService remoteClusterService;
144146

145147
/**
@@ -277,6 +279,7 @@ public TransportService(
277279
connectionManager,
278280
taskManger,
279281
new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, DefaultProjectResolver.INSTANCE),
282+
TelemetryProvider.NOOP,
280283
DefaultProjectResolver.INSTANCE
281284
);
282285
}
@@ -292,6 +295,7 @@ public TransportService(
292295
ConnectionManager connectionManager,
293296
TaskManager taskManger,
294297
LinkedProjectConfigService linkedProjectConfigService,
298+
TelemetryProvider telemetryProvider,
295299
ProjectResolver projectResolver
296300
) {
297301
this.transport = transport;
@@ -308,6 +312,7 @@ public TransportService(
308312
this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings);
309313
this.enableStackOverflowAvoidance = ENABLE_STACK_OVERFLOW_AVOIDANCE.get(settings);
310314
this.linkedProjectConfigService = linkedProjectConfigService;
315+
this.telemetryProvider = telemetryProvider;
311316
remoteClusterService = new RemoteClusterService(settings, this, projectResolver);
312317
responseHandlers = transport.getResponseHandlers();
313318
if (clusterSettings != null) {
@@ -354,6 +359,10 @@ void setTracerLogExclude(List<String> tracerLogExclude) {
354359
this.tracerLogExclude = tracerLogExclude.toArray(Strings.EMPTY_ARRAY);
355360
}
356361

362+
public TelemetryProvider getTelemetryProvider() {
363+
return telemetryProvider;
364+
}
365+
357366
@Override
358367
protected void doStart() {
359368
transport.setMessageListener(this);

server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.elasticsearch.common.util.concurrent.ThreadContext;
2121
import org.elasticsearch.core.Strings;
2222
import org.elasticsearch.core.TimeValue;
23+
import org.elasticsearch.telemetry.InstrumentType;
24+
import org.elasticsearch.telemetry.RecordingMeterRegistry;
2325
import org.elasticsearch.test.ESTestCase;
2426
import org.elasticsearch.test.EnumSerializationTestUtils;
2527
import org.elasticsearch.test.MockLog;
@@ -28,12 +30,15 @@
2830
import org.elasticsearch.threadpool.TestThreadPool;
2931
import org.elasticsearch.threadpool.ThreadPool;
3032

33+
import java.util.Set;
34+
3135
import static org.elasticsearch.test.MockLog.assertThatLogger;
3236
import static org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings.PROXY_ADDRESS;
3337
import static org.elasticsearch.transport.RemoteClusterSettings.REMOTE_CONNECTION_MODE;
3438
import static org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS;
3539
import static org.elasticsearch.transport.RemoteClusterSettings.toConfig;
3640
import static org.elasticsearch.transport.RemoteConnectionStrategy.buildConnectionProfile;
41+
import static org.hamcrest.Matchers.equalTo;
3742
import static org.mockito.Mockito.mock;
3843

3944
public class RemoteConnectionStrategyTests extends ESTestCase {
@@ -194,7 +199,7 @@ public void testConnectionStrategySerialization() {
194199
value = "org.elasticsearch.transport.RemoteConnectionStrategyTests.FakeConnectionStrategy:DEBUG",
195200
reason = "logging verification"
196201
)
197-
public void testConnectionAttemptLogging() {
202+
public void testConnectionAttemptMetricsAndLogging() {
198203
final var originProjectId = randomUniqueProjectId();
199204
final var linkedProjectId = randomUniqueProjectId();
200205
final var alias = randomAlphanumericOfLength(10);
@@ -208,16 +213,21 @@ public void testConnectionAttemptLogging() {
208213
new ClusterConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class), threadContext)
209214
)
210215
) {
216+
assert transportService.getTelemetryProvider() != null;
217+
final var meterRegistry = transportService.getTelemetryProvider().getMeterRegistry();
218+
assert meterRegistry instanceof RecordingMeterRegistry;
219+
final var metricRecorder = ((RecordingMeterRegistry) meterRegistry).getRecorder();
220+
211221
for (boolean shouldConnectFail : new boolean[] { true, false }) {
212-
for (boolean isIntialConnectAttempt : new boolean[] { true, false }) {
222+
for (boolean isInitialConnectAttempt : new boolean[] { true, false }) {
213223
final var strategy = new FakeConnectionStrategy(
214224
originProjectId,
215225
linkedProjectId,
216226
alias,
217227
transportService,
218228
connectionManager
219229
);
220-
if (isIntialConnectAttempt == false) {
230+
if (isInitialConnectAttempt == false) {
221231
waitForConnect(strategy);
222232
}
223233
strategy.setShouldConnectFail(shouldConnectFail);
@@ -228,7 +238,7 @@ public void testConnectionAttemptLogging() {
228238
shouldConnectFail ? "failed to connect" : "successfully connected",
229239
linkedProjectId,
230240
alias,
231-
isIntialConnectAttempt ? "the initial connection" : "a reconnection"
241+
isInitialConnectAttempt ? "the initial connection" : "a reconnection"
232242
);
233243
assertThatLogger(() -> {
234244
if (shouldConnectFail) {
@@ -243,12 +253,30 @@ public void testConnectionAttemptLogging() {
243253
+ expectedLogLevel
244254
+ " after a "
245255
+ (shouldConnectFail ? "failed" : "successful")
246-
+ (isIntialConnectAttempt ? " initial connection attempt" : " reconnection attempt"),
256+
+ (isInitialConnectAttempt ? " initial connection attempt" : " reconnection attempt"),
247257
strategy.getClass().getCanonicalName(),
248258
expectedLogLevel,
249259
expectedLogMessage
250260
)
251261
);
262+
if (shouldConnectFail) {
263+
metricRecorder.collect();
264+
final var counterName = RemoteClusterService.CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME;
265+
final var measurements = metricRecorder.getMeasurements(InstrumentType.LONG_COUNTER, counterName);
266+
assertFalse(measurements.isEmpty());
267+
final var measurement = measurements.getLast();
268+
assertThat(measurement.getLong(), equalTo(1L));
269+
final var attributes = measurement.attributes();
270+
final var keySet = Set.of("linked_project_id", "linked_project_alias", "attempt", "strategy");
271+
final var expectedAttemptType = isInitialConnectAttempt
272+
? RemoteConnectionStrategy.ConnectionAttempt.initial
273+
: RemoteConnectionStrategy.ConnectionAttempt.reconnect;
274+
assertThat(attributes.keySet(), equalTo(keySet));
275+
assertThat(attributes.get("linked_project_id"), equalTo(linkedProjectId.toString()));
276+
assertThat(attributes.get("linked_project_alias"), equalTo(alias));
277+
assertThat(attributes.get("attempt"), equalTo(expectedAttemptType.toString()));
278+
assertThat(attributes.get("strategy"), equalTo(strategy.strategyType().toString()));
279+
}
252280
}
253281
}
254282
}

test/framework/src/main/java/org/elasticsearch/node/MockNode.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.elasticsearch.search.SearchService;
4646
import org.elasticsearch.search.fetch.FetchPhase;
4747
import org.elasticsearch.tasks.TaskManager;
48+
import org.elasticsearch.telemetry.TelemetryProvider;
4849
import org.elasticsearch.telemetry.tracing.Tracer;
4950
import org.elasticsearch.test.ESTestCase;
5051
import org.elasticsearch.test.MockHttpTransport;
@@ -174,7 +175,7 @@ protected TransportService newTransportService(
174175
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
175176
ClusterSettings clusterSettings,
176177
TaskManager taskManager,
177-
Tracer tracer,
178+
TelemetryProvider telemetryProvider,
178179
String nodeId,
179180
LinkedProjectConfigService linkedProjectConfigService,
180181
ProjectResolver projectResolver
@@ -194,7 +195,7 @@ protected TransportService newTransportService(
194195
localNodeFactory,
195196
clusterSettings,
196197
taskManager,
197-
tracer,
198+
telemetryProvider,
198199
nodeId,
199200
linkedProjectConfigService,
200201
projectResolver
@@ -209,6 +210,7 @@ protected TransportService newTransportService(
209210
clusterSettings,
210211
MockTransportService.createTaskManager(settings, threadPool, taskManager.getTaskHeaders(), Tracer.NOOP, nodeId),
211212
linkedProjectConfigService,
213+
telemetryProvider,
212214
projectResolver
213215
);
214216
}

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@
4646
import org.elasticsearch.plugins.Plugin;
4747
import org.elasticsearch.search.SearchModule;
4848
import org.elasticsearch.tasks.TaskManager;
49+
import org.elasticsearch.telemetry.RecordingMeterRegistry;
50+
import org.elasticsearch.telemetry.TelemetryProvider;
51+
import org.elasticsearch.telemetry.metric.MeterRegistry;
4952
import org.elasticsearch.telemetry.tracing.Tracer;
5053
import org.elasticsearch.test.ESIntegTestCase;
5154
import org.elasticsearch.test.ESTestCase;
@@ -264,6 +267,19 @@ public MockTransportService(
264267
clusterSettings,
265268
createTaskManager(settings, threadPool, taskHeaders, Tracer.NOOP, nodeId),
266269
new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, DefaultProjectResolver.INSTANCE),
270+
new TelemetryProvider() {
271+
final MeterRegistry meterRegistry = new RecordingMeterRegistry();
272+
273+
@Override
274+
public Tracer getTracer() {
275+
return Tracer.NOOP;
276+
}
277+
278+
@Override
279+
public MeterRegistry getMeterRegistry() {
280+
return meterRegistry;
281+
}
282+
},
267283
DefaultProjectResolver.INSTANCE
268284
);
269285
}
@@ -277,6 +293,7 @@ public MockTransportService(
277293
@Nullable ClusterSettings clusterSettings,
278294
TaskManager taskManager,
279295
LinkedProjectConfigService linkedProjectConfigService,
296+
TelemetryProvider telemetryProvider,
280297
ProjectResolver projectResolver
281298
) {
282299
super(
@@ -289,6 +306,7 @@ public MockTransportService(
289306
new StubbableConnectionManager(new ClusterConnectionManager(settings, transport, threadPool.getThreadContext())),
290307
taskManager,
291308
linkedProjectConfigService,
309+
telemetryProvider,
292310
projectResolver
293311
);
294312
this.original = transport.getDelegate();

0 commit comments

Comments
 (0)