Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,7 @@ public Map<String, String> queryFields() {
localNodeFactory,
settingsModule.getClusterSettings(),
taskManager,
telemetryProvider.getTracer(),
telemetryProvider,
nodeEnvironment.nodeId(),
linkedProjectConfigService,
projectResolver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ClusterConnectionManager;
Expand Down Expand Up @@ -120,7 +121,7 @@ TransportService newTransportService(
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings,
TaskManager taskManager,
Tracer tracer,
TelemetryProvider telemetryProvider,
String nodeId,
LinkedProjectConfigService linkedProjectConfigService,
ProjectResolver projectResolver
Expand All @@ -135,6 +136,7 @@ TransportService newTransportService(
new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()),
taskManager,
linkedProjectConfigService,
telemetryProvider,
projectResolver
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
Expand Down Expand Up @@ -79,6 +81,11 @@ public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
private List<ActionListener<Void>> listeners = new ArrayList<>();
private final AtomicBoolean initialConnectionAttempted = new AtomicBoolean(false);

static final String INITIAL_CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME = "es.projects.linked.connections.initial.error.total";
static final String RECONNECTION_ATTEMPT_FAILURES_COUNTER_NAME = "es.projects.linked.connections.reconnect.error.total";
private static LongCounter initialConnectionAttemptFailures;
private static LongCounter reconnectAttemptFailures;

protected final TransportService transportService;
protected final RemoteConnectionManager connectionManager;
protected final ProjectId originProjectId;
Expand All @@ -92,9 +99,27 @@ public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
this.transportService = transportService;
this.connectionManager = connectionManager;
this.maxPendingConnectionListeners = config.maxPendingConnectionListeners();
registerMetrics(transportService.getTelemetryProvider());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metrics registration should be node wide, not per Java instance. I suggest we register the metrics in RemoteClusterService instead and you can access them here by either passing them directly or use meterRegistry.getMeterRegistry(String name).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started refactoring to registering in RemoteClusterService, but realized the result was turning out the same, a single static instance initialized in a static synchronized method. Also we would be leaking knowledge of the metrics being used down in the strategy class. Aren't the single static instances in RemoteConnectionStrategy already node wide? Perhaps I'm misunderstanding what you mean by 'node wide' and 'per Java instance'. The way it is coded there are only two static counters, registered once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way it is coded there are only two static counters, registered once.

You are right that they are registered once as two static fields. But we should not need these static fields and the synchronized initialization. It is more idmoatic to register the metrics once at a common place instead of attempting registration by each individual object. For example, we register repository related metrics once in RepositoriesModule instead of inside each individual repository.

Static fields also do not work well with internal cluster tests where multiple nodes are running in the same Java process. We want most objects, including MetricRegistry and its registration, to be per ES node. Static fields break this encapsulation because they are shared by multiple ES nodes in the test cluster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, makes sense now, thank you for clearing this up Yang, forgive me for it not clicking before. I refactored to register the single metric in RemoteClusterService. I needed to refactor the unit tests for RemoteClusterService since they were creating an unnecessary RemoteClusterService instance, which would cause duplicate metric registration. Since this is a big change and separate from the focus of this PR I split it off into #137647.

connectionManager.addListener(this);
}

private static synchronized void registerMetrics(TelemetryProvider telemetryProvider) {
final var meterRegistry = telemetryProvider == null ? null : telemetryProvider.getMeterRegistry();
if (initialConnectionAttemptFailures != null || meterRegistry == null) {
return;
}
initialConnectionAttemptFailures = meterRegistry.registerLongCounter(
INITIAL_CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME,
"linked project initial connection attempt failure count",
"count"
);
reconnectAttemptFailures = meterRegistry.registerLongCounter(
RECONNECTION_ATTEMPT_FAILURES_COUNTER_NAME,
"linked project reconnection attempt failure count",
"count"
);
}

static ConnectionProfile buildConnectionProfile(LinkedProjectConfig config, String transportProfile) {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder().setConnectTimeout(config.transportConnectTimeout())
.setHandshakeTimeout(config.transportConnectTimeout())
Expand Down Expand Up @@ -221,7 +246,8 @@ private void connectionAttemptCompleted(@Nullable Exception e) {
logger.debug(msgSupplier);
} else {
logger.warn(msgSupplier, e);
// TODO: ES-12695: Increment either the initial or retry connection failure metric.
final var counter = isInitialAttempt ? initialConnectionAttemptFailures : reconnectAttemptFailures;
counter.increment();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should lablel the metric with information like target project alias etc. with counter.incrementBy(long, Map).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Yang, I added linked_project_id and linked_project_alias. Are there others you had in mind?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We can also have a label (an enum) to differentiate between initial connection and reconnection so that we need only a single APM metric.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use a label for ConnectionStrategy as well. Though CPS only uses proxy, this metric is in a more general place that can be used in other environments.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.node.ReportingService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -140,6 +141,7 @@ protected boolean removeEldestEntry(Map.Entry<Long, TimeoutInfoHolder> eldest) {
volatile String[] tracerLogExclude;

private final LinkedProjectConfigService linkedProjectConfigService;
private final TelemetryProvider telemetryProvider;
private final RemoteClusterService remoteClusterService;

/**
Expand Down Expand Up @@ -277,6 +279,7 @@ public TransportService(
connectionManager,
taskManger,
new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, DefaultProjectResolver.INSTANCE),
TelemetryProvider.NOOP,
DefaultProjectResolver.INSTANCE
);
}
Expand All @@ -292,6 +295,7 @@ public TransportService(
ConnectionManager connectionManager,
TaskManager taskManger,
LinkedProjectConfigService linkedProjectConfigService,
TelemetryProvider telemetryProvider,
ProjectResolver projectResolver
) {
this.transport = transport;
Expand All @@ -308,6 +312,7 @@ public TransportService(
this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings);
this.enableStackOverflowAvoidance = ENABLE_STACK_OVERFLOW_AVOIDANCE.get(settings);
this.linkedProjectConfigService = linkedProjectConfigService;
this.telemetryProvider = telemetryProvider;
remoteClusterService = new RemoteClusterService(settings, this, projectResolver);
responseHandlers = transport.getResponseHandlers();
if (clusterSettings != null) {
Expand Down Expand Up @@ -354,6 +359,10 @@ void setTracerLogExclude(List<String> tracerLogExclude) {
this.tracerLogExclude = tracerLogExclude.toArray(Strings.EMPTY_ARRAY);
}

public TelemetryProvider getTelemetryProvider() {
return telemetryProvider;
}

@Override
protected void doStart() {
transport.setMessageListener(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.telemetry.InstrumentType;
import org.elasticsearch.telemetry.RecordingMeterRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EnumSerializationTestUtils;
import org.elasticsearch.test.MockLog;
Expand All @@ -34,6 +36,8 @@
import static org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS;
import static org.elasticsearch.transport.RemoteClusterSettings.toConfig;
import static org.elasticsearch.transport.RemoteConnectionStrategy.buildConnectionProfile;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Mockito.mock;

public class RemoteConnectionStrategyTests extends ESTestCase {
Expand Down Expand Up @@ -194,7 +198,7 @@ public void testConnectionStrategySerialization() {
value = "org.elasticsearch.transport.RemoteConnectionStrategyTests.FakeConnectionStrategy:DEBUG",
reason = "logging verification"
)
public void testConnectionAttemptLogging() {
public void testConnectionAttemptMetricsAndLogging() {
final var originProjectId = randomUniqueProjectId();
final var linkedProjectId = randomUniqueProjectId();
final var alias = randomAlphanumericOfLength(10);
Expand All @@ -208,16 +212,21 @@ public void testConnectionAttemptLogging() {
new ClusterConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class), threadContext)
)
) {
assert transportService.getTelemetryProvider() != null;
final var meterRegistry = transportService.getTelemetryProvider().getMeterRegistry();
assert meterRegistry instanceof RecordingMeterRegistry;
final var metricRecorder = ((RecordingMeterRegistry) meterRegistry).getRecorder();
Comment on lines +216 to +219
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to my other comments about static fields. If we create two TransportService instances representing two nodes and test connection strategies for each of them, the static fields will make RemoteConnectionStrategy objects from different TransportService instance share the same metrics give incorrect results.


for (boolean shouldConnectFail : new boolean[] { true, false }) {
for (boolean isIntialConnectAttempt : new boolean[] { true, false }) {
for (boolean isInitialConnectAttempt : new boolean[] { true, false }) {
final var strategy = new FakeConnectionStrategy(
originProjectId,
linkedProjectId,
alias,
transportService,
connectionManager
);
if (isIntialConnectAttempt == false) {
if (isInitialConnectAttempt == false) {
waitForConnect(strategy);
}
strategy.setShouldConnectFail(shouldConnectFail);
Expand All @@ -228,7 +237,7 @@ public void testConnectionAttemptLogging() {
shouldConnectFail ? "failed to connect" : "successfully connected",
linkedProjectId,
alias,
isIntialConnectAttempt ? "the initial connection" : "a reconnection"
isInitialConnectAttempt ? "the initial connection" : "a reconnection"
);
assertThatLogger(() -> {
if (shouldConnectFail) {
Expand All @@ -243,12 +252,21 @@ public void testConnectionAttemptLogging() {
+ expectedLogLevel
+ " after a "
+ (shouldConnectFail ? "failed" : "successful")
+ (isIntialConnectAttempt ? " initial connection attempt" : " reconnection attempt"),
+ (isInitialConnectAttempt ? " initial connection attempt" : " reconnection attempt"),
strategy.getClass().getCanonicalName(),
expectedLogLevel,
expectedLogMessage
)
);
if (shouldConnectFail) {
metricRecorder.collect();
final var counterName = isInitialConnectAttempt
? RemoteConnectionStrategy.INITIAL_CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME
: RemoteConnectionStrategy.RECONNECTION_ATTEMPT_FAILURES_COUNTER_NAME;
final var measurements = metricRecorder.getMeasurements(InstrumentType.LONG_COUNTER, counterName);
assertThat(measurements, hasSize(1));
assertThat(measurements.getFirst().getLong(), equalTo(1L));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockHttpTransport;
Expand Down Expand Up @@ -174,7 +175,7 @@ protected TransportService newTransportService(
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings,
TaskManager taskManager,
Tracer tracer,
TelemetryProvider telemetryProvider,
String nodeId,
LinkedProjectConfigService linkedProjectConfigService,
ProjectResolver projectResolver
Expand All @@ -194,7 +195,7 @@ protected TransportService newTransportService(
localNodeFactory,
clusterSettings,
taskManager,
tracer,
telemetryProvider,
nodeId,
linkedProjectConfigService,
projectResolver
Expand All @@ -209,6 +210,7 @@ protected TransportService newTransportService(
clusterSettings,
MockTransportService.createTaskManager(settings, threadPool, taskManager.getTaskHeaders(), Tracer.NOOP, nodeId),
linkedProjectConfigService,
telemetryProvider,
projectResolver
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.RecordingMeterRegistry;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -264,6 +267,19 @@ public MockTransportService(
clusterSettings,
createTaskManager(settings, threadPool, taskHeaders, Tracer.NOOP, nodeId),
new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, DefaultProjectResolver.INSTANCE),
new TelemetryProvider() {
final MeterRegistry meterRegistry = new RecordingMeterRegistry();

@Override
public Tracer getTracer() {
return Tracer.NOOP;
}

@Override
public MeterRegistry getMeterRegistry() {
return meterRegistry;
}
},
DefaultProjectResolver.INSTANCE
);
}
Expand All @@ -277,6 +293,7 @@ public MockTransportService(
@Nullable ClusterSettings clusterSettings,
TaskManager taskManager,
LinkedProjectConfigService linkedProjectConfigService,
TelemetryProvider telemetryProvider,
ProjectResolver projectResolver
) {
super(
Expand All @@ -289,6 +306,7 @@ public MockTransportService(
new StubbableConnectionManager(new ClusterConnectionManager(settings, transport, threadPool.getThreadContext())),
taskManager,
linkedProjectConfigService,
telemetryProvider,
projectResolver
);
this.original = transport.getDelegate();
Expand Down