Skip to content

Commit 9f01f00

Browse files
committed
Merge branch 'main' into update-ocr3-metadata
2 parents e845ca6 + 9839ff5 commit 9f01f00

File tree

69 files changed

+1632
-1843
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+1632
-1843
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ require (
3535
github.com/santhosh-tekuri/jsonschema/v5 v5.2.0
3636
github.com/scylladb/go-reflectx v1.0.1
3737
github.com/shopspring/decimal v1.4.0
38-
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250604191313-8f161cf29ad5
38+
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250612182447-1c32d2efe48f
3939
github.com/smartcontractkit/freeport v0.1.1
4040
github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7
4141
github.com/smartcontractkit/libocr v0.0.0-20250220133800-f3b940c4f298

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,8 @@ github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp
296296
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
297297
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
298298
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
299-
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250604191313-8f161cf29ad5 h1:Qrz1cG/myK+EUEz4aLfz1pS4cODit7v61BuXq3MDifk=
300-
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250604191313-8f161cf29ad5/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA=
299+
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250612182447-1c32d2efe48f h1:OYOifWMKVL54uM+sxrY7oPVTZptTXK45tA8dXN5nZSE=
300+
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250612182447-1c32d2efe48f/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA=
301301
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20250430163438-97d324ef9061 h1:5BKk6j2QWmb5TFoWYVLuL8U2XUIzTUUo5HBkCHTX2kM=
302302
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20250430163438-97d324ef9061/go.mod h1:HIpGvF6nKCdtZ30xhdkKWGM9+4Z4CVqJH8ZBL1FTEiY=
303303
github.com/smartcontractkit/freeport v0.1.1 h1:B5fhEtmgomdIhw03uPVbVTP6oPv27fBhZsoZZMSIS8I=
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package beholdertest
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"reflect"
7+
"sync"
8+
"testing"
9+
10+
"github.com/stretchr/testify/require"
11+
otellognoop "go.opentelemetry.io/otel/log/noop"
12+
otelmetricnoop "go.opentelemetry.io/otel/metric/noop"
13+
oteltracenoop "go.opentelemetry.io/otel/trace/noop"
14+
"google.golang.org/protobuf/proto"
15+
16+
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
17+
"github.com/smartcontractkit/chainlink-common/pkg/beholder/pb"
18+
)
19+
20+
const (
21+
packageNameBeholder = "test_beholder"
22+
)
23+
24+
// Observer is a test helper that provides assertion methods on received messages within the beholder client.
25+
type Observer struct {
26+
emitter *assertMessageEmitter
27+
}
28+
29+
// Len returns the total number of messages received that match the provided attribute key/value pairs.
30+
func (o Observer) Len(t *testing.T, attrKVs ...any) int {
31+
t.Helper()
32+
33+
found := o.msgsForKVs(t, attrKVs...)
34+
35+
return len(found)
36+
}
37+
38+
// Messages returns messages matching the provided keys and values.
39+
func (o Observer) Messages(t *testing.T, attrKVs ...any) []beholder.Message {
40+
t.Helper()
41+
42+
if attrKVs == nil {
43+
return o.emitter.msgs
44+
}
45+
46+
return o.msgsForKVs(t, attrKVs...)
47+
}
48+
func (o Observer) msgsForKVs(t *testing.T, attrKVs ...any) []beholder.Message {
49+
t.Helper()
50+
51+
o.emitter.mu.RLock()
52+
defer o.emitter.mu.RUnlock()
53+
54+
found := []beholder.Message{}
55+
56+
for _, eMsg := range o.emitter.msgs {
57+
var i, j int
58+
59+
for i < len(attrKVs)-1 {
60+
j = i + 1
61+
62+
key, ok := attrKVs[i].(string)
63+
require.True(t, ok)
64+
65+
value := attrKVs[j]
66+
val, ok := eMsg.Attrs[key]
67+
68+
if ok && reflect.DeepEqual(value, val) {
69+
found = append(found, eMsg)
70+
}
71+
72+
i = i + 2
73+
}
74+
}
75+
76+
return found
77+
}
78+
79+
func (o Observer) BaseMessagesForLabels(t *testing.T, labels map[string]string) ([]*pb.BaseMessage, error) {
80+
t.Helper()
81+
82+
o.emitter.mu.RLock()
83+
defer o.emitter.mu.RUnlock()
84+
85+
var found []*pb.BaseMessage
86+
87+
messageLoop:
88+
for _, eMsg := range o.emitter.msgs {
89+
dataSchema, ok := eMsg.Attrs["beholder_entity"].(string)
90+
if !ok {
91+
continue
92+
}
93+
94+
if dataSchema != "BaseMessage" {
95+
continue
96+
}
97+
98+
payload := pb.BaseMessage{}
99+
err := proto.Unmarshal(eMsg.Body, &payload)
100+
if err != nil {
101+
return nil, fmt.Errorf("error unmarshalling base message: %v", err)
102+
}
103+
104+
for k, v := range labels {
105+
if payload.Labels[k] != v {
106+
continue messageLoop
107+
}
108+
}
109+
110+
found = append(found, &payload)
111+
}
112+
113+
return found, nil
114+
}
115+
116+
// NewObserver sets the global beholder client as a message collector and returns an Observer that provides helper assertion
117+
// functions on received messages.
118+
//
119+
// NewObserver affects the whole process, it cannot be used in parallel tests or tests with parallel ancestors.
120+
func NewObserver(t *testing.T) Observer {
121+
t.Helper()
122+
123+
cfg := beholder.DefaultConfig()
124+
125+
// Logger
126+
loggerProvider := otellognoop.NewLoggerProvider()
127+
logger := loggerProvider.Logger(packageNameBeholder)
128+
129+
// Tracer
130+
tracerProvider := oteltracenoop.NewTracerProvider()
131+
tracer := tracerProvider.Tracer(packageNameBeholder)
132+
133+
// Meter
134+
meterProvider := otelmetricnoop.NewMeterProvider()
135+
meter := meterProvider.Meter(packageNameBeholder)
136+
137+
// MessageEmitter
138+
messageEmitter := &assertMessageEmitter{t: t}
139+
140+
client := &beholder.Client{
141+
Config: cfg,
142+
Logger: logger,
143+
Tracer: tracer,
144+
Meter: meter,
145+
Emitter: messageEmitter,
146+
LoggerProvider: loggerProvider,
147+
TracerProvider: tracerProvider,
148+
MeterProvider: meterProvider,
149+
MessageLoggerProvider: loggerProvider,
150+
OnClose: func() error { return nil },
151+
}
152+
153+
//reset NewObserver state after the test
154+
prevClient := beholder.GetClient()
155+
t.Cleanup(func() {
156+
beholder.SetClient(prevClient)
157+
t.Setenv(packageNameBeholder, packageNameBeholder)
158+
})
159+
beholder.SetClient(client)
160+
161+
return Observer{emitter: messageEmitter}
162+
}
163+
164+
// assertMessageEmitter is implemented with the same interface as the noopMessageEmitter in pkg/beholder/noop.go
165+
// it is unknown at this time whether EmitMessage is needed, but it exists in the case that it is needed
166+
type assertMessageEmitter struct {
167+
t *testing.T
168+
mu sync.RWMutex
169+
msgs []beholder.Message
170+
}
171+
172+
func (e *assertMessageEmitter) Emit(_ context.Context, msg []byte, attrKVs ...any) error {
173+
e.t.Helper()
174+
175+
e.mu.Lock()
176+
defer e.mu.Unlock()
177+
178+
e.msgs = append(e.msgs, beholder.NewMessage(msg, attrKVs...))
179+
180+
return nil
181+
}
182+
183+
func (e *assertMessageEmitter) EmitMessage(_ context.Context, msg beholder.Message) error {
184+
e.t.Helper()
185+
186+
e.mu.Lock()
187+
defer e.mu.Unlock()
188+
189+
e.msgs = append(e.msgs, msg)
190+
191+
return nil
192+
}

