Skip to content

Commit 5a8b3e6

Browse files
author
Alex Boten
authored
[collector] add tests for lifecycle manager (aws-observability#410)
* [collector] add tests for lifecycle manager Improved the error handling in processEvents along the way. Signed-off-by: Alex Boten <[email protected]> * move collectorWrapper interface Signed-off-by: Alex Boten <[email protected]> Signed-off-by: Alex Boten <[email protected]>
1 parent 1942730 commit 5a8b3e6

File tree

5 files changed

+169
-10
lines changed

5 files changed

+169
-10
lines changed

collector/go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ require (
1414
github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259
1515
github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/s3provider v0.68.0
1616
github.com/open-telemetry/opentelemetry-lambda/collector/lambdacomponents v0.0.0
17+
github.com/pkg/errors v0.9.1
1718
github.com/stretchr/testify v1.8.1
1819
go.opentelemetry.io/collector v0.68.0
1920
go.opentelemetry.io/collector/component v0.68.0
@@ -41,6 +42,7 @@ require (
4142
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.11 // indirect
4243
github.com/aws/aws-sdk-go-v2/service/sts v1.17.7 // indirect
4344
github.com/aws/smithy-go v1.13.5 // indirect
45+
github.com/benbjohnson/clock v1.3.0 // indirect
4446
github.com/beorn7/perks v1.0.1 // indirect
4547
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
4648
github.com/cespare/xxhash/v2 v2.1.2 // indirect

collector/go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAm
409409
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
410410
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
411411
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
412+
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
412413
github.com/benbjohnson/immutable v0.2.1/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI=
413414
github.com/benbjohnson/tmpl v1.0.0/go.mod h1:igT620JFIi44B6awvU9IsDhR77IXWtFigTLil/RPdps=
414415
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=

collector/internal/collector/collector.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ import (
3333
"go.uber.org/zap/zapcore"
3434
)
3535

36-
// Collector implements the OtelcolRunner interfaces running a single otelcol as a go routine within the
37-
// same process as the test executor.
36+
// Collector runs a single otelcol as a go routine within the
37+
// same process as the executor.
3838
type Collector struct {
3939
factories component.Factories
4040
configProvider otelcol.ConfigProvider

collector/internal/lifecycle/manager.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,22 @@ import (
2727
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/extensionapi"
2828
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi"
2929
"github.com/open-telemetry/opentelemetry-lambda/collector/lambdacomponents"
30+
"github.com/pkg/errors"
3031
"go.uber.org/zap"
3132
)
3233

3334
var (
3435
extensionName = filepath.Base(os.Args[0]) // extension name has to match the filename
3536
)
3637

38+
type collectorWrapper interface {
39+
Start(ctx context.Context) error
40+
Stop() error
41+
}
42+
3743
type manager struct {
3844
logger *zap.Logger
39-
collector *collector.Collector
45+
collector collectorWrapper
4046
extensionClient *extensionapi.Client
4147
listener *telemetryapi.Listener
4248
wg sync.WaitGroup
@@ -77,7 +83,11 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex
7783
listener: listener,
7884
}
7985

80-
go lm.processEvents(ctx)
86+
go func() {
87+
if err := lm.processEvents(ctx); err != nil {
88+
lm.logger.Warn("Failed to process events", zap.Error(err))
89+
}
90+
}()
8191

8292
factories, _ := lambdacomponents.Components()
8393
lm.collector = collector.NewCollector(logger, factories, version)
@@ -96,21 +106,23 @@ func (lm *manager) Run(ctx context.Context) error {
96106
return nil
97107
}
98108

99-
func (lm *manager) processEvents(ctx context.Context) {
109+
func (lm *manager) processEvents(ctx context.Context) error {
100110
lm.wg.Add(1)
101111
defer lm.wg.Done()
102112

103113
for {
104114
select {
105115
case <-ctx.Done():
106-
return
116+
return nil
107117
default:
108118
lm.logger.Debug("Waiting for event...")
109119
res, err := lm.extensionClient.NextEvent(ctx)
110120
if err != nil {
111121
lm.logger.Warn("error waiting for extension event", zap.Error(err))
112-
lm.extensionClient.ExitError(ctx, fmt.Sprintf("error waiting for extension event: %v", err))
113-
return
122+
if _, exitErr := lm.extensionClient.ExitError(ctx, fmt.Sprintf("error waiting for extension event: %v", err)); exitErr != nil {
123+
err = errors.Wrap(err, exitErr.Error())
124+
}
125+
return err
114126
}
115127

116128
lm.logger.Debug("Received ", zap.Any("event :", res))
@@ -120,9 +132,11 @@ func (lm *manager) processEvents(ctx context.Context) {
120132
lm.listener.Shutdown()
121133
err = lm.collector.Stop()
122134
if err != nil {
123-
lm.extensionClient.ExitError(ctx, fmt.Sprintf("error stopping collector: %v", err))
135+
if _, exitErr := lm.extensionClient.ExitError(ctx, fmt.Sprintf("error stopping collector: %v", err)); exitErr != nil {
136+
err = errors.Wrap(err, exitErr.Error())
137+
}
124138
}
125-
return
139+
return err
126140
}
127141

128142
err = lm.listener.Wait(ctx, res.RequestID)
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package lifecycle
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"io"
21+
"net/http"
22+
"net/http/httptest"
23+
"net/url"
24+
"testing"
25+
26+
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/extensionapi"
27+
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi"
28+
"github.com/stretchr/testify/require"
29+
"go.uber.org/zap/zaptest"
30+
)
31+
32+
type MockCollector struct {
33+
err error
34+
}
35+
36+
func (c *MockCollector) Start(ctx context.Context) error {
37+
return c.err
38+
}
39+
func (c *MockCollector) Stop() error {
40+
return c.err
41+
}
42+
43+
func TestRun(t *testing.T) {
44+
logger := zaptest.NewLogger(t)
45+
ctx := context.Background()
46+
// test with an error
47+
lm := manager{
48+
collector: &MockCollector{err: fmt.Errorf("test start error")},
49+
logger: logger,
50+
extensionClient: extensionapi.NewClient(logger, ""),
51+
}
52+
require.Error(t, lm.Run(ctx))
53+
// test with no waitgroup
54+
lm = manager{
55+
collector: &MockCollector{},
56+
logger: logger,
57+
extensionClient: extensionapi.NewClient(logger, ""),
58+
}
59+
require.NoError(t, lm.Run(ctx))
60+
// test with waitgroup counter incremented
61+
lm = manager{
62+
collector: &MockCollector{},
63+
logger: logger,
64+
extensionClient: extensionapi.NewClient(logger, ""),
65+
}
66+
lm.wg.Add(1)
67+
go func() {
68+
require.NoError(t, lm.Run(ctx))
69+
}()
70+
lm.wg.Done()
71+
72+
}
73+
74+
func TestProcessEvents(t *testing.T) {
75+
type test struct {
76+
name string
77+
cancel bool
78+
err error
79+
serverResponse string
80+
collectorError error
81+
}
82+
testCases := []test{
83+
{
84+
name: "processEvents with context cancelled",
85+
cancel: true,
86+
},
87+
{
88+
name: "processEvents with error from extension API",
89+
err: fmt.Errorf("unexpected end of JSON input"),
90+
},
91+
{
92+
name: "processEvents with shutdown event received",
93+
serverResponse: `{"time":"2006-01-02T15:04:05.000Z", "eventType":"SHUTDOWN", "record":{}}`,
94+
},
95+
{
96+
name: "processEvents with shutdown event received and collector error",
97+
serverResponse: `{"time":"2006-01-02T15:04:05.000Z", "eventType":"SHUTDOWN", "record":{}}`,
98+
collectorError: fmt.Errorf("test shutdown error"),
99+
err: fmt.Errorf("test shutdown error"),
100+
},
101+
}
102+
for _, tc := range testCases {
103+
t.Run(tc.name, func(t *testing.T) {
104+
logger := zaptest.NewLogger(t)
105+
var ctx context.Context
106+
107+
if tc.cancel {
108+
var cancel context.CancelFunc
109+
ctx, cancel = context.WithCancel(context.Background())
110+
cancel()
111+
} else {
112+
ctx = context.Background()
113+
}
114+
115+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
116+
w.WriteHeader(200)
117+
w.Write([]byte(tc.serverResponse))
118+
_, err := io.ReadAll(r.Body)
119+
require.NoError(t, err, "failed to read request body: %v", err)
120+
}))
121+
defer server.Close()
122+
u, err := url.Parse(server.URL)
123+
require.NoError(t, err)
124+
125+
lm := manager{
126+
collector: &MockCollector{err: tc.collectorError},
127+
logger: logger,
128+
listener: telemetryapi.NewListener(logger),
129+
extensionClient: extensionapi.NewClient(logger, fmt.Sprintf("%s", u.Host)),
130+
}
131+
if tc.err != nil {
132+
err = lm.processEvents(ctx)
133+
require.Error(t, err)
134+
require.ErrorContains(t, err, tc.err.Error())
135+
} else {
136+
require.NoError(t, lm.processEvents(ctx))
137+
}
138+
139+
})
140+
}
141+
142+
}

0 commit comments

Comments
 (0)