Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import co.elastic.otel.dynamicconfig.BlockableLogRecordExporter;
import co.elastic.otel.dynamicconfig.BlockableMetricExporter;
import co.elastic.otel.dynamicconfig.BlockableSpanExporter;
import co.elastic.otel.dynamicconfig.DynamicConfiguration;
import co.elastic.otel.dynamicconfig.DynamicInstrumentation;
import co.elastic.otel.dynamicconfig.CentralConfig;
import com.google.auto.service.AutoService;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizer;
Expand Down Expand Up @@ -68,13 +67,13 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {

autoConfiguration.addPropertiesCustomizer(
ElasticAutoConfigurationCustomizerProvider::propertiesCustomizer);
autoConfiguration.addResourceCustomizer(resourceProviders());
// make sure this comes after anything that might set the service name
autoConfiguration.addTracerProviderCustomizer(
(providerBuilder, properties) -> {
DynamicInstrumentation.setTracerConfigurator(
providerBuilder, DynamicConfiguration.UpdatableConfigurator.INSTANCE);
CentralConfig.init(providerBuilder, properties);
return providerBuilder;
});
autoConfiguration.addResourceCustomizer(resourceProviders());
}

private void configureExporterUserAgentHeaders(AutoConfigurationCustomizer autoConfiguration) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package co.elastic.otel.dynamicconfig;

import co.elastic.opamp.client.CentralConfigurationManager;
import co.elastic.opamp.client.CentralConfigurationProcessor;
import co.elastic.otel.logging.AgentLog;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class CentralConfig {
private static final Logger logger = Logger.getLogger(CentralConfig.class.getName());

static {
DynamicConfigurationPropertyChecker.startCheckerThread();
}

public static void init(SdkTracerProviderBuilder providerBuilder, ConfigProperties properties) {
// TODO flip default when EDOT collector supports op amp
boolean startOpAmp = properties.getBoolean("elastic.otel.opamp.start", false);
if (!startOpAmp) {
return;
}
String serviceName = getServiceName(properties);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't take service names derived from ResourceProviders into account, will it?
If not, we should probably add a TODO here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will if the resource provider sets the resource "service.name" entry before this executes (which should be the case) - is there another case that needs handling? I'm not sure about whether the resource providers can set it elsewhere

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was asking because getServiceName(properties) only reads from the ConfigurationProperties and not potential changes from ResourceProviders which don't write those, but directly set the Resource, e.g. AppServiceNameProvider.

But maybe there is some magic behind the curtains making this work? So if you tested it this is fine, otherwise we should add a TODO

// TODO agree on polling interval property name
int pollingInterval = properties.getInt("elastic.otel.opamp.polling.interval_in_seconds", 30);
// TODO derive default endpoint from main endpoint when EDOT collector endpoint is stable
String endpoint = properties.getString("elastic.otel.opamp.endpoint", "http://localhost:4320");
if (!endpoint.endsWith("v1/opamp")) {
if (endpoint.endsWith("/")) {
endpoint += "v1/opamp";
} else {
endpoint += "/v1/opamp";
}
}
logger.info("============= Starting OpAmp client for: " + serviceName);
DynamicInstrumentation.setTracerConfigurator(
providerBuilder, DynamicConfiguration.UpdatableConfigurator.INSTANCE);
CentralConfigurationManager centralConfigurationManager =
CentralConfigurationManager.builder()
.setServiceName(serviceName)
.setPollingInterval(Duration.ofSeconds(pollingInterval))
.setConfigurationEndpoint(endpoint)
.build();

centralConfigurationManager.start(
configuration -> {
logger.fine("Received configuration: " + configuration);
Configs.applyConfigurations(configuration);
return CentralConfigurationProcessor.Result.SUCCESS;
});

Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
logger.info("=========== Shutting down OpAmp client for: " + serviceName);
centralConfigurationManager.stop();
}));
}

