Skip to content

Commit 03b672c

Browse files
authored
OpAMP Polling interval (#770)
* add opamp centrally configurable polling interval * debug log when a http opamp request is made * stop duplication in task execution
1 parent 4724c32 commit 03b672c

File tree

6 files changed

+106
-24
lines changed

6 files changed

+106
-24
lines changed

custom/src/main/java/co/elastic/otel/dynamicconfig/CentralConfig.java

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919
package co.elastic.otel.dynamicconfig;
2020

2121
import co.elastic.opamp.client.CentralConfigurationManager;
22+
import co.elastic.opamp.client.CentralConfigurationManagerImpl;
2223
import co.elastic.opamp.client.CentralConfigurationProcessor;
2324
import co.elastic.otel.logging.AgentLog;
2425
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
2526
import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder;
2627
import java.text.MessageFormat;
2728
import java.time.Duration;
29+
import java.time.format.DateTimeParseException;
2830
import java.util.HashSet;
2931
import java.util.Map;
3032
import java.util.Set;
@@ -69,7 +71,7 @@ public static void init(SdkTracerProviderBuilder providerBuilder, ConfigProperti
6971
centralConfigurationManager.start(
7072
configuration -> {
7173
logger.fine("Received configuration: " + configuration);
72-
Configs.applyConfigurations(configuration);
74+
Configs.applyConfigurations(configuration, centralConfigurationManager);
7375
return CentralConfigurationProcessor.Result.SUCCESS;
7476
});
7577

@@ -121,37 +123,46 @@ public static class Configs {
121123
new SendTraces(),
122124
new DeactivateAllInstrumentations(),
123125
new DeactivateInstrumentations(),
124-
new LoggingLevel())
126+
new LoggingLevel(),
127+
new PollingInterval())
125128
.collect(Collectors.toMap(ConfigOption::getConfigName, option -> option));
126129
}
127130

