Skip to content

Commit ef4be12

Browse files
committed
Merge branch 'INFOPLAT-2938-logs-streaming-loopp' into INFOPLAT-2938-logs-streaming-loopp-heartbeat
2 parents 7bb83e1 + e9d2e86 commit ef4be12

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+1550
-214
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ require (
3737
github.com/scylladb/go-reflectx v1.0.1
3838
github.com/shopspring/decimal v1.4.0
3939
github.com/smartcontractkit/chain-selectors v1.0.67
40-
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.1
40+
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4
4141
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-876fd6b94976
4242
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2
4343
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,8 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB
326326
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
327327
github.com/smartcontractkit/chain-selectors v1.0.67 h1:gxTqP/JC40KDe3DE1SIsIKSTKTZEPyEU1YufO1admnw=
328328
github.com/smartcontractkit/chain-selectors v1.0.67/go.mod h1:xsKM0aN3YGcQKTPRPDDtPx2l4mlTN1Djmg0VVXV40b8=
329-
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.1 h1:ca2z5OXgnbBPQRxpwXwBLJsUA1+cAp5ncfW4Ssvd6eY=
330-
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.1/go.mod h1:NZv/qKYGFRnkjOYBouajnDfFoZ+WDa6H2KNmSf1dnKc=
329+
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 h1:hvqATtrZ0iMRTI80cpBot/3JFbjz2j+2tvpfooVhRHw=
330+
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc=
331331
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-876fd6b94976 h1:mF3FiDUoV0QbJcks9R2y7ydqntNL1Z0VCPBJgx/Ms+0=
332332
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-876fd6b94976/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA=
333333
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2 h1:1/KdO5AbUr3CmpLjMPuJXPo2wHMbfB8mldKLsg7D4M8=

pkg/beholder/chip_client.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package beholder
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
7+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb"
8+
)
9+
10+
type ChipIngressClient interface {
11+
RegisterSchema(ctx context.Context, schemas ...*pb.Schema) (map[string]int, error)
12+
}
13+
14+
type chipIngressClient struct {
15+
client chipingress.Client
16+
}
17+
18+
func NewChipIngressClient(client chipingress.Client) (ChipIngressClient, error) {
19+
if client == nil {
20+
return nil, fmt.Errorf("chip ingress client is nil")
21+
}
22+
23+
return &chipIngressClient{
24+
client: client,
25+
}, nil
26+
}
27+
28+
// RegisterSchema registers one or more schemas with the Chip Ingress service. Returns a map of subject to version for each registered schema.
29+
func (sr *chipIngressClient) RegisterSchema(ctx context.Context, schemas ...*pb.Schema) (map[string]int, error) {
30+
request := &pb.RegisterSchemaRequest{
31+
Schemas: schemas,
32+
}
33+
34+
resp, err := sr.client.RegisterSchema(ctx, request)
35+
if err != nil {
36+
return nil, fmt.Errorf("failed to register schema: %w", err)
37+
}
38+
39+
registeredMap := make(map[string]int)
40+
for _, schema := range resp.Registered {
41+
registeredMap[schema.Subject] = int(schema.Version)
42+
}
43+
44+
return registeredMap, err
45+
}

pkg/beholder/chip_client_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package beholder_test
2+
3+
import (
4+
"fmt"
5+
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
6+
7+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks"
8+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb"
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/mock"
11+
"github.com/stretchr/testify/require"
12+
"testing"
13+
)
14+
15+
func TestNewChipClient(t *testing.T) {
16+
t.Run("returns error when client is nil", func(t *testing.T) {
17+
registry, err := beholder.NewChipIngressClient(nil)
18+
assert.Nil(t, registry)
19+
assert.EqualError(t, err, "chip ingress client is nil")
20+
})
21+
22+
t.Run("returns schema registry when client is valid", func(t *testing.T) {
23+
mockClient := mocks.NewClient(t)
24+
registry, err := beholder.NewChipIngressClient(mockClient)
25+
require.NoError(t, err)
26+
assert.NotNil(t, registry)
27+
})
28+
}
29+
30+
func TestRegisterSchema(t *testing.T) {
31+
t.Run("successfully registers schemas", func(t *testing.T) {
32+
mockClient := mocks.NewClient(t)
33+
mockClient.
34+
On("RegisterSchema", mock.Anything, mock.Anything).
35+
Return(&pb.RegisterSchemaResponse{
36+
Registered: []*pb.RegisteredSchema{
37+
{Subject: "schema1", Version: 1},
38+
{Subject: "schema2", Version: 2},
39+
},
40+
}, nil)
41+
42+
registry, err := beholder.NewChipIngressClient(mockClient)
43+
require.NoError(t, err)
44+
45+
schemas := []*pb.Schema{
46+
{Subject: "schema1", Schema: `{"type":"record","name":"Test","fields":[{"name":"field1"}]}`, Format: 1},
47+
{Subject: "schema2", Schema: `{"type":"record","name":"Test2","fields":[{"name":"field2"}]}`, Format: 2},
48+
}
49+
50+
result, err := registry.RegisterSchema(t.Context(), schemas...)
51+
require.NoError(t, err)
52+
assert.Equal(t, map[string]int{"schema1": 1, "schema2": 2}, result)
53+
})
54+
55+
t.Run("returns error when registration fails", func(t *testing.T) {
56+
mockClient := mocks.NewClient(t)
57+
mockClient.
58+
On("RegisterSchema", mock.Anything, mock.Anything).
59+
Return(nil, fmt.Errorf("registration failed"))
60+
61+
registry, err := beholder.NewChipIngressClient(mockClient)
62+
require.NoError(t, err)
63+
64+
schemas := []*pb.Schema{
65+
{Subject: "schema1", Schema: `{"type":"record","name":"Test","fields":[{"name":"field1"}]}`, Format: 1},
66+
}
67+
68+
result, err := registry.RegisterSchema(t.Context(), schemas...)
69+
assert.Nil(t, result)
70+
assert.EqualError(t, err, "failed to register schema: registration failed")
71+
})
72+
}

