Skip to content

Commit e9f08d2

Browse files
authored
refactor(events): make events handler as service (#1980)
Signed-off-by: Artur Troian <[email protected]> Co-authored-by: Artur Troian <[email protected]>
1 parent 5c5f8f6 commit e9f08d2

File tree

3 files changed

+102
-31
lines changed

3 files changed

+102
-31
lines changed

events/cmd/root.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,20 @@ func getEvents(ctx context.Context, cmd *cobra.Command, _ []string) error {
5050
group, ctx := errgroup.WithContext(ctx)
5151

5252
subscriber, err := bus.Subscribe()
53+
if err != nil {
54+
return err
55+
}
5356

57+
evtSvc, err := events.NewEvents(ctx, cctx.Client, "akash-cli", bus)
5458
if err != nil {
5559
return err
5660
}
5761

5862
group.Go(func() error {
59-
return events.Publish(ctx, cctx.Client, "akash-cli", bus)
63+
<-ctx.Done()
64+
evtSvc.Shutdown()
65+
66+
return nil
6067
})
6168

6269
group.Go(func() error {

events/publish.go

Lines changed: 93 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ package events
33
import (
44
"context"
55

6-
"golang.org/x/sync/errgroup"
7-
6+
"github.com/boz/go-lifecycle"
87
sdk "github.com/cosmos/cosmos-sdk/types"
98
abci "github.com/tendermint/tendermint/abci/types"
109
tmclient "github.com/tendermint/tendermint/rpc/client"
1110
ctypes "github.com/tendermint/tendermint/rpc/core/types"
1211
tmtmtypes "github.com/tendermint/tendermint/types"
12+
"golang.org/x/sync/errgroup"
1313

1414
atypes "github.com/akash-network/akash-api/go/node/audit/v1beta3"
1515
dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3"
@@ -20,57 +20,124 @@ import (
2020
"github.com/akash-network/node/pubsub"
2121
)
2222

23-
// Publish events using tm buses to clients. Waits on context
24-
// shutdown signals to exit.
25-
func Publish(ctx context.Context, client tmclient.Client, name string, bus pubsub.Bus) (err error) {
23+
type events struct {
24+
ctx context.Context
25+
group *errgroup.Group
26+
client tmclient.Client
27+
bus pubsub.Bus
28+
lc lifecycle.Lifecycle
29+
}
30+
31+
// Service represents an event monitoring service that subscribes to and processes blockchain events.
32+
// It monitors block headers and various transaction events, publishing them to a message bus.
33+
type Service interface {
34+
// Shutdown gracefully stops the event monitoring service and cleans up resources.
35+
// Once called, the service will unsubscribe from events and complete any pending operations.
36+
Shutdown()
37+
}
38+
39+
// NewEvents creates and initializes a new blockchain event monitoring service.
40+
//
41+
// Parameters:
42+
// - pctx: Parent context for controlling the service lifecycle
43+
// - client: Tendermint RPC client for interacting with the blockchain
44+
// - name: Service name used as a prefix for subscription identifiers
45+
// - bus: Message bus for publishing processed events
46+
//
47+
// Returns:
48+
// - Service: A running event monitoring service interface
49+
// - error: Any error encountered during service initialization
50+
//
51+
// The service subscribes to block header events and processes them to extract and publish
52+
// various transaction events (deployment, market, provider, audit) to the provided message bus.
53+
// The service starts monitoring events immediately and will continue until either the context
54+
// is canceled or Shutdown() is called.
55+
func NewEvents(pctx context.Context, client tmclient.Client, name string, bus pubsub.Bus) (Service, error) {
56+
group, ctx := errgroup.WithContext(pctx)
57+
58+
ev := &events{
59+
ctx: ctx,
60+
group: group,
61+
client: client,
62+
lc: lifecycle.New(),
63+
bus: bus,
64+
}
65+
2666
const (
2767
queuesz = 1000
2868
)
29-
var (
30-
blkHeaderName = name + "-blk-hdr"
31-
)
69+
70+
var blkHeaderName = name + "-blk-hdr"
3271

3372
tmbus := client.(tmclient.EventsClient)
3473

3574
blkch, err := tmbus.Subscribe(ctx, blkHeaderName, blkHeaderQuery().String(), queuesz)
3675
if err != nil {
37-
return err
76+
return nil, err
3877
}
39-
defer func() {
40-
err = tmbus.UnsubscribeAll(ctx, blkHeaderName)
41-
}()
4278

43-
g, ctx := errgroup.WithContext(ctx)
79+
startch := make(chan struct{}, 1)
80+
81+
group.Go(func() error {
82+
ev.lc.WatchContext(ctx)
4483

45-
g.Go(func() error {
46-
return publishEvents(ctx, client, blkch, bus)
84+
return ev.lc.Error()
4785
})
4886

49-
return g.Wait()
87+
group.Go(func() error {
88+
return ev.run(blkHeaderName, blkch, startch)
89+
})
90+
91+
select {
92+
case <-pctx.Done():
93+
return nil, pctx.Err()
94+
case <-startch:
95+
return ev, nil
96+
}
97+
}
98+
99+
func (e *events) Shutdown() {
100+
_, stopped := <-e.lc.Done()
101+
if stopped {
102+
return
103+
}
104+
105+
e.lc.Shutdown(nil)
106+
107+
_ = e.group.Wait()
50108
}
51109

52-
func publishEvents(ctx context.Context, client tmclient.Client, ch <-chan ctypes.ResultEvent, bus pubsub.Bus) error {
53-
var err error
110+
func (e *events) run(subs string, ch <-chan ctypes.ResultEvent, startch chan<- struct{}) error {
111+
tmbus := e.client.(tmclient.EventsClient)
112+
113+
defer func() {
114+
_ = tmbus.UnsubscribeAll(e.ctx, subs)
115+
116+
e.lc.ShutdownCompleted()
117+
}()
118+
119+
startch <- struct{}{}
54120

55121
loop:
56122
for {
57123
select {
58-
case <-ctx.Done():
124+
case err := <-e.lc.ShutdownRequest():
125+
e.lc.ShutdownInitiated(err)
59126
break loop
60-
case ed := <-ch:
127+
case ev := <-ch:
61128
// nolint: gocritic
62-
switch evt := ed.Data.(type) {
129+
switch evt := ev.Data.(type) {
63130
case tmtmtypes.EventDataNewBlockHeader:
64-
processBlock(ctx, bus, client, evt.Header.Height)
131+
e.processBlock(evt.Header.Height)
65132
}
66133
}
67134
}
68135

69-
return err
136+
return e.ctx.Err()
70137
}
71138

72-
func processBlock(ctx context.Context, bus pubsub.Bus, client tmclient.Client, height int64) {
73-
blkResults, err := client.BlockResults(ctx, &height)
139+
func (e *events) processBlock(height int64) {
140+
blkResults, err := e.client.BlockResults(e.ctx, &height)
74141
if err != nil {
75142
return
76143
}
@@ -82,11 +149,9 @@ func processBlock(ctx context.Context, bus pubsub.Bus, client tmclient.Client, h
82149

83150
for _, ev := range tx.Events {
84151
if mev, ok := processEvent(ev); ok {
85-
if err := bus.Publish(mev); err != nil {
86-
bus.Close()
152+
if err := e.bus.Publish(mev); err != nil {
87153
return
88154
}
89-
continue
90155
}
91156
}
92157
}

events/query.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,5 @@ import (
1919
// }
2020

2121
func blkHeaderQuery() pubsub.Query {
22-
return tmquery.MustParse(
23-
fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventNewBlockHeader))
22+
return tmquery.MustParse(fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventNewBlockHeader))
2423
}

0 commit comments

Comments
 (0)