Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .buildkite/x-pack/pipeline.xpack.agentbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ env:
AWS_ARM_INSTANCE_TYPE: "m6g.xlarge"
AWS_IMAGE_UBUNTU_ARM_64: "platform-ingest-beats-ubuntu-2204-aarch64"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might also need the pin

GCP_HI_PERF_MACHINE_TYPE: "c2d-highcpu-16"
IMAGE_UBUNTU_X86_64: "family/platform-ingest-beats-ubuntu-2204"
IMAGE_UBUNTU_X86_64: "platform-ingest-beats-ubuntu-2204-1772337686"

IMAGE_BEATS_WITH_HOOKS_LATEST: "docker.elastic.co/ci-agent-images/platform-ingest/buildkite-agent-beats-ci-with-hooks:latest"

Expand Down
8 changes: 4 additions & 4 deletions auditbeat/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ var (
// OpenBucket returns a new Bucket that stores data in {path.data}/beat.db.
// The returned Bucket must be closed when finished to ensure all resources
// are released.
func OpenBucket(name string) (Bucket, error) {
func OpenBucket(name string, p *paths.Path) (Bucket, error) {
initDatastoreOnce.Do(func() {
ds = &boltDatastore{
path: paths.Resolve(paths.Data, "beat.db"),
path: p.Resolve(paths.Data, "beat.db"),
mode: 0o600,
}
})
Expand All @@ -49,10 +49,10 @@ func OpenBucket(name string) (Bucket, error) {
// Update executes a function within the context of a read-write managed transaction.
// If no error is returned from the function then the transaction is committed. If an
// error is returned then the entire transaction is rolled back.
func Update(fn func(tx *bolt.Tx) error) error {
func Update(fn func(tx *bolt.Tx) error, p *paths.Path) error {
initDatastoreOnce.Do(func() {
ds = &boltDatastore{
path: paths.Resolve(paths.Data, "beat.db"),
path: p.Resolve(paths.Data, "beat.db"),
mode: 0o600,
}
})
Expand Down
2 changes: 1 addition & 1 deletion auditbeat/module/file_integrity/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,14 @@
}

func (ms *MetricSet) init(reporter mb.PushReporterV2) bool {
bucket, err := datastore.OpenBucket(bucketName)
bucket, err := datastore.OpenBucket(bucketName, ms.GetPath())
if err != nil {
err = fmt.Errorf("failed to open persistent datastore: %w", err)
reporter.Error(err)
ms.log.Errorw("Failed to initialize", "error", err)
return false
}
ms.bucket = bucket.(datastore.BoltBucket)

Check failure on line 189 in auditbeat/module/file_integrity/metricset.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value is not checked (errcheck)

ms.eventChan, err = ms.reader.Start(reporter.Done())
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions auditbeat/module/file_integrity/metricset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
abtest "github.com/elastic/beats/v7/auditbeat/testing"
"github.com/elastic/beats/v7/metricbeat/mb"
mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
"github.com/elastic/elastic-agent-libs/paths"
)

func TestData(t *testing.T) {
Expand Down Expand Up @@ -70,7 +71,7 @@ func TestActions(t *testing.T) {

defer abtest.SetupDataDir(t)()

bucket, err := datastore.OpenBucket(bucketName)
bucket, err := datastore.OpenBucket(bucketName, paths.New())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -173,7 +174,7 @@ func TestExcludedFiles(t *testing.T) {

defer abtest.SetupDataDir(t)()

bucket, err := datastore.OpenBucket(bucketName)
bucket, err := datastore.OpenBucket(bucketName, paths.New())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -225,7 +226,7 @@ func TestIncludedExcludedFiles(t *testing.T) {

defer abtest.SetupDataDir(t)()

bucket, err := datastore.OpenBucket(bucketName)
bucket, err := datastore.OpenBucket(bucketName, paths.New())
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
[]module.Option{module.WithMaxStartDelay(bt.config.MaxStartDelay)},
bt.moduleOptions...)

factory := module.NewFactory(b.Info, b.Monitoring, bt.registry, moduleOptions...)
factory := module.NewFactory(b.Info, b.Monitoring, bt.registry, bt.paths, moduleOptions...)

if bt.otelStatusFactoryWrapper != nil {
factory = bt.otelStatusFactoryWrapper(factory)
Expand Down Expand Up @@ -270,7 +270,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
}

// Centrally managed modules
factory = module.NewFactory(b.Info, b.Monitoring, bt.registry, bt.moduleOptions...)
factory = module.NewFactory(b.Info, b.Monitoring, bt.registry, bt.paths, bt.moduleOptions...)
modules := cfgfile.NewRunnerList(management.DebugK, factory, b.Publisher, bt.logger)
b.Registry.MustRegisterInput(modules)
wg.Add(1)
Expand Down Expand Up @@ -335,5 +335,5 @@ func (bt *Metricbeat) Stop() {

// Modules return a list of all configured modules.
func (bt *Metricbeat) Modules() ([]*module.Wrapper, error) {
return module.ConfiguredModules(bt.registry, bt.config.Modules, bt.config.ConfigModules, bt.moduleOptions, bt.logger)
return module.ConfiguredModules(bt.registry, bt.config.Modules, bt.config.ConfigModules, bt.moduleOptions, bt.paths, bt.logger)
}
3 changes: 2 additions & 1 deletion metricbeat/helper/elastic/elastic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/elastic/beats/v7/metricbeat/mb"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp/logptest"
"github.com/elastic/elastic-agent-libs/paths"
)

func TestMakeXPackMonitoringIndexName(t *testing.T) {
Expand Down Expand Up @@ -211,7 +212,7 @@ func TestConfigureModule(t *testing.T) {
for name, test := range tests {
t.Run(name, func(t *testing.T) {
cfg := conf.MustNewConfigFrom(test.initConfig)
m, _, err := mb.NewModule(cfg, mockRegistry, logptest.NewTestingLogger(t, ""))
m, _, err := mb.NewModule(cfg, mockRegistry, paths.New(), logptest.NewTestingLogger(t, ""))
require.NoError(t, err)

bm, ok := m.(*mb.BaseModule)
Expand Down
12 changes: 7 additions & 5 deletions metricbeat/mb/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/paths"
)

var (
Expand All @@ -46,7 +47,7 @@ var (
// will be unpacked into ModuleConfig structs). r is the Register where the
// ModuleFactory's and MetricSetFactory's will be obtained from. This method
// returns a Module and its configured MetricSets or an error.
func NewModule(config *conf.C, r *Register, logger *logp.Logger) (Module, []MetricSet, error) {
func NewModule(config *conf.C, r *Register, p *paths.Path, logger *logp.Logger) (Module, []MetricSet, error) {
if !config.Enabled() {
return nil, nil, ErrModuleDisabled
}
Expand All @@ -61,7 +62,7 @@ func NewModule(config *conf.C, r *Register, logger *logp.Logger) (Module, []Metr
return nil, nil, err
}

metricsets, err := initMetricSets(r, module, logger)
metricsets, err := initMetricSets(r, module, p, logger)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -106,12 +107,12 @@ func createModule(r *Register, bm BaseModule) (Module, error) {
return f(bm)
}

func initMetricSets(r *Register, m Module, logger *logp.Logger) ([]MetricSet, error) {
func initMetricSets(r *Register, m Module, p *paths.Path, logger *logp.Logger) ([]MetricSet, error) {
var (
errs []error
)

bms, err := newBaseMetricSets(r, m, logger)
bms, err := newBaseMetricSets(r, m, p, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -157,7 +158,7 @@ func initMetricSets(r *Register, m Module, logger *logp.Logger) ([]MetricSet, er
// newBaseMetricSets creates a new BaseMetricSet for all MetricSets defined
// in the module's config. An error is returned if no MetricSets are specified
// in the module's config and no default MetricSet is defined.
func newBaseMetricSets(r *Register, m Module, logger *logp.Logger) ([]BaseMetricSet, error) {
func newBaseMetricSets(r *Register, m Module, p *paths.Path, logger *logp.Logger) ([]BaseMetricSet, error) {
hosts := []string{""}
if l := m.Config().Hosts; len(l) > 0 {
hosts = l
Expand Down Expand Up @@ -209,6 +210,7 @@ func newBaseMetricSets(r *Register, m Module, logger *logp.Logger) ([]BaseMetric
host: host,
metrics: metrics,
logger: logger,
paths: p,
})
}
}
Expand Down
3 changes: 1 addition & 2 deletions metricbeat/mb/lightmodules.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -261,9 +260,9 @@
s.log.Debugf("Light modules directory '%s' doesn't exist", dir)
continue
}
files, err := ioutil.ReadDir(dir)
files, err := os.ReadDir(dir)
if err != nil {
return nil, fmt.Errorf("listing modules on path '%s': %v", dir, err)

Check failure on line 265 in metricbeat/mb/lightmodules.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

non-wrapping format verb for fmt.Errorf. Use `%w` to format errors (errorlint)
}
for _, f := range files {
if !f.IsDir() {
Expand Down
11 changes: 6 additions & 5 deletions metricbeat/mb/lightmodules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/logp/logptest"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/paths"
)

// TestLightModulesAsModuleSource checks that registry correctly lists
Expand Down Expand Up @@ -258,7 +259,7 @@ func TestNewModuleFromConfig(t *testing.T) {
config, err := conf.NewConfigFrom(c.config)
require.NoError(t, err)

module, metricSets, err := NewModule(config, r, logptest.NewTestingLogger(t, ""))
module, metricSets, err := NewModule(config, r, paths.New(), logptest.NewTestingLogger(t, ""))
if c.err {
assert.Error(t, err)
return
Expand Down Expand Up @@ -315,7 +316,7 @@ func TestLightMetricSet_VerifyHostDataURI(t *testing.T) {
})
require.NoError(t, err)

_, metricSets, err := NewModule(config, r, logptest.NewTestingLogger(t, ""))
_, metricSets, err := NewModule(config, r, paths.New(), logptest.NewTestingLogger(t, ""))
require.NoError(t, err)
require.Len(t, metricSets, 1)

Expand All @@ -338,7 +339,7 @@ func TestLightMetricSet_WithoutHostParser(t *testing.T) {
})
require.NoError(t, err)

_, metricSets, err := NewModule(config, r, logptest.NewTestingLogger(t, ""))
_, metricSets, err := NewModule(config, r, paths.New(), logptest.NewTestingLogger(t, ""))
require.NoError(t, err)
require.Len(t, metricSets, 1)

Expand Down Expand Up @@ -371,7 +372,7 @@ func TestLightMetricSet_VerifyHostDataURI_NonParsableHost(t *testing.T) {
})
require.NoError(t, err)

_, metricSets, err := NewModule(config, r, logptest.NewTestingLogger(t, ""))
_, metricSets, err := NewModule(config, r, paths.New(), logptest.NewTestingLogger(t, ""))
require.NoError(t, err)
require.Len(t, metricSets, 1)

Expand All @@ -395,7 +396,7 @@ func TestNewModulesCallModuleFactory(t *testing.T) {
config, err := conf.NewConfigFrom(mapstr.M{"module": "service"})
require.NoError(t, err)

_, _, err = NewModule(config, r, logptest.NewTestingLogger(t, ""))
_, _, err = NewModule(config, r, paths.New(), logptest.NewTestingLogger(t, ""))
assert.NoError(t, err)

assert.True(t, called, "module factory must be called if registered")
Expand Down
11 changes: 11 additions & 0 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/paths"
)

const (
Expand Down Expand Up @@ -265,6 +266,7 @@ type BaseMetricSet struct {
registration MetricSetRegistration
metrics *monitoring.Registry
logger *logp.Logger
paths *paths.Path
}

func (b *BaseMetricSet) String() string {
Expand Down Expand Up @@ -327,6 +329,15 @@ func (b *BaseMetricSet) Registration() MetricSetRegistration {
return b.registration
}

// GetPath returns the paths associated with this MetricSet or
// the global if none was set.
func (b *BaseMetricSet) GetPath() *paths.Path {
if b.paths != nil {
return b.paths
}
return paths.Paths
}

// Configuration types

// ModuleConfig is the base configuration data for all Modules.
Expand Down
9 changes: 5 additions & 4 deletions metricbeat/mb/mb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp/logptest"
"github.com/elastic/elastic-agent-libs/paths"
)

// Reporting V2 MetricSet
Expand Down Expand Up @@ -157,7 +158,7 @@ func TestNewModulesDuplicateHosts(t *testing.T) {
"hosts": []string{"a", "b", "a"},
})

_, _, err := NewModule(c, r, logptest.NewTestingLogger(t, ""))
_, _, err := NewModule(c, r, paths.New(), logptest.NewTestingLogger(t, ""))
assert.Error(t, err)
}

Expand All @@ -170,7 +171,7 @@ func TestNewModulesWithDefaultMetricSet(t *testing.T) {
"module": moduleName,
})

_, metricSets, err := NewModule(c, r, logptest.NewTestingLogger(t, ""))
_, metricSets, err := NewModule(c, r, paths.New(), logptest.NewTestingLogger(t, ""))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -263,7 +264,7 @@ func newTestRegistry(t testing.TB, metricSetOptions ...MetricSetOption) *Registe
}

func newTestMetricSet(t testing.TB, r *Register, config map[string]interface{}) MetricSet {
_, metricsets, err := NewModule(newConfig(t, config), r, logptest.NewTestingLogger(t, ""))
_, metricsets, err := NewModule(newConfig(t, config), r, paths.New(), logptest.NewTestingLogger(t, ""))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -350,7 +351,7 @@ func TestBaseModuleWithConfig(t *testing.T) {
MetricSets: []string{"foo", "bar"},
}

m, _, err := NewModule(conf.MustNewConfigFrom(initConfig), mockRegistry, logptest.NewTestingLogger(t, ""))
m, _, err := NewModule(conf.MustNewConfigFrom(initConfig), mockRegistry, paths.New(), logptest.NewTestingLogger(t, ""))
require.NoError(t, err)

bm, ok := m.(*BaseModule)
Expand Down
7 changes: 4 additions & 3 deletions metricbeat/mb/module/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,19 @@ import (
"github.com/elastic/beats/v7/metricbeat/mb"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
)

// ConfiguredModules returns a list of all configured modules, including anyone present under dynamic config settings.
func ConfiguredModules(registry *mb.Register, modulesData []*conf.C, configModulesData *conf.C, moduleOptions []Option, logger *logp.Logger) ([]*Wrapper, error) {
func ConfiguredModules(registry *mb.Register, modulesData []*conf.C, configModulesData *conf.C, moduleOptions []Option, p *paths.Path, logger *logp.Logger) ([]*Wrapper, error) {
var modules []*Wrapper //nolint:prealloc //can't be preallocated

// Pass in placeholder monitoring for the module wrappers since this
// isn't a real beat startup.
mon := beat.NewMonitoring()

for _, moduleCfg := range modulesData {
module, err := NewWrapper(moduleCfg, registry, logger, mon, moduleOptions...)
module, err := NewWrapper(moduleCfg, registry, logger, mon, p, moduleOptions...)
if err != nil {
return nil, err
}
Expand All @@ -61,7 +62,7 @@ func ConfiguredModules(registry *mb.Register, modulesData []*conf.C, configModul
return nil, fmt.Errorf("error loading config files: %w", err)
}
for _, conf := range confs {
m, err := NewWrapper(conf, registry, logger, mon, moduleOptions...)
m, err := NewWrapper(conf, registry, logger, mon, p, moduleOptions...)
if err != nil {
return nil, fmt.Errorf("module initialization error: %w", err)
}
Expand Down
5 changes: 3 additions & 2 deletions metricbeat/mb/module/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/elastic/beats/v7/metricbeat/mb/module"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
)

// ExampleWrapper demonstrates how to create a single Wrapper
Expand All @@ -53,7 +54,7 @@ func ExampleWrapper() {
return
}
// Create a new Wrapper based on the configuration.
m, err := module.NewWrapper(config, mb.Registry, logger, beat.NewMonitoring(), module.WithMetricSetInfo())
m, err := module.NewWrapper(config, mb.Registry, logger, beat.NewMonitoring(), paths.New(), module.WithMetricSetInfo())
if err != nil {
fmt.Println("Error:", err)
return
Expand Down Expand Up @@ -137,7 +138,7 @@ func ExampleRunner() {
}

// Create a new Wrapper based on the configuration.
m, err := module.NewWrapper(config, mb.Registry, logp.NewNopLogger(), beat.NewMonitoring(), module.WithMetricSetInfo())
m, err := module.NewWrapper(config, mb.Registry, logp.NewNopLogger(), beat.NewMonitoring(), paths.New(), module.WithMetricSetInfo())
if err != nil {
return
}
Expand Down
Loading
Loading