Skip to content

Commit bd7889e

Browse files
authored
Merge branch 'main' into cache-settings-store-flag
2 parents 9a1cf40 + 598ab18 commit bd7889e

File tree

14 files changed

+282
-18
lines changed

14 files changed

+282
-18
lines changed

.github/workflows/api-diff.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
name: Analyze API Changes
2-
description: Runs apidiff-go action to check for breaking changes to modules' public APIs
32

43
on:
54
pull_request:
@@ -23,7 +22,7 @@ jobs:
2322
go-version-file: "go.mod"
2423
cache: false
2524

26-
- uses: smartcontractkit/.github/actions/apidiff-go@apidiff-go/0.1.0
25+
- uses: smartcontractkit/.github/actions/apidiff-go@apidiff-go/0.2.0
2726
env:
2827
GITHUB_TOKEN: ${{ github.token }}
2928
with:

pkg/logger/logger.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,17 @@ func NewWith(cfgFn func(*zap.Config)) (Logger, error) {
9696
return &logger{core.Sugar()}, nil
9797
}
9898

99+
// NewCore returns a new Logger core from a modified [zap.Config].
100+
func NewCore(cfgFn func(*zap.Config)) (zapcore.Core, error) {
101+
cfg := zap.NewProductionConfig()
102+
cfgFn(&cfg)
103+
logger, err := cfg.Build()
104+
if err != nil {
105+
return nil, err
106+
}
107+
return logger.Core(), nil
108+
}
109+
99110
// NewWithSync returns a new Logger with a given SyncWriter.
100111
func NewWithSync(w io.Writer) Logger {
101112
core := zapcore.NewCore(zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), zapcore.AddSync(w), zapcore.InfoLevel)

pkg/logger/logger_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,3 +324,25 @@ type differentLogger interface {
324324

325325
Sync() error
326326
}
327+
328+
func TestNewCore(t *testing.T) {
329+
// First core at Info (would drop Debug), second core at Debug
330+
obsCore, obsLogs := observer.New(zap.DebugLevel)
331+
332+
primaryCore, err := NewCore(func(cfg *zap.Config) {
333+
cfg.Level = zap.NewAtomicLevelAt(zap.InfoLevel)
334+
})
335+
if err != nil {
336+
t.Fatalf("NewCore error: %v", err)
337+
}
338+
339+
lggr := NewWithCores(primaryCore, obsCore)
340+
341+
lggr.Debug("debug message should reach observer core")
342+
if got := obsLogs.Len(); got != 1 {
343+
t.Fatalf("expected 1 log in observer core, got %d", got)
344+
}
345+
if msg := obsLogs.All()[0].Message; msg != "debug message should reach observer core" {
346+
t.Fatalf("unexpected message: %s", msg)
347+
}
348+
}

pkg/loop/logger.go

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ import (
1313
"go.uber.org/zap/zapcore"
1414
"golang.org/x/exp/slices"
1515

16+
otellog "go.opentelemetry.io/otel/log"
17+
1618
"github.com/smartcontractkit/chainlink-common/pkg/logger"
19+
"github.com/smartcontractkit/chainlink-common/pkg/logger/otelzap"
1720
)
1821

