Skip to content

Commit 2027890

Browse files
authored
Add Decouple and Batch Processors to Collector (aws-observability#959)
* Add Decouple and Batch Processors Add a new processor that decouples the receiver and exporter sides of the pipeline and is aware of lambda lifecycle events. Also add the Batch processor to the list of available processor to reduce the cost of lambda function invocation at the expense of data being delayed. * Fix missing go.sum entries * Add link to lambda lifecycle * Add additional comments to clarify lifecycle * Update README.md * Add Makefile to lambdalifecycle * Fix race detector error * Update dependencies
1 parent f27bde2 commit 2027890

File tree

22 files changed

+1356
-5
lines changed

22 files changed

+1356
-5
lines changed

collector/go.mod

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@ go 1.20
44

55
replace github.com/open-telemetry/opentelemetry-lambda/collector/lambdacomponents => ./lambdacomponents
66

7+
replace github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle => ./lambdalifecycle
8+
79
replace github.com/open-telemetry/opentelemetry-lambda/collector/processor/coldstartprocessor => ./processor/coldstartprocessor
810

11+
replace github.com/open-telemetry/opentelemetry-lambda/collector/processor/decoupleprocessor => ./processor/decoupleprocessor
12+
913
replace github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver => ./receiver/telemetryapireceiver
1014

1115
// fixes ambiguous import error: found package cloud.google.com/go/compute/metadata in multiple modules:
@@ -18,6 +22,7 @@ require (
1822
github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259
1923
github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/s3provider v0.88.0
2024
github.com/open-telemetry/opentelemetry-lambda/collector/lambdacomponents v0.88.0
25+
github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle v0.0.0-00010101000000-000000000000
2126
github.com/stretchr/testify v1.8.4
2227
go.opentelemetry.io/collector/component v0.88.0
2328
go.opentelemetry.io/collector/confmap v0.88.0
@@ -94,6 +99,7 @@ require (
9499
github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.88.0 // indirect
95100
github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor v0.88.0 // indirect
96101
github.com/open-telemetry/opentelemetry-lambda/collector/processor/coldstartprocessor v0.88.0 // indirect
102+
github.com/open-telemetry/opentelemetry-lambda/collector/processor/decoupleprocessor v0.0.0-00010101000000-000000000000 // indirect
97103
github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver v0.88.0 // indirect
98104
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
99105
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
@@ -138,6 +144,7 @@ require (
138144
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0017 // indirect
139145
go.opentelemetry.io/collector/pdata v1.0.0-rcv0017 // indirect
140146
go.opentelemetry.io/collector/processor v0.88.0 // indirect
147+
go.opentelemetry.io/collector/processor/batchprocessor v0.88.0 // indirect
141148
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.88.0 // indirect
142149
go.opentelemetry.io/collector/receiver v0.88.0 // indirect
143150
go.opentelemetry.io/collector/receiver/otlpreceiver v0.88.0 // indirect

collector/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,8 @@ go.opentelemetry.io/collector/pdata v1.0.0-rcv0017 h1:AgALhc2VenoA5l1DvTdg7mkzaB
643643
go.opentelemetry.io/collector/pdata v1.0.0-rcv0017/go.mod h1:Rv9fOclA5AtM/JGm0d4jBOIAo1+jBA13UT5Bx0ovXi4=
644644
go.opentelemetry.io/collector/processor v0.88.0 h1:5BUZaH+RhTpgTVqBZCrBnN/vl0M1CtwQsZ8ek4iH1lc=
645645
go.opentelemetry.io/collector/processor v0.88.0/go.mod h1:2T5KxgBQxXuuyMu9dh+PIBxQ/geCFYcdnjmlWZx8o3E=
646+
go.opentelemetry.io/collector/processor/batchprocessor v0.88.0 h1:KEifeRMC9JysHpVhQPEyD29C+gqhP0cHuFpJMJUbE/Y=
647+
go.opentelemetry.io/collector/processor/batchprocessor v0.88.0/go.mod h1:SQhHxRcZ92/DLufTYzb4xnxnR/uuW5makoqezBlJgJ4=
646648
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.88.0 h1:m37nR0I1F7ao3qAJtzHB4GwBr1qrtDaPKNfeBOYzDF0=
647649
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.88.0/go.mod h1:p5+E4COkqfUPip3q2HFGuieBZbryI0h8KPP7jNYZvsY=
648650
go.opentelemetry.io/collector/receiver v0.88.0 h1:MPvVAFOfjl0+Ylka7so8QoK8T2Za2471rv5t3sqbbSY=

collector/internal/lifecycle/manager.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package lifecycle
1717
import (
1818
"context"
1919
"fmt"
20+
"github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle"
2021
"os"
2122
"os/signal"
2223
"path/filepath"
@@ -42,11 +43,12 @@ type collectorWrapper interface {
4243
}
4344

4445
type manager struct {
45-
logger *zap.Logger
46-
collector collectorWrapper
47-
extensionClient *extensionapi.Client
48-
listener *telemetryapi.Listener
49-
wg sync.WaitGroup
46+
logger *zap.Logger
47+
collector collectorWrapper
48+
extensionClient *extensionapi.Client
49+
listener *telemetryapi.Listener
50+
wg sync.WaitGroup
51+
lifecycleListeners []lambdalifecycle.Listener
5052
}
5153

5254
func NewManager(ctx context.Context, logger *zap.Logger, version string) (context.Context, *manager) {
@@ -132,6 +134,7 @@ func (lm *manager) processEvents(ctx context.Context) error {
132134
// Exit if we receive a SHUTDOWN event
133135
if res.EventType == extensionapi.Shutdown {
134136
lm.logger.Info("Received SHUTDOWN event")
137+
lm.notifyEnvironmentShutdown()
135138
lm.listener.Shutdown()
136139
err = lm.collector.Stop()
137140
if err != nil {
@@ -142,10 +145,37 @@ func (lm *manager) processEvents(ctx context.Context) error {
142145
return err
143146
}
144147

148+
lm.notifyFunctionInvoked()
149+
145150
err = lm.listener.Wait(ctx, res.RequestID)
146151
if err != nil {
147152
lm.logger.Error("problem waiting for platform.runtimeDone event", zap.Error(err), zap.String("requestID", res.RequestID))
148153
}
154+
155+
// Check other components are ready before allowing the freezing of the environment.
156+
lm.notifyFunctionFinished()
149157
}
150158
}
151159
}
160+
161+
func (lm *manager) notifyFunctionInvoked() {
162+
for _, listener := range lm.lifecycleListeners {
163+
listener.FunctionInvoked()
164+
}
165+
}
166+
167+
func (lm *manager) notifyFunctionFinished() {
168+
for _, listener := range lm.lifecycleListeners {
169+
listener.FunctionFinished()
170+
}
171+
}
172+
173+
func (lm *manager) notifyEnvironmentShutdown() {
174+
for _, listener := range lm.lifecycleListeners {
175+
listener.EnvironmentShutdown()
176+
}
177+
}
178+
179+
func (lm *manager) AddListener(listener lambdalifecycle.Listener) {
180+
lm.lifecycleListeners = append(lm.lifecycleListeners, listener)
181+
}

collector/lambdacomponents/default.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ import (
2222
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor"
2323
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor"
2424
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor"
25+
"github.com/open-telemetry/opentelemetry-lambda/collector/processor/decoupleprocessor"
2526
"go.opentelemetry.io/collector/exporter"
2627
"go.opentelemetry.io/collector/exporter/loggingexporter"
2728
"go.opentelemetry.io/collector/exporter/otlpexporter"
2829
"go.opentelemetry.io/collector/exporter/otlphttpexporter"
2930
"go.opentelemetry.io/collector/extension"
3031
"go.opentelemetry.io/collector/otelcol"
3132
"go.opentelemetry.io/collector/processor"
33+
"go.opentelemetry.io/collector/processor/batchprocessor"
3234
"go.opentelemetry.io/collector/processor/memorylimiterprocessor"
3335
"go.opentelemetry.io/collector/receiver"
3436
"go.opentelemetry.io/collector/receiver/otlpreceiver"
@@ -67,6 +69,8 @@ func Components(extensionID string) (otelcol.Factories, error) {
6769
resourceprocessor.NewFactory(),
6870
spanprocessor.NewFactory(),
6971
coldstartprocessor.NewFactory(),
72+
decoupleprocessor.NewFactory(),
73+
batchprocessor.NewFactory(),
7074
)
7175
if err != nil {
7276
errs = append(errs, err)

collector/lambdacomponents/go.mod

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.88.0
1212
github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor v0.88.0
1313
github.com/open-telemetry/opentelemetry-lambda/collector/processor/coldstartprocessor v0.88.0
14+
github.com/open-telemetry/opentelemetry-lambda/collector/processor/decoupleprocessor v0.0.0-00010101000000-000000000000
1415
github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver v0.88.0
1516
go.opentelemetry.io/collector/exporter v0.88.0
1617
go.opentelemetry.io/collector/exporter/loggingexporter v0.88.0
@@ -19,6 +20,7 @@ require (
1920
go.opentelemetry.io/collector/extension v0.88.0
2021
go.opentelemetry.io/collector/otelcol v0.88.0
2122
go.opentelemetry.io/collector/processor v0.88.0
23+
go.opentelemetry.io/collector/processor/batchprocessor v0.88.0
2224
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.88.0
2325
go.opentelemetry.io/collector/receiver v0.88.0
2426
go.opentelemetry.io/collector/receiver/otlpreceiver v0.88.0
@@ -82,6 +84,7 @@ require (
8284
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.88.0 // indirect
8385
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.88.0 // indirect
8486
github.com/open-telemetry/opentelemetry-lambda/collector v0.88.0 // indirect
87+
github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle v0.0.0-00010101000000-000000000000 // indirect
8588
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
8689
github.com/prometheus/client_golang v1.17.0 // indirect
8790
github.com/prometheus/client_model v0.5.0 // indirect
@@ -163,6 +166,10 @@ replace cloud.google.com/go => cloud.google.com/go v0.107.0
163166

164167
replace github.com/open-telemetry/opentelemetry-lambda/collector => ../
165168

169+
replace github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle => ../lambdalifecycle
170+
166171
replace github.com/open-telemetry/opentelemetry-lambda/collector/processor/coldstartprocessor => ../processor/coldstartprocessor
167172

173+
replace github.com/open-telemetry/opentelemetry-lambda/collector/processor/decoupleprocessor => ../processor/decoupleprocessor
174+
168175
replace github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver => ../receiver/telemetryapireceiver

collector/lambdacomponents/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,8 @@ go.opentelemetry.io/collector/pdata v1.0.0-rcv0017 h1:AgALhc2VenoA5l1DvTdg7mkzaB
626626
go.opentelemetry.io/collector/pdata v1.0.0-rcv0017/go.mod h1:Rv9fOclA5AtM/JGm0d4jBOIAo1+jBA13UT5Bx0ovXi4=
627627
go.opentelemetry.io/collector/processor v0.88.0 h1:5BUZaH+RhTpgTVqBZCrBnN/vl0M1CtwQsZ8ek4iH1lc=
628628
go.opentelemetry.io/collector/processor v0.88.0/go.mod h1:2T5KxgBQxXuuyMu9dh+PIBxQ/geCFYcdnjmlWZx8o3E=
629+
go.opentelemetry.io/collector/processor/batchprocessor v0.88.0 h1:KEifeRMC9JysHpVhQPEyD29C+gqhP0cHuFpJMJUbE/Y=
630+
go.opentelemetry.io/collector/processor/batchprocessor v0.88.0/go.mod h1:SQhHxRcZ92/DLufTYzb4xnxnR/uuW5makoqezBlJgJ4=
629631
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.88.0 h1:m37nR0I1F7ao3qAJtzHB4GwBr1qrtDaPKNfeBOYzDF0=
630632
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.88.0/go.mod h1:p5+E4COkqfUPip3q2HFGuieBZbryI0h8KPP7jNYZvsY=
631633
go.opentelemetry.io/collector/receiver v0.88.0 h1:MPvVAFOfjl0+Ylka7so8QoK8T2Za2471rv5t3sqbbSY=

collector/lambdalifecycle/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
include ../Makefile.Common

collector/lambdalifecycle/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
module github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle
2+
3+
go 1.20
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package lambdalifecycle
16+
17+
// Listener interface used to notify objects of Lambda lifecycle events.
18+
type Listener interface {
19+
// FunctionInvoked is called after the extension receives a "Next" notification.
20+
FunctionInvoked()
21+
// FunctionFinished is called after the extension is notified that the function has completed, but before the environment is frozen.
22+
// The environment is only frozen once all listeners have returned.
23+
FunctionFinished()
24+
// EnvironmentShutdown is called when the extension is notified that the environment is about to shut down.
25+
// Shutting down of the collector components only happens after all listeners have returned.
26+
EnvironmentShutdown()
27+
}
28+
29+
type Notifier interface {
30+
AddListener(listener Listener)
31+
}
32+
33+
var (
34+
notifier Notifier
35+
)
36+
37+
func SetNotifier(n Notifier) {
38+
notifier = n
39+
}
40+
41+
func GetNotifier() Notifier {
42+
return notifier
43+
}

collector/main.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"flag"
2020
"fmt"
21+
"github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle"
2122
"os"
2223

2324
"go.uber.org/zap"
@@ -47,6 +48,9 @@ func main() {
4748

4849
ctx, lm := lifecycle.NewManager(context.Background(), logger, Version)
4950

51+
// Set the new lifecycle manager as the lifecycle notifier for all other components.
52+
lambdalifecycle.SetNotifier(lm)
53+
5054
// Will block until shutdown event is received or cancelled via the context.
5155
logger.Info("done", zap.Error(lm.Run(ctx)))
5256
}

0 commit comments

Comments
 (0)