@@ -135,7 +135,7 @@ protected <T> Mono<List<ConfigurationItem<T>>> doGet(String appId, String group,
135135 .clientId (UUID .randomUUID ().toString ())
136136 .configuration (configurationName )
137137 .clientConfigurationVersion (clientConfigurationVersion )
138- .environment (AwsCapaConfigurationProperties .AppConfigProperties .Settings .getAwsAppConfigEnv ())
138+ .environment (AwsCapaConfigurationProperties .AppConfigProperties .Settings .getConfigAwsAppConfigEnv ())
139139 .build ();
140140
141141 return Mono .fromFuture (() -> appConfigAsyncClient .getConfiguration (request ))
@@ -182,30 +182,30 @@ private synchronized <T> Configuration<T> initConfig(String applicationName, Str
182182 return Configuration .EMPTY ;
183183 }
184184 return Mono .create (monoSink -> {
185- AwsCapaConfigurationScheduler .INSTANCE .configInitScheduler
186- .schedule (() -> {
187- String version = getCurVersion (applicationName , configurationName );
188-
189- GetConfigurationRequest request = GetConfigurationRequest .builder ()
190- .application (applicationName )
191- .clientId (UUID .randomUUID ().toString ())
192- .configuration (configurationName )
193- .clientConfigurationVersion (version )
194- .environment (AwsCapaConfigurationProperties .AppConfigProperties .Settings .getAwsAppConfigEnv ())
195- .build ();
196-
197- GetConfigurationResponse resp = null ;
198- try {
199- resp = appConfigAsyncClient .getConfiguration (request ).get ();
200- } catch (InterruptedException | ExecutionException e ) {
201- LOGGER .error ("error occurs when getConfiguration,configurationName:{},version:{}" , request .configuration (), request .clientConfigurationVersion (), e );
202- }
203- if (resp != null && !Objects .equals (resp .configurationVersion (), version )) {
204- Configuration <T > tConfiguration = initConfigurationItem (applicationName , configurationName , type , resp .content (), resp .configurationVersion ());
205- monoSink .success (tConfiguration );
206- }
207- });
208- })
185+ AwsCapaConfigurationScheduler .INSTANCE .configInitScheduler
186+ .schedule (() -> {
187+ String version = getCurVersion (applicationName , configurationName );
188+
189+ GetConfigurationRequest request = GetConfigurationRequest .builder ()
190+ .application (applicationName )
191+ .clientId (UUID .randomUUID ().toString ())
192+ .configuration (configurationName )
193+ .clientConfigurationVersion (version )
194+ .environment (AwsCapaConfigurationProperties .AppConfigProperties .Settings .getConfigAwsAppConfigEnv ())
195+ .build ();
196+
197+ GetConfigurationResponse resp = null ;
198+ try {
199+ resp = appConfigAsyncClient .getConfiguration (request ).get ();
200+ } catch (InterruptedException | ExecutionException e ) {
201+ LOGGER .error ("error occurs when getConfiguration,configurationName:{},version:{}" , request .configuration (), request .clientConfigurationVersion (), e );
202+ }
203+ if (resp != null && !Objects .equals (resp .configurationVersion (), version )) {
204+ Configuration <T > tConfiguration = initConfigurationItem (applicationName , configurationName , type , resp .content (), resp .configurationVersion ());
205+ monoSink .success (tConfiguration );
206+ }
207+ });
208+ })
209209 .map (resp -> (Configuration <T >) resp )
210210 .block ();
211211 }
@@ -224,33 +224,33 @@ private synchronized <T> void createSubscribe(String applicationName, String con
224224 return ;
225225 }
226226 Flux .create (fluxSink -> {
227- AwsCapaConfigurationScheduler .INSTANCE .configSubscribePollingScheduler
228- .schedulePeriodically (() -> {
229- String version = getCurVersion (applicationName , configurationName );
230-
231- GetConfigurationRequest request = GetConfigurationRequest .builder ()
232- .application (applicationName )
233- .clientId (UUID .randomUUID ().toString ())
234- .configuration (configurationName )
235- .clientConfigurationVersion (version )
236- .environment (AwsCapaConfigurationProperties .AppConfigProperties .Settings .getAwsAppConfigEnv ())
237- .build ();
238-
239- GetConfigurationResponse resp = null ;
240- try {
241- resp = appConfigAsyncClient .getConfiguration (request ).get ();
242- } catch (InterruptedException | ExecutionException e ) {
243- LOGGER .error ("error occurs when getConfiguration,configurationName:{},version:{}" , request .configuration (), request .clientConfigurationVersion (), e );
244- }
245- // update subscribed status if needs
246- getConfiguration (applicationName , configurationName ).getSubscribed ().compareAndSet (false , true );
247-
248- if (resp != null && !Objects .equals (resp .configurationVersion (), version )) {
249- fluxSink .next (resp );
250- }
251- // todo: make the polling frequency configurable
252- }, 0 , 1 , TimeUnit .SECONDS );
253- })
227+ AwsCapaConfigurationScheduler .INSTANCE .configSubscribePollingScheduler
228+ .schedulePeriodically (() -> {
229+ String version = getCurVersion (applicationName , configurationName );
230+
231+ GetConfigurationRequest request = GetConfigurationRequest .builder ()
232+ .application (applicationName )
233+ .clientId (UUID .randomUUID ().toString ())
234+ .configuration (configurationName )
235+ .clientConfigurationVersion (version )
236+ .environment (AwsCapaConfigurationProperties .AppConfigProperties .Settings .getConfigAwsAppConfigEnv ())
237+ .build ();
238+
239+ GetConfigurationResponse resp = null ;
240+ try {
241+ resp = appConfigAsyncClient .getConfiguration (request ).get ();
242+ } catch (InterruptedException | ExecutionException e ) {
243+ LOGGER .error ("error occurs when getConfiguration,configurationName:{},version:{}" , request .configuration (), request .clientConfigurationVersion (), e );
244+ }
245+ // update subscribed status if needs
246+ getConfiguration (applicationName , configurationName ).getSubscribed ().compareAndSet (false , true );
247+
248+ if (resp != null && !Objects .equals (resp .configurationVersion (), version )) {
249+ fluxSink .next (resp );
250+ }
251+ // todo: make the polling frequency configurable
252+ }, 0 , 1 , TimeUnit .SECONDS );
253+ })
254254 .publishOn (AwsCapaConfigurationScheduler .INSTANCE .configPublisherScheduler )
255255 .map (origin -> {
256256 GetConfigurationResponse resp = (GetConfigurationResponse ) origin ;
@@ -268,11 +268,12 @@ private <T> Flux<SubscribeResp<T>> doSub(String applicationName, String configur
268268 if (Objects .equals (configuration , Configuration .EMPTY )) {
269269 return Flux .empty ();
270270 }
271- return Flux .create (fluxSink -> {
272- configuration .addListener (configurationItem -> {
273- fluxSink .next (configurationItem );
274- });
275- })
271+ return Flux
272+ .create (fluxSink -> {
273+ configuration .addListener (configurationItem -> {
274+ fluxSink .next (configurationItem );
275+ });
276+ })
276277 .map (resp -> (ConfigurationItem <T >) resp )
277278 .map (resp -> convert (resp , appId ));
278279 }
0 commit comments