pkg/beholder/client.go

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"net"
78

89
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
910
"go.opentelemetry.io/otel/attribute"
@@ -132,6 +133,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
132133
} else {
133134
loggerProcessor = sdklog.NewSimpleProcessor(sharedLogExporter)
134135
}
136+
135137
loggerAttributes := []attribute.KeyValue{
136138
attribute.String(AttrKeyDataType, "zap_log_message"),
137139
}
@@ -146,6 +148,12 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
146148
sdklog.WithResource(loggerResource),
147149
sdklog.WithProcessor(loggerProcessor),
148150
)
151+
152+
// If log streaming is disabled, use a noop logger provider
153+
if !cfg.LogStreamingEnabled {
154+
loggerProvider = BeholderNoopLoggerProvider()
155+
}
156+
149157
logger := loggerProvider.Logger(defaultPackageName)
150158

151159
// Tracer
@@ -211,21 +219,29 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
211219
// eventually we will remove the dual source emitter and just use chip ingress
212220

213221
if cfg.ChipIngressEmitterEnabled {
214-
215-
// Create a header provider that implements the chipingress.HeaderProvider interface
216-
217-
chipIngressOpts := []chipingress.Opt{
218-
chipingress.WithTransportCredentials(creds),
222+
chipIngressOpts := make([]chipingress.Opt, 0, 2)
223+
chipIngressEndpoint := cfg.ChipIngressEmitterGRPCEndpoint
224+
chipIngressHost, err := getHost(chipIngressEndpoint)
225+
if err != nil {
226+
return nil, fmt.Errorf("failed to extract host from address '%s': %w", chipIngressEndpoint, err)
227+
}
228+
// Set Authority
229+
chipIngressOpts = append(chipIngressOpts, chipingress.WithAuthority(chipIngressHost))
230+
231+
if cfg.ChipIngressInsecureConnection {
232+
// Use insecure credentials when TLS is not required
233+
chipIngressOpts = append(chipIngressOpts, chipingress.WithInsecureConnection())
234+
} else {
235+
chipIngressOpts = append(chipIngressOpts, chipingress.WithTLSAndHTTP2(chipIngressHost))
219236
}
220-
221237
// Only add headers if they exist
222238
if len(cfg.AuthHeaders) > 0 {
223239
headerProvider := NewStaticAuthHeaderProvider(cfg.AuthHeaders)
224240
chipIngressOpts = append(chipIngressOpts, chipingress.WithHeaderProvider(headerProvider))
225241
}
226242

227243
chipIngressClient, err := chipingress.NewChipIngressClient(
228-
cfg.ChipIngressEmitterGRPCEndpoint,
244+
chipIngressEndpoint,
229245
chipIngressOpts...,
230246
)
231247

@@ -409,3 +425,11 @@ func newMeterProvider(config Config, resource *sdkresource.Resource, creds crede
409425
)
410426
return mp, nil
411427
}
428+
429+
func getHost(address string) (string, error) {
430+
host, _, err := net.SplitHostPort(address)
431+
if err != nil {
432+
return "", err
433+
}
434+
return host, nil
435+
}

0 commit comments

Comments
 (0)