11package io .flamingock .examples .inventory ;
22
33import com .mongodb .client .MongoClient ;
4+ import io .confluent .kafka .schemaregistry .client .CachedSchemaRegistryClient ;
5+ import io .confluent .kafka .schemaregistry .client .SchemaRegistryClient ;
46import io .flamingock .community .mongodb .sync .driver .MongoDBSyncAuditStore ;
7+ import io .flamingock .examples .inventory .util .KafkaSchemaManager ;
8+ import io .flamingock .examples .inventory .util .LaunchDarklyClient ;
59import io .flamingock .examples .inventory .util .MongoDBUtil ;
610import io .flamingock .internal .core .store .CommunityAuditStore ;
711import io .flamingock .targetsystem .nontransactional .NonTransactionalTargetSystem ;
812import io .flamingock .targetystem .mongodb .sync .MongoDBSyncTargetSystem ;
13+ import org .apache .kafka .clients .admin .AdminClient ;
14+ import org .apache .kafka .clients .admin .AdminClientConfig ;
15+ import org .springframework .beans .factory .annotation .Value ;
916import org .springframework .context .annotation .Bean ;
1017import org .springframework .context .annotation .Configuration ;
1118
19+ import jakarta .annotation .PreDestroy ;
20+ import java .time .Duration ;
21+ import java .util .Collections ;
22+ import java .util .Properties ;
23+
1224@ Configuration
1325public class FlamingockConfig {
1426
15- @ Bean
27+ @ Value ("${mongodb.uri:mongodb://localhost:27017/}" )
28+ private String mongodbUri ;
29+
30+ @ Value ("${kafka.bootstrap-servers:localhost:9092}" )
31+ private String kafkaBootstrapServers ;
32+
33+ @ Value ("${kafka.schema-registry-url:http://localhost:8081}" )
34+ private String schemaRegistryUrl ;
35+
36+ @ Value ("${launchdarkly.api-url:http://localhost:8765/api/v2}" )
37+ private String launchDarklyApiUrl ;
38+
39+ private AdminClient kafkaAdminClient ;
40+
41+ @ Bean (destroyMethod = "close" )
1642 public MongoClient mongoClient () {
17- return MongoDBUtil .getMongoClient ("mongodb://localhost:27017/" );
43+ return MongoDBUtil .getMongoClient (mongodbUri );
1844 }
1945
2046 @ Bean
@@ -24,12 +50,29 @@ public MongoDBSyncTargetSystem mongoDBSyncTargetSystem(MongoClient mongoClient)
2450
2551 @ Bean
2652 public NonTransactionalTargetSystem kafkaTargetSystem () throws Exception {
27- return TargetSystems .kafkaTargetSystem ();
53+ SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient (
54+ Collections .singletonList (schemaRegistryUrl ),
55+ 100
56+ );
57+
58+ Properties kafkaProps = new Properties ();
59+ kafkaProps .put (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , kafkaBootstrapServers );
60+ this .kafkaAdminClient = AdminClient .create (kafkaProps );
61+
62+ KafkaSchemaManager schemaManager = new KafkaSchemaManager (schemaRegistryClient , kafkaAdminClient );
63+ schemaManager .createTopicIfNotExists ("order-created" , 3 , (short ) 1 );
64+ return new NonTransactionalTargetSystem (TargetSystems .KAFKA_TARGET_SYSTEM ).addDependency (schemaManager );
2865 }
2966
3067 @ Bean
3168 public NonTransactionalTargetSystem toggleTargetSystem () {
32- return TargetSystems .toggleTargetSystem ();
69+ LaunchDarklyClient launchDarklyClient = new LaunchDarklyClient (
70+ "demo-token" ,
71+ "inventory-service" ,
72+ "production" ,
73+ launchDarklyApiUrl
74+ );
75+ return new NonTransactionalTargetSystem (TargetSystems .FEATURE_FLAG_TARGET_SYSTEM ).addDependency (launchDarklyClient );
3376 }
3477
3578
@@ -38,4 +81,11 @@ public NonTransactionalTargetSystem toggleTargetSystem() {
3881 public CommunityAuditStore auditStore (MongoDBSyncTargetSystem mongoDBSyncTargetSystem ) {
3982 return MongoDBSyncAuditStore .from (mongoDBSyncTargetSystem );
4083 }
84+
85+ @ PreDestroy
86+ public void cleanup () {
87+ if (kafkaAdminClient != null ) {
88+ kafkaAdminClient .close (Duration .ofSeconds (2 ));
89+ }
90+ }
4191}
0 commit comments