Skip to content

Commit b52f9d5

Browse files
authored
Rearrange collector startup to finish extension init sooner (aws-observability#399)
Signed-off-by: Anthony J Mirabella <[email protected]> Signed-off-by: Anthony J Mirabella <[email protected]>
1 parent 98b7824 commit b52f9d5

File tree

1 file changed

+25
-11
lines changed

1 file changed

+25
-11
lines changed

collector/main.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"os"
2121
"os/signal"
2222
"path/filepath"
23+
"sync"
2324
"syscall"
2425

2526
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/extensionapi"
@@ -40,14 +41,15 @@ func main() {
4041
ctx, lm := newLifecycleManager(context.Background(), logger)
4142

4243
// Will block until shutdown event is received or cancelled via the context.
43-
lm.processEvents(ctx)
44+
logger.Info("done", zap.Error(lm.run(ctx)))
4445
}
4546

4647
type lifecycleManager struct {
4748
logger *zap.Logger
4849
collector *Collector
4950
extensionClient *extensionapi.Client
5051
listener *telemetryapi.Listener
52+
wg sync.WaitGroup
5153
}
5254

5355
func newLifecycleManager(ctx context.Context, logger *zap.Logger) (context.Context, *lifecycleManager) {
@@ -79,23 +81,35 @@ func newLifecycleManager(ctx context.Context, logger *zap.Logger) (context.Conte
7981
logger.Fatal("Cannot register Telemetry API client", zap.Error(err))
8082
}
8183

82-
factories, _ := lambdacomponents.Components()
83-
collector := NewCollector(logger, factories)
84-
85-
if err = collector.Start(ctx); err != nil {
86-
logger.Fatal("Failed to start the extension", zap.Error(err))
87-
extensionClient.InitError(ctx, fmt.Sprintf("failed to start the collector: %v", err))
88-
}
89-
90-
return ctx, &lifecycleManager{
84+
lm := &lifecycleManager{
9185
logger: logger.Named("lifecycleManager"),
92-
collector: collector,
9386
extensionClient: extensionClient,
9487
listener: listener,
9588
}
89+
90+
go lm.processEvents(ctx)
91+
92+
factories, _ := lambdacomponents.Components()
93+
lm.collector = NewCollector(logger, factories)
94+
95+
return ctx, lm
96+
}
97+
98+
func (lm *lifecycleManager) run(ctx context.Context) error {
99+
if err := lm.collector.Start(ctx); err != nil {
100+
lm.logger.Warn("Failed to start the extension", zap.Error(err))
101+
lm.extensionClient.InitError(ctx, fmt.Sprintf("failed to start the collector: %v", err))
102+
return err
103+
}
104+
105+
lm.wg.Wait()
106+
return nil
96107
}
97108

98109
func (lm *lifecycleManager) processEvents(ctx context.Context) {
110+
lm.wg.Add(1)
111+
defer lm.wg.Done()
112+
99113
for {
100114
select {
101115
case <-ctx.Done():

0 commit comments

Comments
 (0)