Skip to content

Commit bcceb0d

Browse files
author
Gustavo Caso
authored
[ASCII-1281] Ensure host metadata information information is sent first (#23620)
* add new priority provider * filter and get non optional providers in the constructor * fix linters * apply feedback * add test for the priority provider logic * ensure test consistency
1 parent 4bbda0e commit bcceb0d

File tree

3 files changed

+129
-26
lines changed

3 files changed

+129
-26
lines changed

comp/metadata/host/hostimpl/host.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ type provides struct {
6666
fx.Out
6767

6868
Comp hostComp.Component
69-
MetadataProvider runnerimpl.Provider
69+
MetadataProvider runnerimpl.PriorityProvider
7070
FlareProvider flaretypes.Provider
7171
StatusHeaderProvider status.HeaderInformationProvider
7272
}
@@ -102,7 +102,7 @@ func newHostProvider(deps dependencies) provides {
102102
}
103103
return provides{
104104
Comp: &h,
105-
MetadataProvider: runnerimpl.NewProvider(h.collect),
105+
MetadataProvider: runnerimpl.NewPriorityProvider(h.collect),
106106
FlareProvider: flaretypes.NewProvider(h.fillFlare),
107107
StatusHeaderProvider: status.NewHeaderInformationProvider(StatusProvider{
108108
Config: h.config,

comp/metadata/runner/runnerimpl/runner.go

Lines changed: 62 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,18 @@ func Module() fxutil.Module {
2727
// MetadataProvider is the provider for metadata
2828
type MetadataProvider func(context.Context) time.Duration
2929

30+
// PriorityMetadataProvider is the provider for metadata that needs to be execute at start time of the agent.
31+
// Right now the agent needs the host metadata provider to execute first to ensure host tags are being reported correctly.
32+
// This is a temporary workaorund until we figure a more permanent solution in the backend
33+
// If you need to use this provide please contact the agent-shared-components team
34+
type PriorityMetadataProvider func(context.Context) time.Duration
35+
3036
type runnerImpl struct {
3137
log log.Component
3238
config config.Component
3339

34-
// providers are the metada providers to run. They're Optional because some of them can be disabled through the
35-
// configuration
36-
providers []optional.Option[MetadataProvider]
40+
providers []MetadataProvider
41+
priorityProviders []PriorityMetadataProvider
3742

3843
wg sync.WaitGroup
3944
stopChan chan struct{}
@@ -45,7 +50,8 @@ type dependencies struct {
4550
Log log.Component
4651
Config config.Component
4752

48-
Providers []optional.Option[MetadataProvider] `group:"metadata_provider"`
53+
Providers []optional.Option[MetadataProvider] `group:"metadata_provider"`
54+
PriorityProviders []optional.Option[PriorityMetadataProvider] `group:"metadata_priority_provider"`
4955
}
5056

5157
// Provider represents the callback from a metada provider. This is returned by 'NewProvider' helper.
@@ -55,13 +61,27 @@ type Provider struct {
5561
Callback optional.Option[MetadataProvider] `group:"metadata_provider"`
5662
}
5763

64+
// PriorityProvider represents the callback from a priority metada provider. This is returned by 'NewPriorityProvider' helper.
65+
type PriorityProvider struct {
66+
fx.Out
67+
68+
Callback optional.Option[PriorityMetadataProvider] `group:"metadata_priority_provider"`
69+
}
70+
5871
// NewProvider registers a new metadata provider by adding a callback to the runner.
5972
func NewProvider(callback MetadataProvider) Provider {
6073
return Provider{
6174
Callback: optional.NewOption[MetadataProvider](callback),
6275
}
6376
}
6477

78+
// NewPriorityProvider registers a new metadata provider by adding a callback to the runner.
79+
func NewPriorityProvider(callback PriorityMetadataProvider) PriorityProvider {
80+
return PriorityProvider{
81+
Callback: optional.NewOption[PriorityMetadataProvider](callback),
82+
}
83+
}
84+
6585
// NewEmptyProvider returns a empty provider which is not going to register anything. This is useful for providers that
6686
// can be enabled/disabled through configuration.
6787
func NewEmptyProvider() Provider {
@@ -72,11 +92,30 @@ func NewEmptyProvider() Provider {
7292

7393
// createRunner instantiates a runner object
7494
func createRunner(deps dependencies) *runnerImpl {
95+
providers := []MetadataProvider{}
96+
priorityProviders := []PriorityMetadataProvider{}
97+
98+
nonNilProviders := fxutil.GetAndFilterGroup(deps.Providers)
99+
nonNilPriorityProviders := fxutil.GetAndFilterGroup(deps.PriorityProviders)
100+
101+
for _, optionaP := range nonNilProviders {
102+
if p, isSet := optionaP.Get(); isSet {
103+
providers = append(providers, p)
104+
}
105+
}
106+
107+
for _, optionaP := range nonNilPriorityProviders {
108+
if p, isSet := optionaP.Get(); isSet {
109+
priorityProviders = append(priorityProviders, p)
110+
}
111+
}
112+
75113
return &runnerImpl{
76-
log: deps.Log,
77-
config: deps.Config,
78-
providers: fxutil.GetAndFilterGroup(deps.Providers),
79-
stopChan: make(chan struct{}),
114+
log: deps.Log,
115+
config: deps.Config,
116+
providers: providers,
117+
priorityProviders: priorityProviders,
118+
stopChan: make(chan struct{}),
80119
}
81120
}
82121

@@ -100,7 +139,7 @@ func newRunner(lc fx.Lifecycle, deps dependencies) runner.Component {
100139
}
101140

102141
// handleProvider runs a provider at regular interval until the runner is stopped
103-
func (r *runnerImpl) handleProvider(p MetadataProvider) {
142+
func (r *runnerImpl) handleProvider(p func(context.Context) time.Duration) {
104143
r.log.Debugf("Starting runner for MetadataProvider %#v", p)
105144
r.wg.Add(1)
106145

@@ -137,12 +176,21 @@ func (r *runnerImpl) handleProvider(p MetadataProvider) {
137176
// start is called by FX when the application starts. Lifecycle hooks are blocking and called sequencially. We should
138177
// not block here.
139178
func (r *runnerImpl) start() error {
140-
r.log.Debugf("Starting metadata runner with %d providers", len(r.providers))
141-
for _, optionaP := range r.providers {
142-
if p, isSet := optionaP.Get(); isSet {
143-
go r.handleProvider(p)
179+
r.log.Debugf("Starting metadata runner with %d priority providers and %d regular providers", len(r.priorityProviders), len(r.providers))
180+
181+
go func() {
182+
for _, priorityProvider := range r.priorityProviders {
183+
// Execute synchronously the priority provider
184+
priorityProvider(context.Background())
185+
186+
go r.handleProvider(priorityProvider)
144187
}
145-
}
188+
189+
for _, provider := range r.providers {
190+
go r.handleProvider(provider)
191+
}
192+
}()
193+
146194
return nil
147195
}
148196

comp/metadata/runner/runnerimpl/runner_test.go

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package runnerimpl
88

99
import (
1010
"context"
11+
"sync"
1112
"testing"
1213
"time"
1314

@@ -21,12 +22,15 @@ import (
2122
)
2223

2324
func TestHandleProvider(t *testing.T) {
24-
called := make(chan struct{})
25+
wg := sync.WaitGroup{}
26+
2527
provider := func(context.Context) time.Duration {
26-
called <- struct{}{}
28+
wg.Done()
2729
return 1 * time.Minute // Long timeout to block
2830
}
2931

32+
wg.Add(1)
33+
3034
r := createRunner(
3135
fxutil.Test[dependencies](
3236
t,
@@ -36,18 +40,21 @@ func TestHandleProvider(t *testing.T) {
3640
))
3741

3842
r.start()
39-
// either called receive a value or the test will fail as a timeout
40-
<-called
43+
// either the provider call wg.Done() or the test will fail as a timeout
44+
wg.Wait()
4145
assert.NoError(t, r.stop())
4246
}
4347

4448
func TestRunnerCreation(t *testing.T) {
45-
called := make(chan struct{})
46-
callback := func(context.Context) time.Duration {
47-
called <- struct{}{}
49+
wg := sync.WaitGroup{}
50+
51+
provider := func(context.Context) time.Duration {
52+
wg.Done()
4853
return 1 * time.Minute // Long timeout to block
4954
}
5055

56+
wg.Add(1)
57+
5158
lc := fxtest.NewLifecycle(t)
5259
fxutil.Test[runner.Component](
5360
t,
@@ -56,14 +63,62 @@ func TestRunnerCreation(t *testing.T) {
5663
config.MockModule(),
5764
Module(),
5865
// Supplying our provider by using the helper function
59-
fx.Supply(NewProvider(callback)),
66+
fx.Supply(NewProvider(provider)),
6067
)
6168

6269
ctx := context.Background()
6370
lc.Start(ctx)
6471

65-
// either called receive a value or the test will fail as a timeout
66-
<-called
72+
// either the provider call wg.Done() or the test will fail as a timeout
73+
wg.Wait()
74+
75+
assert.NoError(t, lc.Stop(ctx))
76+
}
77+
78+
func TestPriorityProviderOrder(t *testing.T) {
79+
wg := sync.WaitGroup{}
80+
m := sync.Mutex{}
81+
eventSequence := []string{}
82+
83+
provider := func(context.Context) time.Duration {
84+
m.Lock()
85+
eventSequence = append(eventSequence, "provider")
86+
m.Unlock()
87+
wg.Done()
88+
return 1 * time.Minute // Long timeout to block
89+
}
90+
91+
priorityProvider := func(context.Context) time.Duration {
92+
m.Lock()
93+
eventSequence = append(eventSequence, "priorityProvider")
94+
m.Unlock()
95+
wg.Done()
96+
return 1 * time.Minute // Long timeout to block
97+
}
98+
99+
// We add 3 work unit because the priority provider are called twice at startup. One synchronousily and one asynchronosuly
100+
wg.Add(3)
101+
102+
lc := fxtest.NewLifecycle(t)
103+
fxutil.Test[runner.Component](
104+
t,
105+
fx.Supply(lc),
106+
logimpl.MockModule(),
107+
config.MockModule(),
108+
Module(),
109+
// Supplying our provider by using the helper function
110+
fx.Supply(NewProvider(provider)),
111+
fx.Supply(NewPriorityProvider(priorityProvider)),
112+
)
113+
114+
ctx := context.Background()
115+
lc.Start(ctx)
67116

117+
// either the provider call wg.Done() or the test will fail as a timeout
118+
wg.Wait()
119+
// it is expected to see three events. Priority providers are called twice at start up
120+
assert.Equal(t, 3, len(eventSequence))
121+
// ensure priority provider is the first provider to be executed
122+
assert.Equal(t, "priorityProvider", eventSequence[0])
68123
assert.NoError(t, lc.Stop(ctx))
69124
}

0 commit comments

Comments
 (0)