Skip to content

Commit 850b659

Browse files
committed
pkg/beholder: add Emitter.Close; track goroutines
1 parent 3b2bf34 commit 850b659

File tree

9 files changed

+57
-6
lines changed

9 files changed

+57
-6
lines changed

pkg/beholder/beholdertest/beholder.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@ type assertMessageEmitter struct {
169169
msgs []beholder.Message
170170
}
171171

172+
func (e *assertMessageEmitter) Close() error { return nil }
173+
172174
func (e *assertMessageEmitter) Emit(_ context.Context, msg []byte, attrKVs ...any) error {
173175
e.t.Helper()
174176

pkg/beholder/chip_client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ package beholder
33
import (
44
"context"
55
"fmt"
6+
"io"
7+
68
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
79
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb"
810
)
911

1012
type ChipIngressClient interface {
1113
RegisterSchema(ctx context.Context, schemas ...*pb.Schema) (map[string]int, error)
14+
io.Closer
1215
}
1316

1417
type chipIngressClient struct {
@@ -24,6 +27,7 @@ func NewChipIngressClient(client chipingress.Client) (ChipIngressClient, error)
2427
client: client,
2528
}, nil
2629
}
30+
func (sr *chipIngressClient) Close() error { return nil }
2731

2832
// RegisterSchema registers one or more schemas with the Chip Ingress service. Returns a map of subject to version for each registered schema.
2933
func (sr *chipIngressClient) RegisterSchema(ctx context.Context, schemas ...*pb.Schema) (map[string]int, error) {

pkg/beholder/chip_ingress_emitter.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ func NewChipIngressEmitter(client chipingress.Client) (Emitter, error) {
2121
return &ChipIngressEmitter{client: client}, nil
2222
}
2323

24+
func (c *ChipIngressEmitter) Close() error {
25+
return c.client.Close()
26+
}
27+
2428
func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
2529

2630
sourceDomain, entityType, err := ExtractSourceAndType(attrKVs...)

pkg/beholder/client.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/hex"
66
"errors"
77
"fmt"
8+
"io"
89
"time"
910

1011
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
@@ -30,6 +31,7 @@ import (
3031
type Emitter interface {
3132
// Sends message with bytes and attributes to OTel Collector
3233
Emit(ctx context.Context, body []byte, attrKVs ...any) error
34+
io.Closer
3335
}
3436

3537
type Client struct {
@@ -250,8 +252,14 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
250252

251253
// Closes all providers, flushes all data and stops all background processes
252254
func (c Client) Close() (err error) {
255+
if c.Chip != nil {
256+
err = errors.Join(err, c.Chip.Close())
257+
}
258+
if c.Emitter != nil {
259+
err = errors.Join(err, c.Emitter.Close())
260+
}
253261
if c.OnClose != nil {
254-
return c.OnClose()
262+
err = errors.Join(err, c.OnClose())
255263
}
256264
return
257265
}

pkg/beholder/dual_source_emitter.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,25 @@ package beholder
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"sync/atomic"
68

79
"github.com/smartcontractkit/chainlink-common/pkg/logger"
10+
"github.com/smartcontractkit/chainlink-common/pkg/services"
811
)
912

10-
// dualSourceEmitter emits both to chip ingress and to the otel collector
13+
// DualSourceEmitter emits both to chip ingress and to the otel collector
1114
// this is to help transition from sending custom messages via OTLP to instead use chip-ingress
1215
// we want to send to both during the transition period, then cutover to using
1316
// chipIngressEmitter only
1417
type DualSourceEmitter struct {
1518
chipIngressEmitter Emitter
1619
otelCollectorEmitter Emitter
1720
log logger.Logger
21+
stopCh services.StopChan
22+
wg services.WaitGroup
23+
closed atomic.Bool
1824
}
1925

2026
func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitter) (Emitter, error) {
@@ -36,9 +42,19 @@ func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitt
3642
chipIngressEmitter: chipIngressEmitter,
3743
otelCollectorEmitter: otelCollectorEmitter,
3844
log: logger,
45+
stopCh: make(services.StopChan),
3946
}, nil
4047
}
4148

49+
func (d *DualSourceEmitter) Close() error {
50+
if wasClosed := d.closed.Swap(true); wasClosed {
51+
return errors.New("already closed")
52+
}
53+
close(d.stopCh)
54+
d.wg.Wait()
55+
return errors.Join(d.chipIngressEmitter.Close(), d.otelCollectorEmitter.Close())
56+
}
57+
4258
func (d *DualSourceEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
4359

4460
// Emit via OTLP first
@@ -47,13 +63,21 @@ func (d *DualSourceEmitter) Emit(ctx context.Context, body []byte, attrKVs ...an
4763
}
4864

4965
// Emit via chip ingress async
50-
go func() {
51-
if err := d.chipIngressEmitter.Emit(context.WithoutCancel(ctx), body, attrKVs...); err != nil {
66+
if err := d.wg.TryAdd(1); err != nil {
67+
return err
68+
}
69+
go func(ctx context.Context) {
70+
defer d.wg.Done()
71+
var cancel context.CancelFunc
72+
ctx, cancel = d.stopCh.Ctx(ctx)
73+
defer cancel()
74+
75+
if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil {
5276
// If the chip ingress emitter fails, we ONLY log the error
5377
// because we still want to send the data to the OTLP collector and not cause disruption
5478
d.log.Infof("failed to emit to chip ingress: %v", err)
5579
}
56-
}()
80+
}(context.WithoutCancel(ctx))
5781

5882
return nil
5983
}

pkg/beholder/dual_source_emitter_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ import (
55
"fmt"
66
"testing"
77

8-
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
98
"github.com/stretchr/testify/assert"
109
"github.com/stretchr/testify/require"
10+
11+
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
1112
)
1213

1314
func TestNewDualSourceEmitter(t *testing.T) {
@@ -80,6 +81,8 @@ type mockEmitter struct {
8081
emitFunc func(ctx context.Context, body []byte, attrKVs ...any) error
8182
}
8283

84+
func (m *mockEmitter) Close() error { return nil }
85+
8386
func (m *mockEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
8487
if m.emitFunc != nil {
8588
return m.emitFunc(ctx, body, attrKVs...)

pkg/beholder/message_emitter.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ func NewMessageEmitter(logger otellog.Logger) Emitter {
1717
}
1818
}
1919

20+
func (e messageEmitter) Close() error { return nil }
21+
2022
// Emits logs the message, but does not wait for the message to be processed.
2123
// Open question: what are pros/cons for using use map[]any vs use otellog.KeyValue
2224
func (e messageEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {

pkg/beholder/noop.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ func NewWriterClient(w io.Writer) (*Client, error) {
9999

100100
type noopMessageEmitter struct{}
101101

102+
func (e noopMessageEmitter) Close() error { return nil }
103+
102104
func (noopMessageEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
103105
return nil
104106
}

pkg/capabilities/events/events_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ type testEmitter struct {
1717
attrs []any
1818
}
1919

20+
func (t *testEmitter) Close() error { return nil }
21+
2022
func (t *testEmitter) Emit(ctx context.Context, payload []byte, attrKVs ...any) error {
2123
t.payload = payload
2224
t.attrs = attrKVs

0 commit comments

Comments
 (0)