Skip to content

Commit acbaa75

Browse files
committed
filebeat/input/{v2,logv2}: add Redirector interface for general input migration
Add an optional Redirector interface to the v2 input system so the Loader can transparently redirect one input type to another. This replaces the bespoke approach where logv2 directly imported and instantiated the filestream plugin. The Loader checks for Redirector before calling Create. If Redirect returns a non-empty target type, the Loader resolves the target from its plugin registry and calls its Create with the translated config. Only one redirect hop is allowed. Delete follows the same redirect path so CheckConfig cleanup operates on the correct target. Refactor the logv2 manager to implement Redirector instead of holding a direct reference to the filestream manager. This removes logv2's import of the filestream package, eliminates a duplicate filestream manager instance (and its redundant Init call), and makes the pattern reusable for future V2-to-V2 migrations such as httpjson to cel.
1 parent 11d1fe7 commit acbaa75

File tree

9 files changed

+444
-62
lines changed

9 files changed

+444
-62
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
kind: enhancement
2+
summary: Add input redirection support through a new Redirector mechanism.
3+
component: filebeat

filebeat/input/default-inputs/inputs.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func genericInputs(log *logp.Logger, components statestore.States) []v2.Plugin {
4545
tcp.Plugin(),
4646
udp.Plugin(),
4747
unix.Plugin(),
48-
logv2.LogPluginV2(log, components),
49-
logv2.ContainerPluginV2(log, components),
48+
logv2.LogPluginV2(log),
49+
logv2.ContainerPluginV2(log),
5050
}
5151
}

filebeat/input/logv2/README.md

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,33 @@
11
# Log v2
22
The LogV2 input is used as an entrypoint to select whether to run the
3-
Log input or (as normally done) the Filestream input as part of
4-
the effort to fully deprecate and remove the Log input.
3+
Log or Container input as Filestream as part of the effort to fully
4+
deprecate and remove the Log input.
55

6-
Currently there are two ways to run the Filestream input in place of
7-
the Log input:
6+
The logv2 manager implements the `v2.Redirector` interface. When a
7+
redirect is active the Loader resolves the filestream plugin from its
8+
own registry and calls its `Create` with the translated config. The
9+
logv2 package does not import or instantiate filestream directly.
10+
11+
When no redirect is needed, `Create` returns `v2.ErrUnknownInput` and
12+
`compat.Combine` falls through to the V1 log input as before.
13+
14+
Currently there are two ways to enable the redirect:
815
- When Filebeat is running under Elastic Agent and
916
`run_as_filestream: true` is added to the input configuration. This
10-
allows to control the migration at the input level.
17+
allows control of the migration at the input level.
1118
- When Filebeat is running standalone and the feature flag
1219
`log_input_run_as_filestream` is enabled (this is done by setting
1320
`features.log_input_run_as_filestream.enabled: true`). This forces
1421
*all* Log inputs on Filebeat to run as Filestream.
1522

23+
Both the Log and Container input types are supported.
24+
1625
## Limitations
17-
Regarding of how the migration is enabled, to run the Log input as
18-
Filestream all conditions listed below need to be met:
19-
- The Log input configuration must contain an unique ID, this ID will
20-
become the Filestream input ID.
21-
- The Container input is not supported
26+
Regardless of how the migration is enabled, to run the Log input as
27+
Filestream all conditions listed below need to be met:
28+
- The input configuration must contain a unique ID, which becomes the
29+
Filestream input ID.
2230
- Only states of files being actively harvested are migrated.
2331