1922
// HCLogLogger returns an [hclog.Logger] backed by the given [logger.Logger].
@@ -162,13 +165,38 @@ func (h *hclSinkAdapter) Accept(_ string, level hclog.Level, msg string, args ..
162165

163166
// NewLogger returns a new [logger.Logger] configured to encode [hclog] compatible JSON.
164167
func NewLogger() (logger.Logger, error) {
165-
return logger.NewWith(func(cfg *zap.Config) {
166-
cfg.Level.SetLevel(zap.DebugLevel)
167-
cfg.EncoderConfig.LevelKey = "@level"
168-
cfg.EncoderConfig.MessageKey = "@message"
169-
cfg.EncoderConfig.TimeKey = "@timestamp"
170-
cfg.EncoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02T15:04:05.000000Z07:00")
171-
})
168+
return logger.NewWith(configureHCLogEncoder)
169+
}
170+
171+
// configureHCLogEncoder mutates cfg to use hclog-compatible field names and timestamp format.
172+
// NOTE: It also sets the log level to Debug to preserve prior behavior where each caller
173+
// manually set Debug before applying identical encoder tweaks. Centralizing avoids drift.
174+
// If a different level is desired, callers should override cfg.Level AFTER calling this helper.
175+
func configureHCLogEncoder(cfg *zap.Config) {
176+
cfg.Level.SetLevel(zap.DebugLevel)
177+
cfg.EncoderConfig.LevelKey = "@level"
178+
cfg.EncoderConfig.MessageKey = "@message"
179+
cfg.EncoderConfig.TimeKey = "@timestamp"
180+
cfg.EncoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02T15:04:05.000000Z07:00")
181+
}
182+
183+
// NewOtelLogger returns a logger with two cores:
184+
// 1. The primary JSON core configured via cfgFn (encoder keys changed to @level, @message, @timestamp).
185+
// 2. The otel core (otelzap.NewCore) which receives the raw zap.Entry and fields.
186+
//
187+
// Important:
188+
// The cfgFn only mutates the encoder config used to build the first core.
189+
// otelzap.NewCore implements zapcore.Core and does NOT use that encoder; it derives attributes from the zap.Entry
190+
// (Message, Level, Time, etc.) and zap.Fields directly. Therefore changing encoder keys here does NOT affect how
191+
// the otel core extracts data, and only the first core's JSON output format is altered.
192+
// This preserves backward compatibility for OTEL export while allowing hclog-compatible key names in the primary output.
193+
func NewOtelLogger(otelLogger otellog.Logger) (logger.Logger, error) {
194+
primaryCore, err := logger.NewCore(configureHCLogEncoder)
195+
if err != nil {
196+
return nil, err
197+
}
198+
// set debug level from primaryCore to match otelzap.NewCore
199+
return logger.NewWithCores(primaryCore, otelzap.NewCore(otelLogger, otelzap.WithLevel(zapcore.DebugLevel))), nil
172200
}
173201

174202
// onceValue returns a function that invokes f only once and returns the value

pkg/loop/logger_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
package loop
22

33
import (
4+
"context"
5+
"sync"
46
"testing"
57

68
"github.com/stretchr/testify/assert"
9+
sdklog "go.opentelemetry.io/otel/sdk/log"
10+
11+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
12+
"github.com/smartcontractkit/chainlink-common/pkg/logger/otelzap"
713
)
814

915
func Test_removeArg(t *testing.T) {
@@ -38,3 +44,74 @@ func Test_removeArg(t *testing.T) {
3844
})
3945
}
4046
}
47+
48+
func TestNewOtelLogger(t *testing.T) {
49+
tests := []struct {
50+
name string
51+
logFn func(l logger.Logger)
52+
wantMsg string
53+
}{
54+
{
55+
name: "debug",
56+
logFn: func(l logger.Logger) {
57+
l.Debugw("hello world", "k", "v")
58+
},
59+
wantMsg: "hello world",
60+
},
61+
{
62+
name: "info",
63+
logFn: func(l logger.Logger) {
64+
l.Infow("info msg", "a", 1)
65+
},
66+
wantMsg: "info msg",
67+
},
68+
}
69+
for _, tt := range tests {
70+
t.Run(tt.name, func(t *testing.T) {
71+
exp := &recordingExporter{}
72+
lp := sdklog.NewLoggerProvider(
73+
sdklog.WithProcessor(sdklog.NewSimpleProcessor(exp)),
74+
)
75+
otelLggr := lp.Logger("test-" + tt.name)
76+
77+
lggr, err := NewOtelLogger(otelLggr)
78+
if err != nil {
79+
t.Fatalf("NewOtelLogger error: %v", err)
80+
}
81+
82+
tt.logFn(lggr)
83+
84+
// Force flush the logger provider to ensure records are exported
85+
if err := lp.ForceFlush(context.Background()); err != nil {
86+
t.Fatalf("ForceFlush error: %v", err)
87+
}
88+
89+
if len(exp.records) != 1 {
90+
t.Fatalf("expected 1 exported record, got %d", len(exp.records))
91+
}
92+
if got := exp.records[0].Body().AsString(); got != tt.wantMsg {
93+
t.Fatalf("unexpected body: got %q want %q", got, tt.wantMsg)
94+
}
95+
})
96+
}
97+
}
98+
99+
// recordingExporter captures exported log records (current sdk/log Export signature).
100+
type recordingExporter struct {
101+
mu sync.Mutex
102+
records []sdklog.Record
103+
}
104+
105+
func (r *recordingExporter) Export(_ context.Context, recs []sdklog.Record) error {
106+
r.mu.Lock()
107+
defer r.mu.Unlock()
108+
r.records = append(r.records, recs...)
109+
return nil
110+
}
111+
func (r *recordingExporter) ForceFlush(context.Context) error { return nil }
112+
func (r *recordingExporter) Shutdown(context.Context) error { return nil }
113+
114+
// Compile-time assertion that otelzap.NewCore still satisfies zapcore.Core usage pattern.
115+
// (Guards against accidental API break causing this test file to silently compile with stubs.)
116+
var _ = otelzap.NewCore
117+
var _ logger.Logger // silence unused import of logger in case future refactors remove usage

