4747import opamp .proto .RemoteConfigStatuses ;
4848import opamp .proto .ServerErrorResponse ;
4949
50- public final class OpampManager implements Closeable , OpampClient . Callbacks {
50+ public final class OpampManager implements Closeable {
5151 private final Configuration configuration ;
52- private final DslJson <Object > dslJson = new DslJson <>(new DslJson .Settings <>());
53- private final Logger logger = Logger .getLogger (OpampManager .class .getName ());
5452 private volatile OpampClient client ;
5553 private volatile MutablePeriodicDelay pollingDelay ;
56- private volatile CentralConfigurationProcessor processor ;
5754
5855 private OpampManager (Configuration configuration ) {
5956 this .configuration = configuration ;
6057 }
6158
6259 public void start (CentralConfigurationProcessor processor ) {
63- this .processor = processor ;
6460 pollingDelay = new MutablePeriodicDelay (configuration .pollingInterval );
6561
6662 OpampClientBuilder builder = OpampClient .builder ();
@@ -87,86 +83,100 @@ public void start(CentralConfigurationProcessor processor) {
8783 }
8884 PeriodicDelay retryDelay = RetryPeriodicDelay .create (configuration .pollingInterval );
8985 builder .setRequestService (HttpRequestService .create (httpSender , pollingDelay , retryDelay ));
90- client = builder .build (this );
86+ client = builder .build (new OpampCallbacks ( processor ) );
9187 }
9288
9389 @ Override
9490 public void close () throws IOException {
9591 client .close ();
9692 }
9793
98- @ Override
99- public void onMessage (MessageData messageData ) {
100- logger .log (Level .FINE , "onMessage({0}, {1})" , new Object [] {client , messageData });
101- AgentRemoteConfig remoteConfig = messageData .getRemoteConfig ();
102- if (remoteConfig != null ) {
103- processRemoteConfig (client , remoteConfig );
94+ private static class OpampCallbacks implements OpampClient .Callbacks {
95+ private final Logger logger = Logger .getLogger (OpampCallbacks .class .getName ());
96+ private final CentralConfigurationProcessor processor ;
97+ private final DslJson <Object > dslJson ;
98+
99+ private OpampCallbacks (CentralConfigurationProcessor processor ) {
100+ this .processor = processor ;
101+ this .dslJson = new DslJson <>(new DslJson .Settings <>());
104102 }
105- }
106103
107- private void processRemoteConfig (OpampClient client , AgentRemoteConfig remoteConfig ) {
108- Map <String , AgentConfigFile > configMapMap = remoteConfig .config .config_map ;
109- AgentConfigFile centralConfig = configMapMap .get ("elastic" );
110- if (centralConfig != null ) {
111- Map <String , String > configuration = parseCentralConfiguration (centralConfig .body );
112- RemoteConfigStatuses status ;
113-
114- if (configuration != null ) {
115- CentralConfigurationProcessor .Result result = processor .process (configuration );
116- status =
117- (result == CentralConfigurationProcessor .Result .SUCCESS )
118- ? RemoteConfigStatuses .RemoteConfigStatuses_APPLIED
119- : RemoteConfigStatuses .RemoteConfigStatuses_FAILED ;
120- } else {
121- status = RemoteConfigStatuses .RemoteConfigStatuses_FAILED ;
104+ @ Override
105+ public void onMessage (OpampClient client , MessageData messageData ) {
106+ logger .log (Level .FINE , "onMessage({0}, {1})" , new Object [] {client , messageData });
107+ AgentRemoteConfig remoteConfig = messageData .getRemoteConfig ();
108+ if (remoteConfig != null ) {
109+ processRemoteConfig (client , remoteConfig );
122110 }
111+ }
123112
124- // Note if FAILED is sent, the config change is effectively dropped as the server will not
125- // re-send it
126- client .setRemoteConfigStatus (getRemoteConfigStatus (status , remoteConfig .config_hash ));
113+ private void processRemoteConfig (OpampClient client , AgentRemoteConfig remoteConfig ) {
114+ Map <String , AgentConfigFile > configMapMap = remoteConfig .config .config_map ;
115+ AgentConfigFile centralConfig = configMapMap .get ("elastic" );
116+ if (centralConfig != null ) {
117+ Map <String , String > configuration = parseCentralConfiguration (centralConfig .body );
118+ RemoteConfigStatuses status ;
119+
120+ if (configuration != null ) {
121+ CentralConfigurationProcessor .Result result = processor .process (configuration );
122+ status =
123+ (result == CentralConfigurationProcessor .Result .SUCCESS )
124+ ? RemoteConfigStatuses .RemoteConfigStatuses_APPLIED
125+ : RemoteConfigStatuses .RemoteConfigStatuses_FAILED ;
126+ } else {
127+ status = RemoteConfigStatuses .RemoteConfigStatuses_FAILED ;
128+ }
129+
130+ // Note if FAILED is sent, the config change is effectively dropped as the server will not
131+ // re-send it
132+ client .setRemoteConfigStatus (getRemoteConfigStatus (status , remoteConfig .config_hash ));
133+ }
127134 }
128- }
129135
130- private static RemoteConfigStatus getRemoteConfigStatus (
131- RemoteConfigStatuses status , ByteString hash ) {
132- if (hash != null && status == RemoteConfigStatuses .RemoteConfigStatuses_APPLIED ) {
133- return new RemoteConfigStatus .Builder ().status (status ).last_remote_config_hash (hash ).build ();
134- } else {
135- return new RemoteConfigStatus .Builder ().status (status ).build ();
136+ private static RemoteConfigStatus getRemoteConfigStatus (
137+ RemoteConfigStatuses status , ByteString hash ) {
138+ if (hash != null && status == RemoteConfigStatuses .RemoteConfigStatuses_APPLIED ) {
139+ return new RemoteConfigStatus .Builder ()
140+ .status (status )
141+ .last_remote_config_hash (hash )
142+ .build ();
143+ } else {
144+ return new RemoteConfigStatus .Builder ().status (status ).build ();
145+ }
136146 }
137- }
138147
139- private Map <String , String > parseCentralConfiguration (ByteString centralConfig ) {
140- try {
141- byte [] centralConfigBytes = centralConfig .toByteArray ();
142- if (centralConfigBytes .length == 0 ) {
143- logger .log (
144- Level .WARNING ,
145- "No central configuration returned - is this connected to an EDOT collector above 18.8?" );
148+ private Map <String , String > parseCentralConfiguration (ByteString centralConfig ) {
149+ try {
150+ byte [] centralConfigBytes = centralConfig .toByteArray ();
151+ if (centralConfigBytes .length == 0 ) {
152+ logger .log (
153+ Level .WARNING ,
154+ "No central configuration returned - is this connected to an EDOT collector above 18.8?" );
155+ return null ;
156+ }
157+ JsonReader <Object > reader = dslJson .newReader (centralConfig .toByteArray ());
158+ reader .startObject ();
159+ return Collections .unmodifiableMap (MapConverter .deserialize (reader ));
160+ } catch (IOException e ) {
161+ logger .log (Level .WARNING , "Could not parse central configuration." , e );
146162 return null ;
147163 }
148- JsonReader <Object > reader = dslJson .newReader (centralConfig .toByteArray ());
149- reader .startObject ();
150- return Collections .unmodifiableMap (MapConverter .deserialize (reader ));
151- } catch (IOException e ) {
152- logger .log (Level .WARNING , "Could not parse central configuration." , e );
153- return null ;
154164 }
155- }
156165
157- @ Override
158- public void onConnect () {
159- logger .log (Level .INFO , "onConnect({0})" , client );
160- }
166+ @ Override
167+ public void onConnect (OpampClient client ) {
168+ logger .log (Level .INFO , "onConnect({0})" , client );
169+ }
161170
162- @ Override
163- public void onConnectFailed (@ Nullable Throwable throwable ) {
164- logger .log (Level .INFO , "onConnect({0}, {1})" , new Object [] {client , throwable });
165- }
171+ @ Override
172+ public void onConnectFailed (OpampClient client , @ Nullable Throwable throwable ) {
173+ logger .log (Level .INFO , "onConnect({0}, {1})" , new Object [] {client , throwable });
174+ }
166175
167- @ Override
168- public void onErrorResponse (@ Nonnull ServerErrorResponse errorResponse ) {
169- logger .log (Level .INFO , "onErrorResponse({0}, {1})" , new Object [] {client , errorResponse });
176+ @ Override
177+ public void onErrorResponse (OpampClient client , @ Nonnull ServerErrorResponse errorResponse ) {
178+ logger .log (Level .INFO , "onErrorResponse({0}, {1})" , new Object [] {client , errorResponse });
179+ }
170180 }
171181
172182 public void setPollingDelay (@ Nonnull Duration duration ) {
0 commit comments