18
18
*/
19
19
package co .elastic .otel .dynamicconfig ;
20
20
21
- import co .elastic .opamp .client .CentralConfigurationManager ;
22
- import co .elastic .opamp .client .CentralConfigurationManagerImpl ;
23
- import co .elastic .opamp .client .CentralConfigurationProcessor ;
21
+ import co .elastic .otel .dynamicconfig .internal .OpampManager ;
24
22
import co .elastic .otel .logging .AgentLog ;
25
23
import io .opentelemetry .sdk .autoconfigure .spi .ConfigProperties ;
26
24
import io .opentelemetry .sdk .trace .SdkTracerProviderBuilder ;
25
+ import java .io .IOException ;
27
26
import java .text .MessageFormat ;
28
27
import java .time .Duration ;
29
28
import java .time .format .DateTimeParseException ;
30
29
import java .util .HashSet ;
31
30
import java .util .Map ;
32
31
import java .util .Set ;
32
+ import java .util .logging .Level ;
33
33
import java .util .logging .Logger ;
34
34
import java .util .stream .Collectors ;
35
35
import java .util .stream .Stream ;
@@ -60,27 +60,31 @@ public static void init(SdkTracerProviderBuilder providerBuilder, ConfigProperti
60
60
logger .info ("Starting OpAmp client for: " + serviceName + " on endpoint " + endpoint );
61
61
DynamicInstrumentation .setTracerConfigurator (
62
62
providerBuilder , DynamicConfiguration .UpdatableConfigurator .INSTANCE );
63
- CentralConfigurationManager centralConfigurationManager =
64
- CentralConfigurationManager .builder ()
63
+ OpampManager opampManager =
64
+ OpampManager .builder ()
65
65
.setServiceName (serviceName )
66
66
.setPollingInterval (Duration .ofSeconds (30 ))
67
67
.setConfigurationEndpoint (endpoint )
68
68
.setServiceEnvironment (environment )
69
69
.build ();
70
70
71
- centralConfigurationManager .start (
71
+ opampManager .start (
72
72
configuration -> {
73
73
logger .fine ("Received configuration: " + configuration );
74
- Configs .applyConfigurations (configuration , centralConfigurationManager );
75
- return CentralConfigurationProcessor .Result .SUCCESS ;
74
+ Configs .applyConfigurations (configuration , opampManager );
75
+ return OpampManager . CentralConfigurationProcessor .Result .SUCCESS ;
76
76
});
77
77
78
78
Runtime .getRuntime ()
79
79
.addShutdownHook (
80
80
new Thread (
81
81
() -> {
82
- logger .info ("=========== Shutting down OpAmp client for: " + serviceName );
83
- centralConfigurationManager .stop ();
82
+ logger .info ("=========== Shutting down OpAMP client for: " + serviceName );
83
+ try {
84
+ opampManager .close ();
85
+ } catch (IOException e ) {
86
+ logger .log (Level .SEVERE , "Error during OpAMP shutdown" , e );
87
+ }
84
88
}));
85
89
}
86
90
@@ -129,40 +133,35 @@ public static class Configs {
129
133
}
130
134
131
135
public static synchronized void applyConfigurations (
132
- Map <String , String > configuration ,
133
- CentralConfigurationManager centralConfigurationManager ) {
136
+ Map <String , String > configuration , OpampManager opampManager ) {
134
137
Set <String > copyOfCurrentNonDefaultConfigsApplied =
135
138
new HashSet <>(currentNonDefaultConfigsApplied );
136
139
configuration .forEach (
137
140
(configurationName , configurationValue ) -> {
138
141
copyOfCurrentNonDefaultConfigsApplied .remove (configurationName );
139
- applyConfiguration (configurationName , configurationValue , centralConfigurationManager );
142
+ applyConfiguration (configurationName , configurationValue , opampManager );
140
143
currentNonDefaultConfigsApplied .add (configurationName );
141
144
});
142
145
if (!copyOfCurrentNonDefaultConfigsApplied .isEmpty ()) {
143
146
// We have configs that were applied previously but have now been set back to default and
144
147
// have been removed from the configs being sent - so for all of these we need to set the
145
148
// config back to default
146
149
for (String configurationName : copyOfCurrentNonDefaultConfigsApplied ) {
147
- applyDefaultConfiguration (configurationName , centralConfigurationManager );
150
+ applyDefaultConfiguration (configurationName , opampManager );
148
151
currentNonDefaultConfigsApplied .remove (configurationName );
149
152
}
150
153
}
151
154
}
152
155
153
156
public static void applyDefaultConfiguration (
154
- String configurationName , CentralConfigurationManager centralConfigurationManager ) {
155
- configNameToConfig .get (configurationName ).updateToDefault (centralConfigurationManager );
157
+ String configurationName , OpampManager opampManager ) {
158
+ configNameToConfig .get (configurationName ).updateToDefault (opampManager );
156
159
}
157
160
158
161
public static void applyConfiguration (
159
- String configurationName ,
160
- String configurationValue ,
161
- CentralConfigurationManager centralConfigurationManager ) {
162
+ String configurationName , String configurationValue , OpampManager opampManager ) {
162
163
if (configNameToConfig .containsKey (configurationName )) {
163
- configNameToConfig
164
- .get (configurationName )
165
- .updateOrLog (configurationValue , centralConfigurationManager );
164
+ configNameToConfig .get (configurationName ).updateOrLog (configurationValue , opampManager );
166
165
} else {
167
166
logger .warning (
168
167
"Ignoring unknown confguration option: '"
@@ -204,21 +203,19 @@ protected boolean getBoolean(String configurationValue, String error) {
204
203
}
205
204
}
206
205
207
- public void updateOrLog (
208
- String configurationValue , CentralConfigurationManager centralConfigurationManager ) {
206
+ public void updateOrLog (String configurationValue , OpampManager opampManager ) {
209
207
try {
210
- update (configurationValue , centralConfigurationManager );
208
+ update (configurationValue , opampManager );
211
209
} catch (IllegalArgumentException e ) {
212
210
logger .warning (e .getMessage ());
213
211
}
214
212
}
215
213
216
- abstract void update (
217
- String configurationValue , CentralConfigurationManager centralConfigurationManager )
214
+ abstract void update (String configurationValue , OpampManager opampManager )
218
215
throws IllegalArgumentException ;
219
216
220
- public void updateToDefault (CentralConfigurationManager centralConfigurationManager ) {
221
- update (defaultConfigStringValue , centralConfigurationManager );
217
+ public void updateToDefault (OpampManager opampManager ) {
218
+ update (defaultConfigStringValue , opampManager );
222
219
}
223
220
224
221
protected DynamicConfiguration config () {
@@ -232,7 +229,7 @@ public static final class SendLogs extends ConfigOption {
232
229
}
233
230
234
231
@ Override
235
- void update (String configurationValue , CentralConfigurationManager centralConfigurationManager )
232
+ void update (String configurationValue , OpampManager opampManager )
236
233
throws IllegalArgumentException {
237
234
config ().setSendingLogs (getBoolean (configurationValue ));
238
235
}
@@ -244,7 +241,7 @@ public static final class SendMetrics extends ConfigOption {
244
241
}
245
242
246
243
@ Override
247
- void update (String configurationValue , CentralConfigurationManager centralConfigurationManager )
244
+ void update (String configurationValue , OpampManager opampManager )
248
245
throws IllegalArgumentException {
249
246
config ().setSendingMetrics (getBoolean (configurationValue ));
250
247
}
@@ -256,7 +253,7 @@ public static final class SendTraces extends ConfigOption {
256
253
}
257
254
258
255
@ Override
259
- void update (String configurationValue , CentralConfigurationManager centralConfigurationManager )
256
+ void update (String configurationValue , OpampManager opampManager )
260
257
throws IllegalArgumentException {
261
258
config ().setSendingSpans (getBoolean (configurationValue ));
262
259
}
@@ -268,7 +265,7 @@ public static final class DeactivateAllInstrumentations extends ConfigOption {
268
265
}
269
266
270
267
@ Override
271
- void update (String configurationValue , CentralConfigurationManager centralConfigurationManager )
268
+ void update (String configurationValue , OpampManager opampManager )
272
269
throws IllegalArgumentException {
273
270
if (getBoolean (configurationValue )) {
274
271
config ().deactivateAllInstrumentations ();
@@ -284,7 +281,7 @@ public static final class DeactivateInstrumentations extends ConfigOption {
284
281
}
285
282
286
283
@ Override
287
- void update (String configurationValue , CentralConfigurationManager centralConfigurationManager )
284
+ void update (String configurationValue , OpampManager opampManager )
288
285
throws IllegalArgumentException {
289
286
config ().deactivateInstrumentations (configurationValue );
290
287
}
@@ -296,7 +293,7 @@ public static final class LoggingLevel extends ConfigOption {
296
293
}
297
294
298
295
@ Override
299
- void update (String configurationValue , CentralConfigurationManager centralConfigurationManager )
296
+ void update (String configurationValue , OpampManager opampManager )
300
297
throws IllegalArgumentException {
301
298
AgentLog .setLevel (configurationValue );
302
299
}
@@ -308,17 +305,14 @@ public static final class PollingInterval extends ConfigOption {
308
305
}
309
306
310
307
@ Override
311
- void update (String configurationValue , CentralConfigurationManager centralConfigurationManager )
308
+ void update (String configurationValue , OpampManager opampManager )
312
309
throws IllegalArgumentException {
313
- if (centralConfigurationManager instanceof CentralConfigurationManagerImpl ) {
314
- try {
315
- Duration duration = Duration .parse ("PT" + configurationValue );
316
- ((CentralConfigurationManagerImpl ) centralConfigurationManager )
317
- .resetPeriodicDelay (duration );
318
- } catch (DateTimeParseException e ) {
319
- logger .warning (
320
- "Failed to update the polling interval, value passed was invalid: " + e .getMessage ());
321
- }
310
+ try {
311
+ Duration duration = Duration .parse ("PT" + configurationValue );
312
+ opampManager .setPollingDelay (duration );
313
+ } catch (DateTimeParseException e ) {
314
+ logger .warning (
315
+ "Failed to update the polling interval, value passed was invalid: " + e .getMessage ());
322
316
}
323
317
}
324
318
}
0 commit comments