Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
413d14c
Moved all chain-agnostic write target code to chainlink-framework
silaslenihan Apr 30, 2025
408283b
Added GetTransactionStatus to target strategy abstrasction
silaslenihan Apr 30, 2025
9838fac
Merge branch 'main' into refactor-write-target
silaslenihan May 7, 2025
3b2ed07
bumped go.mod
silaslenihan May 7, 2025
147237b
Merge branch 'refactor-write-target' of github.com:smartcontractkit/c…
silaslenihan May 7, 2025
4cb2d73
Created Generalized WriteTarget tests
silaslenihan May 9, 2025
40e1bd0
Ported over synchronous report waiting from chainlink-aptos and expan…
silaslenihan May 12, 2025
75d27dd
Merge branch 'main' into refactor-write-target
silaslenihan May 12, 2025
f9bc6a7
Added accepted status config
silaslenihan May 13, 2025
18b3914
Merge branch 'refactor-write-target' of github.com:smartcontractkit/c…
silaslenihan May 13, 2025
d591c90
Fixed broken test and lints
silaslenihan May 14, 2025
6f02d0e
Convert block timestamp to ms
silaslenihan May 15, 2025
9366d78
Merge branch 'main' into refactor-write-target
silaslenihan May 19, 2025
98eadb8
bumped go.mod
silaslenihan May 19, 2025
90219ea
Merge branch 'main' into refactor-write-target
silaslenihan May 23, 2025
ddf5352
Addressed feedback and bump go.mod
silaslenihan May 23, 2025
d5c0402
Moved common beholder interfaces to chainlink-common
silaslenihan May 27, 2025
0ddbe8e
Refactored ExecutionContext proto message
silaslenihan May 28, 2025
ad6abb9
fix mocks
silaslenihan May 28, 2025
863889d
removed balance monitoring code
silaslenihan May 28, 2025
a12bea7
bumped go.mod
silaslenihan May 30, 2025
1c7ee05
Removed EVMService and ContractReader
silaslenihan May 30, 2025
e139f71
bump go.mod
silaslenihan Jun 3, 2025
7179dbf
addressed feedback
silaslenihan Jun 5, 2025
1aa3b17
Updated config times
silaslenihan Jun 6, 2025
4931a97
bump go.mod
silaslenihan Jun 9, 2025
09ffa82
moved datafeeds protos to chainlink-framework
silaslenihan Jun 9, 2025
749478e
removed observed_by_transmitter_str
silaslenihan Jun 11, 2025
b0fcbb3
Refactored processor logic
silaslenihan Jun 11, 2025
7962879
Updated protos to reuse TransactionData and BlockData
silaslenihan Jun 11, 2025
696a50b
addressed feedback
silaslenihan Jun 11, 2025
5a2dec0
Refactored metrics to use ToSchemaFullName rather than hardcoded name
silaslenihan Jun 11, 2025
c9af4b9
Merge branch 'main' into refactor-write-target
silaslenihan Jun 17, 2025
411638d
addressed feedback
silaslenihan Jun 17, 2025
3d116df
addressed feedback
silaslenihan Jun 17, 2025
4c5ccb2
fixed lint
silaslenihan Jun 17, 2025
ccfa325
Merge branch 'main' into refactor-write-target
silaslenihan Jun 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

