Skip to content

Commit 001fc43

Browse files
committed
fluentbit: enable multiple FluentbitAgents for a single logging resource; use the FluentbitAgent resource's name as the default hostPath for volumes; fix the for loop variable with a deepcopy
Signed-off-by: Peter Wilcsinszky <[email protected]>
1 parent e537603 commit 001fc43

File tree

7 files changed

+56
-35
lines changed

7 files changed

+56
-35
lines changed

controllers/logging/logging_controller.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -186,35 +186,38 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
186186
}
187187

188188
switch len(loggingResources.Fluentbits) {
189-
case 1:
190-
if logging.Spec.FluentbitSpec != nil {
191-
return ctrl.Result{}, errors.New("fluentbit has to be removed from the logging resource before the new FluentbitAgent can be reconciled")
192-
}
193-
reconcilers = append(reconcilers, fluentbit.New(
194-
r.Client,
195-
r.Log,
196-
&logging,
197-
reconcilerOpts,
198-
&loggingResources.Fluentbits[0].Spec,
199-
loggingDataProvider,
200-
loggingv1beta1.NewStandaloneFluentbitNameProvider(&loggingResources.Fluentbits[0]),
201-
).Reconcile)
202189
case 0:
203190
// check for legacy definition
204191
log.Info("WARNING fluentbit definition inside the Logging resource is deprecated and will be removed in the next major release")
205192
if logging.Spec.FluentbitSpec != nil {
193+
nameProvider := loggingv1beta1.NewLegacyFluentbitNameProvider(&logging)
206194
reconcilers = append(reconcilers, fluentbit.New(
207195
r.Client,
208-
r.Log,
196+
r.Log.WithName(nameProvider.ComponentName("logging-legacy")),
209197
&logging,
210198
reconcilerOpts,
211199
logging.Spec.FluentbitSpec,
212200
loggingDataProvider,
213-
loggingv1beta1.NewLegacyFluentbitNameProvider(&logging),
201+
nameProvider,
214202
).Reconcile)
215203
}
216204
default:
217-
return ctrl.Result{}, errors.New("cannot handle more than one FluentbitAgent for the same Logging resource")
205+
if logging.Spec.FluentbitSpec != nil {
206+
return ctrl.Result{}, errors.New("fluentbit has to be removed from the logging resource before the new FluentbitAgent can be reconciled")
207+
}
208+
for _, f := range loggingResources.Fluentbits {
209+
f := f.DeepCopy()
210+
nameProvider := loggingv1beta1.NewStandaloneFluentbitNameProvider(f)
211+
reconcilers = append(reconcilers, fluentbit.New(
212+
r.Client,
213+
r.Log.WithName(nameProvider.ComponentName("fluentbit-agent")),
214+
&logging,
215+
reconcilerOpts,
216+
&f.Spec,
217+
loggingDataProvider,
218+
nameProvider,
219+
).Reconcile)
220+
}
218221
}
219222

220223
if len(logging.Spec.NodeAgents) > 0 || len(loggingResources.NodeAgents) > 0 {

pkg/resources/fluentbit/configsecret.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,7 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er
246246
// If MultilineParser is set, remove other parser fields
247247
// See https://docs.fluentbit.io/manual/pipeline/inputs/tail#multiline-core-v1.8
248248

249-
log.Log.Info("Notice: MultilineParser is enabled. Disabling other parser options",
250-
"logging", r.Logging.Name)
249+
r.logger.Info("Notice: MultilineParser is enabled. Disabling other parser options")
251250

252251
inputTail.Parser = ""
253252
inputTail.ParserFirstline = ""
@@ -305,25 +304,25 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er
305304
input.FluentForwardOutput.Network = newFluentbitNetwork(*r.fluentbitSpec.Network)
306305
}
307306

308-
fluentdReplicas, err := r.loggingDataProvider.GetReplicaCount(context.TODO())
307+
aggregatorReplicas, err := r.loggingDataProvider.GetReplicaCount(context.TODO())
309308
if err != nil {
310309
return nil, nil, errors.WrapIf(err, "getting replica count for fluentd")
311310
}
312311

