@@ -38,94 +38,103 @@ func (o *ServerOptions) Complete() error {
3838 snConfig := o .Options .LoadConfigOrDie ()
3939
4040 // If the key file is provided, use it to authenticate to StreamNative Cloud
41- if snConfig .KeyFile != "" {
42- issuer := snConfig .Auth .Issuer ()
41+ switch {
42+ case snConfig .KeyFile != "" :
43+ {
44+ issuer := snConfig .Auth .Issuer ()
4345
44- // authorize
45- flow , err := o .newClientCredentialsFlow (issuer , o .KeyFile )
46- if err != nil {
47- return errors .Wrap (err , "configuration error: unable to use client credential flow" )
48- }
49- grant , err := flow .Authorize ()
50- if err != nil {
51- return errors .Wrap (err , "activation failed" )
52- }
53-
54- // persist the authorization data
55- if err = o .Options .SaveGrant (issuer .Audience , * grant ); err != nil {
56- return errors .Wrap (err , "Unable to store the authorization data" )
57- }
46+ // authorize
47+ flow , err := o .newClientCredentialsFlow (issuer , o .KeyFile )
48+ if err != nil {
49+ return errors .Wrap (err , "configuration error: unable to use client credential flow" )
50+ }
51+ grant , err := flow .Authorize ()
52+ if err != nil {
53+ return errors .Wrap (err , "activation failed" )
54+ }
5855
59- err = config .InitSNCloudClient (
60- issuer .IssuerEndpoint , issuer .Audience , o .KeyFile , o .Options .Server , 30 * time .Second , o .Options .Store )
61- if err != nil {
62- return errors .Wrap (err , "failed to initialize StreamNative Cloud client" )
63- }
56+ // persist the authorization data
57+ if err = o .Options .SaveGrant (issuer .Audience , * grant ); err != nil {
58+ return errors .Wrap (err , "Unable to store the authorization data" )
59+ }
6460
65- if o . Options . PulsarInstance != "" && o . Options . PulsarCluster != "" {
66- err = mcp . SetContext ( o . Options , o .Options .PulsarInstance , o .Options .PulsarCluster )
61+ err = config . InitSNCloudClient (
62+ issuer . IssuerEndpoint , issuer . Audience , o . KeyFile , o .Options .Server , 30 * time . Second , o .Options .Store )
6763 if err != nil {
68- return errors .Wrap (err , "failed to set StreamNative Cloud context " )
64+ return errors .Wrap (err , "failed to initialize StreamNative Cloud client " )
6965 }
70- }
7166
72- if len (o .Features ) != 0 {
73- requiredFeatures := []mcp.McpFeature {
74- mcp .FeatureStreamNativeCloud ,
67+ if o .Options .PulsarInstance != "" && o .Options .PulsarCluster != "" {
68+ err = mcp .SetContext (o .Options , o .Options .PulsarInstance , o .Options .PulsarCluster )
69+ if err != nil {
70+ return errors .Wrap (err , "failed to set StreamNative Cloud context" )
71+ }
7572 }
76- for _ , feature := range requiredFeatures {
77- if ! slices .Contains (o .Features , string (feature )) {
78- o .Features = append (o .Features , string (feature ))
73+
74+ if len (o .Features ) != 0 {
75+ requiredFeatures := []mcp.Feature {
76+ mcp .FeatureStreamNativeCloud ,
77+ }
78+ for _ , feature := range requiredFeatures {
79+ if ! slices .Contains (o .Features , string (feature )) {
80+ o .Features = append (o .Features , string (feature ))
81+ }
7982 }
83+ } else {
84+ o .Features = []string {string (mcp .FeatureAll )}
8085 }
81- } else {
82- o .Features = []string {string (mcp .FeatureAll )}
8386 }
84- } else if snConfig .ExternalKafka != nil {
85- if len (o .Features ) != 0 {
86- return errors .New ("kafka-only mode does not support additional features" )
87- }
88- o .Features = []string {string (mcp .FeatureKafkaClient ), string (mcp .FeatureKafkaAdmin ), string (mcp .FeatureKafkaAdminSchemaRegistry )}
89- err := kafka .NewCurrentKafkaContext (kafka.KafkaContext {
90- BootstrapServers : snConfig .ExternalKafka .BootstrapServers ,
91- AuthType : snConfig .ExternalKafka .AuthType ,
92- AuthMechanism : snConfig .ExternalKafka .AuthMechanism ,
93- AuthUser : snConfig .ExternalKafka .AuthUser ,
94- AuthPass : snConfig .ExternalKafka .AuthPass ,
95- UseTLS : snConfig .ExternalKafka .UseTLS ,
96- ClientKeyFile : snConfig .ExternalKafka .ClientKeyFile ,
97- ClientCertFile : snConfig .ExternalKafka .ClientCertFile ,
98- CaFile : snConfig .ExternalKafka .CaFile ,
99- SchemaRegistryURL : snConfig .ExternalKafka .SchemaRegistryURL ,
100- SchemaRegistryAuthUser : snConfig .ExternalKafka .SchemaRegistryAuthUser ,
101- SchemaRegistryAuthPass : snConfig .ExternalKafka .SchemaRegistryAuthPass ,
102- SchemaRegistryBearerToken : snConfig .ExternalKafka .SchemaRegistryBearerToken ,
103- })
104- if err != nil {
105- return errors .Wrap (err , "failed to set external Kafka context" )
87+ case snConfig .ExternalKafka != nil :
88+ {
89+ if len (o .Features ) != 0 {
90+ return errors .New ("kafka-only mode does not support additional features" )
91+ }
92+ o .Features = []string {string (mcp .FeatureKafkaClient ), string (mcp .FeatureKafkaAdmin ), string (mcp .FeatureKafkaAdminSchemaRegistry )}
93+ err := kafka .NewCurrentKafkaContext (kafka.KafkaContext {
94+ BootstrapServers : snConfig .ExternalKafka .BootstrapServers ,
95+ AuthType : snConfig .ExternalKafka .AuthType ,
96+ AuthMechanism : snConfig .ExternalKafka .AuthMechanism ,
97+ AuthUser : snConfig .ExternalKafka .AuthUser ,
98+ AuthPass : snConfig .ExternalKafka .AuthPass ,
99+ UseTLS : snConfig .ExternalKafka .UseTLS ,
100+ ClientKeyFile : snConfig .ExternalKafka .ClientKeyFile ,
101+ ClientCertFile : snConfig .ExternalKafka .ClientCertFile ,
102+ CaFile : snConfig .ExternalKafka .CaFile ,
103+ SchemaRegistryURL : snConfig .ExternalKafka .SchemaRegistryURL ,
104+ SchemaRegistryAuthUser : snConfig .ExternalKafka .SchemaRegistryAuthUser ,
105+ SchemaRegistryAuthPass : snConfig .ExternalKafka .SchemaRegistryAuthPass ,
106+ SchemaRegistryBearerToken : snConfig .ExternalKafka .SchemaRegistryBearerToken ,
107+ })
108+ if err != nil {
109+ return errors .Wrap (err , "failed to set external Kafka context" )
110+ }
106111 }
107- } else if snConfig .ExternalPulsar != nil {
108- if len (o .Features ) != 0 {
109- return errors .New ("pulsar-only mode does not support additional features" )
112+ case snConfig .ExternalPulsar != nil :
113+ {
114+ if len (o .Features ) != 0 {
115+ return errors .New ("pulsar-only mode does not support additional features" )
116+ }
117+ o .Features = []string {string (mcp .FeatureAllPulsar )}
118+ err := pulsar .NewCurrentPulsarContext (pulsar.PulsarContext {
119+ WebServiceURL : snConfig .ExternalPulsar .WebServiceURL ,
120+ AuthPlugin : snConfig .ExternalPulsar .AuthPlugin ,
121+ AuthParams : snConfig .ExternalPulsar .AuthParams ,
122+ Token : snConfig .ExternalPulsar .Token ,
123+ TLSAllowInsecureConnection : snConfig .ExternalPulsar .TLSAllowInsecureConnection ,
124+ TLSEnableHostnameVerification : snConfig .ExternalPulsar .TLSEnableHostnameVerification ,
125+ TLSTrustCertsFilePath : snConfig .ExternalPulsar .TLSTrustCertsFilePath ,
126+ TLSCertFile : snConfig .ExternalPulsar .TLSCertFile ,
127+ TLSKeyFile : snConfig .ExternalPulsar .TLSKeyFile ,
128+ })
129+ if err != nil {
130+ return errors .Wrap (err , "failed to set external Pulsar context" )
131+ }
110132 }
111- o .Features = []string {string (mcp .FeatureAllPulsar )}
112- err := pulsar .NewCurrentPulsarContext (pulsar.PulsarContext {
113- WebServiceURL : snConfig .ExternalPulsar .WebServiceURL ,
114- AuthPlugin : snConfig .ExternalPulsar .AuthPlugin ,
115- AuthParams : snConfig .ExternalPulsar .AuthParams ,
116- Token : snConfig .ExternalPulsar .Token ,
117- TLSAllowInsecureConnection : snConfig .ExternalPulsar .TLSAllowInsecureConnection ,
118- TLSEnableHostnameVerification : snConfig .ExternalPulsar .TLSEnableHostnameVerification ,
119- TLSTrustCertsFilePath : snConfig .ExternalPulsar .TLSTrustCertsFilePath ,
120- TLSCertFile : snConfig .ExternalPulsar .TLSCertFile ,
121- TLSKeyFile : snConfig .ExternalPulsar .TLSKeyFile ,
122- })
123- if err != nil {
124- return errors .Wrap (err , "failed to set external Pulsar context" )
133+ default :
134+ {
135+ return errors .New ("no valid configuration found" )
125136 }
126137 }
127-
128- return nil
129138}
130139
131140func (o * ServerOptions ) AddFlags (cmd * cobra.Command ) {
0 commit comments