pkg/beholder/client.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ type Client struct {
3636
Meter otelmetric.Meter
3737
// Message Emitter
3838
Emitter Emitter
39+
// Chip
40+
Chip ChipIngressClient
3941

4042
// Providers
4143
LoggerProvider otellog.LoggerProvider
@@ -213,10 +215,11 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
213215
// This will eventually be removed in favor of chip-ingress emitter
214216
// and logs will be sent via OTLP using the regular Logger instead of calling Emit
215217
emitter := NewMessageEmitter(messageLogger)
218+
var chipIngressClient chipingress.Client
219+
216220
// if chip ingress is enabled, create dual source emitter that sends to both otel collector and chip ingress
217221
// eventually we will remove the dual source emitter and just use chip ingress
218-
219-
if cfg.ChipIngressEmitterEnabled {
222+
if cfg.ChipIngressEmitterEnabled || cfg.ChipIngressEmitterGRPCEndpoint != "" {
220223
chipIngressOpts := make([]chipingress.Opt, 0, 2)
221224

222225
if cfg.ChipIngressInsecureConnection {
@@ -231,7 +234,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
231234
chipIngressOpts = append(chipIngressOpts, chipingress.WithTokenAuth(headerProvider))
232235
}
233236

234-
chipIngressClient, err := chipingress.NewClient(
237+
chipIngressClient, err = chipingress.NewClient(
235238
cfg.ChipIngressEmitterGRPCEndpoint,
236239
chipIngressOpts...,
237240
)
@@ -251,13 +254,22 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
251254
}
252255
}
253256

257+
// Create interface/wrapper to chip-ingress for schema registry
258+
var chip ChipIngressClient
259+
if chipIngressClient != nil {
260+
chip, err = NewChipIngressClient(chipIngressClient)
261+
if err != nil {
262+
return nil, fmt.Errorf("failed to create interface to chip ingress: %w", err)
263+
}
264+
}
265+
254266
onClose := func() (err error) {
255267
for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider} {
256268
err = errors.Join(err, provider.Shutdown(context.Background()))
257269
}
258270
return
259271
}
260-
return &Client{cfg, logger, tracer, meter, emitter, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}, nil
272+
return &Client{cfg, logger, tracer, meter, emitter, chip, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}, nil
261273
}
262274

263275
// Closes all providers, flushes all data and stops all background processes

pkg/beholder/client_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,42 @@ func TestNewGRPCClient_ChipIngressEmitter(t *testing.T) {
480480
})
481481
}
482482

483+
func TestNewClient_Chip(t *testing.T) {
484+
t.Run("chip interface available with chip-ingress endpoint provided", func(t *testing.T) {
485+
client, err := beholder.NewClient(beholder.Config{
486+
OtelExporterGRPCEndpoint: "grpc-endpoint",
487+
ChipIngressEmitterEnabled: true,
488+
ChipIngressEmitterGRPCEndpoint: "chip-ingress.example.com:9090",
489+
ChipIngressInsecureConnection: false,
490+
})
491+
require.NoError(t, err)
492+
assert.NotNil(t, client)
493+
assert.NotNil(t, client.Chip)
494+
})
495+
496+
t.Run("chip interface can be enabled when chip ingress dual emitter is not enabled ", func(t *testing.T) {
497+
client, err := beholder.NewClient(beholder.Config{
498+
OtelExporterGRPCEndpoint: "grpc-endpoint",
499+
ChipIngressEmitterEnabled: false,
500+
ChipIngressEmitterGRPCEndpoint: "chip-ingress.example.com:9090",
501+
ChipIngressInsecureConnection: false,
502+
})
503+
require.NoError(t, err)
504+
assert.NotNil(t, client)
505+
assert.NotNil(t, client.Chip)
506+
assert.NotNil(t, client.Emitter)
507+
})
508+
509+
t.Run("chip interface is nil when chip ingress config is missing", func(t *testing.T) {
510+
client, err := beholder.NewClient(beholder.Config{
511+
OtelExporterGRPCEndpoint: "grpc-endpoint",
512+
ChipIngressEmitterEnabled: true,
513+
})
514+
require.Error(t, err)
515+
assert.Nil(t, client)
516+
})
517+
}
518+
483519
// mockLogExporter is a no-op exporter for testing purposes.
484520
type mockLogExporter struct{}
485521

