4040import java .util .List ;
4141import java .util .Map ;
4242import java .util .Objects ;
43+ import java .util .Optional ;
4344import java .util .UUID ;
4445import java .util .concurrent .ConcurrentHashMap ;
4546import java .util .concurrent .ExecutionException ;
@@ -82,6 +83,17 @@ public AwsCapaConfiguration(CapaObjectSerializer objectSerializer) {
8283 serializerProcessor = new SerializerProcessor (objectSerializer );
8384 }
8485
86+ @ Override
87+ public String stopSubscribe () {
88+ AwsCapaConfigurationScheduler .INSTANCE .configSubscribePollingScheduler .dispose ();
89+ return "success" ;
90+ }
91+
92+ @ Override
93+ public void close () {
94+ //no need
95+ }
96+
8597 @ Override
8698 protected void doInit (StoreConfig storeConfig ) {
8799 //no need
@@ -93,10 +105,8 @@ protected <T> Mono<List<ConfigurationItem<T>>> doGet(String appId, String group,
93105 if (CollectionUtils .isNullOrEmpty (keys )) {
94106 return Mono .error (new IllegalArgumentException ("keys is null or empty" ));
95107 }
96-
97108 //todo:need to get the specific env from system properties
98109 String applicationName = appId + "_FAT" ;
99-
100110 String configurationName = keys .get (0 );
101111 String clientConfigurationVersion = getCurVersion (applicationName , configurationName );
102112
@@ -116,7 +126,10 @@ protected <T> Mono<List<ConfigurationItem<T>>> doGet(String appId, String group,
116126 items .add ((ConfigurationItem <T >) getCurConfigurationItem (applicationName , configurationName ));
117127 } else {
118128 //if version changes,update versionMap and return
119- items .add (updateConfigurationItem (applicationName , configurationName , type , resp .content (), resp .configurationVersion ()));
129+ Configuration <T > tConfiguration = updateConfigurationItem (applicationName , configurationName , type , resp .content (), resp .configurationVersion ());
130+ if (tConfiguration != null ) {
131+ items .add (tConfiguration .getConfigurationItem ());
132+ }
120133 }
121134 return items ;
122135 });
@@ -126,12 +139,20 @@ protected <T> Mono<List<ConfigurationItem<T>>> doGet(String appId, String group,
126139 protected <T > Flux <SubscribeResp <T >> doSubscribe (String appId , String group , String label , List <String > keys , Map <String , String > metadata , TypeRef <T > type ) {
127140 //todo:need to get the specific env from system properties
128141 String applicationName = appId + "_FAT" ;
129-
130142 String configurationName = keys .get (0 );
131143
132- return Flux .create (fluxSink -> {
133- AwsCapaConfigurationScheduler .INSTANCE .configSubscribePollingScheduler
134- .schedulePeriodically (() -> {
144+ initSubscribe (applicationName , configurationName , group , label , metadata , type );
145+ return doSub (applicationName , configurationName , group , label , metadata , type , appId );
146+ }
147+
148+ private synchronized <T > Mono <Boolean > initConfig (String applicationName , String configurationName , String group , String label , Map <String , String > metadata , TypeRef <T > type ) {
149+ //double check whether has been initialized
150+ if (isInitialized (applicationName , configurationName )) {
151+ return Mono .just (true );
152+ }
153+ return Mono .create (monoSink -> {
154+ AwsCapaConfigurationScheduler .INSTANCE .configInitScheduler
155+ .schedule (() -> {
135156 String version = getCurVersion (applicationName , configurationName );
136157
137158 GetConfigurationRequest request = GetConfigurationRequest .builder ()
@@ -149,45 +170,90 @@ protected <T> Flux<SubscribeResp<T>> doSubscribe(String appId, String group, Str
149170 LOGGER .error ("error occurs when getConfiguration,configurationName:{},version:{}" , request .configuration (), request .clientConfigurationVersion (), e );
150171 }
151172 if (resp != null && !Objects .equals (resp .configurationVersion (), version )) {
152- /*
153- the reason why not use publisher scheduler to update configuration item is that switch thread needs time,
154- when the polling frequency is high,the second polling request may happens before the first request has been
155- update successfully by publisher thread. In that case,the subscriber may receive several signals for one actual
156- change event.
157- */
158- ConfigurationItem <T > configurationItem = updateConfigurationItem (applicationName , configurationName , type , resp .content (), resp .configurationVersion ());
159- SubscribeResp <T > subscribeResp = convertToSubscribeResp (configurationItem , appId );
160- fluxSink .next (subscribeResp );
173+ initConfigurationItem (applicationName , configurationName , type , resp .content (), resp .configurationVersion ());
174+ monoSink .success (true );
161175 }
162- }, 0 , 1 , TimeUnit .SECONDS );
176+ }, 0 , TimeUnit .SECONDS );
163177 });
164178 }
165179
166- private <T > SubscribeResp <T > convertToSubscribeResp (ConfigurationItem <T > item , String appId ) {
167- SubscribeResp <T > resp = new SubscribeResp <>();
168- resp .setStoreName (AWS_APP_CONFIG_NAME );
169- resp .setAppId (appId );
170- resp .setItems (Lists .newArrayList (item ));
171- return resp ;
180+ private <T > void initSubscribe (String applicationName , String configurationName , String group , String label , Map <String , String > metadata , TypeRef <T > type ) {
181+ if (!isInitialized (applicationName , configurationName )) {
182+ initConfig (applicationName , configurationName , group , label , metadata , type ).block ();
183+ }
184+ if (!isSubscribed (applicationName , configurationName )) {
185+ createSubscribe (applicationName , configurationName , type );
186+ }
172187 }
173188
174- @ Override
175- public String stopSubscribe () {
176- AwsCapaConfigurationScheduler .INSTANCE .configSubscribePollingScheduler .dispose ();
177- return "success" ;
189+ private synchronized <T > void createSubscribe (String applicationName , String configurationName , TypeRef <T > type ) {
190+ if (isSubscribed (applicationName , configurationName )) {
191+ return ;
192+ }
193+ Flux .create (fluxSink -> {
194+ AwsCapaConfigurationScheduler .INSTANCE .configSubscribePollingScheduler
195+ .schedulePeriodically (() -> {
196+ String version = getCurVersion (applicationName , configurationName );
197+
198+ GetConfigurationRequest request = GetConfigurationRequest .builder ()
199+ .application (applicationName )
200+ .clientId (UUID .randomUUID ().toString ())
201+ .configuration (configurationName )
202+ .clientConfigurationVersion (version )
203+ .environment (DEFAULT_ENV )
204+ .build ();
205+
206+ GetConfigurationResponse resp = null ;
207+ try {
208+ resp = appConfigAsyncClient .getConfiguration (request ).get ();
209+ } catch (InterruptedException | ExecutionException e ) {
210+ LOGGER .error ("error occurs when getConfiguration,configurationName:{},version:{}" , request .configuration (), request .clientConfigurationVersion (), e );
211+ }
212+ //update subscribed status if needs
213+ getConfiguration (applicationName , configurationName ).getSubscribed ().compareAndSet (false , true );
214+
215+ if (resp != null && !Objects .equals (resp .configurationVersion (), version )) {
216+ fluxSink .next (resp );
217+ }
218+ //todo:make the polling frequency configurable
219+ }, 0 , 1 , TimeUnit .SECONDS );
220+ })
221+ .publishOn (AwsCapaConfigurationScheduler .INSTANCE .configPublisherScheduler )
222+ .map (origin -> {
223+ GetConfigurationResponse resp = (GetConfigurationResponse ) origin ;
224+ Configuration configuration = updateConfigurationItem (applicationName , configurationName , type , resp .content (), resp .configurationVersion ());
225+ return configuration == null ? Configuration .EMPTY : configuration ;
226+ }).filter (resp -> resp != Configuration .EMPTY )
227+ .subscribe (resp -> {
228+ resp .triggers (resp .getConfigurationItem ());
229+ });
178230 }
179231
180- @ Override
181- public void close () {
182- //no need
232+ private <T > Flux <SubscribeResp <T >> doSub (String applicationName , String configurationName , String group , String label , Map <String , String > metadata , TypeRef <T > type , String appId ) {
233+ Configuration <?> configuration = getConfiguration (applicationName , configurationName );
234+ return Flux .create (fluxSink -> {
235+ configuration .addListener (configurationItem -> {
236+ fluxSink .next (configurationItem );
237+ });
238+ })
239+ .map (resp -> (ConfigurationItem <T >) resp )
240+ .map (resp -> convert (resp , appId ));
241+ }
242+
243+ private <T > SubscribeResp <T > convert (ConfigurationItem <T > conf , String appId ) {
244+ SubscribeResp <T > subscribeResp = new SubscribeResp <>();
245+ subscribeResp .setItems (Lists .newArrayList (conf ));
246+ subscribeResp .setAppId (appId );
247+ subscribeResp .setStoreName (AWS_APP_CONFIG_NAME );
248+ return subscribeResp ;
183249 }
184250
185251 /**
186252 * get current version
187253 * ps:version can be null
188254 *
189- * @param applicationName
190- * @param configurationName
255+ * @param applicationName applicationName
256+ * @param configurationName configurationName
191257 * @return current version
192258 */
193259 private String getCurVersion (String applicationName , String configurationName ) {
@@ -208,34 +274,81 @@ private ConfigurationItem<?> getCurConfigurationItem(String applicationName, Str
208274 return configurationItem ;
209275 }
210276
211- private synchronized <T > ConfigurationItem <T > updateConfigurationItem (String applicationName , String configurationName , TypeRef <T > type , SdkBytes contentSdkBytes , String version ) {
277+ /**
278+ * @param applicationName applicationName
279+ * @param configurationName configurationName
280+ * @param type type of content
281+ * @param contentSdkBytes content value with SdkBytes type
282+ * @param version new version
283+ * @param <T> T
284+ * @return return the new value or null if not actually update
285+ */
286+ private <T > Configuration <T > updateConfigurationItem (String applicationName , String configurationName , TypeRef <T > type , SdkBytes contentSdkBytes , String version ) {
212287 ConcurrentHashMap <String , Configuration <?>> configMap = versionMap .get (applicationName );
213- if (configMap == null ) {
214- configMap = new ConcurrentHashMap <>();
215- T content = serializerProcessor .deserialize (contentSdkBytes , type , configurationName );
216288
217- Configuration <T > configuration = new Configuration <>();
289+ //in fact,configMap.get(configurationName) is always not null, as it has been initialized in initialization process
290+ Configuration <T > configuration = (Configuration <T >) configMap .get (configurationName );
291+
292+ synchronized (configuration .lock ) {
293+ //check whether content has been updated by other thread
294+ if (configMap .containsKey (configurationName ) && Objects .equals (configMap .get (configurationName ).getClientConfigurationVersion (), version )) {
295+ return null ;
296+ }
297+ //do need to update
298+ T content = serializerProcessor .deserialize (contentSdkBytes , type , configurationName );
218299 configuration .setClientConfigurationVersion (version );
219300
220- ConfigurationItem <T > configurationItem = new ConfigurationItem <>();
221- configurationItem .setKey (configurationName );
301+ ConfigurationItem <T > configurationItem = Optional .ofNullable (configuration .getConfigurationItem ()).orElse (new ConfigurationItem <>());
222302 configurationItem .setContent (content );
223303 configuration .setConfigurationItem (configurationItem );
224304
225305 configMap .put (configurationName , configuration );
306+ return configuration ;
307+ }
308+ }
309+
310+ private <T > Configuration <T > initConfigurationItem (String applicationName , String configurationName , TypeRef <T > type , SdkBytes contentSdkBytes , String version ) {
311+ ConcurrentHashMap <String , Configuration <?>> configMap = versionMap .get (applicationName );
312+ boolean initApplication = false ;
313+ if (configMap == null ) {
314+ configMap = new ConcurrentHashMap <>();
315+ initApplication = true ;
316+ }
317+
318+ Configuration <T > configuration = new Configuration <>();
319+ configuration .setClientConfigurationVersion (version );
320+ configuration .getInitialized ().compareAndSet (false , true );
321+
322+ ConfigurationItem <T > configurationItem = new ConfigurationItem <>();
323+ configurationItem .setKey (configurationName );
324+ T content = serializerProcessor .deserialize (contentSdkBytes , type , configurationName );
325+ configurationItem .setContent (content );
326+ configuration .setConfigurationItem (configurationItem );
327+
328+ configMap .put (configurationName , configuration );
329+
330+ if (initApplication ) {
226331 versionMap .put (applicationName , configMap );
227- return configurationItem ;
228- } else {
229- T content = serializerProcessor .deserialize (contentSdkBytes , type , configurationName );
230- Configuration <T > configuration = new Configuration <>();
231- configuration .setClientConfigurationVersion (version );
332+ }
333+ return configuration ;
334+ }
232335
233- ConfigurationItem <T > configurationItem = new ConfigurationItem <>();
234- configurationItem .setKey (configurationName );
235- configurationItem .setContent (content );
236- configuration .setConfigurationItem (configurationItem );
237- configMap .put (configurationName , configuration );
238- return configurationItem ;
336+ private boolean isInitialized (String applicationName , String configurationName ) {
337+ ConcurrentHashMap <String , Configuration <?>> configMap = versionMap .get (applicationName );
338+ return configMap != null && configMap .containsKey (configurationName ) && configMap .get (configurationName ).getInitialized ().get ();
339+ }
340+
341+ private boolean isSubscribed (String applicationName , String configurationName ) {
342+ ConcurrentHashMap <String , Configuration <?>> configMap = versionMap .get (applicationName );
343+ return configMap != null && configMap .containsKey (configurationName ) && configMap .get (configurationName ).getInitialized ().get () && configMap .get (configurationName ).getSubscribed ().get ();
344+ }
345+
346+ private Configuration <?> getConfiguration (String applicationName , String configurationName ) {
347+ ConcurrentHashMap <String , Configuration <?>> configMap = versionMap .get (applicationName );
348+ if (configMap != null && configMap .containsKey (configurationName )) {
349+ return configMap .get (configurationName );
239350 }
351+ return Configuration .EMPTY ;
240352 }
353+
241354}
0 commit comments