private static String getServiceName(ConfigProperties properties) {
String serviceName = properties.getString("otel.service.name");
if (serviceName != null) {
return serviceName;
}
Map<String, String> resourceMap = properties.getMap("otel.resource.attributes");
if (resourceMap != null) {
serviceName = resourceMap.get("service.name");
if (serviceName != null) {
return serviceName;
}
}
return "unknown_service"; // Specified default
}

public static class Configs {
private static final Map<String, ConfigOption> configNameToConfig;
private static final Set<String> currentNonDefaultConfigsApplied = new HashSet<>();

static {
configNameToConfig =
Stream.of(
new SendLogs(),
new SendMetrics(),
new SendTraces(),
new DeactivateAllInstrumentations(),
new DeactivateInstrumentations(),
new LoggingLevel())
.collect(Collectors.toMap(ConfigOption::getConfigName, option -> option));
}

public static synchronized void applyConfigurations(Map<String, String> configuration) {
Set<String> copyOfCurrentNonDefaultConfigsApplied =
new HashSet<>(currentNonDefaultConfigsApplied);
configuration.forEach(
(configurationName, configurationValue) -> {
copyOfCurrentNonDefaultConfigsApplied.remove(configurationName);
applyConfiguration(configurationName, configurationValue);
currentNonDefaultConfigsApplied.add(configurationName);
});
if (!copyOfCurrentNonDefaultConfigsApplied.isEmpty()) {
// We have configs that were applied previously but have now been set back to default and
// have been removed from the configs being sent - so for all of these we need to set the
// config back to default
for (String configurationName : copyOfCurrentNonDefaultConfigsApplied) {
applyDefaultConfiguration(configurationName);
currentNonDefaultConfigsApplied.remove(configurationName);
}
}
}

public static void applyDefaultConfiguration(String configurationName) {
configNameToConfig.get(configurationName).updateToDefault();
}

public static void applyConfiguration(String configurationName, String configurationValue) {
if (configNameToConfig.containsKey(configurationName)) {
configNameToConfig.get(configurationName).updateOrLog(configurationValue);
} else {
logger.warning(
"Ignoring unknown confguration option: '"
+ configurationName
+ "' with value: "
+ configurationValue);
}
}
}

public abstract static class ConfigOption {
protected final String configName;
protected final String defaultConfigStringValue;

protected ConfigOption(String configName1, String defaultConfigStringValue1) {
configName = configName1;
defaultConfigStringValue = defaultConfigStringValue1;
}

public String getConfigName() {
return configName;
}

protected boolean getBoolean(String configurationValue) {
String error =
"'"
+ getConfigName()
+ "' configuration option can only be 'true' or 'false' but is: {0}";
return getBoolean(configurationValue, error);
}

protected boolean getBoolean(String configurationValue, String error) {
if ("true".equalsIgnoreCase(configurationValue)) {
return true;
} else if ("false".equalsIgnoreCase(configurationValue)) {
return false;
} else {
throw new IllegalArgumentException(MessageFormat.format(error, configurationValue));
}
}

public void updateOrLog(String configurationValue) {
try {
update(configurationValue);
} catch (IllegalArgumentException e) {
logger.warning(e.getMessage());
}
}

abstract void update(String configurationValue) throws IllegalArgumentException;

public void updateToDefault() {
update(defaultConfigStringValue);
}

protected DynamicConfiguration config() {
return DynamicConfiguration.getInstance();
}
}

public static final class SendLogs extends ConfigOption {
SendLogs() {
super("send_logs", "true");
}

@Override
void update(String configurationValue) throws IllegalArgumentException {
config().setSendingLogs(getBoolean(configurationValue));
}
}

public static final class SendMetrics extends ConfigOption {
SendMetrics() {
super("send_metrics", "true");
}

@Override
void update(String configurationValue) throws IllegalArgumentException {
config().setSendingMetrics(getBoolean(configurationValue));
}
}

public static final class SendTraces extends ConfigOption {
SendTraces() {
super("send_traces", "true");
}

@Override
void update(String configurationValue) throws IllegalArgumentException {
config().setSendingSpans(getBoolean(configurationValue));
}
}

