Skip to content

Commit 350e400

Browse files
authored
feat: remove references to es output and unused callbacks (#19554)
APM Server no longer uses the beats elasticsearch output to send data to ES. Remove unused callbacks and references
1 parent 78f1885 commit 350e400

File tree

3 files changed

+0
-100
lines changed

3 files changed

+0
-100
lines changed

internal/beatcmd/beat.go

Lines changed: 0 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,9 @@ import (
4646
"github.com/elastic/beats/v7/libbeat/api"
4747
"github.com/elastic/beats/v7/libbeat/beat"
4848
"github.com/elastic/beats/v7/libbeat/common/reload"
49-
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
50-
"github.com/elastic/beats/v7/libbeat/licenser"
5149
"github.com/elastic/beats/v7/libbeat/management"
5250
"github.com/elastic/beats/v7/libbeat/monitoring/report"
5351
"github.com/elastic/beats/v7/libbeat/monitoring/report/log"
54-
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
5552
"github.com/elastic/beats/v7/libbeat/pprof"
5653
"github.com/elastic/elastic-agent-libs/config"
5754
"github.com/elastic/elastic-agent-libs/file"
@@ -61,7 +58,6 @@ import (
6158
"github.com/elastic/elastic-agent-libs/monitoring/report/buffer"
6259
"github.com/elastic/elastic-agent-libs/paths"
6360
"github.com/elastic/elastic-agent-libs/service"
64-
libversion "github.com/elastic/elastic-agent-libs/version"
6561
"github.com/elastic/elastic-agent-system-metrics/metric/system/host"
6662
metricreport "github.com/elastic/elastic-agent-system-metrics/report"
6763
sysinfo "github.com/elastic/go-sysinfo"
@@ -379,18 +375,6 @@ func (b *Beat) Run(ctx context.Context) error {
379375

380376
logSystemInfo(b.Info)
381377

382-
cleanup, err := b.registerElasticsearchVersionCheck()
383-
if err != nil {
384-
return err
385-
}
386-
defer cleanup()
387-
388-
cleanup, err = b.registerClusterUUIDFetching()
389-
if err != nil {
390-
return err
391-
}
392-
defer cleanup()
393-
394378
statsRegistry := b.Monitoring.StatsRegistry()
395379

396380
if err := metricreport.SetupMetricsOptions(metricreport.MetricOptions{
@@ -739,75 +723,6 @@ func addDocappenderOutputElasticsearchMetrics(ctx context.Context, v monitoring.
739723
v.OnRegistryFinished()
740724
}
741725

742-
// registerElasticsearchVerfication registers a global callback to make sure
743-
// the Elasticsearch instance we are connecting to has a valid license, and is
744-
// at least on the same version as APM Server.
745-
//
746-
// registerElasticsearchVerification returns a cleanup function which must be
747-
// called on shutdown.
748-
func (b *Beat) registerElasticsearchVersionCheck() (func(), error) {
749-
uuid, err := elasticsearch.RegisterGlobalCallback(func(conn *eslegclient.Connection, logger *logp.Logger) error {
750-
if err := licenser.FetchAndVerify(conn, logger); err != nil {
751-
return err
752-
}
753-
esVersion := conn.GetVersion()
754-
beatVersion, err := libversion.New(b.Info.Version)
755-
if err != nil {
756-
return err
757-
}
758-
if esVersion.LessThanMajorMinor(beatVersion) {
759-
return fmt.Errorf(
760-
"%w Elasticsearch: %s, APM Server: %s",
761-
elasticsearch.ErrTooOld, esVersion.String(), b.Info.Version,
762-
)
763-
}
764-
return nil
765-
})
766-
if err != nil {
767-
return nil, err
768-
}
769-
return func() { elasticsearch.DeregisterGlobalCallback(uuid) }, nil
770-
}
771-
772-
func (b *Beat) registerClusterUUIDFetching() (func(), error) {
773-
callback := b.clusterUUIDFetchingCallback()
774-
uuid, err := elasticsearch.RegisterConnectCallback(callback)
775-
if err != nil {
776-
return nil, err
777-
}
778-
return func() { elasticsearch.DeregisterConnectCallback(uuid) }, nil
779-
}
780-
781-
// Build and return a callback to fetch the Elasticsearch cluster_uuid for monitoring
782-
func (b *Beat) clusterUUIDFetchingCallback() elasticsearch.ConnectCallback {
783-
stateRegistry := b.Monitoring.StateRegistry()
784-
elasticsearchRegistry := stateRegistry.GetOrCreateRegistry("outputs.elasticsearch")
785-
clusterUUIDRegVar := monitoring.NewString(elasticsearchRegistry, "cluster_uuid")
786-
787-
callback := func(esClient *eslegclient.Connection, _ *logp.Logger) error {
788-
var response struct {
789-
ClusterUUID string `json:"cluster_uuid"`
790-
}
791-
792-
status, body, err := esClient.Request("GET", "/", "", nil, nil)
793-
if err != nil {
794-
return fmt.Errorf("error querying /: %w", err)
795-
}
796-
if status > 299 {
797-
return fmt.Errorf("error querying /. Status: %d. Response body: %s", status, body)
798-
}
799-
err = json.Unmarshal(body, &response)
800-
if err != nil {
801-
return fmt.Errorf("error unmarshaling json when querying /. Body: %s", body)
802-
}
803-
804-
clusterUUIDRegVar.Set(response.ClusterUUID)
805-
return nil
806-
}
807-
808-
return callback
809-
}
810-
811726
func (b *Beat) setupMonitoring() (report.Reporter, error) {
812727
monitoringCfg := b.Config.Monitoring
813728

internal/beater/beater.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,9 @@ import (
4646
_ "google.golang.org/grpc/encoding/gzip"
4747

4848
"github.com/elastic/beats/v7/libbeat/beat"
49-
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
5049
"github.com/elastic/beats/v7/libbeat/instrumentation"
5150
"github.com/elastic/beats/v7/libbeat/licenser"
5251
"github.com/elastic/beats/v7/libbeat/outputs"
53-
esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
5452
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
5553
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
5654
agentconfig "github.com/elastic/elastic-agent-libs/config"
@@ -318,18 +316,6 @@ func (s *Runner) Run(ctx context.Context) error {
318316
close(publishReady)
319317
return nil
320318
})
321-
callbackUUID, err := esoutput.RegisterConnectCallback(func(*eslegclient.Connection, *logp.Logger) error {
322-
select {
323-
case <-publishReady:
324-
return nil
325-
default:
326-
}
327-
return errors.New("not ready for publishing events")
328-
})
329-
if err != nil {
330-
return err
331-
}
332-
defer esoutput.DeregisterConnectCallback(callbackUUID)
333319
newESClient := func(tp trace.TracerProvider) func(cfg *elasticsearch.Config, logger *logp.Logger) (*elasticsearch.Client, error) {
334320
return func(cfg *elasticsearch.Config, logger *logp.Logger) (*elasticsearch.Client, error) {
335321
httpTransport, err := elasticsearch.NewHTTPTransport(cfg, logger)

internal/publish/pub_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import (
3636
"github.com/elastic/beats/v7/libbeat/beat"
3737
"github.com/elastic/beats/v7/libbeat/idxmgmt"
3838
"github.com/elastic/beats/v7/libbeat/outputs"
39-
_ "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
4039
"github.com/elastic/beats/v7/libbeat/publisher"
4140
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
4241
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"

0 commit comments

Comments
 (0)