diff --git a/custom/src/main/java/co/elastic/otel/dynamicconfig/CentralConfig.java b/custom/src/main/java/co/elastic/otel/dynamicconfig/CentralConfig.java index 2b2bf9b1..49a6285d 100644 --- a/custom/src/main/java/co/elastic/otel/dynamicconfig/CentralConfig.java +++ b/custom/src/main/java/co/elastic/otel/dynamicconfig/CentralConfig.java @@ -19,12 +19,14 @@ package co.elastic.otel.dynamicconfig; import co.elastic.opamp.client.CentralConfigurationManager; +import co.elastic.opamp.client.CentralConfigurationManagerImpl; import co.elastic.opamp.client.CentralConfigurationProcessor; import co.elastic.otel.logging.AgentLog; import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties; import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder; import java.text.MessageFormat; import java.time.Duration; +import java.time.format.DateTimeParseException; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -69,7 +71,7 @@ public static void init(SdkTracerProviderBuilder providerBuilder, ConfigProperti centralConfigurationManager.start( configuration -> { logger.fine("Received configuration: " + configuration); - Configs.applyConfigurations(configuration); + Configs.applyConfigurations(configuration, centralConfigurationManager); return CentralConfigurationProcessor.Result.SUCCESS; }); @@ -121,17 +123,20 @@ public static class Configs { new SendTraces(), new DeactivateAllInstrumentations(), new DeactivateInstrumentations(), - new LoggingLevel()) + new LoggingLevel(), + new PollingInterval()) .collect(Collectors.toMap(ConfigOption::getConfigName, option -> option)); } - public static synchronized void applyConfigurations(Map configuration) { + public static synchronized void applyConfigurations( + Map configuration, + CentralConfigurationManager centralConfigurationManager) { Set copyOfCurrentNonDefaultConfigsApplied = new HashSet<>(currentNonDefaultConfigsApplied); configuration.forEach( (configurationName, configurationValue) -> { copyOfCurrentNonDefaultConfigsApplied.remove(configurationName); - applyConfiguration(configurationName, configurationValue); + applyConfiguration(configurationName, configurationValue, centralConfigurationManager); currentNonDefaultConfigsApplied.add(configurationName); }); if (!copyOfCurrentNonDefaultConfigsApplied.isEmpty()) { @@ -139,19 +144,25 @@ public static synchronized void applyConfigurations(Map configur // have been removed from the configs being sent - so for all of these we need to set the // config back to default for (String configurationName : copyOfCurrentNonDefaultConfigsApplied) { - applyDefaultConfiguration(configurationName); + applyDefaultConfiguration(configurationName, centralConfigurationManager); currentNonDefaultConfigsApplied.remove(configurationName); } } } - public static void applyDefaultConfiguration(String configurationName) { - configNameToConfig.get(configurationName).updateToDefault(); + public static void applyDefaultConfiguration( + String configurationName, CentralConfigurationManager centralConfigurationManager) { + configNameToConfig.get(configurationName).updateToDefault(centralConfigurationManager); } - public static void applyConfiguration(String configurationName, String configurationValue) { + public static void applyConfiguration( + String configurationName, + String configurationValue, + CentralConfigurationManager centralConfigurationManager) { if (configNameToConfig.containsKey(configurationName)) { - configNameToConfig.get(configurationName).updateOrLog(configurationValue); + configNameToConfig + .get(configurationName) + .updateOrLog(configurationValue, centralConfigurationManager); } else { logger.warning( "Ignoring unknown confguration option: '" @@ -193,18 +204,21 @@ protected boolean getBoolean(String configurationValue, String error) { } } - public void updateOrLog(String configurationValue) { + public void updateOrLog( + String configurationValue, CentralConfigurationManager centralConfigurationManager) { try { - update(configurationValue); + update(configurationValue, centralConfigurationManager); } catch (IllegalArgumentException e) { logger.warning(e.getMessage()); } } - abstract void update(String configurationValue) throws IllegalArgumentException; + abstract void update( + String configurationValue, CentralConfigurationManager centralConfigurationManager) + throws IllegalArgumentException; - public void updateToDefault() { - update(defaultConfigStringValue); + public void updateToDefault(CentralConfigurationManager centralConfigurationManager) { + update(defaultConfigStringValue, centralConfigurationManager); } protected DynamicConfiguration config() { @@ -218,7 +232,8 @@ public static final class SendLogs extends ConfigOption { } @Override - void update(String configurationValue) throws IllegalArgumentException { + void update(String configurationValue, CentralConfigurationManager centralConfigurationManager) + throws IllegalArgumentException { config().setSendingLogs(getBoolean(configurationValue)); } } @@ -229,7 +244,8 @@ public static final class SendMetrics extends ConfigOption { } @Override - void update(String configurationValue) throws IllegalArgumentException { + void update(String configurationValue, CentralConfigurationManager centralConfigurationManager) + throws IllegalArgumentException { config().setSendingMetrics(getBoolean(configurationValue)); } } @@ -240,7 +256,8 @@ public static final class SendTraces extends ConfigOption { } @Override - void update(String configurationValue) throws IllegalArgumentException { + void update(String configurationValue, CentralConfigurationManager centralConfigurationManager) + throws IllegalArgumentException { config().setSendingSpans(getBoolean(configurationValue)); } } @@ -251,7 +268,8 @@ public static final class DeactivateAllInstrumentations extends ConfigOption { } @Override - void update(String configurationValue) throws IllegalArgumentException { + void update(String configurationValue, CentralConfigurationManager centralConfigurationManager) + throws IllegalArgumentException { if (getBoolean(configurationValue)) { config().deactivateAllInstrumentations(); } else { @@ -266,7 +284,8 @@ public static final class DeactivateInstrumentations extends ConfigOption { } @Override - void update(String configurationValue) throws IllegalArgumentException { + void update(String configurationValue, CentralConfigurationManager centralConfigurationManager) + throws IllegalArgumentException { config().deactivateInstrumentations(configurationValue); } } @@ -277,8 +296,30 @@ public static final class LoggingLevel extends ConfigOption { } @Override - void update(String configurationValue) throws IllegalArgumentException { + void update(String configurationValue, CentralConfigurationManager centralConfigurationManager) + throws IllegalArgumentException { AgentLog.setLevel(configurationValue); } } + + public static final class PollingInterval extends ConfigOption { + PollingInterval() { + super("polling_interval", "30s"); + } + + @Override + void update(String configurationValue, CentralConfigurationManager centralConfigurationManager) + throws IllegalArgumentException { + if (centralConfigurationManager instanceof CentralConfigurationManagerImpl) { + try { + Duration duration = Duration.parse(configurationValue); + ((CentralConfigurationManagerImpl) centralConfigurationManager) + .resetPeriodicDelay(duration); + } catch (DateTimeParseException e) { + logger.warning( + "Failed to update the polling interval, value passed was invalid: " + e.getMessage()); + } + } + } + } } diff --git a/opamp/src/main/java/co/elastic/opamp/client/CentralConfigurationManagerImpl.java b/opamp/src/main/java/co/elastic/opamp/client/CentralConfigurationManagerImpl.java index 6452e883..a8952d4d 100644 --- a/opamp/src/main/java/co/elastic/opamp/client/CentralConfigurationManagerImpl.java +++ b/opamp/src/main/java/co/elastic/opamp/client/CentralConfigurationManagerImpl.java @@ -148,6 +148,10 @@ public void onErrorResponse( } } + public synchronized void resetPeriodicDelay(Duration duration) { + client.resetPeriodicDelay(duration); + } + public static class Builder { private String serviceName; private String serviceNamespace; @@ -192,7 +196,9 @@ public CentralConfigurationManagerImpl build() { OpampClientBuilder builder = OpampClient.builder(); builder.enableRemoteConfig(); OkHttpSender httpSender = OkHttpSender.create("http://localhost:4320/v1/opamp"); - PeriodicDelay pollingDelay = HttpRequestService.DEFAULT_DELAY_BETWEEN_REQUESTS; + PeriodicDelay pollingDelay = + PeriodicDelay.ofVariableDuration( + HttpRequestService.DEFAULT_DELAY_BETWEEN_REQUESTS.getNextDelay()); PeriodicDelay retryDelay = PeriodicDelay.ofVariableDuration(pollingDelay.getNextDelay()); if (serviceName != null) { builder.setServiceName(serviceName); @@ -210,7 +216,7 @@ public CentralConfigurationManagerImpl build() { httpSender = OkHttpSender.create(configurationEndpoint); } if (pollingInterval != null) { - pollingDelay = PeriodicDelay.ofFixedDuration(pollingInterval); + pollingDelay = PeriodicDelay.ofVariableDuration(pollingInterval); retryDelay = PeriodicDelay.ofVariableDuration(pollingInterval); } builder.setRequestService(HttpRequestService.create(httpSender, pollingDelay, retryDelay)); diff --git a/opamp/src/main/java/co/elastic/opamp/client/OpampClient.java b/opamp/src/main/java/co/elastic/opamp/client/OpampClient.java index 5418e442..bdbcb0f3 100644 --- a/opamp/src/main/java/co/elastic/opamp/client/OpampClient.java +++ b/opamp/src/main/java/co/elastic/opamp/client/OpampClient.java @@ -59,6 +59,8 @@ static OpampClientBuilder builder() { */ void setRemoteConfigStatus(Opamp.RemoteConfigStatus remoteConfigStatus); + void resetPeriodicDelay(Duration duration); + interface Callback { /** * Called when the connection is successfully established to the Server. May be called after diff --git a/opamp/src/main/java/co/elastic/opamp/client/internal/OpampClientImpl.java b/opamp/src/main/java/co/elastic/opamp/client/internal/OpampClientImpl.java index 02f371b9..53758fe1 100644 --- a/opamp/src/main/java/co/elastic/opamp/client/internal/OpampClientImpl.java +++ b/opamp/src/main/java/co/elastic/opamp/client/internal/OpampClientImpl.java @@ -35,6 +35,7 @@ import co.elastic.opamp.client.internal.request.fields.recipe.RequestRecipe; import co.elastic.opamp.client.internal.state.OpampClientState; import co.elastic.opamp.client.request.Request; +import co.elastic.opamp.client.request.service.HttpRequestService; import co.elastic.opamp.client.request.service.RequestService; import co.elastic.opamp.client.response.MessageData; import co.elastic.opamp.client.response.Response; @@ -141,6 +142,13 @@ public void setRemoteConfigStatus(Opamp.RemoteConfigStatus remoteConfigStatus) { state.remoteConfigStatusState.set(remoteConfigStatus); } + @Override + public void resetPeriodicDelay(Duration duration) { + if (requestService instanceof HttpRequestService) { + ((HttpRequestService) requestService).resetPeriodicDelay(duration); + } + } + @Override public void onConnectionSuccess() { callback.onConnect(this); diff --git a/opamp/src/main/java/co/elastic/opamp/client/internal/periodictask/PeriodicTaskExecutor.java b/opamp/src/main/java/co/elastic/opamp/client/internal/periodictask/PeriodicTaskExecutor.java index b99586af..5d297ebf 100644 --- a/opamp/src/main/java/co/elastic/opamp/client/internal/periodictask/PeriodicTaskExecutor.java +++ b/opamp/src/main/java/co/elastic/opamp/client/internal/periodictask/PeriodicTaskExecutor.java @@ -32,6 +32,7 @@ public final class PeriodicTaskExecutor { private PeriodicDelay periodicDelay; private ScheduledFuture scheduledFuture; private Runnable periodicTask; + private PeriodicRunner runnerInstance; public static PeriodicTaskExecutor create(PeriodicDelay initialPeriodicDelay) { return new PeriodicTaskExecutor( @@ -46,6 +47,10 @@ public static PeriodicTaskExecutor create(PeriodicDelay initialPeriodicDelay) { public void start(Runnable periodicTask) { this.periodicTask = periodicTask; + if (runnerInstance != null) { + runnerInstance.stop = true; + } + runnerInstance = new PeriodicRunner(); scheduleNext(); } @@ -74,19 +79,27 @@ public void stop() { private void scheduleNext() { delaySetLock.lock(); try { + if (runnerInstance != null) { + runnerInstance.stop = true; + } + runnerInstance = new PeriodicRunner(); scheduledFuture = executorService.schedule( - new PeriodicRunner(), periodicDelay.getNextDelay().toNanos(), TimeUnit.NANOSECONDS); + runnerInstance, periodicDelay.getNextDelay().toNanos(), TimeUnit.NANOSECONDS); } finally { delaySetLock.unlock(); } } private class PeriodicRunner implements Runnable { + volatile boolean stop = false; + @Override public void run() { periodicTask.run(); - scheduleNext(); + if (!stop) { + scheduleNext(); + } } } } diff --git a/opamp/src/main/java/co/elastic/opamp/client/request/service/HttpRequestService.java b/opamp/src/main/java/co/elastic/opamp/client/request/service/HttpRequestService.java index b425eb16..fec47119 100644 --- a/opamp/src/main/java/co/elastic/opamp/client/request/service/HttpRequestService.java +++ b/opamp/src/main/java/co/elastic/opamp/client/request/service/HttpRequestService.java @@ -32,6 +32,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; import opamp.proto.Opamp; public final class HttpRequestService implements RequestService, Runnable { @@ -48,6 +50,7 @@ public final class HttpRequestService implements RequestService, Runnable { private int exponentialBackoffSkips; public static final PeriodicDelay DEFAULT_DELAY_BETWEEN_REQUESTS = PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30)); + private static final Logger logger = Logger.getLogger(HttpRequestService.class.getName()); /** * Creates an {@link HttpRequestService}. @@ -135,6 +138,12 @@ private void enableRetryMode(Duration suggestedDelay) { } } + public void resetPeriodicDelay(Duration suggestedDelay) { + ((AcceptsDelaySuggestion) periodicRequestDelay).suggestDelay(suggestedDelay); + ((AcceptsDelaySuggestion) periodicRetryDelay).suggestDelay(suggestedDelay); + executor.setPeriodicDelay(periodicRequestDelay); + } + private void disableRetryMode() { if (retryModeEnabled.compareAndSet(true, false)) { executor.setPeriodicDelay(periodicRequestDelay); @@ -156,6 +165,9 @@ public void run() { private void doSendRequest() { try { Opamp.AgentToServer agentToServer = requestSupplier.get().getAgentToServer(); + if (logger.isLoggable(Level.FINE)) { + logger.fine(agentToServer.toString().replace('\n', '/')); + } try (HttpSender.Response response = requestSender