Skip to content

Commit 6e77ff5

Browse files
authored
Merge branch 'main' into ARCH-328-keystore-interfaces-and-admin-implementation
2 parents 2b9df50 + a8795d6 commit 6e77ff5

File tree

7 files changed

+430
-1
lines changed

7 files changed

+430
-1
lines changed

go.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ flowchart LR
99
chainlink-common --> chainlink-common/pkg/values
1010
chainlink-common --> chainlink-protos/billing/go
1111
chainlink-common --> chainlink-protos/cre/go
12+
chainlink-common --> chainlink-protos/linking-service/go
1213
chainlink-common --> chainlink-protos/storage-service
1314
chainlink-common --> freeport
1415
chainlink-common --> grpc-proxy
@@ -24,6 +25,8 @@ flowchart LR
2425
click chainlink-protos/billing/go href "https://github.com/smartcontractkit/chainlink-protos"
2526
chainlink-protos/cre/go
2627
click chainlink-protos/cre/go href "https://github.com/smartcontractkit/chainlink-protos"
28+
chainlink-protos/linking-service/go
29+
click chainlink-protos/linking-service/go href "https://github.com/smartcontractkit/chainlink-protos"
2730
chainlink-protos/storage-service
2831
click chainlink-protos/storage-service href "https://github.com/smartcontractkit/chainlink-protos"
2932
chainlink-protos/workflows/go
@@ -46,6 +49,7 @@ flowchart LR
4649
subgraph chainlink-protos-repo[chainlink-protos]
4750
chainlink-protos/billing/go
4851
chainlink-protos/cre/go
52+
chainlink-protos/linking-service/go
4953
chainlink-protos/storage-service
5054
chainlink-protos/workflows/go
5155
end

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ require (
4040
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4
4141
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-876fd6b94976
4242
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2
43+
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b
4344
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0
45+
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20250822025801-598d3d86f873
4446
github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e
4547
github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7
4648
github.com/smartcontractkit/libocr v0.0.0-20250707144819-babe0ec4e358
@@ -137,7 +139,6 @@ require (
137139
github.com/rogpeppe/go-internal v1.13.1 // indirect
138140
github.com/ryanuber/go-glob v1.0.0 // indirect
139141
github.com/sanity-io/litter v1.5.5 // indirect
140-
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20250822025801-598d3d86f873 // indirect
141142
github.com/stretchr/objx v0.5.2 // indirect
142143
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
143144
github.com/x448/float16 v0.8.4 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,8 @@ github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-87
332332
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-876fd6b94976/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA=
333333
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2 h1:1/KdO5AbUr3CmpLjMPuJXPo2wHMbfB8mldKLsg7D4M8=
334334
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2/go.mod h1:jUC52kZzEnWF9tddHh85zolKybmLpbQ1oNA4FjOHt1Q=
335+
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM=
336+
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY=
335337
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+CMJ26elVw/cAJqqhBQ3Xa/mBYWK0/rQ5MuI=
336338
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0/go.mod h1:h6kqaGajbNRrezm56zhx03p0mVmmA2xxj7E/M4ytLUA=
337339
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20250822025801-598d3d86f873 h1:8/qwOmcdSFa8A6ecnj3eH/mwNx7Ybw2tjQFydDymtOc=
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package orgresolver
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
"google.golang.org/grpc"
9+
"google.golang.org/grpc/credentials"
10+
"google.golang.org/grpc/credentials/insecure"
11+
12+
log "github.com/smartcontractkit/chainlink-common/pkg/logger"
13+
"github.com/smartcontractkit/chainlink-common/pkg/services"
14+
linkingclient "github.com/smartcontractkit/chainlink-protos/linking-service/go/v1"
15+
)
16+
17+
// OrgResolver interface defines methods for resolving organization IDs from workflow owners
18+
type OrgResolver interface {
19+
services.Service
20+
Get(ctx context.Context, owner string) (string, error)
21+
}
22+
23+
type Config struct {
24+
URL string
25+
TLSEnabled bool
26+
WorkflowRegistryAddress string
27+
WorkflowRegistryChainSelector uint64
28+
}
29+
30+
// orgResolver makes direct calls to the linking service to resolve organization IDs from workflow owners.
31+
// This simplified implementation makes a network call for each Get() request.
32+
type orgResolver struct {
33+
workflowRegistryAddress string
34+
workflowRegistryChainSelector uint64
35+
36+
client linkingclient.LinkingServiceClient
37+
conn *grpc.ClientConn // nil if client was injected
38+
logger log.SugaredLogger
39+
}
40+
41+
// NewOrgResolver creates a new org resolver with the specified configuration
42+
func NewOrgResolver(cfg Config, logger log.Logger) (*orgResolver, error) {
43+
return NewOrgResolverWithClient(cfg, nil, logger)
44+
}
45+
46+
// NewOrgResolverWithClient creates a new org resolver with an optional injected client (for testing)
47+
func NewOrgResolverWithClient(cfg Config, client linkingclient.LinkingServiceClient, logger log.Logger) (*orgResolver, error) {
48+
resolver := &orgResolver{
49+
workflowRegistryAddress: cfg.WorkflowRegistryAddress,
50+
workflowRegistryChainSelector: cfg.WorkflowRegistryChainSelector,
51+
logger: log.Sugared(logger).Named("OrgResolver"),
52+
}
53+
54+
if client != nil {
55+
resolver.client = client
56+
} else {
57+
if cfg.URL == "" {
58+
return nil, errors.New("URL is required when client is not provided")
59+
}
60+
61+
var opts []grpc.DialOption
62+
if cfg.TLSEnabled {
63+
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(nil)))
64+
} else {
65+
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
66+
}
67+
68+
conn, err := grpc.NewClient(cfg.URL, opts...)
69+
if err != nil {
70+
return nil, fmt.Errorf("failed to create linking service client at %s: %w", cfg.URL, err)
71+
}
72+
73+
resolver.conn = conn
74+
resolver.client = linkingclient.NewLinkingServiceClient(conn)
75+
}
76+
77+
return resolver, nil
78+
}
79+
80+
func (o *orgResolver) Get(ctx context.Context, owner string) (string, error) {
81+
req := &linkingclient.GetOrganizationFromWorkflowOwnerRequest{
82+
WorkflowOwner: owner,
83+
WorkflowRegistryAddress: o.workflowRegistryAddress,
84+
ChainSelector: o.workflowRegistryChainSelector,
85+
}
86+
87+
resp, err := o.client.GetOrganizationFromWorkflowOwner(ctx, req)
88+
if err != nil {
89+
return "", fmt.Errorf("failed to fetch organization from workflow owner: %w", err)
90+
}
91+
92+
return resp.OrganizationId, nil
93+
}
94+
95+
func (o *orgResolver) Start(_ context.Context) error {
96+
return nil
97+
}
98+
99+
func (o *orgResolver) HealthReport() map[string]error {
100+
return map[string]error{o.Name(): nil}
101+
}
102+
103+
func (o *orgResolver) Close() error {
104+
if o.conn != nil {
105+
return o.conn.Close()
106+
}
107+
return nil
108+
}
109+
110+
func (o *orgResolver) Name() string {
111+
return o.logger.Name()
112+
}
113+
114+
func (o *orgResolver) Ready() error {
115+
return nil
116+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package orgresolver
2+
3+
import (
4+
"context"
5+
"net"
6+
"testing"
7+
8+
"github.com/stretchr/testify/require"
9+
"google.golang.org/grpc"
10+
"google.golang.org/grpc/credentials/insecure"
11+
"google.golang.org/grpc/test/bufconn"
12+
13+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
14+
linkingclient "github.com/smartcontractkit/chainlink-protos/linking-service/go/v1"
15+
)
16+
17+
// mockLinkingClient implements the LinkingServiceClient interface for testing
18+
type mockLinkingClient struct{}
19+
20+
func (m *mockLinkingClient) GetOrganizationFromWorkflowOwner(ctx context.Context, req *linkingclient.GetOrganizationFromWorkflowOwnerRequest, opts ...grpc.CallOption) (*linkingclient.GetOrganizationFromWorkflowOwnerResponse, error) {
21+
orgID := "org-" + req.WorkflowOwner
22+
return &linkingclient.GetOrganizationFromWorkflowOwnerResponse{
23+
OrganizationId: orgID,
24+
}, nil
25+
}
26+
27+
// mockLinkingServer implements the LinkingServiceServer interface for testing
28+
type mockLinkingServer struct {
29+
linkingclient.UnimplementedLinkingServiceServer
30+
}
31+
32+
func (s *mockLinkingServer) GetOrganizationFromWorkflowOwner(ctx context.Context, req *linkingclient.GetOrganizationFromWorkflowOwnerRequest) (*linkingclient.GetOrganizationFromWorkflowOwnerResponse, error) {
33+
orgID := "org-" + req.WorkflowOwner
34+
return &linkingclient.GetOrganizationFromWorkflowOwnerResponse{
35+
OrganizationId: orgID,
36+
}, nil
37+
}
38+
39+
func TestOrgResolver_Get(t *testing.T) {
40+
ctx := context.Background()
41+
client := &mockLinkingClient{}
42+
43+
cfg := Config{
44+
URL: "test-url",
45+
TLSEnabled: false,
46+
WorkflowRegistryAddress: "0x1234567890abcdef",
47+
WorkflowRegistryChainSelector: 1,
48+
}
49+
50+
resolver, err := NewOrgResolverWithClient(cfg, client, logger.Test(t))
51+
require.NoError(t, err)
52+
53+
workflowOwner := "0xabcdef1234567890"
54+
55+
orgID, err := resolver.Get(ctx, workflowOwner)
56+
require.NoError(t, err)
57+
require.Equal(t, "org-"+workflowOwner, orgID)
58+
}
59+
60+
func TestOrgResolver_NewOrgResolver_RequiresClientOrURL(t *testing.T) {
61+
cfg := Config{
62+
URL: "", // Empty URL should cause error
63+
TLSEnabled: false,
64+
WorkflowRegistryAddress: "0x1234567890abcdef",
65+
WorkflowRegistryChainSelector: 1,
66+
}
67+
68+
_, err := NewOrgResolverWithClient(cfg, nil, logger.Test(t))
69+
require.Error(t, err)
70+
require.Contains(t, err.Error(), "URL is required when client is not provided")
71+
}
72+
73+
func TestOrgResolver_NewOrgResolver_WithMockServer(t *testing.T) {
74+
// Use in-memory connection for faster testing
75+
lis := bufconn.Listen(1024 * 1024)
76+
server := grpc.NewServer()
77+
linkingclient.RegisterLinkingServiceServer(server, &mockLinkingServer{})
78+
79+
go func() {
80+
_ = server.Serve(lis)
81+
}()
82+
t.Cleanup(func() { server.Stop() })
83+
84+
// Create gRPC client connection using bufconn
85+
ctx := context.Background()
86+
conn, err := grpc.NewClient("passthrough:///bufnet",
87+
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
88+
return lis.Dial()
89+
}),
90+
grpc.WithTransportCredentials(insecure.NewCredentials()))
91+
require.NoError(t, err)
92+
defer conn.Close()
93+
94+
client := linkingclient.NewLinkingServiceClient(conn)
95+
96+
// Create OrgResolver using the client (simulating what NewOrgResolver would do)
97+
cfg := Config{
98+
URL: "bufnet", // Not used since client is injected
99+
TLSEnabled: false,
100+
WorkflowRegistryAddress: "0x1234567890abcdef",
101+
WorkflowRegistryChainSelector: 1,
102+
}
103+
104+
resolver, err := NewOrgResolverWithClient(cfg, client, logger.Test(t))
105+
require.NoError(t, err)
106+
107+
workflowOwner := "0xabcdef1234567890"
108+
109+
orgID, err := resolver.Get(ctx, workflowOwner)
110+
require.NoError(t, err)
111+
require.Equal(t, "org-"+workflowOwner, orgID)
112+
113+
err = resolver.Close()
114+
require.NoError(t, err)
115+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package events
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
"time"
8+
9+
"google.golang.org/protobuf/proto"
10+
11+
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
12+
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
13+
workflowsevents "github.com/smartcontractkit/chainlink-protos/workflows/go/v2"
14+
)
15+
16+
// Label keys for trigger events
17+
const (
18+
KeyTriggerID = "trigger_id"
19+
KeyWorkflowID = "workflow_id"
20+
KeyWorkflowOwner = "workflow_owner"
21+
KeyWorkflowName = "workflow_name"
22+
KeyWorkflowExecutionID = "workflow_execution_id"
23+
KeyDonID = "don_id"
24+
KeyDonVersion = "don_version"
25+
KeyOrganizationID = "organization_id"
26+
)
27+
28+
// EmitTriggerExecutionStarted emits a TriggerExecutionStarted event using the provided labeler
29+
func EmitTriggerExecutionStarted(ctx context.Context, labeler custmsg.MessageEmitter) error {
30+
labels := labeler.Labels()
31+
32+
// Extract required fields
33+
triggerID, ok := labels[KeyTriggerID]
34+
if !ok {
35+
return fmt.Errorf("missing required field: %s", KeyTriggerID)
36+
}
37+
38+
workflowID, ok := labels[KeyWorkflowID]
39+
if !ok {
40+
return fmt.Errorf("missing required field: %s", KeyWorkflowID)
41+
}
42+
43+
workflowExecutionID, ok := labels[KeyWorkflowExecutionID]
44+
if !ok {
45+
return fmt.Errorf("missing required field: %s", KeyWorkflowExecutionID)
46+
}
47+
48+
event := &workflowsevents.TriggerExecutionStarted{
49+
TriggerID: triggerID,
50+
WorkflowExecutionID: workflowExecutionID,
51+
Workflow: &workflowsevents.WorkflowKey{
52+
WorkflowID: workflowID,
53+
WorkflowOwner: labels[KeyWorkflowOwner],
54+
WorkflowName: labels[KeyWorkflowName],
55+
OrganizationID: labels[KeyOrganizationID],
56+
},
57+
Timestamp: time.Now().Format(time.RFC3339),
58+
}
59+
60+
// Optional; downstream consumers could infer from csa public key,
61+
// as of now Beholder/ChiP autohydrates csa public key
62+
if donIDStr, exists := labels[KeyDonID]; exists {
63+
if donID, err := strconv.ParseInt(donIDStr, 10, 32); err == nil {
64+
event.CreInfo = &workflowsevents.CreInfo{
65+
DonID: int32(donID),
66+
DonVersion: labels[KeyDonVersion],
67+
}
68+
}
69+
}
70+
71+
b, err := proto.Marshal(event)
72+
if err != nil {
73+
return fmt.Errorf("failed to marshal TriggerExecutionStarted event: %w", err)
74+
}
75+
76+
return beholder.GetEmitter().Emit(ctx, b,
77+
"beholder_data_schema", "workflows.v2.trigger_execution_started", // required
78+
"beholder_domain", "platform", // required
79+
"beholder_entity", "workflows.v2.TriggerExecutionStarted") // required
80+
}

0 commit comments

Comments
 (0)