-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathconsumer.go
More file actions
48 lines (43 loc) · 1.38 KB
/
consumer.go
File metadata and controls
48 lines (43 loc) · 1.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//nolint:dupl
package main
import (
"fmt"
"github.com/meshplus/pier-client-ethereum/relay"
)
func (c *Client) StartConsumer() error {
loop := func(interchainCh chan *relay.BrokerThrowInterchainEvent, receiptCh chan *relay.BrokerThrowReceiptEvent) {
for {
select {
case interchainEv := <-interchainCh:
ibtp, err := c.Convert2IBTP(interchainEv, int64(c.config.Ether.TimeoutHeight))
if err != nil {
logger.Warn("convert to IBTP", "src", interchainEv.SrcFullID, "dst", interchainEv.DstFullID, "idx", interchainEv.Index, "err", err.Error())
continue
}
c.eventC <- ibtp
case receiptEv := <-receiptCh:
ibtp, err := c.Convert2Receipt(receiptEv)
if err != nil {
logger.Warn("convert to IBTP", "src", receiptEv.SrcFullID, "dst", receiptEv.DstFullID, "idx", receiptEv.Index, "err", err.Error())
continue
}
c.eventC <- ibtp
case <-c.ctx.Done():
return
}
}
}
interchainCh := make(chan *relay.BrokerThrowInterchainEvent, 1024)
receiptCh := make(chan *relay.BrokerThrowReceiptEvent, 1024)
_, err := c.session.Contract.WatchThrowInterchainEvent(nil, interchainCh)
if err != nil {
return fmt.Errorf("watch event: %s", err)
}
_, err = c.session.Contract.WatchThrowReceiptEvent(nil, receiptCh)
if err != nil {
return fmt.Errorf("watch event: %s", err)
}
go loop(interchainCh, receiptCh)
logger.Info("Consumer started")
return nil
}