Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/fragments/1774300895-id6318-filebeat-director.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: enhancement
summary: Add input redirection support through a new Redirector mechanism.
component: filebeat
4 changes: 2 additions & 2 deletions filebeat/input/default-inputs/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func genericInputs(log *logp.Logger, components statestore.States) []v2.Plugin {
tcp.Plugin(),
udp.Plugin(),
unix.Plugin(),
logv2.LogPluginV2(log, components),
logv2.ContainerPluginV2(log, components),
logv2.LogPluginV2(log),
logv2.ContainerPluginV2(log),
}
}
29 changes: 18 additions & 11 deletions filebeat/input/logv2/README.md
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
# Log v2
The LogV2 input is used as an entrypoint to select whether to run the
Log input or (as normally done) the Filestream input as part of
the effort to fully deprecate and remove the Log input.
Log or Container input as Filestream as part of the effort to fully
deprecate and remove the Log input.

Currently there are two ways to run the Filestream input in place of
the Log input:
The logv2 manager implements the `v2.Redirector` interface. When a
redirect is active the Loader resolves the filestream plugin from its
own registry and calls its `Create` with the translated config. The
logv2 package does not import or instantiate filestream directly.

When no redirect is needed, `Create` returns `v2.ErrUnknownInput` and
`compat.Combine` falls through to the V1 log input as before.

Currently there are two ways to enable the redirect:
- When Filebeat is running under Elastic Agent and
`run_as_filestream: true` is added to the input configuration. This
allows to control the migration at the input level.
allows control of the migration at the input level.
- When Filebeat is running standalone and the feature flag
`log_input_run_as_filestream` is enabled (this is done by setting
`features.log_input_run_as_filestream.enabled: true`). This forces
*all* Log inputs on Filebeat to run as Filestream.

Both the Log and Container input types are supported.

## Limitations
Regarding of how the migration is enabled, to run the Log input as
Filestream all conditions listed below need to be met:
- The Log input configuration must contain an unique ID, this ID will
become the Filestream input ID.
- The Container input is not supported
Regardless of how the migration is enabled, to run the Log input as
Filestream all conditions listed below need to be met:
- The input configuration must contain a unique ID, which becomes the
Filestream input ID.
- Only states of files being actively harvested are migrated.