128-
public static synchronized void applyConfigurations(Map<String, String> configuration) {
131+
public static synchronized void applyConfigurations(
132+
Map<String, String> configuration,
133+
CentralConfigurationManager centralConfigurationManager) {
129134
Set<String> copyOfCurrentNonDefaultConfigsApplied =
130135
new HashSet<>(currentNonDefaultConfigsApplied);
131136
configuration.forEach(
132137
(configurationName, configurationValue) -> {
133138
copyOfCurrentNonDefaultConfigsApplied.remove(configurationName);
134-
applyConfiguration(configurationName, configurationValue);
139+
applyConfiguration(configurationName, configurationValue, centralConfigurationManager);
135140
currentNonDefaultConfigsApplied.add(configurationName);
136141
});
137142
if (!copyOfCurrentNonDefaultConfigsApplied.isEmpty()) {
138143
// We have configs that were applied previously but have now been set back to default and
139144
// have been removed from the configs being sent - so for all of these we need to set the
140145
// config back to default
141146
for (String configurationName : copyOfCurrentNonDefaultConfigsApplied) {
142-
applyDefaultConfiguration(configurationName);
147+
applyDefaultConfiguration(configurationName, centralConfigurationManager);
143148
currentNonDefaultConfigsApplied.remove(configurationName);
144149
}
145150
}
146151
}
147152

148-
public static void applyDefaultConfiguration(String configurationName) {
149-
configNameToConfig.get(configurationName).updateToDefault();
153+
public static void applyDefaultConfiguration(
154+
String configurationName, CentralConfigurationManager centralConfigurationManager) {
155+
configNameToConfig.get(configurationName).updateToDefault(centralConfigurationManager);
150156
}
151157

152-
public static void applyConfiguration(String configurationName, String configurationValue) {
158+
public static void applyConfiguration(
159+
String configurationName,
160+
String configurationValue,
161+
CentralConfigurationManager centralConfigurationManager) {
153162
if (configNameToConfig.containsKey(configurationName)) {
154-
configNameToConfig.get(configurationName).updateOrLog(configurationValue);
163+
configNameToConfig
164+
.get(configurationName)
165+
.updateOrLog(configurationValue, centralConfigurationManager);
155166
} else {
156167
logger.warning(
157168
"Ignoring unknown confguration option: '"
@@ -193,18 +204,21 @@ protected boolean getBoolean(String configurationValue, String error) {
193204
}
194205
}
195206

196-
public void updateOrLog(String configurationValue) {
207+
public void updateOrLog(
208+
String configurationValue, CentralConfigurationManager centralConfigurationManager) {
197209
try {
198-
update(configurationValue);
210+
update(configurationValue, centralConfigurationManager);
199211
} catch (IllegalArgumentException e) {
200212
logger.warning(e.getMessage());
201213
}
202214
}
203215

204-
abstract void update(String configurationValue) throws IllegalArgumentException;
216+
abstract void update(
217+
String configurationValue, CentralConfigurationManager centralConfigurationManager)
218+
throws IllegalArgumentException;
205219

206-
public void updateToDefault() {
207-
update(defaultConfigStringValue);
220+
public void updateToDefault(CentralConfigurationManager centralConfigurationManager) {
221+
update(defaultConfigStringValue, centralConfigurationManager);
208222
}
209223

210224
protected DynamicConfiguration config() {
@@ -218,7 +232,8 @@ public static final class SendLogs extends ConfigOption {
218232
}
219233

220234
@Override
221-
void update(String configurationValue) throws IllegalArgumentException {
235+
void update(String configurationValue, CentralConfigurationManager centralConfigurationManager)
236+
throws IllegalArgumentException {
222237
config().setSendingLogs(getBoolean(configurationValue));
223238
}
224239
}
@@ -229,7 +244,8 @@ public static final class SendMetrics extends ConfigOption {
229244
}
230245

231246
@Override
232-
void update(String configurationValue) throws IllegalArgumentException {
247+
void update(String configurationValue, CentralConfigurationManager centralConfigurationManager)
248+
throws IllegalArgumentException {
233249
config().setSendingMetrics(getBoolean(configurationValue));
234250
}
235251
}
@@ -240,7 +256,8 @@ public static final class SendTraces extends ConfigOption {
240256
}
241257

242258
@Override
243-
void update(String configurationValue) throws IllegalArgumentException {
259+
void update(String configurationValue, CentralConfigurationManager centralConfigurationManager)
260+
throws IllegalArgumentException {
244261
config().setSendingSpans(getBoolean(configurationValue));
245262
}
246263
}
@@ -251,7 +268,8 @@ public static final class DeactivateAllInstrumentations extends ConfigOption {
251268
}
252269

253270
@Override
254-
void update(String configurationValue) throws IllegalArgumentException {
271+
void update(String configurationValue, CentralConfigurationManager centralConfigurationManager)
272+
throws IllegalArgumentException {
255273
if (getBoolean(configurationValue)) {
256274
config().deactivateAllInstrumentations();
257275
} else {
@@ -266,7 +284,8 @@ public static final class DeactivateInstrumentations extends ConfigOption {
266284
}
267285

268286
@Override
269-
void update(String configurationValue) throws IllegalArgumentException {
287+
void update(String configurationValue, CentralConfigurationManager centralConfigurationManager)
288+
throws IllegalArgumentException {
270289
config().deactivateInstrumentations(configurationValue);
271290
}
272291
}
@@ -277,8 +296,30 @@ public static final class LoggingLevel extends ConfigOption {
277296
}
278297

279298
@Override
280-
void update(String configurationValue) throws IllegalArgumentException {
299+
void update(String configurationValue, CentralConfigurationManager centralConfigurationManager)
300+
throws IllegalArgumentException {
281301
AgentLog.setLevel(configurationValue);
282302
}
283303
}
304+
305+
public static final class PollingInterval extends ConfigOption {
306+
PollingInterval() {
307+
super("polling_interval", "30s");
308+
}
309+
310+
@Override
311+
void update(String configurationValue, CentralConfigurationManager centralConfigurationManager)
312+
throws IllegalArgumentException {
313+
if (centralConfigurationManager instanceof CentralConfigurationManagerImpl) {
314+
try {
315+
Duration duration = Duration.parse(configurationValue);
316+
((CentralConfigurationManagerImpl) centralConfigurationManager)
317+
.resetPeriodicDelay(duration);
318+
} catch (DateTimeParseException e) {
319+
logger.warning(
320+
"Failed to update the polling interval, value passed was invalid: " + e.getMessage());
321+
}
322+
}
323+
}
324+
}
284325
}

opamp/src/main/java/co/elastic/opamp/client/CentralConfigurationManagerImpl.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,10 @@ public void onErrorResponse(
148148
}
149149
}
150150

151+
public synchronized void resetPeriodicDelay(Duration duration) {
152+
client.resetPeriodicDelay(duration);
153+
}
154+
151155
public static class Builder {
152156
private String serviceName;
153157
private String serviceNamespace;
@@ -192,7 +196,9 @@ public CentralConfigurationManagerImpl build() {
192196
OpampClientBuilder builder = OpampClient.builder();
193197
builder.enableRemoteConfig();
194198
OkHttpSender httpSender = OkHttpSender.create("http://localhost:4320/v1/opamp");
195-
PeriodicDelay pollingDelay = HttpRequestService.DEFAULT_DELAY_BETWEEN_REQUESTS;
199+
PeriodicDelay pollingDelay =
200+
PeriodicDelay.ofVariableDuration(
201+
HttpRequestService.DEFAULT_DELAY_BETWEEN_REQUESTS.getNextDelay());
196202
PeriodicDelay retryDelay = PeriodicDelay.ofVariableDuration(pollingDelay.getNextDelay());
197203
if (serviceName != null) {
198204
builder.setServiceName(serviceName);
@@ -210,7 +216,7 @@ public CentralConfigurationManagerImpl build() {
210216
httpSender = OkHttpSender.create(configurationEndpoint);
211217
}
212218
if (pollingInterval != null) {
213-
pollingDelay = PeriodicDelay.ofFixedDuration(pollingInterval);
219+
pollingDelay = PeriodicDelay.ofVariableDuration(pollingInterval);
214220
retryDelay = PeriodicDelay.ofVariableDuration(pollingInterval);
215221
}
216222
builder.setRequestService(HttpRequestService.create(httpSender, pollingDelay, retryDelay));

opamp/src/main/java/co/elastic/opamp/client/OpampClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ static OpampClientBuilder builder() {
5959
*/
6060
void setRemoteConfigStatus(Opamp.RemoteConfigStatus remoteConfigStatus);
6161

62+
void resetPeriodicDelay(Duration duration);
63+
6264
interface Callback {
6365
/**
6466
* Called when the connection is successfully established to the Server. May be called after

opamp/src/main/java/co/elastic/opamp/client/internal/OpampClientImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import co.elastic.opamp.client.internal.request.fields.recipe.RequestRecipe;
3636
import co.elastic.opamp.client.internal.state.OpampClientState;
3737
import co.elastic.opamp.client.request.Request;
38+
import co.elastic.opamp.client.request.service.HttpRequestService;
3839
import co.elastic.opamp.client.request.service.RequestService;
3940
import co.elastic.opamp.client.response.MessageData;
4041
import co.elastic.opamp.client.response.Response;
@@ -141,6 +142,13 @@ public void setRemoteConfigStatus(Opamp.RemoteConfigStatus remoteConfigStatus) {
141142
state.remoteConfigStatusState.set(remoteConfigStatus);
142143
}
143144

145+
@Override
146+
public void resetPeriodicDelay(Duration duration) {
147+
if (requestService instanceof HttpRequestService) {
148+
((HttpRequestService) requestService).resetPeriodicDelay(duration);
149+
}
150+
}
151+
144152
@Override
145153
public void onConnectionSuccess() {
146154
callback.onConnect(this);

opamp/src/main/java/co/elastic/opamp/client/internal/periodictask/PeriodicTaskExecutor.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public final class PeriodicTaskExecutor {
3232
private PeriodicDelay periodicDelay;
3333
private ScheduledFuture<?> scheduledFuture;
3434
private Runnable periodicTask;
35+
private PeriodicRunner runnerInstance;
3536

3637
public static PeriodicTaskExecutor create(PeriodicDelay initialPeriodicDelay) {
3738
return new PeriodicTaskExecutor(
@@ -46,6 +47,10 @@ public static PeriodicTaskExecutor create(PeriodicDelay initialPeriodicDelay) {
4647

4748
public void start(Runnable periodicTask) {
4849
this.periodicTask = periodicTask;
50+
if (runnerInstance != null) {
51+
runnerInstance.stop = true;
52+
}
53+
runnerInstance = new PeriodicRunner();
4954
scheduleNext();
5055
}
5156

@@ -74,19 +79,27 @@ public void stop() {
7479
private void scheduleNext() {
7580
delaySetLock.lock();
7681
try {
82+
if (runnerInstance != null) {
83+
runnerInstance.stop = true;
84+
}
85+
runnerInstance = new PeriodicRunner();
7786
scheduledFuture =
7887
executorService.schedule(
79-
new PeriodicRunner(), periodicDelay.getNextDelay().toNanos(), TimeUnit.NANOSECONDS);
88+
runnerInstance, periodicDelay.getNextDelay().toNanos(), TimeUnit.NANOSECONDS);
8089
} finally {
8190
delaySetLock.unlock();
8291
}
8392
}
8493

8594
private class PeriodicRunner implements Runnable {
95+
volatile boolean stop = false;
96+
8697
@Override
8798
public void run() {
8899
periodicTask.run();
89-
scheduleNext();
100+
if (!stop) {
101+
scheduleNext();
102+
}
90103
}
91104
}
92105
}

opamp/src/main/java/co/elastic/opamp/client/request/service/HttpRequestService.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import java.util.concurrent.atomic.AtomicBoolean;
3333
import java.util.function.Consumer;
3434
import java.util.function.Supplier;
35+
import java.util.logging.Level;
36+
import java.util.logging.Logger;
3537
import opamp.proto.Opamp;
3638

3739
public final class HttpRequestService implements RequestService, Runnable {
@@ -48,6 +50,7 @@ public final class HttpRequestService implements RequestService, Runnable {
4850
private int exponentialBackoffSkips;
4951
public static final PeriodicDelay DEFAULT_DELAY_BETWEEN_REQUESTS =
5052
PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30));
53+
private static final Logger logger = Logger.getLogger(HttpRequestService.class.getName());
5154

5255
/**
5356
* Creates an {@link HttpRequestService}.
@@ -135,6 +138,12 @@ private void enableRetryMode(Duration suggestedDelay) {
135138
}
136139
}
137140

141+
public void resetPeriodicDelay(Duration suggestedDelay) {
142+
((AcceptsDelaySuggestion) periodicRequestDelay).suggestDelay(suggestedDelay);
143+
((AcceptsDelaySuggestion) periodicRetryDelay).suggestDelay(suggestedDelay);
144+
executor.setPeriodicDelay(periodicRequestDelay);
145+
}
146+
138147
private void disableRetryMode() {
139148
if (retryModeEnabled.compareAndSet(true, false)) {
140149
executor.setPeriodicDelay(periodicRequestDelay);
@@ -156,6 +165,9 @@ public void run() {
156165
private void doSendRequest() {
157166
try {
158167
Opamp.AgentToServer agentToServer = requestSupplier.get().getAgentToServer();
168+
if (logger.isLoggable(Level.FINE)) {
169+
logger.fine(agentToServer.toString().replace('\n', '/'));
170+
}
159171

160172
try (HttpSender.Response response =
161173
requestSender

0 commit comments

Comments
 (0)