public static final class DeactivateAllInstrumentations extends ConfigOption {
DeactivateAllInstrumentations() {
super("deactivate_all_instrumentations", "false");
}

@Override
void update(String configurationValue) throws IllegalArgumentException {
if (getBoolean(configurationValue)) {
config().deactivateAllInstrumentations();
} else {
config().reactivateAllInstrumentations();
}
}
}

public static final class DeactivateInstrumentations extends ConfigOption {
DeactivateInstrumentations() {
super("deactivate_instrumentations", "");
}

@Override
void update(String configurationValue) throws IllegalArgumentException {
config().deactivateInstrumentations(configurationValue);
}
}

public static final class LoggingLevel extends ConfigOption {
LoggingLevel() {
super("logging_level", "");
}

@Override
void update(String configurationValue) throws IllegalArgumentException {
AgentLog.setLevel(configurationValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,28 +136,28 @@ public void restartAllSending() {
}
}

public void reenableTracesFor(String instrumentationName) {
private void reactivateInstrumentation(String instrumentationName) {
UpdatableConfigurator.INSTANCE.put(
InstrumentationScopeInfo.create(INSTRUMENTATION_NAME_PREPEND + instrumentationName),
TracerConfig.enabled());
setProviderTracerConfigurator(
GlobalOpenTelemetry.getTracerProvider(), UpdatableConfigurator.INSTANCE);
}

public void disableTracesFor(String instrumentationName) {
private void deactivateInstrumentation(String instrumentationName) {
UpdatableConfigurator.INSTANCE.put(
InstrumentationScopeInfo.create(INSTRUMENTATION_NAME_PREPEND + instrumentationName),
TracerConfig.disabled());
setProviderTracerConfigurator(
GlobalOpenTelemetry.getTracerProvider(), UpdatableConfigurator.INSTANCE);
}

public void disableAllTraces() {
disableTracesFor(ALL_INSTRUMENTATION);
public void deactivateAllInstrumentations() {
deactivateInstrumentation(ALL_INSTRUMENTATION);
}

public void stopDisablingAllTraces() {
reenableTracesFor(ALL_INSTRUMENTATION);
public void reactivateAllInstrumentations() {
reactivateInstrumentation(ALL_INSTRUMENTATION);
}

// okay to synchronize as this should only be called after multi-second intervals and
Expand All @@ -180,7 +180,7 @@ public synchronized void deactivateInstrumentations(String deactivateList) {
// Applying (1) - keySet.remove() is a valid concurrent mutation here within the loop
Set<String> keySet = alreadyDeactivated.keySet();
for (String instrumentation : keySet) {
DynamicConfiguration.getInstance().reenableTracesFor(instrumentation);
DynamicConfiguration.getInstance().reactivateInstrumentation(instrumentation);
keySet.remove(instrumentation);
}
} else {
Expand Down Expand Up @@ -225,11 +225,11 @@ public Deactivations(Set<String> deactivateList, Set<String> alreadyDeactivated)

public void applyDeactivations(ConcurrentMap<String, Boolean> alreadyDeactivated) {
for (String instrumentation : instrumentationsToReactivate) {
DynamicConfiguration.getInstance().reenableTracesFor(instrumentation);
DynamicConfiguration.getInstance().reactivateInstrumentation(instrumentation);
alreadyDeactivated.remove(instrumentation);
}
for (String instrumentation : instrumentationsToDeactivate) {
DynamicConfiguration.getInstance().disableTracesFor(instrumentation);
DynamicConfiguration.getInstance().deactivateInstrumentation(instrumentation);
alreadyDeactivated.put(instrumentation, Boolean.TRUE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,4 @@ public static TracerConfig setProviderTracerConfigurator(
"Expected SdkTracerProvider but got " + provider.getClass().getName());
}
}

static {
// will refactor this when DynamicInstrumentation class becomes mostly empty
DynamicConfigurationPropertyChecker.startCheckerThread();
}
}
Loading
Loading