pkg/loop/server.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,14 @@ func (s *Server) start() error {
167167
}
168168
beholder.SetClient(beholderClient)
169169
beholder.SetGlobalOtelProviders()
170+
171+
if beholderCfg.LogStreamingEnabled {
172+
otelLogger, err := NewOtelLogger(beholderClient.Logger)
173+
if err != nil {
174+
return fmt.Errorf("failed to enable log streaming: %w", err)
175+
}
176+
s.Logger = logger.Sugared(logger.Named(otelLogger, s.Logger.Name()))
177+
}
170178
}
171179

172180
s.promServer = NewPromServer(s.EnvConfig.PrometheusPort, s.Logger)

pkg/settings/cresettings/defaults.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
"WorkflowLimit": "200",
33
"WorkflowRegistrationQueueLimit": "20",
44
"WorkflowExecutionConcurrencyLimit": "50",
5+
"WorkflowTriggerRateLimit": "200rps:200",
56
"GatewayUnauthenticatedRequestRateLimit": "100rps:-1",
67
"GatewayUnauthenticatedRequestRateLimitPerIP": "1rps:-1",
78
"GatewayIncomingPayloadSizeLimit": "10kb",
@@ -10,7 +11,8 @@
1011
"ZeroBalancePruningTimeout": "24h0m0s"
1112
},
1213
"PerOwner": {
13-
"WorkflowExecutionConcurrencyLimit": "50"
14+
"WorkflowExecutionConcurrencyLimit": "50",
15+
"WorkflowTriggerRateLimit": "200rps:200"
1416
},
1517
"PerWorkflow": {
1618
"TriggerLimit": "10",
@@ -40,7 +42,6 @@
4042
"RateLimit": "every30s:3"
4143
},
4244
"LogTrigger": {
43-
"RateLimit": "every10s:-1",
4445
"Limit": "5",
4546
"EventRateLimit": "every6s:10",
4647
"FilterAddressLimit": "5",

pkg/settings/cresettings/defaults.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
WorkflowLimit = '200'
22
WorkflowRegistrationQueueLimit = '20'
33
WorkflowExecutionConcurrencyLimit = '50'
4+
WorkflowTriggerRateLimit = '200rps:200'
45
GatewayUnauthenticatedRequestRateLimit = '100rps:-1'
56
GatewayUnauthenticatedRequestRateLimitPerIP = '1rps:-1'
67
GatewayIncomingPayloadSizeLimit = '10kb'
@@ -11,6 +12,7 @@ ZeroBalancePruningTimeout = '24h0m0s'
1112

1213
[PerOwner]
1314
WorkflowExecutionConcurrencyLimit = '50'
15+
WorkflowTriggerRateLimit = '200rps:200'
1416

1517
[PerWorkflow]
1618
TriggerLimit = '10'
@@ -41,7 +43,6 @@ RateLimit = 'every30s:1'
4143
RateLimit = 'every30s:3'
4244

4345
[PerWorkflow.LogTrigger]
44-
RateLimit = 'every10s:-1'
4546
Limit = '5'
4647
EventRateLimit = 'every6s:10'
4748
FilterAddressLimit = '5'

pkg/settings/cresettings/settings.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ var Default = Schema{
3434
WorkflowLimit: Int(200),
3535
WorkflowRegistrationQueueLimit: Int(20),
3636
WorkflowExecutionConcurrencyLimit: Int(50),
37+
WorkflowTriggerRateLimit: Rate(200, 200),
3738
GatewayUnauthenticatedRequestRateLimit: Rate(rate.Every(time.Second/100), -1),
3839
GatewayUnauthenticatedRequestRateLimitPerIP: Rate(rate.Every(time.Second), -1),
3940
GatewayIncomingPayloadSizeLimit: Size(10 * config.KByte),
@@ -44,6 +45,7 @@ var Default = Schema{
4445
},
4546
PerOwner: Owners{
4647
WorkflowExecutionConcurrencyLimit: Int(50),
48+
WorkflowTriggerRateLimit: Rate(200, 200),
4749
},
4850
PerWorkflow: Workflows{
4951
TriggerLimit: Int(10),
@@ -73,9 +75,8 @@ var Default = Schema{
7375
RateLimit: Rate(rate.Every(30*time.Second), 3),
7476
},
7577
LogTrigger: logTrigger{
76-
RateLimit: Rate(rate.Every(10*time.Second), -1), //TODO
7778
Limit: Int(5),
78-
EventRateLimit: Rate(rate.Every(time.Minute/10), 10), // TODO
79+
EventRateLimit: Rate(rate.Every(time.Minute/10), 10),
7980
FilterAddressLimit: Int(5),
8081
FilterTopicsPerSlotLimit: Int(10),
8182
},
@@ -91,7 +92,7 @@ var Default = Schema{
9192
TargetsLimit: Int(3),
9293
ReportSizeLimit: Size(config.KByte),
9394
EVM: evmChainWrite{
94-
TransactionGasLimit: Uint64(500_000), //TODO
95+
TransactionGasLimit: Uint64(500_000),
9596
},
9697
},
9798
ChainRead: chainRead{
@@ -106,6 +107,7 @@ type Schema struct {
106107
WorkflowLimit Setting[int] `unit:"{workflow}"`
107108
WorkflowRegistrationQueueLimit Setting[int] `unit:"{workflow}"`
108109
WorkflowExecutionConcurrencyLimit Setting[int] `unit:"{workflow}"`
110+
WorkflowTriggerRateLimit Setting[config.Rate]
109111
GatewayUnauthenticatedRequestRateLimit Setting[config.Rate]
110112
GatewayUnauthenticatedRequestRateLimitPerIP Setting[config.Rate]
111113
GatewayIncomingPayloadSizeLimit Setting[config.Size]
@@ -121,6 +123,7 @@ type Orgs struct {
121123

122124
type Owners struct {
123125
WorkflowExecutionConcurrencyLimit Setting[int] `unit:"{workflow}"`
126+
WorkflowTriggerRateLimit Setting[config.Rate]
124127
}
125128

126129
type Workflows struct {
@@ -166,7 +169,6 @@ type httpTrigger struct {
166169
RateLimit Setting[config.Rate]
167170
}
168171
type logTrigger struct {
169-
RateLimit Setting[config.Rate]
170172
Limit Setting[int] `unit:"{trigger}"`
171173
EventRateLimit Setting[config.Rate]
172174
FilterAddressLimit Setting[int] `unit:"{address}"`

pkg/workflows/wasm/host/execution.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ type execution[T any] struct {
2727
mode sdkpb.Mode
2828
donSeed int64
2929
nodeSeed int64
30+
donLogCount uint32
31+
nodeLogCount uint32
3032
}
3133

3234
// callCapAsync async calls a capability by placing execution results onto a
@@ -139,6 +141,35 @@ func (e *execution[T]) awaitSecrets(ctx context.Context, acr *sdkpb.AwaitSecrets
139141
}
140142

141143
func (e *execution[T]) log(caller *wasmtime.Caller, ptr int32, ptrlen int32) {
144+
switch e.mode {
145+
case sdkpb.Mode_MODE_DON:
146+
e.donLogCount++
147+
if e.donLogCount == e.module.cfg.MaxLogCountDONMode {
148+
e.module.cfg.Logger.Warnf("max log count for don mode reached: %d - all subsequent logs will be dropped", e.donLogCount)
149+
}
150+
if e.donLogCount > e.module.cfg.MaxLogCountDONMode {
151+
// silently drop to avoid spamming logs
152+
return
153+
}
154+
case sdkpb.Mode_MODE_NODE:
155+
e.nodeLogCount++
156+
if e.nodeLogCount == e.module.cfg.MaxLogCountNodeMode {
157+
e.module.cfg.Logger.Warnf("max log count for node mode reached: %d - all subsequent logs will be dropped", e.nodeLogCount)
158+
}
159+
if e.nodeLogCount > e.module.cfg.MaxLogCountNodeMode {
160+
// silently drop to avoid spamming logs
161+
return
162+
}
163+
default:
164+
// unexpected / malicious
165+
return
166+
}
167+
168+
if ptrlen > int32(e.module.cfg.MaxLogLenBytes) {
169+
e.module.cfg.Logger.Warnf("log message too long: %d - dropping", ptrlen)
170+
return
171+
}
172+
142173
b, innerErr := wasmRead(caller, ptr, ptrlen)
143174
if innerErr != nil {
144175
e.module.cfg.Logger.Errorf("error calling log: %s", innerErr)

0 commit comments

Comments
 (0)