15
15
package output
16
16
17
17
import (
18
+ "encoding/json"
19
+
18
20
"github.com/cisco-open/operator-tools/pkg/secret"
19
21
20
22
"github.com/kube-logging/logging-operator/pkg/sdk/logging/model/types"
@@ -355,6 +357,21 @@ func (e *KafkaOutputConfig) ToDirective(secretLoader secret.SecretLoader, id str
355
357
} else {
356
358
kafka .Params = params
357
359
}
360
+
361
+ if e .RdkafkaOptions != nil {
362
+ if rdkafkaOptions , err := types .NewStructToStringMapper (secretLoader ).StringsMap (e .RdkafkaOptions ); err != nil {
363
+ return nil , err
364
+ } else {
365
+ if len (rdkafkaOptions ) > 0 {
366
+ marshaledRdkafkaOptions , err := json .Marshal (rdkafkaOptions )
367
+ if err != nil {
368
+ return nil , err
369
+ }
370
+ kafka .Params ["rdkafka_options" ] = string (marshaledRdkafkaOptions )
371
+ }
372
+ }
373
+ }
374
+
358
375
if e .Buffer == nil {
359
376
e .Buffer = & Buffer {}
360
377
}
@@ -363,13 +380,6 @@ func (e *KafkaOutputConfig) ToDirective(secretLoader secret.SecretLoader, id str
363
380
} else {
364
381
kafka .SubDirectives = append (kafka .SubDirectives , buffer )
365
382
}
366
- if e .RdkafkaOptions != nil {
367
- if rdkafkaOptions , err := e .RdkafkaOptions .ToDirective (secretLoader , id ); err != nil {
368
- return nil , err
369
- } else {
370
- kafka .SubDirectives = append (kafka .SubDirectives , rdkafkaOptions )
371
- }
372
- }
373
383
374
384
if e .Format != nil {
375
385
if format , err := e .Format .ToDirective (secretLoader , "" ); err != nil {
@@ -383,9 +393,3 @@ func (e *KafkaOutputConfig) ToDirective(secretLoader secret.SecretLoader, id str
383
393
delete (kafka .Params , "use_rdkafka" )
384
394
return kafka , nil
385
395
}
386
-
387
- func (o * RdkafkaOptions ) ToDirective (secretLoader secret.SecretLoader , id string ) (types.Directive , error ) {
388
- return types .NewFlatDirective (types.PluginMeta {
389
- Directive : "rdkafka_options" ,
390
- }, o , secretLoader )
391
- }
0 commit comments