Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package co.elastic.otel.dynamicconfig;

import co.elastic.opamp.client.CentralConfigurationManager;
import co.elastic.opamp.client.CentralConfigurationManagerImpl;
import co.elastic.opamp.client.CentralConfigurationProcessor;
import co.elastic.otel.logging.AgentLog;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder;
import java.text.MessageFormat;
import java.time.Duration;
import java.time.format.DateTimeParseException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -69,7 +71,7 @@ public static void init(SdkTracerProviderBuilder providerBuilder, ConfigProperti
centralConfigurationManager.start(
configuration -> {
logger.fine("Received configuration: " + configuration);
Configs.applyConfigurations(configuration);
Configs.applyConfigurations(configuration, centralConfigurationManager);
return CentralConfigurationProcessor.Result.SUCCESS;
});

Expand Down Expand Up @@ -121,37 +123,46 @@ public static class Configs {
new SendTraces(),
new DeactivateAllInstrumentations(),
new DeactivateInstrumentations(),
new LoggingLevel())
new LoggingLevel(),
new PollingInterval())
.collect(Collectors.toMap(ConfigOption::getConfigName, option -> option));
}

public static synchronized void applyConfigurations(Map<String, String> configuration) {
public static synchronized void applyConfigurations(
Map<String, String> configuration,
CentralConfigurationManager centralConfigurationManager) {
Set<String> copyOfCurrentNonDefaultConfigsApplied =
new HashSet<>(currentNonDefaultConfigsApplied);
configuration.forEach(
(configurationName, configurationValue) -> {
copyOfCurrentNonDefaultConfigsApplied.remove(configurationName);
applyConfiguration(configurationName, configurationValue);
applyConfiguration(configurationName, configurationValue, centralConfigurationManager);
currentNonDefaultConfigsApplied.add(configurationName);
});
if (!copyOfCurrentNonDefaultConfigsApplied.isEmpty()) {
// We have configs that were applied previously but have now been set back to default and
// have been removed from the configs being sent - so for all of these we need to set the
// config back to default
for (String configurationName : copyOfCurrentNonDefaultConfigsApplied) {
applyDefaultConfiguration(configurationName);
applyDefaultConfiguration(configurationName, centralConfigurationManager);
currentNonDefaultConfigsApplied.remove(configurationName);
}
}
}

public static void applyDefaultConfiguration(String configurationName) {
configNameToConfig.get(configurationName).updateToDefault();
public static void applyDefaultConfiguration(
String configurationName, CentralConfigurationManager centralConfigurationManager) {
configNameToConfig.get(configurationName).updateToDefault(centralConfigurationManager);
}

public static void applyConfiguration(String configurationName, String configurationValue) {
public static void applyConfiguration(
String configurationName,
String configurationValue,
CentralConfigurationManager centralConfigurationManager) {
if (configNameToConfig.containsKey(configurationName)) {
configNameToConfig.get(configurationName).updateOrLog(configurationValue);
configNameToConfig
.get(configurationName)
.updateOrLog(configurationValue, centralConfigurationManager);
} else {
logger.warning(
"Ignoring unknown confguration option: '"
Expand Down Expand Up @@ -193,18 +204,21 @@ protected boolean getBoolean(String configurationValue, String error) {
}
}

public void updateOrLog(String configurationValue) {
public void updateOrLog(
String configurationValue, CentralConfigurationManager centralConfigurationManager) {
try {
update(configurationValue);
update(configurationValue, centralConfigurationManager);
} catch (IllegalArgumentException e) {
logger.warning(e.getMessage());
}
}

abstract void update(String configurationValue) throws IllegalArgumentException;
abstract void update(
String configurationValue, CentralConfigurationManager centralConfigurationManager)
throws IllegalArgumentException;

public void updateToDefault() {
update(defaultConfigStringValue);
public void updateToDefault(CentralConfigurationManager centralConfigurationManager) {
update(defaultConfigStringValue, centralConfigurationManager);
}

protected DynamicConfiguration config() {
Expand All @@ -218,7 +232,8 @@ public static final class SendLogs extends ConfigOption {
}

@Override
void update(String configurationValue) throws IllegalArgumentException {
void update(String configurationValue, CentralConfigurationManager centralConfigurationManager)
throws IllegalArgumentException {
config().setSendingLogs(getBoolean(configurationValue));
}
}
Expand All @@ -229,7 +244,8 @@ public static final class SendMetrics extends ConfigOption {
}

@Override
void update(String configurationValue) throws IllegalArgumentException {
void update(String configurationValue, CentralConfigurationManager centralConfigurationManager)
throws IllegalArgumentException {
config().setSendingMetrics(getBoolean(configurationValue));
}
}
Expand All @@ -240,7 +256,8 @@ public static final class SendTraces extends ConfigOption {
}

@Override
void update(String configurationValue) throws IllegalArgumentException {
void update(String configurationValue, CentralConfigurationManager centralConfigurationManager)
throws IllegalArgumentException {
config().setSendingSpans(getBoolean(configurationValue));
}
}
Expand All @@ -251,7 +268,8 @@ public static final class DeactivateAllInstrumentations extends ConfigOption {
}

@Override
void update(String configurationValue) throws IllegalArgumentException {
void update(String configurationValue, CentralConfigurationManager centralConfigurationManager)
throws IllegalArgumentException {
if (getBoolean(configurationValue)) {
config().deactivateAllInstrumentations();
} else {
Expand All @@ -266,7 +284,8 @@ public static final class DeactivateInstrumentations extends ConfigOption {
}

@Override
void update(String configurationValue) throws IllegalArgumentException {
void update(String configurationValue, CentralConfigurationManager centralConfigurationManager)
throws IllegalArgumentException {
config().deactivateInstrumentations(configurationValue);
}
}
Expand All @@ -277,8 +296,30 @@ public static final class LoggingLevel extends ConfigOption {
}

@Override
void update(String configurationValue) throws IllegalArgumentException {
void update(String configurationValue, CentralConfigurationManager centralConfigurationManager)
throws IllegalArgumentException {
AgentLog.setLevel(configurationValue);
}
}

public static final class PollingInterval extends ConfigOption {
PollingInterval() {
super("polling_interval", "30s");
}

@Override
void update(String configurationValue, CentralConfigurationManager centralConfigurationManager)
throws IllegalArgumentException {
if (centralConfigurationManager instanceof CentralConfigurationManagerImpl) {
try {
Duration duration = Duration.parse(configurationValue);
((CentralConfigurationManagerImpl) centralConfigurationManager)
.resetPeriodicDelay(duration);
} catch (DateTimeParseException e) {
logger.warning(
"Failed to update the polling interval, value passed was invalid: " + e.getMessage());
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ public void onErrorResponse(
}
}

public synchronized void resetPeriodicDelay(Duration duration) {
client.resetPeriodicDelay(duration);
}

public static class Builder {
private String serviceName;
private String serviceNamespace;
Expand Down Expand Up @@ -192,7 +196,9 @@ public CentralConfigurationManagerImpl build() {
OpampClientBuilder builder = OpampClient.builder();
builder.enableRemoteConfig();
OkHttpSender httpSender = OkHttpSender.create("http://localhost:4320/v1/opamp");
PeriodicDelay pollingDelay = HttpRequestService.DEFAULT_DELAY_BETWEEN_REQUESTS;
PeriodicDelay pollingDelay =
PeriodicDelay.ofVariableDuration(
HttpRequestService.DEFAULT_DELAY_BETWEEN_REQUESTS.getNextDelay());
PeriodicDelay retryDelay = PeriodicDelay.ofVariableDuration(pollingDelay.getNextDelay());
if (serviceName != null) {
builder.setServiceName(serviceName);
Expand All @@ -210,7 +216,7 @@ public CentralConfigurationManagerImpl build() {
httpSender = OkHttpSender.create(configurationEndpoint);
}
if (pollingInterval != null) {
pollingDelay = PeriodicDelay.ofFixedDuration(pollingInterval);
pollingDelay = PeriodicDelay.ofVariableDuration(pollingInterval);
retryDelay = PeriodicDelay.ofVariableDuration(pollingInterval);
}
builder.setRequestService(HttpRequestService.create(httpSender, pollingDelay, retryDelay));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ static OpampClientBuilder builder() {
*/
void setRemoteConfigStatus(Opamp.RemoteConfigStatus remoteConfigStatus);

void resetPeriodicDelay(Duration duration);

interface Callback {
/**
* Called when the connection is successfully established to the Server. May be called after
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import co.elastic.opamp.client.internal.request.fields.recipe.RequestRecipe;
import co.elastic.opamp.client.internal.state.OpampClientState;
import co.elastic.opamp.client.request.Request;
import co.elastic.opamp.client.request.service.HttpRequestService;
import co.elastic.opamp.client.request.service.RequestService;
import co.elastic.opamp.client.response.MessageData;
import co.elastic.opamp.client.response.Response;
Expand Down Expand Up @@ -141,6 +142,13 @@ public void setRemoteConfigStatus(Opamp.RemoteConfigStatus remoteConfigStatus) {
state.remoteConfigStatusState.set(remoteConfigStatus);
}

@Override
public void resetPeriodicDelay(Duration duration) {
if (requestService instanceof HttpRequestService) {
((HttpRequestService) requestService).resetPeriodicDelay(duration);
}
}

@Override
public void onConnectionSuccess() {
callback.onConnect(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public final class PeriodicTaskExecutor {
private PeriodicDelay periodicDelay;
private ScheduledFuture<?> scheduledFuture;
private Runnable periodicTask;
private PeriodicRunner runnerInstance;

public static PeriodicTaskExecutor create(PeriodicDelay initialPeriodicDelay) {
return new PeriodicTaskExecutor(
Expand All @@ -46,6 +47,10 @@ public static PeriodicTaskExecutor create(PeriodicDelay initialPeriodicDelay) {

public void start(Runnable periodicTask) {
this.periodicTask = periodicTask;
if (runnerInstance != null) {
runnerInstance.stop = true;
}
runnerInstance = new PeriodicRunner();
scheduleNext();
}

Expand Down Expand Up @@ -74,19 +79,27 @@ public void stop() {
private void scheduleNext() {
delaySetLock.lock();
try {
if (runnerInstance != null) {
runnerInstance.stop = true;
}
runnerInstance = new PeriodicRunner();
scheduledFuture =
executorService.schedule(
new PeriodicRunner(), periodicDelay.getNextDelay().toNanos(), TimeUnit.NANOSECONDS);
runnerInstance, periodicDelay.getNextDelay().toNanos(), TimeUnit.NANOSECONDS);
} finally {
delaySetLock.unlock();
}
}

private class PeriodicRunner implements Runnable {
volatile boolean stop = false;

@Override
public void run() {
periodicTask.run();
scheduleNext();
if (!stop) {
scheduleNext();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import opamp.proto.Opamp;

public final class HttpRequestService implements RequestService, Runnable {
Expand All @@ -48,6 +50,7 @@ public final class HttpRequestService implements RequestService, Runnable {
private int exponentialBackoffSkips;
public static final PeriodicDelay DEFAULT_DELAY_BETWEEN_REQUESTS =
PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30));
private static final Logger logger = Logger.getLogger(HttpRequestService.class.getName());

/**
* Creates an {@link HttpRequestService}.
Expand Down Expand Up @@ -135,6 +138,12 @@ private void enableRetryMode(Duration suggestedDelay) {
}
}

public void resetPeriodicDelay(Duration suggestedDelay) {
((AcceptsDelaySuggestion) periodicRequestDelay).suggestDelay(suggestedDelay);
((AcceptsDelaySuggestion) periodicRetryDelay).suggestDelay(suggestedDelay);
executor.setPeriodicDelay(periodicRequestDelay);
}

private void disableRetryMode() {
if (retryModeEnabled.compareAndSet(true, false)) {
executor.setPeriodicDelay(periodicRequestDelay);
Expand All @@ -156,6 +165,9 @@ public void run() {
private void doSendRequest() {
try {
Opamp.AgentToServer agentToServer = requestSupplier.get().getAgentToServer();
if (logger.isLoggable(Level.FINE)) {
logger.fine(agentToServer.toString().replace('\n', '/'));
}

try (HttpSender.Response response =
requestSender
Expand Down
Loading