Skip to content

Commit 81b73e7

Browse files
colmaziaRogerKSI
andauthored
[Grogu] fix by spec (#178)
* signaller can only add pending signal id * separate build tx from broadcast tx * add sync max block height * only parallel on calculate gas * add debug log in pending signal id set and remove * fix cmd grogu * fix atomic bug, calculate gas bug * update buildSignedTx to log more error * fix bug * fix again * fix format * implement updater with chain subscribe * signaller use the same now time * fix lint, reduce updater log * fix * add start bothan script for local testing * separate clients from context * delete unnecessary new line * cleaning for filtering and prepare submit price process * cleanup * fix format * remove dup keyring from the submitter * fix merge * clean up broadcast function * continue on some clients create error, extract as a function * updater implement multi-clients * fix from comments * move function buildSignedTx to submitter * update reference source query to support non-tx situation * fix error message * fix from comments --------- Co-authored-by: Kitipong Sirirueangsakul <[email protected]>
1 parent bf4c269 commit 81b73e7

File tree

17 files changed

+610
-276
lines changed

17 files changed

+610
-276
lines changed

cmd/grogu/cmd/keys.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"bufio"
55
"fmt"
66
"strings"
7+
"sync/atomic"
78

8-
"github.com/cometbft/cometbft/rpc/client/http"
99
"github.com/cosmos/cosmos-sdk/client"
1010
"github.com/cosmos/cosmos-sdk/client/flags"
1111
"github.com/cosmos/cosmos-sdk/client/input"
@@ -168,7 +168,7 @@ func keysListCmd(ctx *grogu.Context) *cobra.Command {
168168
Use: "list",
169169
Aliases: []string{"l"},
170170
Short: "List all the keys in the keychain",
171-
Args: cobra.ExactArgs(0),
171+
Args: cobra.NoArgs,
172172
RunE: createKeysListRunE(ctx),
173173
}
174174

@@ -184,26 +184,26 @@ func createKeysListRunE(ctx *grogu.Context) func(cmd *cobra.Command, args []stri
184184
return func(cmd *cobra.Command, args []string) error {
185185
bandConfig := band.MakeEncodingConfig()
186186

187-
nodesURIs := strings.Split(viper.GetString(flagNodes), ",")
188-
clients := make([]client.Context, 0, len(nodesURIs))
189-
for _, URI := range nodesURIs {
190-
httpClient, err := http.New(URI, "/websocket")
191-
if err != nil {
192-
continue
193-
}
194-
cl := client.Context{
195-
Client: httpClient,
196-
ChainID: ctx.Config.ChainID,
197-
Codec: bandConfig.Marshaler,
198-
InterfaceRegistry: bandConfig.InterfaceRegistry,
199-
Keyring: ctx.Keyring,
200-
TxConfig: bandConfig.TxConfig,
201-
BroadcastMode: flags.BroadcastSync,
202-
}
203-
clients = append(clients, cl)
187+
clientCtx := client.Context{
188+
ChainID: ctx.Config.ChainID,
189+
Codec: bandConfig.Marshaler,
190+
InterfaceRegistry: bandConfig.InterfaceRegistry,
191+
Keyring: ctx.Keyring,
192+
TxConfig: bandConfig.TxConfig,
193+
BroadcastMode: flags.BroadcastSync,
194+
}
195+
196+
nodeURIs := strings.Split(viper.GetString(flagNodes), ",")
197+
clients, stopClients, err := createClients(nodeURIs)
198+
if err != nil {
199+
return err
204200
}
201+
defer stopClients()
205202

206-
feedQuerier := querier.NewFeedQuerier(clients)
203+
maxBlockHeight := new(atomic.Int64)
204+
maxBlockHeight.Store(0)
205+
206+
feedQuerier := querier.NewFeedQuerier(clientCtx, clients, maxBlockHeight)
207207

208208
keys, err := ctx.Keyring.List()
209209
if err != nil {
@@ -237,6 +237,7 @@ func createKeysListRunE(ctx *grogu.Context) func(cmd *cobra.Command, args []stri
237237
s = ":x:"
238238
}
239239
}
240+
240241
_, err = emoji.Printf("%s%s => %s\n", s, key.Name, address.String())
241242
if err != nil {
242243
return err

cmd/grogu/cmd/run.go

Lines changed: 49 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ import (
55
"os/signal"
66
"strings"
77
"sync"
8+
"sync/atomic"
89
"syscall"
910
"time"
1011

1112
bothan "github.com/bandprotocol/bothan/bothan-api/client/go-client"
1213
"github.com/cometbft/cometbft/libs/log"
13-
"github.com/cometbft/cometbft/rpc/client/http"
1414
"github.com/cosmos/cosmos-sdk/client"
1515
"github.com/cosmos/cosmos-sdk/client/flags"
1616
sdk "github.com/cosmos/cosmos-sdk/types"
@@ -22,6 +22,7 @@ import (
2222
"github.com/bandprotocol/chain/v2/grogu/querier"
2323
"github.com/bandprotocol/chain/v2/grogu/signaller"
2424
"github.com/bandprotocol/chain/v2/grogu/submitter"
25+
"github.com/bandprotocol/chain/v2/grogu/updater"
2526
"github.com/bandprotocol/chain/v2/pkg/logger"
2627
"github.com/bandprotocol/chain/v2/x/feeds/types"
2728
)
@@ -83,39 +84,38 @@ func createRunE(ctx *context.Context) func(cmd *cobra.Command, args []string) er
8384
bandConfig := band.MakeEncodingConfig()
8485
chainID := viper.GetString(flags.FlagChainID)
8586

86-
// Split Node URIs and create RPC clients
87-
nodesURIs := strings.Split(viper.GetString(flagNodes), ",")
88-
clients := make([]client.Context, 0, len(nodesURIs))
89-
90-
for _, URI := range nodesURIs {
91-
httpClient, err := http.New(URI, "/websocket")
92-
if err != nil {
93-
return err
94-
}
95-
cl := client.Context{
96-
Client: httpClient,
97-
ChainID: chainID,
98-
Codec: bandConfig.Marshaler,
99-
InterfaceRegistry: bandConfig.InterfaceRegistry,
100-
Keyring: ctx.Keyring,
101-
TxConfig: bandConfig.TxConfig,
102-
BroadcastMode: flags.BroadcastSync,
103-
}
104-
clients = append(clients, cl)
105-
}
106-
107-
// set up Queriers
108-
authQuerier := querier.NewAuthQuerier(clients)
109-
feedQuerier := querier.NewFeedQuerier(clients)
110-
txQuerier := querier.NewTxQuerier(clients)
111-
11287
// Initialize logger
11388
allowLevel, err := log.AllowLevel(ctx.Config.LogLevel)
11489
if err != nil {
11590
return err
11691
}
11792
l := logger.New(allowLevel)
11893

94+
// Split Node URIs and create RPC clients
95+
clientCtx := client.Context{
96+
ChainID: chainID,
97+
Codec: bandConfig.Marshaler,
98+
InterfaceRegistry: bandConfig.InterfaceRegistry,
99+
Keyring: ctx.Keyring,
100+
TxConfig: bandConfig.TxConfig,
101+
BroadcastMode: flags.BroadcastSync,
102+
}
103+
104+
nodeURIs := strings.Split(viper.GetString(flagNodes), ",")
105+
clients, stopClients, err := createClients(nodeURIs)
106+
if err != nil {
107+
return err
108+
}
109+
defer stopClients()
110+
111+
// Set up Queriers
112+
maxBlockHeight := new(atomic.Int64)
113+
maxBlockHeight.Store(0)
114+
115+
authQuerier := querier.NewAuthQuerier(clientCtx, clients, maxBlockHeight)
116+
feedQuerier := querier.NewFeedQuerier(clientCtx, clients, maxBlockHeight)
117+
txQuerier := querier.NewTxQuerier(clientCtx, clients)
118+
119119
// Setup Bothan service
120120
timeout, err := time.ParseDuration(ctx.Config.BothanTimeout)
121121
if err != nil {
@@ -165,9 +165,9 @@ func createRunE(ctx *context.Context) func(cmd *cobra.Command, args []string) er
165165

166166
// Setup Submitter
167167
submitterService, err := submitter.New(
168+
clientCtx,
168169
clients,
169170
l,
170-
ctx.Keyring,
171171
submitSignalPriceCh,
172172
authQuerier,
173173
txQuerier,
@@ -182,19 +182,36 @@ func createRunE(ctx *context.Context) func(cmd *cobra.Command, args []string) er
182182
return err
183183
}
184184

185-
// Start all
186-
go signallerService.Start()
187-
go submitterService.Start()
185+
// Setup Updater
186+
maxCurrentFeedEventHeight := new(atomic.Int64)
187+
maxCurrentFeedEventHeight.Store(0)
188188

189-
l.Info("Grogu has started")
189+
maxUpdateRefSourceEventHeight := new(atomic.Int64)
190+
maxUpdateRefSourceEventHeight.Store(0)
191+
192+
updaterService := updater.New(
193+
feedQuerier,
194+
clients,
195+
l,
196+
maxCurrentFeedEventHeight,
197+
maxUpdateRefSourceEventHeight,
198+
)
190199

191200
// Listen for termination signals for graceful shutdown
192201
sigChan := make(chan os.Signal, 1)
193202
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
194203

204+
// Start all services
205+
go updaterService.Start(sigChan)
206+
go signallerService.Start()
207+
go submitterService.Start()
208+
209+
l.Info("Grogu has started")
210+
195211
<-sigChan
196212
l.Info("Received stop signal, shutting down")
197213
l.Info("Grogu has stopped")
214+
198215
return nil
199216
}
200217
}

cmd/grogu/cmd/utils.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package cmd
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/cometbft/cometbft/rpc/client/http"
7+
)
8+
9+
func createClients(nodeURIs []string) ([]*http.HTTP, func(), error) {
10+
clients := make([]*http.HTTP, 0, len(nodeURIs))
11+
for _, uri := range nodeURIs {
12+
httpClient, err := http.New(uri, "/websocket")
13+
if err != nil {
14+
fmt.Printf("[Grogu] failed to create HTTP client with error: %v\n", err)
15+
continue
16+
}
17+
18+
if err = httpClient.Start(); err != nil {
19+
fmt.Printf("[Grogu] failed to start HTTP client with error: %v\n", err)
20+
continue
21+
}
22+
23+
clients = append(clients, httpClient)
24+
}
25+
26+
if len(clients) == 0 {
27+
return nil, nil, fmt.Errorf("no clients are available")
28+
}
29+
30+
// Function to stop all clients created so far
31+
stopClients := func() {
32+
for _, client := range clients {
33+
_ = client.Stop()
34+
}
35+
}
36+
37+
return clients, stopClients, nil
38+
}

cmd/grogu/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package grogu
1+
package main
22

33
import (
44
"fmt"

grogu/querier/auth.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,32 @@
11
package querier
22

33
import (
4+
"sync/atomic"
5+
6+
"github.com/cometbft/cometbft/rpc/client/http"
47
"github.com/cosmos/cosmos-sdk/client"
58
sdk "github.com/cosmos/cosmos-sdk/types"
69
auth "github.com/cosmos/cosmos-sdk/x/auth/types"
710
)
811

912
type AuthQuerier struct {
10-
queryClients []auth.QueryClient
13+
queryClients []auth.QueryClient
14+
maxBlockHeight *atomic.Int64
1115
}
1216

13-
func NewAuthQuerier(clients []client.Context) *AuthQuerier {
17+
func NewAuthQuerier(
18+
clientContext client.Context,
19+
clients []*http.HTTP,
20+
maxBlockHeight *atomic.Int64,
21+
) *AuthQuerier {
1422
queryClients := make([]auth.QueryClient, 0, len(clients))
1523
for _, cl := range clients {
16-
queryClients = append(queryClients, auth.NewQueryClient(cl))
24+
queryClients = append(queryClients, auth.NewQueryClient(clientContext.WithClient(cl)))
1725
}
1826

1927
return &AuthQuerier{
2028
queryClients,
29+
maxBlockHeight,
2130
}
2231
}
2332

@@ -28,5 +37,5 @@ func (q *AuthQuerier) QueryAccount(address sdk.Address) (*auth.QueryAccountRespo
2837
}
2938

3039
in := auth.QueryAccountRequest{Address: address.String()}
31-
return getMaxBlockHeightResponse(fs, &in)
40+
return getMaxBlockHeightResponse(fs, &in, q.maxBlockHeight)
3241
}

grogu/querier/feed.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,27 @@
11
package querier
22

33
import (
4+
"sync/atomic"
5+
6+
"github.com/cometbft/cometbft/rpc/client/http"
47
"github.com/cosmos/cosmos-sdk/client"
58
sdk "github.com/cosmos/cosmos-sdk/types"
69

710
feeds "github.com/bandprotocol/chain/v2/x/feeds/types"
811
)
912

1013
type FeedQuerier struct {
11-
queryClients []feeds.QueryClient
14+
queryClients []feeds.QueryClient
15+
maxBlockHeight *atomic.Int64
1216
}
1317

14-
func NewFeedQuerier(clients []client.Context) *FeedQuerier {
18+
func NewFeedQuerier(clientContext client.Context, clients []*http.HTTP, maxBlockHeight *atomic.Int64) *FeedQuerier {
1519
queryClients := make([]feeds.QueryClient, 0, len(clients))
1620
for _, cl := range clients {
17-
queryClients = append(queryClients, feeds.NewQueryClient(cl))
21+
queryClients = append(queryClients, feeds.NewQueryClient(clientContext.WithClient(cl)))
1822
}
1923

20-
return &FeedQuerier{queryClients}
24+
return &FeedQuerier{queryClients, maxBlockHeight}
2125
}
2226

2327
func (q *FeedQuerier) QueryValidValidator(valAddress sdk.ValAddress) (*feeds.QueryValidValidatorResponse, error) {
@@ -34,7 +38,7 @@ func (q *FeedQuerier) QueryValidValidator(valAddress sdk.ValAddress) (*feeds.Que
3438
Validator: valAddress.String(),
3539
}
3640

37-
return getMaxBlockHeightResponse(fs, &in)
41+
return getMaxBlockHeightResponse(fs, &in, q.maxBlockHeight)
3842
}
3943

4044
func (q *FeedQuerier) QueryIsFeeder(
@@ -50,7 +54,7 @@ func (q *FeedQuerier) QueryIsFeeder(
5054
FeederAddress: feeder.String(),
5155
ValidatorAddress: validator.String(),
5256
}
53-
return getMaxBlockHeightResponse(fs, &in)
57+
return getMaxBlockHeightResponse(fs, &in, q.maxBlockHeight)
5458
}
5559

5660
func (q *FeedQuerier) QueryValidatorPrices(valAddress sdk.ValAddress) (*feeds.QueryValidatorPricesResponse, error) {
@@ -66,7 +70,7 @@ func (q *FeedQuerier) QueryValidatorPrices(valAddress sdk.ValAddress) (*feeds.Qu
6670
in := feeds.QueryValidatorPricesRequest{
6771
Validator: valAddress.String(),
6872
}
69-
return getMaxBlockHeightResponse(fs, &in)
73+
return getMaxBlockHeightResponse(fs, &in, q.maxBlockHeight)
7074
}
7175

7276
func (q *FeedQuerier) QueryParams() (*feeds.QueryParamsResponse, error) {
@@ -76,7 +80,7 @@ func (q *FeedQuerier) QueryParams() (*feeds.QueryParamsResponse, error) {
7680
}
7781

7882
in := feeds.QueryParamsRequest{}
79-
return getMaxBlockHeightResponse(fs, &in)
83+
return getMaxBlockHeightResponse(fs, &in, q.maxBlockHeight)
8084
}
8185

8286
func (q *FeedQuerier) QueryCurrentFeeds() (*feeds.QueryCurrentFeedsResponse, error) {
@@ -90,5 +94,19 @@ func (q *FeedQuerier) QueryCurrentFeeds() (*feeds.QueryCurrentFeedsResponse, err
9094
}
9195

9296
in := feeds.QueryCurrentFeedsRequest{}
93-
return getMaxBlockHeightResponse(fs, &in)
97+
return getMaxBlockHeightResponse(fs, &in, q.maxBlockHeight)
98+
}
99+
100+
func (q *FeedQuerier) QueryReferenceSourceConfig() (*feeds.QueryReferenceSourceConfigResponse, error) {
101+
fs := make(
102+
[]QueryFunction[feeds.QueryReferenceSourceConfigRequest, feeds.QueryReferenceSourceConfigResponse],
103+
0,
104+
len(q.queryClients),
105+
)
106+
for _, queryClient := range q.queryClients {
107+
fs = append(fs, queryClient.ReferenceSourceConfig)
108+
}
109+
110+
in := feeds.QueryReferenceSourceConfigRequest{}
111+
return getMaxBlockHeightResponse(fs, &in, q.maxBlockHeight)
94112
}

0 commit comments

Comments
 (0)