pkg/beholder/httpclient.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro
186186
}
187187
return
188188
}
189-
return &Client{cfg, logger, tracer, meter, emitter, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}, nil
189+
return &Client{cfg, logger, tracer, meter, emitter, nil, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}, nil
190190
}
191191

192192
func newHTTPTracerProvider(config Config, resource *sdkresource.Resource, tlsConfig *tls.Config) (*sdktrace.TracerProvider, error) {

pkg/beholder/noop.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func NewNoopClient() *Client {
3535
// MessageEmitter
3636
messageEmitter := noopMessageEmitter{}
3737

38-
return &Client{cfg, logger, tracer, meter, messageEmitter, loggerProvider, tracerProvider, meterProvider, loggerProvider, noopOnClose}
38+
return &Client{cfg, logger, tracer, meter, messageEmitter, nil, loggerProvider, tracerProvider, meterProvider, loggerProvider, noopOnClose}
3939
}
4040

4141
// NewStdoutClient creates a new Client with exporters which send telemetry data to standard output

pkg/capabilities/capabilities.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"google.golang.org/protobuf/proto"
1212
"google.golang.org/protobuf/types/known/anypb"
1313

14+
"github.com/smartcontractkit/chainlink-common/pkg/contexts"
1415
"github.com/smartcontractkit/chainlink-protos/cre/go/values"
1516
)
1617

@@ -108,13 +109,27 @@ type RequestMetadata struct {
108109
WorkflowTag string
109110
}
110111

112+
func (m *RequestMetadata) ContextWithCRE(ctx context.Context) context.Context {
113+
return contexts.WithCRE(ctx, contexts.CRE{
114+
Owner: m.WorkflowOwner,
115+
Workflow: m.WorkflowID,
116+
})
117+
}
118+
111119
type RegistrationMetadata struct {
112120
WorkflowID string
113121
WorkflowOwner string
114122
// The step reference ID of the workflow
115123
ReferenceID string
116124
}
117125

126+
func (m *RegistrationMetadata) ContextWithCRE(ctx context.Context) context.Context {
127+
return contexts.WithCRE(ctx, contexts.CRE{
128+
Owner: m.WorkflowOwner,
129+
Workflow: m.WorkflowID,
130+
})
131+
}
132+
118133
// CapabilityRequest is a struct for the Execute request of a capability.
119134
type CapabilityRequest struct {
120135
Metadata RequestMetadata

pkg/capabilities/consensus/ocr3/aggregators/reduce_aggregator.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,24 +306,33 @@ func (a *reduceAggregator) extractValues(lggr logger.Logger, observations map[oc
306306
// values are then re-wrapped here to handle aggregating against Value types
307307
// which is used for mode aggregation
308308
switch val := val.(type) {
309-
case map[string]interface{}:
309+
case map[string]any:
310310
_, ok := val[aggregationKey]
311311
if !ok {
312312
continue
313313
}
314+
if val[aggregationKey] == nil {
315+
lggr.Warnf("node %d contributed with a nil value under key %s", nodeID, aggregationKey)
316+
continue
317+
}
314318

315319
rewrapped, err := values.Wrap(val[aggregationKey])
316320
if err != nil {
317321
lggr.Warnf("unable to wrap value %s", val[aggregationKey])
318322
continue
319323
}
320324
vals = append(vals, rewrapped)
321-
case []interface{}:
325+
case []any:
322326
i, err := strconv.Atoi(aggregationKey)
323327
if err != nil {
324328
lggr.Warnf("aggregation key %s could not be used to index a list type", aggregationKey)
325329
continue
326330
}
331+
if i >= len(val) {
332+
lggr.Warnf("node %d contributed with an array shorter than index %s", nodeID, aggregationKey)
333+
continue
334+
}
335+
327336
rewrapped, err := values.Wrap(val[i])
328337
if err != nil {
329338
lggr.Warnf("unable to wrap value %s", val[i])

0 commit comments

Comments
 (0)