1616 */
1717package group .rxcloud .capa .spi .aws .config ;
1818
19- import group . rxcloud . capa . component . configstore . CapaConfigStore ;
19+ import com . google . common . collect . Lists ;
2020import group .rxcloud .capa .component .configstore .ConfigurationItem ;
21- import group .rxcloud .capa .component .configstore .GetRequest ;
2221import group .rxcloud .capa .component .configstore .StoreConfig ;
23- import group .rxcloud .capa .component .configstore .SubscribeReq ;
2422import group .rxcloud .capa .component .configstore .SubscribeResp ;
2523import group .rxcloud .capa .infrastructure .serializer .CapaObjectSerializer ;
24+ import group .rxcloud .capa .spi .aws .config .common .serializer .SerializerProcessor ;
25+ import group .rxcloud .capa .spi .aws .config .entity .Configuration ;
26+ import group .rxcloud .capa .spi .aws .config .scheduler .AwsCapaConfigurationScheduler ;
27+ import group .rxcloud .capa .spi .configstore .CapaConfigStoreSpi ;
2628import group .rxcloud .cloudruntimes .utils .TypeRef ;
29+ import org .slf4j .Logger ;
30+ import org .slf4j .LoggerFactory ;
2731import reactor .core .publisher .Flux ;
2832import reactor .core .publisher .Mono ;
33+ import software .amazon .awssdk .core .SdkBytes ;
2934import software .amazon .awssdk .services .appconfig .AppConfigAsyncClient ;
35+ import software .amazon .awssdk .services .appconfig .model .GetConfigurationRequest ;
36+ import software .amazon .awssdk .services .appconfig .model .GetConfigurationResponse ;
37+ import software .amazon .awssdk .utils .CollectionUtils ;
3038
39+ import java .util .ArrayList ;
3140import java .util .List ;
41+ import java .util .Map ;
42+ import java .util .Objects ;
43+ import java .util .UUID ;
44+ import java .util .concurrent .ConcurrentHashMap ;
45+ import java .util .concurrent .ExecutionException ;
46+ import java .util .concurrent .TimeUnit ;
47+
48+ import static group .rxcloud .capa .spi .aws .config .common .constant .CapaAWSConstants .AWS_APP_CONFIG_NAME ;
49+ import static group .rxcloud .capa .spi .aws .config .common .constant .CapaAWSConstants .DEFAULT_ENV ;
3250
3351/**
34- * TODO load aws client from spi
52+ * @author Reckless Xu
3553 */
36- public class AwsCapaConfiguration extends CapaConfigStore {
54+ public class AwsCapaConfiguration extends CapaConfigStoreSpi {
55+
56+ private static final Logger LOGGER = LoggerFactory .getLogger (AwsCapaConfiguration .class );
3757
3858 private AppConfigAsyncClient appConfigAsyncClient ;
3959
60+ private SerializerProcessor serializerProcessor ;
61+
62+ /**
63+ * key of versionMap--applicationName,format:appid_ENV,e.g:"12345_FAT"
64+ * value of versionMap--configurationMap of this application,which contains all the configuration info for this application
65+ * <p>
66+ * key of configurationMap--configurationName;
67+ * value of configurationMap--configuration content and version of this configuration
68+ * <p>
69+ * ps:currentHashMap may not be necessary, as update in synchronized method
70+ */
71+ private static Map <String , ConcurrentHashMap <String , Configuration <?>>> versionMap ;
72+
4073 /**
4174 * Instantiates a new Capa configuration.
4275 *
@@ -45,40 +78,164 @@ public class AwsCapaConfiguration extends CapaConfigStore {
4578 public AwsCapaConfiguration (CapaObjectSerializer objectSerializer ) {
4679 super (objectSerializer );
4780 appConfigAsyncClient = AppConfigAsyncClient .create ();
81+ versionMap = new ConcurrentHashMap <>();
82+ serializerProcessor = new SerializerProcessor (objectSerializer );
4883 }
4984
5085 @ Override
5186 protected void doInit (StoreConfig storeConfig ) {
52-
87+ //no need
5388 }
5489
5590 @ Override
56- public <T > Mono <List <ConfigurationItem <T >>> get (GetRequest getRequest , TypeRef <T > typeRef ) {
57- return null ;
91+ protected <T > Mono <List <ConfigurationItem <T >>> doGet (String appId , String group , String label , List <String > keys , Map <String , String > metadata , TypeRef <T > type ) {
92+ List <ConfigurationItem <T >> items = new ArrayList <>();
93+ if (CollectionUtils .isNullOrEmpty (keys )) {
94+ return Mono .error (new IllegalArgumentException ("keys is null or empty" ));
95+ }
96+
97+ //todo:need to get the specific env from system properties
98+ String applicationName = appId + "_FAT" ;
99+
100+ String configurationName = keys .get (0 );
101+ String clientConfigurationVersion = getCurVersion (applicationName , configurationName );
102+
103+ GetConfigurationRequest request = GetConfigurationRequest .builder ()
104+ .application (applicationName )
105+ .clientId (UUID .randomUUID ().toString ())
106+ .configuration (configurationName )
107+ .clientConfigurationVersion (clientConfigurationVersion )
108+ .environment (DEFAULT_ENV )
109+ .build ();
110+
111+ return Mono .fromFuture (() -> appConfigAsyncClient .getConfiguration (request ))
112+ .publishOn (AwsCapaConfigurationScheduler .INSTANCE .configPublisherScheduler )
113+ .map (resp -> {
114+ //if version doesn't change, get from versionMap
115+ if (Objects .equals (clientConfigurationVersion , resp .configurationVersion ())) {
116+ items .add ((ConfigurationItem <T >) getCurConfigurationItem (applicationName , configurationName ));
117+ } else {
118+ //if version changes,update versionMap and return
119+ items .add (updateConfigurationItem (applicationName , configurationName , type , resp .content (), resp .configurationVersion ()));
120+ }
121+ return items ;
122+ });
58123 }
59124
60125 @ Override
61- public <T > Flux <SubscribeResp <T >> subscribe (SubscribeReq subscribeReq , TypeRef <T > typeRef ) {
62- return null ;
126+ protected <T > Flux <SubscribeResp <T >> doSubscribe (String appId , String group , String label , List <String > keys , Map <String , String > metadata , TypeRef <T > type ) {
127+ //todo:need to get the specific env from system properties
128+ String applicationName = appId + "_FAT" ;
129+
130+ String configurationName = keys .get (0 );
131+
132+ return Flux .create (fluxSink -> {
133+ AwsCapaConfigurationScheduler .INSTANCE .configSubscribePollingScheduler
134+ .schedulePeriodically (() -> {
135+ String version = getCurVersion (applicationName , configurationName );
136+
137+ GetConfigurationRequest request = GetConfigurationRequest .builder ()
138+ .application (applicationName )
139+ .clientId (UUID .randomUUID ().toString ())
140+ .configuration (configurationName )
141+ .clientConfigurationVersion (version )
142+ .environment (DEFAULT_ENV )
143+ .build ();
144+
145+ GetConfigurationResponse resp = null ;
146+ try {
147+ resp = appConfigAsyncClient .getConfiguration (request ).get ();
148+ } catch (InterruptedException | ExecutionException e ) {
149+ LOGGER .error ("error occurs when getConfiguration,configurationName:{},version:{}" , request .configuration (), request .clientConfigurationVersion (), e );
150+ }
151+ if (resp != null && !Objects .equals (resp .configurationVersion (), version )) {
152+ /*
153+ the reason why not use publisher scheduler to update configuration item is that switch thread needs time,
154+ when the polling frequency is high,the second polling request may happens before the first request has been
155+ update successfully by publisher thread. In that case,the subscriber may receive several signals for one actual
156+ change event.
157+ */
158+ ConfigurationItem <T > configurationItem = updateConfigurationItem (applicationName , configurationName , type , resp .content (), resp .configurationVersion ());
159+ SubscribeResp <T > subscribeResp = convertToSubscribeResp (configurationItem , appId );
160+ fluxSink .next (subscribeResp );
161+ }
162+ }, 0 , 1 , TimeUnit .SECONDS );
163+ });
164+ }
165+
166+ private <T > SubscribeResp <T > convertToSubscribeResp (ConfigurationItem <T > item , String appId ) {
167+ SubscribeResp <T > resp = new SubscribeResp <>();
168+ resp .setStoreName (AWS_APP_CONFIG_NAME );
169+ resp .setAppId (appId );
170+ resp .setItems (Lists .newArrayList (item ));
171+ return resp ;
63172 }
64173
65174 @ Override
66175 public String stopSubscribe () {
67- return null ;
176+ AwsCapaConfigurationScheduler .INSTANCE .configSubscribePollingScheduler .dispose ();
177+ return "success" ;
68178 }
69179
70180 @ Override
71- public String getDefaultGroup () {
72- return null ;
181+ public void close () {
182+ //no need
73183 }
74184
75- @ Override
76- public String getDefaultLabel () {
77- return null ;
185+ /**
186+ * get current version
187+ * ps:version can be null
188+ *
189+ * @param applicationName
190+ * @param configurationName
191+ * @return current version
192+ */
193+ private String getCurVersion (String applicationName , String configurationName ) {
194+ String version = null ;
195+ ConcurrentHashMap <String , Configuration <?>> configVersionMap = versionMap .get (applicationName );
196+ if (configVersionMap != null && configVersionMap .containsKey (configurationName )) {
197+ version = configVersionMap .get (configurationName ).getClientConfigurationVersion ();
198+ }
199+ return version ;
78200 }
79201
80- @ Override
81- public void close () throws Exception {
202+ private ConfigurationItem <?> getCurConfigurationItem (String applicationName , String configuration ) {
203+ ConfigurationItem <?> configurationItem = null ;
204+ ConcurrentHashMap <String , Configuration <?>> configMap = versionMap .get (applicationName );
205+ if (configMap != null && configMap .containsKey (configuration )) {
206+ configurationItem = configMap .get (configuration ).getConfigurationItem ();
207+ }
208+ return configurationItem ;
209+ }
210+
211+ private synchronized <T > ConfigurationItem <T > updateConfigurationItem (String applicationName , String configurationName , TypeRef <T > type , SdkBytes contentSdkBytes , String version ) {
212+ ConcurrentHashMap <String , Configuration <?>> configMap = versionMap .get (applicationName );
213+ if (configMap == null ) {
214+ configMap = new ConcurrentHashMap <>();
215+ T content = serializerProcessor .deserialize (contentSdkBytes , type , configurationName );
216+
217+ Configuration <T > configuration = new Configuration <>();
218+ configuration .setClientConfigurationVersion (version );
219+
220+ ConfigurationItem <T > configurationItem = new ConfigurationItem <>();
221+ configurationItem .setKey (configurationName );
222+ configurationItem .setContent (content );
223+ configuration .setConfigurationItem (configurationItem );
224+
225+ configMap .put (configurationName , configuration );
226+ versionMap .put (applicationName , configMap );
227+ return configurationItem ;
228+ } else {
229+ T content = serializerProcessor .deserialize (contentSdkBytes , type , configurationName );
230+ Configuration <T > configuration = new Configuration <>();
231+ configuration .setClientConfigurationVersion (version );
82232
233+ ConfigurationItem <T > configurationItem = new ConfigurationItem <>();
234+ configurationItem .setKey (configurationName );
235+ configurationItem .setContent (content );
236+ configuration .setConfigurationItem (configurationItem );
237+ configMap .put (configurationName , configuration );
238+ return configurationItem ;
239+ }
83240 }
84241}
0 commit comments