Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,40 +140,64 @@ func (agg *WorkflowMetadataAggregator) Collect(obs *gateway_common.WorkflowMetad
return nil
}

// AggregatedWorkflow pairs a workflow's metadata with the set of node addresses that reported it.
type AggregatedWorkflow struct {
Metadata gateway_common.WorkflowMetadata
Reporters StringSet
}

// Aggregate returns the aggregated workflow metadata for workflows that have reached the threshold.
// Results are sorted chronologically by sequence number (newest first, oldest last).
func (agg *WorkflowMetadataAggregator) Aggregate() ([]gateway_common.WorkflowMetadata, error) {
results, err := agg.AggregateWithReporters()
if err != nil {
return nil, err
}
aggregated := make([]gateway_common.WorkflowMetadata, len(results))
for i, r := range results {
aggregated[i] = r.Metadata
}
return aggregated, nil
}

// AggregateWithReporters returns aggregated workflow metadata together with the set of
// node addresses that reported each workflow. This allows callers to determine which
// shard owns a workflow based on the reporting nodes.
func (agg *WorkflowMetadataAggregator) AggregateWithReporters() ([]AggregatedWorkflow, error) {
agg.mu.RLock()
defer agg.mu.RUnlock()

type aggregatedObs struct {
metadata gateway_common.WorkflowMetadata
type sortable struct {
result AggregatedWorkflow
sequence uint64
}

var toSort []aggregatedObs
var toSort []sortable
for _, nodeObs := range agg.observations {
if len(nodeObs.nodes) >= agg.threshold {
toSort = append(toSort, aggregatedObs{
metadata: *nodeObs.observation,
reportersCopy := make(StringSet, len(nodeObs.nodes))
for addr := range nodeObs.nodes {
reportersCopy.Add(addr)
}
toSort = append(toSort, sortable{
result: AggregatedWorkflow{
Metadata: *nodeObs.observation,
Reporters: reportersCopy,
},
sequence: nodeObs.sequence,
})
}
}

// Sort chronologically (newest first) so that workflows that were registered most recently
// takes precedence
sort.Slice(toSort, func(i, j int) bool {
return toSort[i].sequence > toSort[j].sequence
})

// Extract just the metadata
aggregated := make([]gateway_common.WorkflowMetadata, len(toSort))
for i, obs := range toSort {
aggregated[i] = obs.metadata
results := make([]AggregatedWorkflow, len(toSort))
for i, s := range toSort {
results[i] = s.result
}

return aggregated, nil
return results, nil
}

type NodeObservations struct {
Expand Down
2 changes: 1 addition & 1 deletion core/services/gateway/handler_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (hf *handlerFactory) NewHandler(
case WebAPICapabilitiesType:
return capabilities.NewHandler(handlerConfig, donConfig, don, hf.httpClient, hf.lggr)
case HTTPCapabilityType:
return v2.NewGatewayHandler(handlerConfig, donConfig, don, hf.httpClient, hf.lggr, hf.lf)
return v2.NewGatewayHandler(handlerConfig, shardedDONs, shardsConnMgrs, hf.httpClient, hf.lggr, hf.lf)
case VaultHandlerType:
requestAuthorizer := vaultcap.NewRequestAuthorizer(hf.lggr, hf.workflowRegistrySyncer)
return vault.NewHandler(handlerConfig, donConfig, don, hf.capabilitiesRegistry, requestAuthorizer, hf.lggr, clockwork.NewRealClock(), hf.lf)
Expand Down
23 changes: 14 additions & 9 deletions core/services/gateway/handlers/capabilities/v2/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,14 @@ type RetryConfig struct {
Multiplier float64 `json:"multiplier"`
}

func NewGatewayHandler(handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON, httpClient network.HTTPClient, lggr logger.Logger, lf limits.Factory) (*gatewayHandler, error) {
func NewGatewayHandler(handlerConfig json.RawMessage, shardedDONs []config.ShardedDONConfig, shardsConnMgrs [][]handlers.DON, httpClient network.HTTPClient, lggr logger.Logger, lf limits.Factory) (*gatewayHandler, error) {
shardRouter, err := handlers.NewShardRouter(shardedDONs, shardsConnMgrs)
if err != nil {
return nil, fmt.Errorf("failed to create shard router: %w", err)
}

var cfg ServiceConfig
err := json.Unmarshal(handlerConfig, &cfg)
err = json.Unmarshal(handlerConfig, &cfg)
if err != nil {
return nil, err
}
Expand All @@ -126,24 +131,24 @@ func NewGatewayHandler(handlerConfig json.RawMessage, donConfig *config.DONConfi
return nil, fmt.Errorf("failed to create user rate limiter: %w", err)
}

metrics, err := metrics.NewMetrics(donConfig)
m, err := metrics.NewMetrics(shardRouter.AllMembers())
if err != nil {
return nil, fmt.Errorf("failed to initialize metrics: %w", err)
}

metadataHandler := NewWorkflowMetadataHandler(lggr, cfg, don, donConfig, metrics)
triggerHandler := NewHTTPTriggerHandler(lggr, cfg, donConfig, don, metadataHandler, userRateLimiter, metrics)
metadataHandler := NewWorkflowMetadataHandler(lggr, cfg, shardRouter, m)
triggerHandler := NewHTTPTriggerHandler(lggr, cfg, shardRouter, metadataHandler, userRateLimiter, m)
return &gatewayHandler{
config: cfg,
don: don,
lggr: logger.With(logger.Named(lggr, handlerName), "donId", donConfig.DonId),
don: shardRouter,
lggr: logger.With(logger.Named(lggr, handlerName), "donId", shardRouter.DonID()),
httpClient: httpClient,
nodeRateLimiter: nodeRateLimiter,
stopCh: make(services.StopChan),
responseCache: newResponseCache(lggr, cfg.OutboundRequestCacheTTLMs, metrics),
responseCache: newResponseCache(lggr, cfg.OutboundRequestCacheTTLMs, m),
triggerHandler: triggerHandler,
metadataHandler: metadataHandler,
metrics: metrics,
metrics: m,
}, nil
}

Expand Down
Loading
Loading