2432
## Next steps
25-
- [ ] Support migrating the Container input
2633
- [ ] [Implement manual fallback mechanism for Filestream running as Log input under Elastic Agent](https://github.com/elastic/beats/issues/47747)

filebeat/input/logv2/input.go

Lines changed: 38 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,11 @@ import (
2323

2424
"github.com/elastic/beats/v7/filebeat/channel"
2525
v1 "github.com/elastic/beats/v7/filebeat/input"
26-
"github.com/elastic/beats/v7/filebeat/input/filestream"
2726
loginput "github.com/elastic/beats/v7/filebeat/input/log"
2827
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
2928
"github.com/elastic/beats/v7/libbeat/feature"
3029
"github.com/elastic/beats/v7/libbeat/features"
3130
"github.com/elastic/beats/v7/libbeat/management"
32-
"github.com/elastic/beats/v7/libbeat/statestore"
3331
"github.com/elastic/elastic-agent-libs/config"
3432
"github.com/elastic/elastic-agent-libs/logp"
3533
"github.com/elastic/go-concert/unison"
@@ -126,76 +124,71 @@ func NewV1Input(
126124
return inp, err
127125
}
128126

129-
// LogPluginV2 returns a v2.Plugin with a manager that can convert
130-
// the Log input configuration to Filestream and run the Filestream
131-
// input instead of the Log input.
132-
func LogPluginV2(logger *logp.Logger, store statestore.States) v2.Plugin {
133-
return pluginV2(logger, store, logPluginName)
127+
// LogPluginV2 returns a v2.Plugin with a manager that can redirect
128+
// a Log input configuration to Filestream via the Redirector interface.
129+
// The Loader resolves the filestream plugin from its own registry;
130+
// this package no longer imports or instantiates filestream directly.
131+
func LogPluginV2(logger *logp.Logger) v2.Plugin {
132+
return pluginV2(logger, logPluginName)
134133
}
135134

136-
// ContainerPluginV2 returns a v2.Plugin with a manager that can convert
137-
// the Container input configuration to Filestream and run the Filestream
138-
// input instead of the Log input.
139-
func ContainerPluginV2(logger *logp.Logger, store statestore.States) v2.Plugin {
140-
return pluginV2(logger, store, containerPluginName)
135+
// ContainerPluginV2 returns a v2.Plugin with a manager that can redirect
136+
// a Container input configuration to Filestream via the Redirector interface.
137+
func ContainerPluginV2(logger *logp.Logger) v2.Plugin {
138+
return pluginV2(logger, containerPluginName)
141139
}
142140

143-
// pluginV2 returns a v2.Plugin with a manager that can convert
144-
// the Log/Container input configuration to Filestream and run the Filestream
145-
// input instead of the Log or Container input.
146-
func pluginV2(logger *logp.Logger, store statestore.States, pluginName string) v2.Plugin {
147-
// The InputManager for Filestream input is from an internal package, so we
148-
// cannot instantiate it directly here. To circumvent that, we instantiate
149-
// the whole Filestream Plugin and get its manager.
150-
filestreamPlugin := filestream.Plugin(logger, store)
151-
152-
m := manager{
153-
next: filestreamPlugin.Manager,
154-
logger: logger,
155-
}
156-
157-
p := v2.Plugin{
141+
// pluginV2 builds a v2.Plugin whose manager implements Redirector
142+
// for translating log/container configs to filestream.
143+
func pluginV2(logger *logp.Logger, pluginName string) v2.Plugin {
144+
return v2.Plugin{
158145
Name: pluginName,
159146
Stability: feature.Stable,
160147
Info: "log input running filestream",
161148
Doc: "Log input running Filestream input",
162-
Manager: m,
149+
Manager: manager{logger: logger},
163150
}
164-
return p
165151
}
166152

153+
// manager implements v2.InputManager and v2.Redirector for the
154+
// log and container input types. It delegates to filestream via
155+
// the Loader's plugin registry rather than importing filestream.
167156
type manager struct {
168-
next v2.InputManager
169157
logger *logp.Logger
170158
}
171159

172-
func (m manager) Init(grp unison.Group) error {
173-
return m.next.Init(grp)
160+
// Init is a no-op. The filestream manager is initialised by the
161+
// Loader from its own registry; this manager has no resources.
162+
func (m manager) Init(_ unison.Group) error { return nil }
163+
164+
// Create unconditionally returns ErrUnknownInput. When a redirect is
165+
// needed, the Loader handles it via the Redirector interface before
166+
// reaching Create. When no redirect is needed, compat.Combine falls
167+
// through to the V1 log input.
168+
func (m manager) Create(_ *config.C) (v2.Input, error) {
169+
return nil, v2.ErrUnknownInput
174170
}
175171

176-
// Create first checks whether the config is supposed to run as Filestream
177-
// and creates the Filestream input if needed.
178-
// If the configuration is not supposed to run as Filestream,
179-
// v2.ErrUnknownInput is returned.
180-
func (m manager) Create(cfg *config.C) (v2.Input, error) {
172+
// Redirect implements v2.Redirector. It checks whether the config
173+
// should run as filestream and, if so, translates the config and
174+
// returns the target type. The Loader resolves the filestream plugin
175+
// and calls its Create with the translated config.
176+
func (m manager) Redirect(cfg *config.C) (string, *config.C, error) {
181177
asFilestream, err := runAsFilestream(m.logger, cfg)
182178
if err != nil {
183-
return nil, err
179+
return "", nil, err
184180
}
185-
186181
if !asFilestream {
187-
return nil, v2.ErrUnknownInput
182+
return "", nil, nil
188183
}
189184

190185
newCfg, err := convertConfig(m.logger, cfg)
191186
if err != nil {
192-
return nil, fmt.Errorf("cannot translate log config to filestream: %w", err)
187+
return "", nil, fmt.Errorf("cannot translate log config to filestream: %w", err)
193188
}
194189

195-
// We know 'id' exists in the config and can be retrieved because
196-
// 'runAsFilestream' has already validated it, hence it is safe to
197-
// ignore the error.
190+
// runAsFilestream validated that id exists when redirect is active.
198191
id, _ := cfg.String("id", -1)
199192
m.logger.Infow("Log input (deprecated) running as Filestream input", "id", id)
200-
return m.next.Create(newCfg)
193+
return "filestream", newCfg, nil
201194
}

filebeat/input/logv2/input_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package logv2
2020
import (
2121
"testing"
2222

23+
"github.com/stretchr/testify/require"
24+
2325
"github.com/elastic/beats/v7/libbeat/management"
2426
"github.com/elastic/elastic-agent-libs/config"
2527
"github.com/elastic/elastic-agent-libs/logp"
@@ -89,3 +91,56 @@ func TestRunAsFilestream(t *testing.T) {
8991
})
9092
}
9193
}
94+
95+
func TestManagerRedirect(t *testing.T) {
96+
setUnderAgent := func(t *testing.T, v bool) {
97+
t.Helper()
98+
prev := management.UnderAgent()
99+
t.Cleanup(func() { management.SetUnderAgent(prev) })
100+
management.SetUnderAgent(v)
101+
}
102+
103+
t.Run("redirects_to_filestream", func(t *testing.T) {
104+
setUnderAgent(t, true)
105+
m := manager{logger: logp.NewNopLogger()}
106+
cfg := config.MustNewConfigFrom(map[string]any{
107+
"type": "log",
108+
"id": "test-id",
109+
"paths": []string{"/var/log.log"},
110+
"run_as_filestream": true,
111+
})
112+
113+
target, translated, err := m.Redirect(cfg)
114+
require.NoError(t, err)
115+
require.Equal(t, "filestream", target)
116+
require.NotNil(t, translated)
117+
118+
typ, err := translated.String("type", -1)
119+
require.NoError(t, err)
120+
require.Equal(t, "filestream", typ)
121+
})
122+
123+
t.Run("no_redirect_when_flag_is_absent", func(t *testing.T) {
124+
setUnderAgent(t, true)
125+
m := manager{logger: logp.NewNopLogger()}
126+
cfg := config.MustNewConfigFrom(map[string]any{
127+
"type": "log",
128+
"paths": []string{"/var/log.log"},
129+
})
130+
131+
target, translated, err := m.Redirect(cfg)
132+
require.NoError(t, err)
133+
require.Empty(t, target)
134+
require.Nil(t, translated)
135+
})
136+
137+
t.Run("error_on_invalid_config", func(t *testing.T) {
138+
m := manager{logger: logp.NewNopLogger()}
139+
cfg := config.NewConfig()
140+
141+
target, translated, err := m.Redirect(cfg)
142+
require.Error(t, err)
143+
require.Empty(t, target)
144+
require.Nil(t, translated)
145+
})
146+
}

filebeat/input/v2/loader.go

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,11 @@ func (l *Loader) Configure(cfg *conf.C) (Input, error) {
101101
return nil, fmt.Errorf("running a FIPS-capable distribution but input [%s] is not FIPS capable", name)
102102
}
103103

104-
return p.Manager.Create(cfg)
104+
targetPlugin, targetCfg, err := l.resolveRedirect(name, p, cfg)
105+
if err != nil {
106+
return nil, err
107+
}
108+
return targetPlugin.Manager.Create(targetCfg)
105109
}
106110

107111
func (l *Loader) loadFromCfg(cfg *conf.C) (string, Plugin, error) {
@@ -123,20 +127,59 @@ func (l *Loader) loadFromCfg(cfg *conf.C) (string, Plugin, error) {
123127
return name, p, nil
124128
}
125129

130+
// Delete removes any resources associated with an input configuration.
131+
// If the plugin's InputManager implements Redirector, Delete follows
132+
// the redirect and calls the target's Delete with the translated config.
126133
func (l *Loader) Delete(cfg *conf.C) error {
127-
_, p, err := l.loadFromCfg(cfg)
134+
name, p, err := l.loadFromCfg(cfg)
135+
if err != nil {
136+
return err
137+
}
138+
139+
targetPlugin, targetCfg, err := l.resolveRedirect(name, p, cfg)
128140
if err != nil {
129141
return err
130142
}
131143

132-
pp, ok := p.Manager.(interface{ Delete(cfg *conf.C) error })
144+
pp, ok := targetPlugin.Manager.(interface{ Delete(cfg *conf.C) error })
133145
if ok {
134-
return pp.Delete(cfg)
146+
return pp.Delete(targetCfg)
135147
}
136148

137149
return nil
138150
}
139151

152+
// resolveRedirect checks whether the plugin's InputManager implements
153+
// Redirector and, if so, resolves the redirect target from the registry.
154+
// Only one redirect hop is allowed; the target's Redirector is not consulted.
155+
// If no redirect is needed, the original plugin and config are returned.
156+
func (l *Loader) resolveRedirect(name string, p Plugin, cfg *conf.C) (Plugin, *conf.C, error) {
157+
r, ok := p.Manager.(Redirector)
158+
if !ok {
159+
return p, cfg, nil
160+
}
161+
162+
targetType, translatedCfg, err := r.Redirect(cfg)
163+
if err != nil {
164+
return Plugin{}, nil, fmt.Errorf("input %q redirect failed: %w", name, err)
165+
}
166+
if targetType == "" {
167+
return p, cfg, nil
168+
}
169+
170+
target, exists := l.registry[targetType]
171+
if !exists {
172+
return Plugin{}, nil, &LoadError{
173+
Name: targetType,
174+
Reason: ErrUnknownInput,
175+
Message: fmt.Sprintf("redirect target %q from %q not found", targetType, name),
176+
}
177+
}
178+
179+
l.log.Infof("Input %q redirecting to %q", name, targetType)
180+
return target, translatedCfg, nil
181+
}
182+
140183
// validatePlugins checks if there are multiple plugins with the same name in
141184
// the registry.
142185
func validatePlugins(plugins []Plugin) []error {

0 commit comments

Comments
 (0)