313-
if r.fluentbitSpec.Network == nil && utils.PointerToInt32(fluentdReplicas) > 1 {
312+
if r.fluentbitSpec.Network == nil && utils.PointerToInt32(aggregatorReplicas) > 1 {
314313
input.FluentForwardOutput.Network.KeepaliveSet = true
315314
input.FluentForwardOutput.Network.Keepalive = true
316315
input.FluentForwardOutput.Network.KeepaliveIdleTimeoutSet = true
317316
input.FluentForwardOutput.Network.KeepaliveIdleTimeout = 30
318317
input.FluentForwardOutput.Network.KeepaliveMaxRecycleSet = true
319318
input.FluentForwardOutput.Network.KeepaliveMaxRecycle = 100
320-
log.Log.Info("Notice: Because the Fluentd statefulset has been scaled, we've made some changes in the fluentbit network config too. We advice to revise these default configurations.")
319+
log.Log.Info("Notice: fluentbit `network` settings have been configured automatically to adapt to multiple aggregator replicas. Configure it manually to avoid this notice.")
321320
}
322321

323322
if r.fluentbitSpec.EnableUpstream {
324323
input.FluentForwardOutput.Upstream.Enabled = true
325324
input.FluentForwardOutput.Upstream.Config.Name = "fluentd-upstream"
326-
for i := int32(0); i < utils.PointerToInt32(fluentdReplicas); i++ {
325+
for i := int32(0); i < utils.PointerToInt32(aggregatorReplicas); i++ {
327326
input.FluentForwardOutput.Upstream.Config.Nodes = append(input.FluentForwardOutput.Upstream.Config.Nodes, r.generateUpstreamNode(i))
328327
}
329328
}

pkg/resources/fluentbit/daemonset.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,9 @@ func (r *Reconciler) daemonSet() (runtime.Object, reconciler.DesiredState, error
9494
}
9595

9696
r.fluentbitSpec.PositionDB.WithDefaultHostPath(
97-
fmt.Sprintf(v1beta1.HostPath, r.Logging.Name, TailPositionVolume))
97+
fmt.Sprintf(v1beta1.HostPath, r.nameProvider.Name(), TailPositionVolume))
9898
r.fluentbitSpec.BufferStorageVolume.WithDefaultHostPath(
99-
fmt.Sprintf(v1beta1.HostPath, r.Logging.Name, BufferStorageVolume))
99+
fmt.Sprintf(v1beta1.HostPath, r.nameProvider.Name(), BufferStorageVolume))
100100

