Skip to content

Commit c24790f

Browse files
committed
Gateway multi handler conversions
1 parent 70b1e2b commit c24790f

File tree

5 files changed

+310
-43
lines changed

5 files changed

+310
-43
lines changed

core/services/gateway/handler_factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (hf *handlerFactory) NewHandler(
8383
case WebAPICapabilitiesType:
8484
return capabilities.NewHandler(handlerConfig, donConfig, don, hf.httpClient, hf.lggr)
8585
case HTTPCapabilityType:
86-
return v2.NewGatewayHandler(handlerConfig, donConfig, don, hf.httpClient, hf.lggr, hf.lf)
86+
return v2.NewGatewayHandler(handlerConfig, shardedDONs, shardsConnMgrs, hf.httpClient, hf.lggr, hf.lf)
8787
case VaultHandlerType:
8888
requestAuthorizer := vaultcap.NewRequestAuthorizer(hf.lggr, hf.workflowRegistrySyncer)
8989
return vault.NewHandler(handlerConfig, donConfig, don, hf.capabilitiesRegistry, requestAuthorizer, hf.lggr, clockwork.NewRealClock(), hf.lf)

core/services/gateway/handlers/capabilities/v2/http_handler.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,15 @@ type RetryConfig struct {
110110
Multiplier float64 `json:"multiplier"`
111111
}
112112

113-
func NewGatewayHandler(handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON, httpClient network.HTTPClient, lggr logger.Logger, lf limits.Factory) (*gatewayHandler, error) {
113+
func NewGatewayHandler(handlerConfig json.RawMessage, shardedDONs []config.ShardedDONConfig, shardsConnMgrs [][]handlers.DON, httpClient network.HTTPClient, lggr logger.Logger, lf limits.Factory) (*gatewayHandler, error) {
114+
multiDON, err := handlers.NewMultiDON(shardedDONs, shardsConnMgrs)
115+
if err != nil {
116+
return nil, fmt.Errorf("failed to create multi-DON: %w", err)
117+
}
118+
donConfig := handlers.FlattenedDONConfig(shardedDONs)
119+
114120
var cfg ServiceConfig
115-
err := json.Unmarshal(handlerConfig, &cfg)
121+
err = json.Unmarshal(handlerConfig, &cfg)
116122
if err != nil {
117123
return nil, err
118124
}
@@ -126,24 +132,24 @@ func NewGatewayHandler(handlerConfig json.RawMessage, donConfig *config.DONConfi
126132
return nil, fmt.Errorf("failed to create user rate limiter: %w", err)
127133
}
128134

129-
metrics, err := metrics.NewMetrics(donConfig)
135+
m, err := metrics.NewMetrics(donConfig)
130136
if err != nil {
131137
return nil, fmt.Errorf("failed to initialize metrics: %w", err)
132138
}
133139

134-
metadataHandler := NewWorkflowMetadataHandler(lggr, cfg, don, donConfig, metrics)
135-
triggerHandler := NewHTTPTriggerHandler(lggr, cfg, donConfig, don, metadataHandler, userRateLimiter, metrics)
140+
metadataHandler := NewWorkflowMetadataHandler(lggr, cfg, multiDON, donConfig, m)
141+
triggerHandler := NewHTTPTriggerHandler(lggr, cfg, donConfig, multiDON, metadataHandler, userRateLimiter, m)
136142
return &gatewayHandler{
137143
config: cfg,
138-
don: don,
144+
don: multiDON,
139145
lggr: logger.With(logger.Named(lggr, handlerName), "donId", donConfig.DonId),
140146
httpClient: httpClient,
141147
nodeRateLimiter: nodeRateLimiter,
142148
stopCh: make(services.StopChan),
143-
responseCache: newResponseCache(lggr, cfg.OutboundRequestCacheTTLMs, metrics),
149+
responseCache: newResponseCache(lggr, cfg.OutboundRequestCacheTTLMs, m),
144150
triggerHandler: triggerHandler,
145151
metadataHandler: metadataHandler,
146-
metrics: metrics,
152+
metrics: m,
147153
}, nil
148154
}
149155

core/services/gateway/handlers/capabilities/v2/http_handler_test.go

Lines changed: 47 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,24 @@ import (
2020
gateway_common "github.com/smartcontractkit/chainlink-common/pkg/types/gateway"
2121
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
2222
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/config"
23+
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers"
2324
triggermocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities/v2/mocks"
2425
handlermocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/mocks"
2526
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/network"
2627
httpmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/network/mocks"
2728
)
2829

30+
// toShardedArgs converts legacy DONConfig + DON into the sharded format expected by NewGatewayHandler.
31+
func toShardedArgs(donCfg *config.DONConfig, don handlers.DON) ([]config.ShardedDONConfig, [][]handlers.DON) {
32+
shardedDONs := []config.ShardedDONConfig{{
33+
DonName: donCfg.DonId,
34+
F: donCfg.F,
35+
Shards: []config.Shard{{Nodes: donCfg.Members}},
36+
}}
37+
connMgrs := [][]handlers.DON{{don}}
38+
return shardedDONs, connMgrs
39+
}
40+
2941
func TestNewGatewayHandler(t *testing.T) {
3042
t.Run("successful creation", func(t *testing.T) {
3143
cfg := serviceCfg()
@@ -39,7 +51,8 @@ func TestNewGatewayHandler(t *testing.T) {
3951
mockHTTPClient := httpmocks.NewHTTPClient(t)
4052
lggr := logger.Test(t)
4153

42-
handler, err := NewGatewayHandler(configBytes, donConfig, mockDon, mockHTTPClient, lggr, limits.Factory{Logger: lggr})
54+
shardedDONs, connMgrs := toShardedArgs(donConfig, mockDon)
55+
handler, err := NewGatewayHandler(configBytes, shardedDONs, connMgrs, mockHTTPClient, lggr, limits.Factory{Logger: lggr})
4356
require.NoError(t, err)
4457
require.NotNil(t, handler)
4558
require.NotNil(t, handler.responseCache)
@@ -54,7 +67,8 @@ func TestNewGatewayHandler(t *testing.T) {
5467
mockHTTPClient := httpmocks.NewHTTPClient(t)
5568
lggr := logger.Test(t)
5669

57-
handler, err := NewGatewayHandler(invalidConfig, donConfig, mockDon, mockHTTPClient, lggr, limits.Factory{Logger: lggr})
70+
shardedDONs, connMgrs := toShardedArgs(donConfig, mockDon)
71+
handler, err := NewGatewayHandler(invalidConfig, shardedDONs, connMgrs, mockHTTPClient, lggr, limits.Factory{Logger: lggr})
5872
require.Error(t, err)
5973
require.Nil(t, handler)
6074
})
@@ -74,7 +88,8 @@ func TestNewGatewayHandler(t *testing.T) {
7488
mockHTTPClient := httpmocks.NewHTTPClient(t)
7589
lggr := logger.Test(t)
7690

77-
handler, err := NewGatewayHandler(configBytes, donConfig, mockDon, mockHTTPClient, lggr, limits.Factory{Logger: lggr})
91+
shardedDONs, connMgrs := toShardedArgs(donConfig, mockDon)
92+
handler, err := NewGatewayHandler(configBytes, shardedDONs, connMgrs, mockHTTPClient, lggr, limits.Factory{Logger: lggr})
7893
require.Error(t, err)
7994
require.Nil(t, handler)
8095
})
@@ -97,18 +112,18 @@ func TestNewGatewayHandler(t *testing.T) {
97112
mockHTTPClient := httpmocks.NewHTTPClient(t)
98113
lggr := logger.Test(t)
99114

100-
handler, err := NewGatewayHandler(configBytes, donConfig, mockDon, mockHTTPClient, lggr, limits.Factory{Logger: lggr})
115+
shardedDONs, connMgrs := toShardedArgs(donConfig, mockDon)
116+
handler, err := NewGatewayHandler(configBytes, shardedDONs, connMgrs, mockHTTPClient, lggr, limits.Factory{Logger: lggr})
101117
require.NoError(t, err)
102118
require.NotNil(t, handler)
103119
require.Equal(t, defaultCleanUpPeriodMs, handler.config.CleanUpPeriodMs) // Default value
104120
})
105121
}
106122

107123
func TestHandleNodeMessage(t *testing.T) {
108-
handler := createTestHandler(t)
124+
handler, mockDon := createTestHandler(t)
109125

110126
t.Run("successful node message handling", func(t *testing.T) {
111-
mockDon := handler.don.(*handlermocks.DON)
112127
mockHTTPClient := handler.httpClient.(*httpmocks.HTTPClient)
113128

114129
// Prepare outbound request
@@ -149,7 +164,6 @@ func TestHandleNodeMessage(t *testing.T) {
149164
})
150165

151166
t.Run("successful node message handling with MultiHeaders", func(t *testing.T) {
152-
mockDon := handler.don.(*handlermocks.DON)
153167
mockHTTPClient := handler.httpClient.(*httpmocks.HTTPClient)
154168

155169
// Prepare outbound request
@@ -247,7 +261,6 @@ func TestHandleNodeMessage(t *testing.T) {
247261
Result: &rawRequest,
248262
}
249263

250-
mockDon := handler.don.(*handlermocks.DON)
251264
// First call: should fetch from HTTP client and cache the response
252265
httpResp := &network.HTTPResponse{
253266
StatusCode: 200,
@@ -293,7 +306,6 @@ func TestHandleNodeMessage(t *testing.T) {
293306
Result: &rawRequest,
294307
}
295308

296-
mockDon := handler.don.(*handlermocks.DON)
297309
mockHTTPClient := handler.httpClient.(*httpmocks.HTTPClient)
298310
httpResp := &network.HTTPResponse{
299311
StatusCode: 500,
@@ -345,7 +357,7 @@ func TestHandleNodeMessage(t *testing.T) {
345357
}
346358

347359
func TestServiceLifecycle(t *testing.T) {
348-
handler := createTestHandler(t)
360+
handler, _ := createTestHandler(t)
349361

350362
t.Run("start and stop", func(t *testing.T) {
351363
ctx := testutils.Context(t)
@@ -367,7 +379,7 @@ func TestHandleNodeMessage_RoutesToTriggerHandler(t *testing.T) {
367379
// This test covers the case where the response ID does not contain a "/"
368380
// and should be routed to the triggerHandler.HandleNodeTriggerResponse.
369381
mockTriggerHandler := triggermocks.NewHTTPTriggerHandler(t)
370-
handler := createTestHandler(t)
382+
handler, _ := createTestHandler(t)
371383
handler.triggerHandler = mockTriggerHandler
372384

373385
rawRes := json.RawMessage([]byte(`{}`))
@@ -388,7 +400,7 @@ func TestHandleNodeMessage_RoutesToTriggerHandler(t *testing.T) {
388400
}
389401

390402
func TestHandleNodeMessage_UnsupportedMethod(t *testing.T) {
391-
handler := createTestHandler(t)
403+
handler, _ := createTestHandler(t)
392404
rawRes := json.RawMessage([]byte(`{}`))
393405
resp := &jsonrpc.Response[json.RawMessage]{
394406
ID: "unsupportedMethod/123",
@@ -402,7 +414,7 @@ func TestHandleNodeMessage_UnsupportedMethod(t *testing.T) {
402414
}
403415

404416
func TestHandleNodeMessage_EmptyID(t *testing.T) {
405-
handler := createTestHandler(t)
417+
handler, _ := createTestHandler(t)
406418
rawRes := json.RawMessage([]byte(`{}`))
407419
resp := &jsonrpc.Response[json.RawMessage]{
408420
ID: "",
@@ -458,7 +470,8 @@ func TestGatewayHandler_Start_CallsDeleteExpired(t *testing.T) {
458470
mockHTTPClient := httpmocks.NewHTTPClient(t)
459471
lggr := logger.Test(t)
460472

461-
handler, err := NewGatewayHandler(configBytes, donConfig, mockDon, mockHTTPClient, lggr, limits.Factory{Logger: lggr})
473+
shardedDONs, connMgrs := toShardedArgs(donConfig, mockDon)
474+
handler, err := NewGatewayHandler(configBytes, shardedDONs, connMgrs, mockHTTPClient, lggr, limits.Factory{Logger: lggr})
462475
require.NoError(t, err)
463476
require.NotNil(t, handler)
464477
mockCache := newMockResponseCache()
@@ -491,7 +504,7 @@ func serviceCfg() ServiceConfig {
491504
return WithDefaults(cfg)
492505
}
493506

494-
func createTestHandler(t *testing.T) *gatewayHandler {
507+
func createTestHandler(t *testing.T) (*gatewayHandler, *handlermocks.DON) {
495508
cfg := serviceCfg()
496509
return createTestHandlerWithConfig(t, cfg)
497510
}
@@ -504,22 +517,27 @@ func verifyBackwardCompatibility(t *testing.T, headers map[string]string, multiH
504517
}
505518
}
506519

507-
func createTestHandlerWithConfig(t *testing.T, cfg ServiceConfig) *gatewayHandler {
520+
func createTestHandlerWithConfig(t *testing.T, cfg ServiceConfig) (*gatewayHandler, *handlermocks.DON) {
508521
configBytes, err := json.Marshal(cfg)
509522
require.NoError(t, err)
510523

511524
donConfig := &config.DONConfig{
512525
DonId: "test-don",
526+
Members: []config.NodeConfig{
527+
{Name: "node1", Address: "node1"},
528+
{Name: "node2", Address: "node2"},
529+
},
513530
}
514531
mockDon := handlermocks.NewDON(t)
515532
mockHTTPClient := httpmocks.NewHTTPClient(t)
516533
lggr := logger.Test(t)
517534

518-
handler, err := NewGatewayHandler(configBytes, donConfig, mockDon, mockHTTPClient, lggr, limits.Factory{Logger: lggr})
535+
shardedDONs, connMgrs := toShardedArgs(donConfig, mockDon)
536+
handler, err := NewGatewayHandler(configBytes, shardedDONs, connMgrs, mockHTTPClient, lggr, limits.Factory{Logger: lggr})
519537
require.NoError(t, err)
520538
require.NotNil(t, handler)
521539

522-
return handler
540+
return handler, mockDon
523541
}
524542

525543
func TestCreateHTTPRequestCallback(t *testing.T) {
@@ -542,7 +560,7 @@ func TestCreateHTTPRequestCallback(t *testing.T) {
542560
}
543561

544562
t.Run("successful HTTP request with latency measurement", func(t *testing.T) {
545-
handler := createTestHandler(t)
563+
handler, _ := createTestHandler(t)
546564
mockHTTPClient := handler.httpClient.(*httpmocks.HTTPClient)
547565

548566
expectedResp := &network.HTTPResponse{
@@ -565,7 +583,7 @@ func TestCreateHTTPRequestCallback(t *testing.T) {
565583
})
566584

567585
t.Run("HTTP send error sets IsExternalEndpointError to true", func(t *testing.T) {
568-
handler := createTestHandler(t)
586+
handler, _ := createTestHandler(t)
569587
mockHTTPClient := handler.httpClient.(*httpmocks.HTTPClient)
570588

571589
mockHTTPClient.EXPECT().Send(mock.Anything, mock.Anything).Return(nil, network.ErrHTTPSend)
@@ -584,7 +602,7 @@ func TestCreateHTTPRequestCallback(t *testing.T) {
584602
})
585603

586604
t.Run("response with MultiHeaders is passed through correctly", func(t *testing.T) {
587-
handler := createTestHandler(t)
605+
handler, _ := createTestHandler(t)
588606
mockHTTPClient := handler.httpClient.(*httpmocks.HTTPClient)
589607

590608
expectedResp := &network.HTTPResponse{
@@ -634,7 +652,7 @@ func TestCreateHTTPRequestCallback(t *testing.T) {
634652
})
635653

636654
t.Run("response with empty MultiHeaders still sets Headers", func(t *testing.T) {
637-
handler := createTestHandler(t)
655+
handler, _ := createTestHandler(t)
638656
mockHTTPClient := handler.httpClient.(*httpmocks.HTTPClient)
639657

640658
expectedResp := &network.HTTPResponse{
@@ -661,7 +679,7 @@ func TestCreateHTTPRequestCallback(t *testing.T) {
661679
})
662680

663681
t.Run("HTTP read error sets IsExternalEndpointError to true", func(t *testing.T) {
664-
handler := createTestHandler(t)
682+
handler, _ := createTestHandler(t)
665683
mockHTTPClient := handler.httpClient.(*httpmocks.HTTPClient)
666684

667685
mockHTTPClient.EXPECT().Send(mock.Anything, mock.Anything).Return(nil, network.ErrHTTPRead)
@@ -680,7 +698,7 @@ func TestCreateHTTPRequestCallback(t *testing.T) {
680698
})
681699

682700
t.Run("other errors set IsExternalEndpointError to false", func(t *testing.T) {
683-
handler := createTestHandler(t)
701+
handler, _ := createTestHandler(t)
684702
mockHTTPClient := handler.httpClient.(*httpmocks.HTTPClient)
685703

686704
genericError := errors.New("some other network error")
@@ -701,8 +719,7 @@ func TestCreateHTTPRequestCallback(t *testing.T) {
701719
}
702720

703721
func TestMakeOutgoingRequest_SendResponseUsesIndependentContext(t *testing.T) {
704-
handler := createTestHandler(t)
705-
mockDon := handler.don.(*handlermocks.DON)
722+
handler, mockDon := createTestHandler(t)
706723
mockHTTPClient := handler.httpClient.(*httpmocks.HTTPClient)
707724

708725
outboundReq := gateway_common.OutboundHTTPRequest{
@@ -744,7 +761,7 @@ func TestMakeOutgoingRequest_SendResponseUsesIndependentContext(t *testing.T) {
744761
// TestMakeOutgoingRequestCachingBehavior tests the specific caching logic in makeOutgoingRequest
745762
func TestMakeOutgoingRequestCachingBehavior(t *testing.T) {
746763
t.Run("MaxAgeMs=0 and Store=true calls Set", func(t *testing.T) {
747-
handler := createTestHandler(t)
764+
handler, mockDon := createTestHandler(t)
748765
mockCache := newMockResponseCache()
749766
handler.responseCache = mockCache
750767

@@ -766,7 +783,6 @@ func TestMakeOutgoingRequestCachingBehavior(t *testing.T) {
766783
Result: &rawRequest,
767784
}
768785

769-
mockDon := handler.don.(*handlermocks.DON)
770786
mockHTTPClient := handler.httpClient.(*httpmocks.HTTPClient)
771787
httpResp := &network.HTTPResponse{
772788
StatusCode: 200,
@@ -788,7 +804,7 @@ func TestMakeOutgoingRequestCachingBehavior(t *testing.T) {
788804
})
789805

790806
t.Run("MaxAgeMs=0 and Store=false does not call Set", func(t *testing.T) {
791-
handler := createTestHandler(t)
807+
handler, mockDon := createTestHandler(t)
792808
mockCache := newMockResponseCache()
793809
handler.responseCache = mockCache
794810

@@ -810,7 +826,6 @@ func TestMakeOutgoingRequestCachingBehavior(t *testing.T) {
810826
Result: &rawRequest,
811827
}
812828

813-
mockDon := handler.don.(*handlermocks.DON)
814829
mockHTTPClient := handler.httpClient.(*httpmocks.HTTPClient)
815830
httpResp := &network.HTTPResponse{
816831
StatusCode: 200,
@@ -832,7 +847,7 @@ func TestMakeOutgoingRequestCachingBehavior(t *testing.T) {
832847
})
833848

834849
t.Run("MaxAgeMs>0 calls CachedFetch", func(t *testing.T) {
835-
handler := createTestHandler(t)
850+
handler, mockDon := createTestHandler(t)
836851
mockCache := newMockResponseCache()
837852
handler.responseCache = mockCache
838853

@@ -854,7 +869,6 @@ func TestMakeOutgoingRequestCachingBehavior(t *testing.T) {
854869
Result: &rawRequest,
855870
}
856871

857-
mockDon := handler.don.(*handlermocks.DON)
858872
mockHTTPClient := handler.httpClient.(*httpmocks.HTTPClient)
859873
httpResp := &network.HTTPResponse{
860874
StatusCode: 200,
@@ -878,7 +892,7 @@ func TestMakeOutgoingRequestCachingBehavior(t *testing.T) {
878892

879893
// setupRateLimitingTest creates common test setup for rate limiting tests
880894
func setupRateLimitingTest(t *testing.T, cfg ServiceConfig) (*gatewayHandler, *jsonrpc.Response[json.RawMessage], *httpmocks.HTTPClient, *handlermocks.DON) {
881-
handler := createTestHandlerWithConfig(t, cfg)
895+
handler, mockDon := createTestHandlerWithConfig(t, cfg)
882896

883897
outboundReq := gateway_common.OutboundHTTPRequest{
884898
Method: "GET",
@@ -896,7 +910,6 @@ func setupRateLimitingTest(t *testing.T, cfg ServiceConfig) (*gatewayHandler, *j
896910
}
897911

898912
mockHTTPClient := handler.httpClient.(*httpmocks.HTTPClient)
899-
mockDon := handler.don.(*handlermocks.DON)
900913

901914
return handler, resp, mockHTTPClient, mockDon
902915
}

0 commit comments

Comments
 (0)