Skip to content

Commit 3fb32e8

Browse files
committed
Gateway multi handler conversions
1 parent 70b1e2b commit 3fb32e8

File tree

12 files changed

+555
-130
lines changed

12 files changed

+555
-130
lines changed

core/services/gateway/common/aggregation/workflow_metadata_aggregator.go

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -140,40 +140,64 @@ func (agg *WorkflowMetadataAggregator) Collect(obs *gateway_common.WorkflowMetad
140140
return nil
141141
}
142142

143+
// AggregatedWorkflow pairs a workflow's metadata with the set of node addresses that reported it.
144+
type AggregatedWorkflow struct {
145+
Metadata gateway_common.WorkflowMetadata
146+
Reporters StringSet
147+
}
148+
143149
// Aggregate returns the aggregated workflow metadata for workflows that have reached the threshold.
144150
// Results are sorted chronologically by sequence number (newest first, oldest last).
145151
func (agg *WorkflowMetadataAggregator) Aggregate() ([]gateway_common.WorkflowMetadata, error) {
152+
results, err := agg.AggregateWithReporters()
153+
if err != nil {
154+
return nil, err
155+
}
156+
aggregated := make([]gateway_common.WorkflowMetadata, len(results))
157+
for i, r := range results {
158+
aggregated[i] = r.Metadata
159+
}
160+
return aggregated, nil
161+
}
162+
163+
// AggregateWithReporters returns aggregated workflow metadata together with the set of
164+
// node addresses that reported each workflow. This allows callers to determine which
165+
// shard owns a workflow based on the reporting nodes.
166+
func (agg *WorkflowMetadataAggregator) AggregateWithReporters() ([]AggregatedWorkflow, error) {
146167
agg.mu.RLock()
147168
defer agg.mu.RUnlock()
148169

149-
type aggregatedObs struct {
150-
metadata gateway_common.WorkflowMetadata
170+
type sortable struct {
171+
result AggregatedWorkflow
151172
sequence uint64
152173
}
153174

154-
var toSort []aggregatedObs
175+
var toSort []sortable
155176
for _, nodeObs := range agg.observations {
156177
if len(nodeObs.nodes) >= agg.threshold {
157-
toSort = append(toSort, aggregatedObs{
158-
metadata: *nodeObs.observation,
178+
reportersCopy := make(StringSet, len(nodeObs.nodes))
179+
for addr := range nodeObs.nodes {
180+
reportersCopy.Add(addr)
181+
}
182+
toSort = append(toSort, sortable{
183+
result: AggregatedWorkflow{
184+
Metadata: *nodeObs.observation,
185+
Reporters: reportersCopy,
186+
},
159187
sequence: nodeObs.sequence,
160188
})
161189
}
162190
}
163191

164-
// Sort chronologically (newest first) so that workflows that were registered most recently
165-
// takes precedence
166192
sort.Slice(toSort, func(i, j int) bool {
167193
return toSort[i].sequence > toSort[j].sequence
168194
})
169195

170-
// Extract just the metadata
171-
aggregated := make([]gateway_common.WorkflowMetadata, len(toSort))
172-
for i, obs := range toSort {
173-
aggregated[i] = obs.metadata
196+
results := make([]AggregatedWorkflow, len(toSort))
197+
for i, s := range toSort {
198+
results[i] = s.result
174199
}
175-
176-
return aggregated, nil
200+
return results, nil
177201
}
178202

179203
type NodeObservations struct {

core/services/gateway/handler_factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (hf *handlerFactory) NewHandler(
8383
case WebAPICapabilitiesType:
8484
return capabilities.NewHandler(handlerConfig, donConfig, don, hf.httpClient, hf.lggr)
8585
case HTTPCapabilityType:
86-
return v2.NewGatewayHandler(handlerConfig, donConfig, don, hf.httpClient, hf.lggr, hf.lf)
86+
return v2.NewGatewayHandler(handlerConfig, shardedDONs, shardsConnMgrs, hf.httpClient, hf.lggr, hf.lf)
8787
case VaultHandlerType:
8888
requestAuthorizer := vaultcap.NewRequestAuthorizer(hf.lggr, hf.workflowRegistrySyncer)
8989
return vault.NewHandler(handlerConfig, donConfig, don, hf.capabilitiesRegistry, requestAuthorizer, hf.lggr, clockwork.NewRealClock(), hf.lf)

core/services/gateway/handlers/capabilities/v2/http_handler.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,14 @@ type RetryConfig struct {
110110
Multiplier float64 `json:"multiplier"`
111111
}
112112

113-
func NewGatewayHandler(handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON, httpClient network.HTTPClient, lggr logger.Logger, lf limits.Factory) (*gatewayHandler, error) {
113+
func NewGatewayHandler(handlerConfig json.RawMessage, shardedDONs []config.ShardedDONConfig, shardsConnMgrs [][]handlers.DON, httpClient network.HTTPClient, lggr logger.Logger, lf limits.Factory) (*gatewayHandler, error) {
114+
shardRouter, err := handlers.NewShardRouter(shardedDONs, shardsConnMgrs)
115+
if err != nil {
116+
return nil, fmt.Errorf("failed to create shard router: %w", err)
117+
}
118+
114119
var cfg ServiceConfig
115-
err := json.Unmarshal(handlerConfig, &cfg)
120+
err = json.Unmarshal(handlerConfig, &cfg)
116121
if err != nil {
117122
return nil, err
118123
}
@@ -126,24 +131,24 @@ func NewGatewayHandler(handlerConfig json.RawMessage, donConfig *config.DONConfi
126131
return nil, fmt.Errorf("failed to create user rate limiter: %w", err)
127132
}
128133

129-
metrics, err := metrics.NewMetrics(donConfig)
134+
m, err := metrics.NewMetrics(shardRouter.AllMembers())
130135
if err != nil {
131136
return nil, fmt.Errorf("failed to initialize metrics: %w", err)
132137
}
133138

134-
metadataHandler := NewWorkflowMetadataHandler(lggr, cfg, don, donConfig, metrics)
135-
triggerHandler := NewHTTPTriggerHandler(lggr, cfg, donConfig, don, metadataHandler, userRateLimiter, metrics)
139+
metadataHandler := NewWorkflowMetadataHandler(lggr, cfg, shardRouter, m)
140+
triggerHandler := NewHTTPTriggerHandler(lggr, cfg, shardRouter, metadataHandler, userRateLimiter, m)
136141
return &gatewayHandler{
137142
config: cfg,
138-
don: don,
139-
lggr: logger.With(logger.Named(lggr, handlerName), "donId", donConfig.DonId),
143+
don: shardRouter,
144+
lggr: logger.With(logger.Named(lggr, handlerName), "donId", shardRouter.DonID()),
140145
httpClient: httpClient,
141146
nodeRateLimiter: nodeRateLimiter,
142147
stopCh: make(services.StopChan),
143-
responseCache: newResponseCache(lggr, cfg.OutboundRequestCacheTTLMs, metrics),
148+
responseCache: newResponseCache(lggr, cfg.OutboundRequestCacheTTLMs, m),
144149
triggerHandler: triggerHandler,
145150
metadataHandler: metadataHandler,
146-
metrics: metrics,
151+
metrics: m,
147152
}, nil
148153
}
149154

0 commit comments

Comments
 (0)