forked from cometbft/cometbft
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathunsync_local_client.go
More file actions
138 lines (108 loc) · 4.48 KB
/
unsync_local_client.go
File metadata and controls
138 lines (108 loc) · 4.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package abcicli
import (
"context"
"sync"
"github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/service"
)
type unsyncLocalClient struct {
service.BaseService
types.Application
mtx sync.Mutex
Callback
}
var _ Client = (*unsyncLocalClient)(nil)
// NewUnsyncLocalClient creates a local client, which wraps the application
// interface that Comet as the client will call to the application as the
// server.
//
// This differs from [NewLocalClient] in that it returns a client that only
// maintains a mutex over the callback used by CheckTxAsync and not over the
// application, leaving it up to the proxy to handle all concurrency. If the
// proxy does not impose any concurrency restrictions, it is then left up to
// the application to implement its own concurrency for the relevant group of
// calls.
func NewUnsyncLocalClient(app types.Application) Client {
cli := &unsyncLocalClient{
Application: app,
}
cli.BaseService = *service.NewBaseService(nil, "unsyncLocalClient", cli)
return cli
}
func (app *unsyncLocalClient) SetResponseCallback(cb Callback) {
app.mtx.Lock()
app.Callback = cb
app.mtx.Unlock()
}
func (app *unsyncLocalClient) CheckTxAsync(ctx context.Context, req *types.CheckTxRequest) (*ReqRes, error) {
reqres := NewReqRes(types.ToCheckTxRequest(req))
go func() {
res, err := app.Application.CheckTx(ctx, req)
if err != nil {
reqres.Response = types.ToExceptionResponse("") // optimistic recheck failed
} else {
reqres.Response = types.ToCheckTxResponse(res)
}
if app.Callback != nil {
app.Callback(reqres.Request, reqres.Response)
}
reqres.InvokeCallback()
}()
return reqres, nil
}
// -------------------------------------------------------
func (*unsyncLocalClient) Error() error {
return nil
}
func (*unsyncLocalClient) Flush(context.Context) error {
return nil
}
func (*unsyncLocalClient) Echo(_ context.Context, msg string) (*types.EchoResponse, error) {
return &types.EchoResponse{Message: msg}, nil
}
func (app *unsyncLocalClient) Info(ctx context.Context, req *types.InfoRequest) (*types.InfoResponse, error) {
return app.Application.Info(ctx, req)
}
func (app *unsyncLocalClient) CheckTx(ctx context.Context, req *types.CheckTxRequest) (*types.CheckTxResponse, error) {
return app.Application.CheckTx(ctx, req)
}
func (app *unsyncLocalClient) Query(ctx context.Context, req *types.QueryRequest) (*types.QueryResponse, error) {
return app.Application.Query(ctx, req)
}
func (app *unsyncLocalClient) Commit(ctx context.Context, req *types.CommitRequest) (*types.CommitResponse, error) {
return app.Application.Commit(ctx, req)
}
func (app *unsyncLocalClient) InitChain(ctx context.Context, req *types.InitChainRequest) (*types.InitChainResponse, error) {
return app.Application.InitChain(ctx, req)
}
func (app *unsyncLocalClient) ListSnapshots(ctx context.Context, req *types.ListSnapshotsRequest) (*types.ListSnapshotsResponse, error) {
return app.Application.ListSnapshots(ctx, req)
}
func (app *unsyncLocalClient) OfferSnapshot(ctx context.Context, req *types.OfferSnapshotRequest) (*types.OfferSnapshotResponse, error) {
return app.Application.OfferSnapshot(ctx, req)
}
func (app *unsyncLocalClient) LoadSnapshotChunk(ctx context.Context,
req *types.LoadSnapshotChunkRequest,
) (*types.LoadSnapshotChunkResponse, error) {
return app.Application.LoadSnapshotChunk(ctx, req)
}
func (app *unsyncLocalClient) ApplySnapshotChunk(ctx context.Context,
req *types.ApplySnapshotChunkRequest,
) (*types.ApplySnapshotChunkResponse, error) {
return app.Application.ApplySnapshotChunk(ctx, req)
}
func (app *unsyncLocalClient) PrepareProposal(ctx context.Context, req *types.PrepareProposalRequest) (*types.PrepareProposalResponse, error) {
return app.Application.PrepareProposal(ctx, req)
}
func (app *unsyncLocalClient) ProcessProposal(ctx context.Context, req *types.ProcessProposalRequest) (*types.ProcessProposalResponse, error) {
return app.Application.ProcessProposal(ctx, req)
}
func (app *unsyncLocalClient) ExtendVote(ctx context.Context, req *types.ExtendVoteRequest) (*types.ExtendVoteResponse, error) {
return app.Application.ExtendVote(ctx, req)
}
func (app *unsyncLocalClient) VerifyVoteExtension(ctx context.Context, req *types.VerifyVoteExtensionRequest) (*types.VerifyVoteExtensionResponse, error) {
return app.Application.VerifyVoteExtension(ctx, req)
}
func (app *unsyncLocalClient) FinalizeBlock(ctx context.Context, req *types.FinalizeBlockRequest) (*types.FinalizeBlockResponse, error) {
return app.Application.FinalizeBlock(ctx, req)
}