## Next steps
- [ ] Support migrating the Container input
- [ ] [Implement manual fallback mechanism for Filestream running as Log input under Elastic Agent](https://github.com/elastic/beats/issues/47747)
83 changes: 38 additions & 45 deletions filebeat/input/logv2/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ import (

"github.com/elastic/beats/v7/filebeat/channel"
v1 "github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/input/filestream"
loginput "github.com/elastic/beats/v7/filebeat/input/log"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/features"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-concert/unison"
Expand Down Expand Up @@ -126,76 +124,71 @@ func NewV1Input(
return inp, err
}

// LogPluginV2 returns a v2.Plugin with a manager that can convert
// the Log input configuration to Filestream and run the Filestream
// input instead of the Log input.
func LogPluginV2(logger *logp.Logger, store statestore.States) v2.Plugin {
return pluginV2(logger, store, logPluginName)
// LogPluginV2 returns a v2.Plugin with a manager that can redirect
// a Log input configuration to Filestream via the Redirector interface.
// The Loader resolves the filestream plugin from its own registry;
// this package no longer imports or instantiates filestream directly.
func LogPluginV2(logger *logp.Logger) v2.Plugin {
return pluginV2(logger, logPluginName)
}

// ContainerPluginV2 returns a v2.Plugin with a manager that can convert
// the Container input configuration to Filestream and run the Filestream
// input instead of the Log input.
func ContainerPluginV2(logger *logp.Logger, store statestore.States) v2.Plugin {
return pluginV2(logger, store, containerPluginName)
// ContainerPluginV2 returns a v2.Plugin with a manager that can redirect
// a Container input configuration to Filestream via the Redirector interface.
func ContainerPluginV2(logger *logp.Logger) v2.Plugin {
return pluginV2(logger, containerPluginName)
}

// pluginV2 returns a v2.Plugin with a manager that can convert
// the Log/Container input configuration to Filestream and run the Filestream
// input instead of the Log or Container input.
func pluginV2(logger *logp.Logger, store statestore.States, pluginName string) v2.Plugin {
// The InputManager for Filestream input is from an internal package, so we
// cannot instantiate it directly here. To circumvent that, we instantiate
// the whole Filestream Plugin and get its manager.
filestreamPlugin := filestream.Plugin(logger, store)

m := manager{
next: filestreamPlugin.Manager,
logger: logger,
}

p := v2.Plugin{
// pluginV2 builds a v2.Plugin whose manager implements Redirector
// for translating log/container configs to filestream.
func pluginV2(logger *logp.Logger, pluginName string) v2.Plugin {
return v2.Plugin{
Name: pluginName,
Stability: feature.Stable,
Info: "log input running filestream",
Doc: "Log input running Filestream input",
Manager: m,
Manager: manager{logger: logger},
}
return p
}

// manager implements v2.InputManager and v2.Redirector for the
// log and container input types. It delegates to filestream via
// the Loader's plugin registry rather than importing filestream.
type manager struct {
next v2.InputManager
logger *logp.Logger
}

func (m manager) Init(grp unison.Group) error {
return m.next.Init(grp)
// Init is a no-op. The filestream manager is initialised by the
// Loader from its own registry; this manager has no resources.
func (m manager) Init(_ unison.Group) error { return nil }

// Create unconditionally returns ErrUnknownInput. When a redirect is
// needed, the Loader handles it via the Redirector interface before
// reaching Create. When no redirect is needed, compat.Combine falls
// through to the V1 log input.
func (m manager) Create(_ *config.C) (v2.Input, error) {
return nil, v2.ErrUnknownInput
}

// Create first checks whether the config is supposed to run as Filestream
// and creates the Filestream input if needed.
// If the configuration is not supposed to run as Filestream,
// v2.ErrUnknownInput is returned.
func (m manager) Create(cfg *config.C) (v2.Input, error) {
// Redirect implements v2.Redirector. It checks whether the config
// should run as filestream and, if so, translates the config and
// returns the target type. The Loader resolves the filestream plugin
// and calls its Create with the translated config.
func (m manager) Redirect(cfg *config.C) (string, *config.C, error) {
asFilestream, err := runAsFilestream(m.logger, cfg)
if err != nil {
return nil, err
return "", nil, err
}

if !asFilestream {
return nil, v2.ErrUnknownInput
return "", nil, nil
}

newCfg, err := convertConfig(m.logger, cfg)
if err != nil {
return nil, fmt.Errorf("cannot translate log config to filestream: %w", err)
return "", nil, fmt.Errorf("cannot translate log config to filestream: %w", err)
}

// We know 'id' exists in the config and can be retrieved because
// 'runAsFilestream' has already validated it, hence it is safe to
// ignore the error.
// runAsFilestream validated that id exists when redirect is active.
id, _ := cfg.String("id", -1)
m.logger.Infow("Log input (deprecated) running as Filestream input", "id", id)
return m.next.Create(newCfg)
return "filestream", newCfg, nil
}
54 changes: 54 additions & 0 deletions filebeat/input/logv2/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package logv2
import (
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -89,3 +91,55 @@ func TestRunAsFilestream(t *testing.T) {
})
}
}

func TestManagerRedirect(t *testing.T) {
setUnderAgent := func(t *testing.T, v bool) {
prev := management.UnderAgent()
t.Cleanup(func() { management.SetUnderAgent(prev) })
management.SetUnderAgent(v)
}

t.Run("redirects_to_filestream", func(t *testing.T) {
setUnderAgent(t, true)
m := manager{logger: logp.NewNopLogger()}
cfg := config.MustNewConfigFrom(map[string]any{
"type": "log",
"id": "test-id",
"paths": []string{"/var/log.log"},
"run_as_filestream": true,
})

target, translated, err := m.Redirect(cfg)
require.NoError(t, err)
require.Equal(t, "filestream", target)
require.NotNil(t, translated)

typ, err := translated.String("type", -1)
require.NoError(t, err)
require.Equal(t, "filestream", typ)
})

t.Run("no_redirect_when_flag_is_absent", func(t *testing.T) {
setUnderAgent(t, true)
m := manager{logger: logp.NewNopLogger()}
cfg := config.MustNewConfigFrom(map[string]any{
"type": "log",
"paths": []string{"/var/log.log"},
})

target, translated, err := m.Redirect(cfg)
require.NoError(t, err)
require.Empty(t, target)
require.Nil(t, translated)
})

t.Run("error_on_invalid_config", func(t *testing.T) {
m := manager{logger: logp.NewNopLogger()}
cfg := config.NewConfig()

target, translated, err := m.Redirect(cfg)
require.Error(t, err)
require.Empty(t, target)
require.Nil(t, translated)
})
}
51 changes: 47 additions & 4 deletions filebeat/input/v2/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ func (l *Loader) Configure(cfg *conf.C) (Input, error) {
return nil, fmt.Errorf("running a FIPS-capable distribution but input [%s] is not FIPS capable", name)
}

return p.Manager.Create(cfg)
targetPlugin, targetCfg, err := l.resolveRedirect(name, p, cfg)
if err != nil {
return nil, err
}
return targetPlugin.Manager.Create(targetCfg)
}

func (l *Loader) loadFromCfg(cfg *conf.C) (string, Plugin, error) {
Expand All @@ -123,20 +127,59 @@ func (l *Loader) loadFromCfg(cfg *conf.C) (string, Plugin, error) {
return name, p, nil
}

// Delete removes any resources associated with an input configuration.
// If the plugin's InputManager implements Redirector, Delete follows
// the redirect and calls the target's Delete with the translated config.
func (l *Loader) Delete(cfg *conf.C) error {
_, p, err := l.loadFromCfg(cfg)
name, p, err := l.loadFromCfg(cfg)
if err != nil {
return err
}

targetPlugin, targetCfg, err := l.resolveRedirect(name, p, cfg)
if err != nil {
return err
}

pp, ok := p.Manager.(interface{ Delete(cfg *conf.C) error })
pp, ok := targetPlugin.Manager.(interface{ Delete(cfg *conf.C) error })
if ok {
return pp.Delete(cfg)
return pp.Delete(targetCfg)
}

return nil
}

// resolveRedirect checks whether the plugin's InputManager implements
// Redirector and, if so, resolves the redirect target from the registry.
// Only one redirect hop is allowed; the target's Redirector is not consulted.
// If no redirect is needed, the original plugin and config are returned.
func (l *Loader) resolveRedirect(name string, p Plugin, cfg *conf.C) (Plugin, *conf.C, error) {
r, ok := p.Manager.(Redirector)
if !ok {
return p, cfg, nil
}

targetType, translatedCfg, err := r.Redirect(cfg)
if err != nil {
return Plugin{}, nil, fmt.Errorf("input %q redirect failed: %w", name, err)
}
if targetType == "" {
return p, cfg, nil
}

target, exists := l.registry[targetType]
if !exists {
return Plugin{}, nil, &LoadError{
Name: targetType,
Reason: ErrUnknownInput,
Message: fmt.Sprintf("redirect target %q from %q not found", targetType, name),
}
}

l.log.Infof("Input %q redirecting to %q", name, targetType)
return target, translatedCfg, nil
}

// validatePlugins checks if there are multiple plugins with the same name in
// the registry.
func validatePlugins(plugins []Plugin) []error {
Expand Down
Loading
Loading