Skip to content

Commit ddb0cb7

Browse files
committed
Refactor pusher engine: queue length metrics
Rename queue and add length metrics for it, updating creation sites.
1 parent 051e6d4 commit ddb0cb7

File tree

5 files changed

+28
-10
lines changed

5 files changed

+28
-10
lines changed

cmd/collection/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,7 @@ func main() {
480480
node.EngineRegistry,
481481
node.State,
482482
node.Metrics.Engine,
483+
node.Metrics.Mempool,
483484
colMetrics,
484485
node.Me,
485486
node.Storage.Collections,

engine/collection/pusher/engine.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type Engine struct {
3838
transactions storage.Transactions
3939

4040
notifier engine.Notifier
41-
inbound *fifoqueue.FifoQueue
41+
queue *fifoqueue.FifoQueue
4242

4343
component.Component
4444
cm *component.ComponentManager
@@ -48,11 +48,26 @@ type Engine struct {
4848
var _ network.Engine = (*Engine)(nil)
4949
var _ component.Component = (*Engine)(nil)
5050

51-
func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, engMetrics module.EngineMetrics, colMetrics module.CollectionMetrics, me module.Local, collections storage.Collections, transactions storage.Transactions) (*Engine, error) {
52-
// TODO length observer metrics
53-
inbound, err := fifoqueue.NewFifoQueue(1000)
51+
// New creates a new pusher engine.
52+
func New(
53+
log zerolog.Logger,
54+
net network.EngineRegistry,
55+
state protocol.State,
56+
engMetrics module.EngineMetrics,
57+
mempoolMetrics module.MempoolMetrics,
58+
colMetrics module.CollectionMetrics,
59+
me module.Local,
60+
collections storage.Collections,
61+
transactions storage.Transactions,
62+
) (*Engine, error) {
63+
queue, err := fifoqueue.NewFifoQueue(
64+
1000,
65+
fifoqueue.WithLengthObserver(func(len int) {
66+
mempoolMetrics.MempoolEntries(metrics.ResourceSubmitCollectionGuaranteesQueue, uint(len))
67+
}),
68+
)
5469
if err != nil {
55-
return nil, fmt.Errorf("could not create inbound fifoqueue: %w", err)
70+
return nil, fmt.Errorf("could not create fifoqueue: %w", err)
5671
}
5772

5873
notifier := engine.NewNotifier()
@@ -67,7 +82,7 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e
6782
transactions: transactions,
6883

6984
notifier: notifier,
70-
inbound: inbound,
85+
queue: queue,
7186
}
7287

7388
conduit, err := net.Register(channels.PushGuarantees, e)
@@ -107,7 +122,7 @@ func (e *Engine) outboundQueueWorker(ctx irrecoverable.SignalerContext, ready co
107122
// Only returns when the queue is empty (or the engine is terminated).
108123
func (e *Engine) processOutboundMessages(ctx context.Context) error {
109124
for {
110-
nextMessage, ok := e.inbound.Pop()
125+
nextMessage, ok := e.queue.Pop()
111126
if !ok {
112127
return nil
113128
}
@@ -167,9 +182,9 @@ func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, mes
167182
// SubmitCollectionGuarantee adds a collection guarantee to the engine's queue
168183
// to later be published to consensus nodes.
169184
func (e *Engine) SubmitCollectionGuarantee(msg *messages.SubmitCollectionGuarantee) {
170-
ok := e.inbound.Push(msg)
185+
ok := e.queue.Push(msg)
171186
if !ok {
172-
e.log.Err(fmt.Errorf("failed to store collection guarantee in queue"))
187+
engine.LogError(e.log, fmt.Errorf("failed to store collection guarantee in queue"))
173188
return
174189
}
175190
e.notifier.Notify()

engine/collection/pusher/engine_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ func (suite *Suite) SetupTest() {
7272
suite.state,
7373
metrics,
7474
metrics,
75+
metrics,
7576
suite.me,
7677
suite.collections,
7778
suite.transactions,

engine/testutil/nodes.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ func CollectionNode(t *testing.T, hub *stub.Hub, identity bootstrap.NodeInfo, ro
313313
retrieve)
314314
require.NoError(t, err)
315315

316-
pusherEngine, err := pusher.New(node.Log, node.Net, node.State, node.Metrics, node.Metrics, node.Me, collections, transactions)
316+
pusherEngine, err := pusher.New(node.Log, node.Net, node.State, node.Metrics, node.Metrics, node.Metrics, node.Me, collections, transactions)
317317
require.NoError(t, err)
318318

319319
clusterStateFactory, err := factories.NewClusterStateFactory(

module/metrics/labels.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ const (
113113
ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel
114114
ResourceClusterBlockProposalQueue = "cluster_compliance_proposal_queue" // collection node, compliance engine
115115
ResourceTransactionIngestQueue = "ingest_transaction_queue" // collection node, ingest engine
116+
ResourceSubmitCollectionGuaranteesQueue = "submit_col_guarantee_queue" // collection node, pusher engine
116117
ResourceBeaconKey = "beacon-key" // consensus node, DKG engine
117118
ResourceDKGMessage = "dkg_private_message" // consensus, DKG messaging engine
118119
ResourceApprovalQueue = "sealing_approval_queue" // consensus node, sealing engine

0 commit comments

Comments
 (0)