101101
if err := r.fluentbitSpec.PositionDB.ApplyVolumeForPodSpec(TailPositionVolume, containerName, "/tail-db", &desired.Spec.Template.Spec); err != nil {
102102
return desired, reconciler.StatePresent, err

pkg/resources/fluentbit/fluentbit.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,13 @@ func generateLoggingRefLabels(loggingRef string) map[string]string {
5252
}
5353

5454
func (r *Reconciler) getFluentBitLabels() map[string]string {
55-
return util.MergeLabels(r.fluentbitSpec.Labels, map[string]string{
56-
"app.kubernetes.io/name": "fluentbit"}, generateLoggingRefLabels(r.Logging.GetName()))
55+
return util.MergeLabels(
56+
r.fluentbitSpec.Labels,
57+
map[string]string{
58+
"app.kubernetes.io/instance": r.nameProvider.Name(),
59+
"app.kubernetes.io/name": "fluentbit",
60+
},
61+
generateLoggingRefLabels(r.Logging.GetName()))
5762
}
5863

5964
func (r *Reconciler) getServiceAccount() string {
@@ -64,7 +69,11 @@ func (r *Reconciler) getServiceAccount() string {
6469
}
6570

6671
type NameProvider interface {
72+
// ComponentName provides a qualified name using (Name + "-" + name)
6773
ComponentName(name string) string
74+
// Name returns the name of the resource, that is owning fluentbit
75+
// It is Logging.Name for legacy but the resource's name for FluentbitAgent
76+
Name() string
6877
OwnerRef() v1.OwnerReference
6978
}
7079

@@ -75,7 +84,8 @@ type DesiredObject struct {
7584

7685
// Reconciler holds info what resource to reconcile
7786
type Reconciler struct {
78-
*reconciler.GenericResourceReconciler
87+
resourceReconciler *reconciler.GenericResourceReconciler
88+
logger logr.Logger
7989
Logging *v1beta1.Logging
8090
configs map[string][]byte
8191
fluentbitSpec *v1beta1.FluentbitSpec
@@ -92,11 +102,12 @@ func New(client client.Client,
92102
loggingDataProvider loggingdataprovider.LoggingDataProvider,
93103
nameProvider NameProvider) *Reconciler {
94104
return &Reconciler{
95-
Logging: logging,
96-
GenericResourceReconciler: reconciler.NewGenericReconciler(client, logger, opts),
97-
fluentbitSpec: fluentbitSpec,
98-
loggingDataProvider: loggingDataProvider,
99-
nameProvider: nameProvider,
105+
Logging: logging,
106+
logger: logger,
107+
resourceReconciler: reconciler.NewGenericReconciler(client, logger.WithName("reconciler"), opts),
108+
fluentbitSpec: fluentbitSpec,
109+
loggingDataProvider: loggingDataProvider,
110+
nameProvider: nameProvider,
100111
}
101112
}
102113

@@ -129,7 +140,7 @@ func (r *Reconciler) Reconcile() (*reconcile.Result, error) {
129140
if o == nil {
130141
return nil, errors.Errorf("Reconcile error! Resource %#v returns with nil object", factory)
131142
}
132-
result, err := r.ReconcileResource(o, state)
143+
result, err := r.resourceReconciler.ReconcileResource(o, state)
133144
if err != nil {
134145
return nil, errors.WrapWithDetails(err,
135146
"failed to reconcile resource", "resource", o.GetObjectKind().GroupVersionKind())

pkg/resources/fluentbit/psp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func (r *Reconciler) clusterPodSecurityPolicy() (runtime.Object, reconciler.Desi
5353

5454
if r.fluentbitSpec.PositionDB.HostPath != nil {
5555
r.fluentbitSpec.PositionDB.WithDefaultHostPath(
56-
fmt.Sprintf(v1beta1.HostPath, r.Logging.Name, TailPositionVolume))
56+
fmt.Sprintf(v1beta1.HostPath, r.nameProvider.Name(), TailPositionVolume))
5757

5858
allowedHostPaths = append(allowedHostPaths, policyv1beta1.AllowedHostPath{
5959
PathPrefix: r.fluentbitSpec.PositionDB.HostPath.Path,

pkg/resources/model/repository.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ import (
2121

2222
"emperror.dev/errors"
2323
"github.com/go-logr/logr"
24-
"github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1"
2524
corev1 "k8s.io/api/core/v1"
2625
"sigs.k8s.io/controller-runtime/pkg/client"
26+
27+
"github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1"
2728
)
2829

2930
func NewLoggingResourceRepository(client client.Reader, logger logr.Logger) *LoggingResourceRepository {

pkg/sdk/logging/api/v1beta1/fluentbit_types.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,13 @@ func (l *FluentbitNameProvider) ComponentName(name string) string {
472472
return fmt.Sprintf("%s-%s", l.fluentbit.Name, name)
473473
}
474474

475+
func (l *FluentbitNameProvider) Name() string {
476+
if l.logging != nil {
477+
return l.logging.Name
478+
}
479+
return l.fluentbit.Name
480+
}
481+
475482
func (l *FluentbitNameProvider) OwnerRef() metav1.OwnerReference {
476483
if l.logging != nil {
477484
return metav1.OwnerReference{

0 commit comments

Comments
 (0)