diff --git a/internal/agentcfg/elasticsearch_test.go b/internal/agentcfg/elasticsearch_test.go index 5826de915a6..423c6d17de4 100644 --- a/internal/agentcfg/elasticsearch_test.go +++ b/internal/agentcfg/elasticsearch_test.go @@ -52,7 +52,10 @@ func newMockElasticsearchClient(t testing.TB, handler func(http.ResponseWriter, config := elasticsearch.DefaultConfig() config.Backoff.Init = time.Nanosecond config.Hosts = []string{srv.URL} - client, err := elasticsearch.NewClient(config, logptest.NewTestingLogger(t, "")) + client, err := elasticsearch.NewClient(elasticsearch.ClientParams{ + Config: config, + Logger: logptest.NewTestingLogger(t, ""), + }) require.NoError(t, err) return client } diff --git a/internal/beater/auth/authenticator.go b/internal/beater/auth/authenticator.go index c72b67141f1..9a2cbd13bd6 100644 --- a/internal/beater/auth/authenticator.go +++ b/internal/beater/auth/authenticator.go @@ -152,7 +152,7 @@ func NewAuthenticator(cfg config.AgentAuth, tp trace.TracerProvider, logger *log cfg.APIKey.ESConfig.Username = "" cfg.APIKey.ESConfig.Password = "" cfg.APIKey.ESConfig.APIKey = "" - client, err := elasticsearch.NewClientParams(elasticsearch.ClientParams{ + client, err := elasticsearch.NewClient(elasticsearch.ClientParams{ Config: cfg.APIKey.ESConfig, Logger: logger, TracerProvider: tp, diff --git a/internal/beater/beater.go b/internal/beater/beater.go index 484e0c5d044..39177cd53dc 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -316,25 +316,22 @@ func (s *Runner) Run(ctx context.Context) error { close(publishReady) return nil }) - newESClient := func(tp trace.TracerProvider) func(cfg *elasticsearch.Config, logger *logp.Logger) (*elasticsearch.Client, error) { - return func(cfg *elasticsearch.Config, logger *logp.Logger) (*elasticsearch.Client, error) { - httpTransport, err := elasticsearch.NewHTTPTransport(cfg, logger) - if err != nil { - return nil, err - } - transport := &waitReadyRoundTripper{Transport: httpTransport, ready: publishReady, drain: drain} - return elasticsearch.NewClientParams(elasticsearch.ClientParams{ - Config: cfg, - Transport: transport, - RetryOnError: func(_ *http.Request, err error) bool { - return !errors.Is(err, errServerShuttingDown) - }, - Logger: logger, - TracerProvider: tp, - }) + newElasticsearchClient := func(args elasticsearch.ClientParams) (*elasticsearch.Client, error) { + httpTransport, err := elasticsearch.NewHTTPTransport(args.Config, args.Logger) + if err != nil { + return nil, err } + transport := &waitReadyRoundTripper{Transport: httpTransport, ready: publishReady, drain: drain} + return elasticsearch.NewClient(elasticsearch.ClientParams{ + Config: args.Config, + Transport: transport, + RetryOnError: func(_ *http.Request, err error) bool { + return !errors.Is(err, errServerShuttingDown) + }, + Logger: args.Logger, + TracerProvider: args.TracerProvider, + }) } - newElasticsearchClient := newESClient(s.tracerProvider) var sourcemapFetcher sourcemap.Fetcher if s.config.RumConfig.Enabled && s.config.RumConfig.SourceMapping.Enabled { @@ -456,7 +453,7 @@ func (s *Runner) Run(ctx context.Context) error { SourcemapFetcher: sourcemapFetcher, PublishReady: publishReady, KibanaClient: kibanaClient, - NewElasticsearchClient: newESClient(tracenoop.NewTracerProvider()), + NewElasticsearchClient: newElasticsearchClient, GRPCServer: grpcServer, Semaphore: semaphore.NewWeighted(int64(s.config.MaxConcurrentDecoders)), BeatMonitoring: s.beatMonitoring, @@ -509,7 +506,7 @@ func (s *Runner) Run(ctx context.Context) error { if tracerServerListener != nil { // use a batch processor without tracing to prevent the tracing processor from sending traces to itself finalTracerBatchProcessor, closeTracerFinalBatchProcessor, err := s.newFinalBatchProcessor( - tracer, newESClient(tracenoop.NewTracerProvider()), memLimitGB, s.logger, tracenoop.NewTracerProvider(), metricnoop.NewMeterProvider(), + tracer, newElasticsearchClient, memLimitGB, s.logger, tracenoop.NewTracerProvider(), metricnoop.NewMeterProvider(), ) if err != nil { return err @@ -649,7 +646,7 @@ func (s *Runner) waitReady( if err != nil { return err } - esOutputClient, err = elasticsearch.NewClientParams(elasticsearch.ClientParams{ + esOutputClient, err = elasticsearch.NewClient(elasticsearch.ClientParams{ Config: esConfig, Logger: s.logger, TracerProvider: s.tracerProvider, @@ -712,7 +709,7 @@ func (s *Runner) waitReady( // "elasticsearch", then we use docappender; otherwise we use the libbeat publisher. func (s *Runner) newFinalBatchProcessor( tracer *apm.Tracer, - newElasticsearchClient func(*elasticsearch.Config, *logp.Logger) (*elasticsearch.Client, error), + newElasticsearchClient func(elasticsearch.ClientParams) (*elasticsearch.Client, error), memLimit float64, logger *logp.Logger, tp trace.TracerProvider, @@ -734,7 +731,11 @@ func (s *Runner) newFinalBatchProcessor( if err != nil { return nil, nil, err } - client, err := newElasticsearchClient(esCfg, logger) + client, err := newElasticsearchClient(elasticsearch.ClientParams{ + Config: esCfg, + Logger: logger, + TracerProvider: tp, + }) if err != nil { return nil, nil, err } @@ -918,11 +919,15 @@ const sourcemapIndex = ".apm-source-map" func newSourcemapFetcher( cfg config.SourceMapping, kibanaClient *kibana.Client, - newElasticsearchClient func(*elasticsearch.Config, *logp.Logger) (*elasticsearch.Client, error), + newElasticsearchClient func(elasticsearch.ClientParams) (*elasticsearch.Client, error), tp trace.TracerProvider, logger *logp.Logger, ) (sourcemap.Fetcher, context.CancelFunc, error) { - esClient, err := newElasticsearchClient(cfg.ESConfig, logger) + esClient, err := newElasticsearchClient(elasticsearch.ClientParams{ + Config: cfg.ESConfig, + Logger: logger, + TracerProvider: tp, + }) if err != nil { return nil, nil, err } diff --git a/internal/beater/beater_test.go b/internal/beater/beater_test.go index ae7e8166873..1bb1771d9be 100644 --- a/internal/beater/beater_test.go +++ b/internal/beater/beater_test.go @@ -160,7 +160,10 @@ func newMockClusterUUIDClient(t testing.TB, clusterUUID string) *elasticsearch.C config := elasticsearch.DefaultConfig() config.Hosts = []string{srv.URL} - client, err := elasticsearch.NewClient(config, logptest.NewTestingLogger(t, "")) + client, err := elasticsearch.NewClient(elasticsearch.ClientParams{ + Config: config, + Logger: logptest.NewTestingLogger(t, ""), + }) require.NoError(t, err) return client } diff --git a/internal/beater/server.go b/internal/beater/server.go index f277a9a50c6..3bbfc29701a 100644 --- a/internal/beater/server.go +++ b/internal/beater/server.go @@ -124,7 +124,7 @@ type ServerParams struct { // for indexing. Under some configuration, the server will wrap the // client's transport such that requests will be blocked until data // streams have been initialised. - NewElasticsearchClient func(*elasticsearch.Config, *logp.Logger) (*elasticsearch.Client, error) + NewElasticsearchClient func(elasticsearch.ClientParams) (*elasticsearch.Client, error) // GRPCServer holds a *grpc.Server to which services will be registered // for receiving data, configuration requests, etc. @@ -240,7 +240,7 @@ func newAgentConfigFetcher( ctx context.Context, cfg *config.Config, kibanaClient *kibana.Client, - newElasticsearchClient func(*elasticsearch.Config, *logp.Logger) (*elasticsearch.Client, error), + newElasticsearchClient func(elasticsearch.ClientParams) (*elasticsearch.Client, error), tp trace.TracerProvider, mp metric.MeterProvider, logger *logp.Logger, @@ -264,7 +264,11 @@ func newAgentConfigFetcher( // It is possible that none of the above applies. } - esClient, err := newElasticsearchClient(cfg.AgentConfig.ESConfig, logger) + esClient, err := newElasticsearchClient(elasticsearch.ClientParams{ + Config: cfg.AgentConfig.ESConfig, + Logger: logger, + TracerProvider: tp, + }) if err != nil { return nil, nil, err } diff --git a/internal/elasticsearch/client.go b/internal/elasticsearch/client.go index af51bf85976..6dbdc09c8c7 100644 --- a/internal/elasticsearch/client.go +++ b/internal/elasticsearch/client.go @@ -41,7 +41,7 @@ var userAgent = fmt.Sprintf("Elastic-APM-Server/%s go-elasticsearch/%s", version type Client = elastictransport.Client -// ClientParams holds parameters for NewClientParams. +// ClientParams holds parameters for NewClient. type ClientParams struct { // Config holds the user-defined configuration: Elasticsearch hosts, // max retries, etc. @@ -66,13 +66,7 @@ type ClientParams struct { } // NewClient returns a stack version-aware Elasticsearch client, -// equivalent to NewClientParams(ClientParams{Config: config}). -func NewClient(config *Config, logger *logp.Logger) (*Client, error) { - return NewClientParams(ClientParams{Config: config, Logger: logger}) -} - -// NewClientParams returns a stack version-aware Elasticsearch client. -func NewClientParams(args ClientParams) (*Client, error) { +func NewClient(args ClientParams) (*Client, error) { if args.Config == nil { return nil, errConfigMissing } diff --git a/internal/elasticsearch/client_test.go b/internal/elasticsearch/client_test.go index 96754e31fd1..7bf5eba4e87 100644 --- a/internal/elasticsearch/client_test.go +++ b/internal/elasticsearch/client_test.go @@ -34,14 +34,17 @@ import ( func TestClient(t *testing.T) { t.Run("no config", func(t *testing.T) { - goESClient, err := NewClient(nil, logptest.NewTestingLogger(t, "")) + goESClient, err := NewClient(ClientParams{}) assert.Error(t, err) assert.Nil(t, goESClient) }) t.Run("valid config", func(t *testing.T) { cfg := Config{Hosts: Hosts{"localhost:9200", "localhost:9201"}} - goESClient, err := NewClient(&cfg, logptest.NewTestingLogger(t, "")) + goESClient, err := NewClient(ClientParams{ + Config: &cfg, + Logger: logptest.NewTestingLogger(t, ""), + }) require.NoError(t, err) assert.NotNil(t, goESClient) }) @@ -60,7 +63,10 @@ func TestClientCustomHeaders(t *testing.T) { Hosts: Hosts{srv.URL}, Headers: map[string]string{"custom": "header"}, } - client, err := NewClient(&cfg, logptest.NewTestingLogger(t, "")) + client, err := NewClient(ClientParams{ + Config: &cfg, + Logger: logptest.NewTestingLogger(t, ""), + }) require.NoError(t, err) req, err := http.NewRequest(http.MethodPost, "/_bulk", bytes.NewReader([]byte("{}"))) @@ -73,7 +79,6 @@ func TestClientCustomHeaders(t *testing.T) { case <-time.After(1 * time.Second): t.Fatal("timed out while waiting for request") } - } func TestClientCustomUserAgent(t *testing.T) { @@ -88,7 +93,10 @@ func TestClientCustomUserAgent(t *testing.T) { cfg := Config{ Hosts: Hosts{srv.URL}, } - client, err := NewClient(&cfg, logptest.NewTestingLogger(t, "")) + client, err := NewClient(ClientParams{ + Config: &cfg, + Logger: logptest.NewTestingLogger(t, ""), + }) require.NoError(t, err) req, err := http.NewRequest(http.MethodPost, "/_bulk", bytes.NewReader([]byte("{}"))) @@ -179,7 +187,10 @@ func TestClientRetryableStatuses(t *testing.T) { MaxRetries: maxRetries, Hosts: []string{srv.URL}, } - client, err := NewClient(&c, logptest.NewTestingLogger(t, "")) + client, err := NewClient(ClientParams{ + Config: &c, + Logger: logptest.NewTestingLogger(t, ""), + }) require.NoError(t, err) var buf bytes.Buffer diff --git a/internal/elasticsearch/security_api_test.go b/internal/elasticsearch/security_api_test.go index aa8b47bd39a..443e91c46fd 100644 --- a/internal/elasticsearch/security_api_test.go +++ b/internal/elasticsearch/security_api_test.go @@ -38,7 +38,10 @@ func TestHasPrivilegesError(t *testing.T) { })) defer server.Close() - client, err := NewClient(&Config{Hosts: Hosts{server.Listener.Addr().String()}}, logptest.NewTestingLogger(t, "")) + client, err := NewClient(ClientParams{ + Config: &Config{Hosts: Hosts{server.Listener.Addr().String()}}, + Logger: logptest.NewTestingLogger(t, ""), + }) require.NoError(t, err) resp, err := HasPrivileges(context.Background(), client, HasPrivilegesRequest{}, "foo") diff --git a/internal/sourcemap/elasticsearch_test.go b/internal/sourcemap/elasticsearch_test.go index d0dd3e795fd..631ebc9b8ec 100644 --- a/internal/sourcemap/elasticsearch_test.go +++ b/internal/sourcemap/elasticsearch_test.go @@ -142,7 +142,7 @@ func newUnavailableElasticsearchClient(t testing.TB) *elasticsearch.Client { cfg.MaxRetries = 1 cfg.Backoff.Init = time.Nanosecond cfg.Backoff.Max = time.Nanosecond - client, err := elasticsearch.NewClientParams(elasticsearch.ClientParams{Config: cfg, Transport: transport, Logger: logptest.NewTestingLogger(t, "")}) + client, err := elasticsearch.NewClient(elasticsearch.ClientParams{Config: cfg, Transport: transport, Logger: logptest.NewTestingLogger(t, "")}) require.NoError(t, err) return client } @@ -162,7 +162,10 @@ func newMockElasticsearchClient(t testing.TB, statusCode int, responseBody io.Re config := elasticsearch.DefaultConfig() config.Backoff.Init = time.Nanosecond config.Hosts = []string{srv.URL} - client, err := elasticsearch.NewClient(config, logptest.NewTestingLogger(t, "")) + client, err := elasticsearch.NewClient(elasticsearch.ClientParams{ + Config: config, + Logger: logptest.NewTestingLogger(t, ""), + }) require.NoError(t, err) return client } diff --git a/internal/sourcemap/metadata_fetcher_test.go b/internal/sourcemap/metadata_fetcher_test.go index 114f4bc529d..5aadc200e41 100644 --- a/internal/sourcemap/metadata_fetcher_test.go +++ b/internal/sourcemap/metadata_fetcher_test.go @@ -120,7 +120,10 @@ func TestMetadataFetcher(t *testing.T) { esConfig := elasticsearch.DefaultConfig() esConfig.Hosts = []string{ts.URL} - esClient, err := elasticsearch.NewClient(esConfig, logptest.NewTestingLogger(t, "")) + esClient, err := elasticsearch.NewClient(elasticsearch.ClientParams{ + Config: esConfig, + Logger: logptest.NewTestingLogger(t, ""), + }) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) @@ -268,7 +271,10 @@ func TestInvalidation(t *testing.T) { esConfig := elasticsearch.DefaultConfig() esConfig.Hosts = []string{ts.URL} - esClient, err := elasticsearch.NewClient(esConfig, logptest.NewTestingLogger(t, "")) + esClient, err := elasticsearch.NewClient(elasticsearch.ClientParams{ + Config: esConfig, + Logger: logptest.NewTestingLogger(t, ""), + }) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) diff --git a/internal/sourcemap/processor_test.go b/internal/sourcemap/processor_test.go index 8908fcd2374..dafb51337d7 100644 --- a/internal/sourcemap/processor_test.go +++ b/internal/sourcemap/processor_test.go @@ -286,7 +286,7 @@ func TestBatchProcessorTimeout(t *testing.T) { cfg := elasticsearch.DefaultConfig() cfg.Hosts = []string{""} - client, err := elasticsearch.NewClientParams(elasticsearch.ClientParams{ + client, err := elasticsearch.NewClient(elasticsearch.ClientParams{ Config: cfg, Transport: transport, Logger: logptest.NewTestingLogger(t, ""), diff --git a/x-pack/apm-server/main.go b/x-pack/apm-server/main.go index 93b9ebc59dd..6fdf5444c4b 100644 --- a/x-pack/apm-server/main.go +++ b/x-pack/apm-server/main.go @@ -13,6 +13,7 @@ import ( "github.com/gofrs/uuid/v5" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace/noop" "golang.org/x/sync/errgroup" "github.com/elastic/beats/v7/libbeat/common/reload" @@ -27,6 +28,7 @@ import ( "github.com/elastic/apm-data/model/modelprocessor" "github.com/elastic/apm-server/internal/beatcmd" "github.com/elastic/apm-server/internal/beater" + "github.com/elastic/apm-server/internal/elasticsearch" "github.com/elastic/apm-server/x-pack/apm-server/sampling" "github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage" ) @@ -102,7 +104,11 @@ func newProcessors(args beater.ServerParams) ([]namedProcessor, error) { func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, error) { tailSamplingConfig := args.Config.Sampling.Tail - es, err := args.NewElasticsearchClient(tailSamplingConfig.ESConfig, args.Logger) + es, err := args.NewElasticsearchClient(elasticsearch.ClientParams{ + Config: tailSamplingConfig.ESConfig, + Logger: args.Logger, + TracerProvider: noop.NewTracerProvider(), + }) if err != nil { return nil, fmt.Errorf("failed to create Elasticsearch client for tail-sampling: %w", err) }