Skip to content

Commit ef61108

Browse files
committed
refactor(event): parse events after querying block results
fixes issue with server side of websocket closing subscription due to client not reading events fast. this happens when block has substantial amount of txs thus events count can be > 1000. this implementation now just waits for event with block header then queries block info and parses events. Signed-off-by: Artur Troian <[email protected]>
1 parent 32f2443 commit ef61108

File tree

2 files changed

+38
-38
lines changed

2 files changed

+38
-38
lines changed

events/publish.go

Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -22,46 +22,34 @@ import (
2222

2323
// Publish events using tm buses to clients. Waits on context
2424
// shutdown signals to exit.
25-
func Publish(ctx context.Context, tmbus tmclient.EventsClient, name string, bus pubsub.Bus) (err error) {
26-
25+
func Publish(ctx context.Context, client tmclient.Client, name string, bus pubsub.Bus) (err error) {
2726
const (
28-
queuesz = 100
27+
queuesz = 1000
2928
)
3029
var (
31-
txname = name + "-tx"
32-
blkname = name + "-blk"
30+
blkHeaderName = name + "-blk-hdr"
3331
)
3432

35-
txch, err := tmbus.Subscribe(ctx, txname, txQuery().String(), queuesz)
36-
if err != nil {
37-
return err
38-
}
39-
defer func() {
40-
err = tmbus.UnsubscribeAll(ctx, txname)
41-
}()
33+
tmbus := client.(tmclient.EventsClient)
4234

43-
blkch, err := tmbus.Subscribe(ctx, blkname, blkQuery().String(), queuesz)
35+
blkch, err := tmbus.Subscribe(ctx, blkHeaderName, blkHeaderQuery().String(), queuesz)
4436
if err != nil {
4537
return err
4638
}
4739
defer func() {
48-
err = tmbus.UnsubscribeAll(ctx, blkname)
40+
err = tmbus.UnsubscribeAll(ctx, blkHeaderName)
4941
}()
5042

5143
g, ctx := errgroup.WithContext(ctx)
5244

5345
g.Go(func() error {
54-
return publishEvents(ctx, txch, bus)
55-
})
56-
57-
g.Go(func() error {
58-
return publishEvents(ctx, blkch, bus)
46+
return publishEvents(ctx, client, blkch, bus)
5947
})
6048

6149
return g.Wait()
6250
}
6351

64-
func publishEvents(ctx context.Context, ch <-chan ctypes.ResultEvent, bus pubsub.Bus) error {
52+
func publishEvents(ctx context.Context, client tmclient.Client, ch <-chan ctypes.ResultEvent, bus pubsub.Bus) error {
6553
var err error
6654

6755
loop:
@@ -70,30 +58,37 @@ loop:
7058
case <-ctx.Done():
7159
break loop
7260
case ed := <-ch:
61+
// nolint: gocritic
7362
switch evt := ed.Data.(type) {
74-
case tmtmtypes.EventDataTx:
75-
if !evt.Result.IsOK() {
76-
continue
77-
}
78-
processEvents(bus, evt.Result.GetEvents())
7963
case tmtmtypes.EventDataNewBlockHeader:
80-
processEvents(bus, evt.ResultEndBlock.GetEvents())
64+
processBlock(ctx, bus, client, evt.Header.Height)
8165
}
8266
}
8367
}
8468

8569
return err
8670
}
8771

88-
func processEvents(bus pubsub.Bus, events []abci.Event) {
89-
for _, ev := range events {
90-
if mev, ok := processEvent(ev); ok {
91-
if err := bus.Publish(mev); err != nil {
92-
bus.Close()
93-
return
94-
}
72+
func processBlock(ctx context.Context, bus pubsub.Bus, client tmclient.Client, height int64) {
73+
blkResults, err := client.BlockResults(ctx, &height)
74+
if err != nil {
75+
return
76+
}
77+
78+
for _, tx := range blkResults.TxsResults {
79+
if tx == nil {
9580
continue
9681
}
82+
83+
for _, ev := range tx.Events {
84+
if mev, ok := processEvent(ev); ok {
85+
if err := bus.Publish(mev); err != nil {
86+
bus.Close()
87+
return
88+
}
89+
continue
90+
}
91+
}
9792
}
9893
}
9994

events/query.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,17 @@ import (
88
tmtypes "github.com/tendermint/tendermint/types"
99
)
1010

11-
func txQuery() pubsub.Query {
12-
return tmquery.MustParse(
13-
fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventTx))
14-
}
11+
// func txQuery() pubsub.Query {
12+
// return tmquery.MustParse(
13+
// fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventTx))
14+
// }
15+
//
16+
// func blkQuery() pubsub.Query {
17+
// return tmquery.MustParse(
18+
// fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventNewBlock))
19+
// }
1520

16-
func blkQuery() pubsub.Query {
21+
func blkHeaderQuery() pubsub.Query {
1722
return tmquery.MustParse(
1823
fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventNewBlockHeader))
1924
}

0 commit comments

Comments
 (0)