Skip to content

Commit 1942730

Browse files
Alex BotenAneurysm9
andauthored
[collector] move lifecyclemanager and collector in internal (aws-observability#409)
* [collector] move lifecyclemanager and collector in internal Moving lifecyclemanager and the collector components in their own internal packages. Will follow this up with some tests for each package. Signed-off-by: Alex Boten <[email protected]> * Apply suggestions from code review Co-authored-by: Anthony Mirabella <[email protected]> * [chore] update makefile to include test/tidy (aws-observability#402) * [chore] update makefile to include test/tidy This updates the build command to also run tests and tidy. Signed-off-by: Alex Boten <[email protected]> * separate test/tidy into their own gh jobs * Apply suggestions from code review Co-authored-by: Anthony Mirabella <[email protected]> Signed-off-by: Alex Boten <[email protected]> Co-authored-by: Anthony Mirabella <[email protected]> * cleanup additional references * rename file Signed-off-by: Alex Boten <[email protected]> Co-authored-by: Anthony Mirabella <[email protected]>
1 parent ea79db1 commit 1942730

File tree

3 files changed

+147
-121
lines changed

3 files changed

+147
-121
lines changed

collector/collector.go renamed to collector/internal/collector/collector.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package main
15+
package collector
1616

1717
import (
1818
"context"
@@ -33,14 +33,6 @@ import (
3333
"go.uber.org/zap/zapcore"
3434
)
3535

36-
var (
37-
// Version variable will be replaced at link time after `make` has been run.
38-
Version = "latest"
39-
40-
// GitHash variable will be replaced at link time after `make` has been run.
41-
GitHash = "<NOT PROPERLY GENERATED>"
42-
)
43-
4436
// Collector implements the OtelcolRunner interfaces running a single otelcol as a go routine within the
4537
// same process as the test executor.
4638
type Collector struct {
@@ -50,6 +42,7 @@ type Collector struct {
5042
appDone chan struct{}
5143
stopped bool
5244
logger *zap.Logger
45+
version string
5346
}
5447

5548
func getConfig(logger *zap.Logger) string {
@@ -61,7 +54,7 @@ func getConfig(logger *zap.Logger) string {
6154
return val
6255
}
6356

64-
func NewCollector(logger *zap.Logger, factories component.Factories) *Collector {
57+
func NewCollector(logger *zap.Logger, factories component.Factories, version string) *Collector {
6558
l := logger.Named("NewCollector")
6659
providers := []confmap.Provider{fileprovider.New(), envprovider.New(), yamlprovider.New(), httpprovider.New(), s3provider.New()}
6760
mapProvider := make(map[string]confmap.Provider, len(providers))
@@ -87,6 +80,7 @@ func NewCollector(logger *zap.Logger, factories component.Factories) *Collector
8780
factories: factories,
8881
configProvider: cfgProvider,
8982
logger: logger,
83+
version: version,
9084
}
9185
return col
9286
}
@@ -96,7 +90,7 @@ func (c *Collector) Start(ctx context.Context) error {
9690
BuildInfo: component.BuildInfo{
9791
Command: "otelcol-lambda",
9892
Description: "Lambda Collector",
99-
Version: Version,
93+
Version: c.version,
10094
},
10195
ConfigProvider: c.configProvider,
10296
Factories: c.factories,
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package lifecycle
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"os"
21+
"os/signal"
22+
"path/filepath"
23+
"sync"
24+
"syscall"
25+
26+
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/collector"
27+
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/extensionapi"
28+
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi"
29+
"github.com/open-telemetry/opentelemetry-lambda/collector/lambdacomponents"
30+
"go.uber.org/zap"
31+
)
32+
33+
var (
34+
extensionName = filepath.Base(os.Args[0]) // extension name has to match the filename
35+
)
36+
37+
type manager struct {
38+
logger *zap.Logger
39+
collector *collector.Collector
40+
extensionClient *extensionapi.Client
41+
listener *telemetryapi.Listener
42+
wg sync.WaitGroup
43+
}
44+
45+
func NewManager(ctx context.Context, logger *zap.Logger, version string) (context.Context, *manager) {
46+
ctx, cancel := context.WithCancel(ctx)
47+
48+
sigs := make(chan os.Signal, 1)
49+
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
50+
go func() {
51+
s := <-sigs
52+
cancel()
53+
logger.Info("received signal", zap.String("signal", s.String()))
54+
}()
55+
56+
extensionClient := extensionapi.NewClient(logger, os.Getenv("AWS_LAMBDA_RUNTIME_API"))
57+
res, err := extensionClient.Register(ctx, extensionName)
58+
if err != nil {
59+
logger.Fatal("Cannot register extension", zap.Error(err))
60+
}
61+
62+
listener := telemetryapi.NewListener(logger)
63+
addr, err := listener.Start()
64+
if err != nil {
65+
logger.Fatal("Cannot start Telemetry API Listener", zap.Error(err))
66+
}
67+
68+
telemetryClient := telemetryapi.NewClient(logger)
69+
_, err = telemetryClient.Subscribe(ctx, res.ExtensionID, addr)
70+
if err != nil {
71+
logger.Fatal("Cannot register Telemetry API client", zap.Error(err))
72+
}
73+
74+
lm := &manager{
75+
logger: logger.Named("lifecycle.manager"),
76+
extensionClient: extensionClient,
77+
listener: listener,
78+
}
79+
80+
go lm.processEvents(ctx)
81+
82+
factories, _ := lambdacomponents.Components()
83+
lm.collector = collector.NewCollector(logger, factories, version)
84+
85+
return ctx, lm
86+
}
87+
88+
func (lm *manager) Run(ctx context.Context) error {
89+
if err := lm.collector.Start(ctx); err != nil {
90+
lm.logger.Warn("Failed to start the extension", zap.Error(err))
91+
lm.extensionClient.InitError(ctx, fmt.Sprintf("failed to start the collector: %v", err))
92+
return err
93+
}
94+
95+
lm.wg.Wait()
96+
return nil
97+
}
98+
99+
func (lm *manager) processEvents(ctx context.Context) {
100+
lm.wg.Add(1)
101+
defer lm.wg.Done()
102+
103+
for {
104+
select {
105+
case <-ctx.Done():
106+
return
107+
default:
108+
lm.logger.Debug("Waiting for event...")
109+
res, err := lm.extensionClient.NextEvent(ctx)
110+
if err != nil {
111+
lm.logger.Warn("error waiting for extension event", zap.Error(err))
112+
lm.extensionClient.ExitError(ctx, fmt.Sprintf("error waiting for extension event: %v", err))
113+
return
114+
}
115+
116+
lm.logger.Debug("Received ", zap.Any("event :", res))
117+
// Exit if we receive a SHUTDOWN event
118+
if res.EventType == extensionapi.Shutdown {
119+
lm.logger.Info("Received SHUTDOWN event")
120+
lm.listener.Shutdown()
121+
err = lm.collector.Stop()
122+
if err != nil {
123+
lm.extensionClient.ExitError(ctx, fmt.Sprintf("error stopping collector: %v", err))
124+
}
125+
return
126+
}
127+
128+
err = lm.listener.Wait(ctx, res.RequestID)
129+
if err != nil {
130+
lm.logger.Error("problem waiting for platform.runtimeDone event", zap.Error(err), zap.String("requestID", res.RequestID))
131+
}
132+
}
133+
}
134+
}

collector/main.go

Lines changed: 8 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -16,131 +16,29 @@ package main
1616

1717
import (
1818
"context"
19-
"fmt"
2019
"os"
21-
"os/signal"
22-
"path/filepath"
23-
"sync"
24-
"syscall"
2520

26-
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/extensionapi"
27-
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi"
28-
"github.com/open-telemetry/opentelemetry-lambda/collector/lambdacomponents"
21+
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/lifecycle"
2922
"go.uber.org/zap"
3023
"go.uber.org/zap/zapcore"
3124
)
3225

3326
var (
34-
extensionName = filepath.Base(os.Args[0]) // extension name has to match the filename
27+
// Version variable will be replaced at link time after `make` has been run.
28+
Version = "latest"
29+
30+
// GitHash variable will be replaced at link time after `make` has been run.
31+
GitHash = "<NOT PROPERLY GENERATED>"
3532
)
3633

3734
func main() {
3835
logger := initLogger()
3936
logger.Info("Launching OpenTelemetry Lambda extension", zap.String("version", Version))
4037

41-
ctx, lm := newLifecycleManager(context.Background(), logger)
38+
ctx, lm := lifecycle.NewManager(context.Background(), logger, Version)
4239

4340
// Will block until shutdown event is received or cancelled via the context.
44-
logger.Info("done", zap.Error(lm.run(ctx)))
45-
}
46-
47-
type lifecycleManager struct {
48-
logger *zap.Logger
49-
collector *Collector
50-
extensionClient *extensionapi.Client
51-
listener *telemetryapi.Listener
52-
wg sync.WaitGroup
53-
}
54-
55-
func newLifecycleManager(ctx context.Context, logger *zap.Logger) (context.Context, *lifecycleManager) {
56-
ctx, cancel := context.WithCancel(ctx)
57-
58-
sigs := make(chan os.Signal, 1)
59-
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
60-
go func() {
61-
s := <-sigs
62-
cancel()
63-
logger.Info("received signal", zap.String("signal", s.String()))
64-
}()
65-
66-
extensionClient := extensionapi.NewClient(logger, os.Getenv("AWS_LAMBDA_RUNTIME_API"))
67-
res, err := extensionClient.Register(ctx, extensionName)
68-
if err != nil {
69-
logger.Fatal("Cannot register extension", zap.Error(err))
70-
}
71-
72-
listener := telemetryapi.NewListener(logger)
73-
addr, err := listener.Start()
74-
if err != nil {
75-
logger.Fatal("Cannot start Telemetry API Listener", zap.Error(err))
76-
}
77-
78-
telemetryClient := telemetryapi.NewClient(logger)
79-
_, err = telemetryClient.Subscribe(ctx, res.ExtensionID, addr)
80-
if err != nil {
81-
logger.Fatal("Cannot register Telemetry API client", zap.Error(err))
82-
}
83-
84-
lm := &lifecycleManager{
85-
logger: logger.Named("lifecycleManager"),
86-
extensionClient: extensionClient,
87-
listener: listener,
88-
}
89-
90-
go lm.processEvents(ctx)
91-
92-
factories, _ := lambdacomponents.Components()
93-
lm.collector = NewCollector(logger, factories)
94-
95-
return ctx, lm
96-
}
97-
98-
func (lm *lifecycleManager) run(ctx context.Context) error {
99-
if err := lm.collector.Start(ctx); err != nil {
100-
lm.logger.Warn("Failed to start the extension", zap.Error(err))
101-
lm.extensionClient.InitError(ctx, fmt.Sprintf("failed to start the collector: %v", err))
102-
return err
103-
}
104-
105-
lm.wg.Wait()
106-
return nil
107-
}
108-
109-
func (lm *lifecycleManager) processEvents(ctx context.Context) {
110-
lm.wg.Add(1)
111-
defer lm.wg.Done()
112-
113-
for {
114-
select {
115-
case <-ctx.Done():
116-
return
117-
default:
118-
lm.logger.Debug("Waiting for event...")
119-
res, err := lm.extensionClient.NextEvent(ctx)
120-
if err != nil {
121-
lm.logger.Warn("error waiting for extension event", zap.Error(err))
122-
lm.extensionClient.ExitError(ctx, fmt.Sprintf("error waiting for extension event: %v", err))
123-
return
124-
}
125-
126-
lm.logger.Debug("Received ", zap.Any("event :", res))
127-
// Exit if we receive a SHUTDOWN event
128-
if res.EventType == extensionapi.Shutdown {
129-
lm.logger.Info("Received SHUTDOWN event")
130-
lm.listener.Shutdown()
131-
err = lm.collector.Stop()
132-
if err != nil {
133-
lm.extensionClient.ExitError(ctx, fmt.Sprintf("error stopping collector: %v", err))
134-
}
135-
return
136-
}
137-
138-
err = lm.listener.Wait(ctx, res.RequestID)
139-
if err != nil {
140-
lm.logger.Error("problem waiting for platform.runtimeDone event", zap.Error(err), zap.String("requestID", res.RequestID))
141-
}
142-
}
143-
}
41+
logger.Info("done", zap.Error(lm.Run(ctx)))
14442
}
14543

14644
func initLogger() *zap.Logger {

0 commit comments

Comments
 (0)