// Product-agnostic processors to be injected into WriteTarget Monitor
func NewPlatformProcessors(emitter beholder.ProtoEmitter) ([]beholder.ProtoProcessor, error) {
func NewPlatformProcessors(emitter beholder.ProtoEmitter) (map[string]beholder.ProtoProcessor, error) {
forwarderMetrics, err := forwarder.NewMetrics()
if err != nil {
return nil, fmt.Errorf("failed to create new forwarder metrics: %w", err)
Expand All @@ -22,17 +22,21 @@ func NewPlatformProcessors(emitter beholder.ProtoEmitter) ([]beholder.ProtoProce
if err != nil {
return nil, fmt.Errorf("failed to create new write target metrics: %w", err)
}
return []beholder.ProtoProcessor{
&keystoneProcessor{
return map[string]beholder.ProtoProcessor{
"keystone": &keystoneProcessor{
emitter: emitter,
metrics: forwarderMetrics,
},
&wtProcessor{
"writetarget": &wtProcessor{
metrics: wtMetrics,
},
}, nil
}

func GetDefaultPlatformProcessors() []string {
return []string{"writetarget", "keystone"}
}

// Write-Target specific processor decodes write messages to derive metrics
type wtProcessor struct {
metrics *wt.Metrics
Expand Down
34 changes: 20 additions & 14 deletions capabilities/writetarget/write_target_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ func NewMonitorEmitter(lggr logger.Logger) beholder.ProtoEmitter {
}

type MonitorOpts struct {
Lggr logger.Logger
ProductAgnosticProcessors []beholder.ProtoProcessor
ProductSpecificProcessors map[string]beholder.ProtoProcessor
Emitter beholder.ProtoEmitter
Lggr logger.Logger
Processors map[string]beholder.ProtoProcessor
EnabledProcessors []string
Emitter beholder.ProtoEmitter
}

// NewMonitor initializes a Beholder client for the Write Target
Expand All @@ -44,20 +44,20 @@ func NewMonitor(opts MonitorOpts) (*beholder.BeholderClient, error) {

// Proxy ProtoEmitter with additional processing
protoEmitterProxy := protoEmitter{
lggr: opts.Lggr,
emitter: opts.Emitter,
processors: opts.ProductAgnosticProcessors,
productSpecificProcessors: opts.ProductSpecificProcessors,
lggr: opts.Lggr,
emitter: opts.Emitter,
processors: opts.Processors,
enabledProcessors: opts.EnabledProcessors,
}
return &beholder.BeholderClient{Client: &client, ProtoEmitter: &protoEmitterProxy}, nil
}

// ProtoEmitter proxy specific to the WT
type protoEmitter struct {
lggr logger.Logger
emitter beholder.ProtoEmitter
processors []beholder.ProtoProcessor
productSpecificProcessors map[string]beholder.ProtoProcessor
lggr logger.Logger
emitter beholder.ProtoEmitter
processors map[string]beholder.ProtoProcessor
enabledProcessors []string
}

// Emit emits a proto.Message and runs additional processing
Expand Down Expand Up @@ -85,7 +85,13 @@ func (e *protoEmitter) EmitWithLog(ctx context.Context, m proto.Message, attrKVs
// Process aggregates further processing for emitted messages
func (e *protoEmitter) Process(ctx context.Context, m proto.Message, attrKVs ...any) error {
// Further processing for emitted messages
for _, p := range e.processors {
for _, processorName := range e.enabledProcessors {
p, ok := e.processors[processorName]
if !ok {
// no processor matching configured one, log error but continue
e.lggr.Errorf("no required processor with name %s", processorName)
continue
}
err := p.Process(ctx, m, attrKVs...)
if err != nil {
// Notice: we swallow and log processing errors
Expand All @@ -102,7 +108,7 @@ func (e *protoEmitter) Process(ctx context.Context, m proto.Message, attrKVs ...
return nil
}

if p, ok := e.productSpecificProcessors[msg.MetaCapabilityProcessor]; ok {
if p, ok := e.processors[msg.MetaCapabilityProcessor]; ok {
if err := p.Process(ctx, msg, attrKVs...); err != nil {
e.lggr.Errorw("failed to process emitted message", "err", err)
}
Expand Down
13 changes: 11 additions & 2 deletions capabilities/writetarget/write_target_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

m, err := writetarget.NewMonitor(writetarget.MonitorOpts{
lggr,
[]beholder.ProtoProcessor{},
map[string]beholder.ProtoProcessor{"test": processor},
[]string{},
writetarget.NewMonitorEmitter(lggr),
})
require.NoError(t, err)
Expand All @@ -43,7 +43,7 @@
})

t.Run("Logs when config name is not found", func(t *testing.T) {
m, err = writetarget.NewMonitor(writetarget.MonitorOpts{lggr, []beholder.ProtoProcessor{}, map[string]beholder.ProtoProcessor{"other": processor}, writetarget.NewMonitorEmitter(lggr)})
m, err = writetarget.NewMonitor(writetarget.MonitorOpts{lggr, map[string]beholder.ProtoProcessor{"other": processor}, []string{}, writetarget.NewMonitorEmitter(lggr)})

err = m.ProtoEmitter.EmitWithLog(t.Context(), msg)
require.NoError(t, err)
Expand All @@ -57,9 +57,18 @@
msg.MetaCapabilityProcessor = ""
processor.AssertNotCalled(t, "Process", mock.Anything, mock.Anything, mock.Anything)

err := m.ProtoEmitter.EmitWithLog(t.Context(), msg)

Check failure on line 60 in capabilities/writetarget/write_target_monitor_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint (capabilities)

shadow: declaration of "err" shadows declaration at line 23 (govet)
require.NoError(t, err)

tests.RequireLogMessage(t, observed, "No product specific processor specified; skipping.")
})

t.Run("Logs when required processor is not found", func(t *testing.T) {
m, err = writetarget.NewMonitor(writetarget.MonitorOpts{lggr, map[string]beholder.ProtoProcessor{}, []string{"other"}, writetarget.NewMonitorEmitter(lggr)})

err = m.ProtoEmitter.EmitWithLog(t.Context(), msg)
require.NoError(t, err)

tests.RequireLogMessage(t, observed, "no required processor with name other")
})
}
5 changes: 2 additions & 3 deletions capabilities/writetarget/write_target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ func setupWriteTarget(
platformProcessors, err := processor.NewPlatformProcessors(emitter)
require.NoError(t, err)

var psp map[string]beholder.ProtoProcessor
if productSpecificProcessor {
psp = map[string]beholder.ProtoProcessor{"test": newMockProductSpecificProcessor(t)}
platformProcessors["test"] = newMockProductSpecificProcessor(t)
}
monClient, err := writetarget.NewMonitor(writetarget.MonitorOpts{lggr, platformProcessors, psp, emitter})
monClient, err := writetarget.NewMonitor(writetarget.MonitorOpts{lggr, platformProcessors, processor.GetDefaultPlatformProcessors(), emitter})
require.NoError(t, err)

pollPeriod := 100 * time.Millisecond
Expand Down
Loading