Skip to content

Commit 8dc6681

Browse files
authored
Custom nonce monitor for blobber txns (#802)
* Use a custom nonce monitor * More fail conditions * Add logging and fix recurring balance refresh * Add expiry on nonce reservation * Add more logging * Capture setting of nonce failure * Readd error * Txn nonce log upon set * Update txn verify to use nonce * Update other txn verify to use nonce * Fix error logging
1 parent bb0d7bd commit 8dc6681

File tree

13 files changed

+262
-49
lines changed

13 files changed

+262
-49
lines changed

code/go/0chain.net/blobbercore/allocation/workers.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ func sendFinalizeAllocation(a *Allocation) {
257257
var request finalizeRequest
258258
request.AllocationID = a.ID
259259

260+
// TODO should this be verified?
260261
err = tx.ExecuteSmartContract(
261262
transaction.STORAGE_CONTRACT_ADDRESS,
262263
transaction.FINALIZE_ALLOCATION,

code/go/0chain.net/blobbercore/challenge/protocol.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (cr *ChallengeEntity) SubmitChallengeToBC(ctx context.Context) (*transactio
6464
t *transaction.Transaction
6565
)
6666
for i := 0; i < 3; i++ {
67-
t, err = transaction.VerifyTransaction(txn.Hash, chain.GetServerChain())
67+
t, err = transaction.VerifyTransactionWithNonce(txn.Hash, txn.GetTransaction().GetTransactionNonce())
6868
if err == nil {
6969
break
7070
}

code/go/0chain.net/blobbercore/handler/health.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,17 @@ func getBlobberHealthCheckError() error {
2929
return err
3030
}
3131

32-
func BlobberHealthCheck() (string, error) {
32+
func BlobberHealthCheck() (*transaction.Transaction, error) {
3333
if config.Configuration.Capacity == 0 {
3434

3535
setBlobberHealthCheckError(ErrBlobberHasRemoved)
36-
return "", ErrBlobberHasRemoved
36+
return nil, ErrBlobberHasRemoved
3737
}
3838

3939
txn, err := transaction.NewTransactionEntity()
4040
if err != nil {
4141
setBlobberHealthCheckError(err)
42-
return "", err
42+
return nil, err
4343
}
4444

4545
err = txn.ExecuteSmartContract(transaction.STORAGE_CONTRACT_ADDRESS,
@@ -49,9 +49,9 @@ func BlobberHealthCheck() (string, error) {
4949
zap.String("err:", err.Error()))
5050
setBlobberHealthCheckError(err)
5151

52-
return "", err
52+
return nil, err
5353
}
5454

5555
setBlobberHealthCheckError(err)
56-
return txn.Hash, nil
56+
return txn, nil
5757
}

code/go/0chain.net/blobbercore/handler/protocol.go

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
1010
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
1111
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
12-
"github.com/0chain/blobber/code/go/0chain.net/core/chain"
1312
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
1413

1514
"github.com/0chain/blobber/code/go/0chain.net/core/node"
@@ -87,15 +86,15 @@ func RegisterBlobber(ctx context.Context) error {
8786

8887
b, err := config.ReloadFromChain(ctx, datastore.GetStore().GetDB())
8988
if err != nil || b.BaseURL != node.Self.GetURLBase() { // blobber is not registered yet, baseURL is changed
90-
txnHash, err := sendSmartContractBlobberAdd(ctx)
89+
txn, err := sendSmartContractBlobberAdd(ctx)
9190
if err != nil {
9291
logging.Logger.Error("Error when sending add request to blockchain", zap.Any("err", err))
9392
return err
9493
}
9594

96-
t, err := TransactionVerify(txnHash)
95+
t, err := TransactionVerify(txn)
9796
if err != nil {
98-
logging.Logger.Error("Failed to verify blobber register transaction", zap.Any("err", err), zap.String("txn.Hash", txnHash))
97+
logging.Logger.Error("Failed to verify blobber register transaction", zap.Any("err", err), zap.String("txn.Hash", txn.Hash))
9998
return err
10099
}
101100

@@ -115,14 +114,14 @@ func RegisterBlobber(ctx context.Context) error {
115114
// UpdateBlobber update blobber
116115
func UpdateBlobber(ctx context.Context) error {
117116

118-
txnHash, err := sendSmartContractBlobberAdd(ctx)
117+
txn, err := sendSmartContractBlobberAdd(ctx)
119118
if err != nil {
120119
return err
121120
}
122121

123-
t, err := TransactionVerify(txnHash)
122+
t, err := TransactionVerify(txn)
124123
if err != nil {
125-
logging.Logger.Error("Failed to verify blobber update transaction", zap.Any("err", err), zap.String("txn.Hash", txnHash))
124+
logging.Logger.Error("Failed to verify blobber update transaction", zap.Any("err", err), zap.String("txn.Hash", txn.Hash))
126125
return err
127126
}
128127

@@ -132,13 +131,13 @@ func UpdateBlobber(ctx context.Context) error {
132131
}
133132

134133
func RefreshPriceOnChain(ctx context.Context) error {
135-
txnHash, err := sendSmartContractBlobberAdd(ctx)
134+
txn, err := sendSmartContractBlobberAdd(ctx)
136135
if err != nil {
137136
return err
138137
}
139138

140-
if t, err := TransactionVerify(txnHash); err != nil {
141-
logging.Logger.Error("Failed to verify price refresh transaction", zap.Any("err", err), zap.String("txn.Hash", txnHash))
139+
if t, err := TransactionVerify(txn); err != nil {
140+
logging.Logger.Error("Failed to verify price refresh transaction", zap.Any("err", err), zap.String("txn.Hash", txn.Hash))
142141
} else {
143142
logging.Logger.Info("Verified price refresh transaction", zap.String("txn_hash", t.Hash), zap.Any("txn_output", t.TransactionOutput))
144143
}
@@ -147,39 +146,39 @@ func RefreshPriceOnChain(ctx context.Context) error {
147146
}
148147

149148
// sendSmartContractBlobberAdd Add or update blobber on blockchain
150-
func sendSmartContractBlobberAdd(ctx context.Context) (string, error) {
149+
func sendSmartContractBlobberAdd(ctx context.Context) (*transaction.Transaction, error) {
151150
// initialize storage node (ie blobber)
152151
txn, err := transaction.NewTransactionEntity()
153152
if err != nil {
154-
return "", err
153+
return nil, err
155154
}
156155

157156
sn, err := getStorageNode()
158157
if err != nil {
159-
return "", err
158+
return nil, err
160159
}
161160

162161
err = txn.ExecuteSmartContract(transaction.STORAGE_CONTRACT_ADDRESS,
163162
transaction.ADD_BLOBBER_SC_NAME, sn, 0)
164163
if err != nil {
165164
logging.Logger.Error("Failed to set blobber on the blockchain",
166165
zap.String("err:", err.Error()))
167-
return "", err
166+
return nil, err
168167
}
169168

170-
return txn.Hash, nil
169+
return txn, nil
171170
}
172171

173172
// UpdateBlobberOnChain updates latest changes in blobber's settings, capacity,etc.
174173
func UpdateBlobberOnChain(ctx context.Context) error {
175174

176-
txnHash, err := sendSmartContractBlobberUpdate(ctx)
175+
txn, err := sendSmartContractBlobberUpdate(ctx)
177176
if err != nil {
178177
return err
179178
}
180179

181-
if t, err := TransactionVerify(txnHash); err != nil {
182-
logging.Logger.Error("Failed to verify blobber update transaction", zap.Any("err", err), zap.String("txn.Hash", txnHash))
180+
if t, err := TransactionVerify(txn); err != nil {
181+
logging.Logger.Error("Failed to verify blobber update transaction", zap.Any("err", err), zap.String("txn.Hash", txn.Hash))
183182
} else {
184183
logging.Logger.Info("Verified blobber update transaction", zap.String("txn_hash", t.Hash), zap.Any("txn_output", t.TransactionOutput))
185184
}
@@ -188,27 +187,27 @@ func UpdateBlobberOnChain(ctx context.Context) error {
188187
}
189188

190189
// sendSmartContractBlobberUpdate update blobber on blockchain
191-
func sendSmartContractBlobberUpdate(ctx context.Context) (string, error) {
190+
func sendSmartContractBlobberUpdate(ctx context.Context) (*transaction.Transaction, error) {
192191
// initialize storage node (ie blobber)
193192
txn, err := transaction.NewTransactionEntity()
194193
if err != nil {
195-
return "", err
194+
return nil, err
196195
}
197196

198197
sn, err := getStorageNode()
199198
if err != nil {
200-
return "", err
199+
return nil, err
201200
}
202201

203202
err = txn.ExecuteSmartContract(transaction.STORAGE_CONTRACT_ADDRESS,
204203
transaction.UPDATE_BLOBBER_SC_NAME, sn, 0)
205204
if err != nil {
206205
logging.Logger.Error("Failed to set blobber on the blockchain",
207206
zap.String("err:", err.Error()))
208-
return "", err
207+
return txn, err
209208
}
210209

211-
return txn.Hash, nil
210+
return txn, nil
212211
}
213212

214213
// ErrBlobberHasRemoved represents service health check error, where the
@@ -220,15 +219,15 @@ var ErrBlobberHasRemoved = errors.New("blobber has been removed")
220219
// ErrBlobberNotFound it is not registered on chain
221220
var ErrBlobberNotFound = errors.New("blobber is not found")
222221

223-
func TransactionVerify(txnHash string) (t *transaction.Transaction, err error) {
222+
func TransactionVerify(txn *transaction.Transaction) (t *transaction.Transaction, err error) {
224223
for i := 0; i < util.MAX_RETRIES; i++ {
225224
time.Sleep(transaction.SLEEP_FOR_TXN_CONFIRMATION * time.Second)
226-
if t, err = transaction.VerifyTransaction(txnHash, chain.GetServerChain()); err == nil {
225+
if t, err = transaction.VerifyTransactionWithNonce(txn.Hash, txn.GetTransaction().GetTransactionNonce()); err == nil {
227226
return t, nil
228227
}
229228
}
230229

231-
return nil, errors.New("[txn]max retries exceeded with " + txnHash)
230+
return nil, errors.New("[txn]max retries exceeded with " + txn.Hash)
232231
}
233232

234233
func WalletRegister() error {
@@ -244,11 +243,12 @@ func WalletRegister() error {
244243

245244
// SendHealthCheck send heartbeat to blockchain
246245
func SendHealthCheck() (string, error) {
247-
txnHash, err := BlobberHealthCheck()
246+
txn, err := BlobberHealthCheck()
248247
if err != nil {
249-
return txnHash, err
248+
return "", err
250249
}
251-
_, err = TransactionVerify(txnHash)
252250

253-
return txnHash, err
251+
_, err = TransactionVerify(txn)
252+
253+
return txn.Hash, err
254254
}

code/go/0chain.net/blobbercore/readmarker/protocol.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"time"
66

7-
"github.com/0chain/blobber/code/go/0chain.net/core/chain"
87
"github.com/0chain/blobber/code/go/0chain.net/core/common"
98
zLogger "github.com/0chain/blobber/code/go/0chain.net/core/logging"
109
"github.com/0chain/blobber/code/go/0chain.net/core/transaction"
@@ -58,7 +57,7 @@ func (rme *ReadMarkerEntity) RedeemReadMarker(ctx context.Context) (err error) {
5857
time.Sleep(transaction.SLEEP_FOR_TXN_CONFIRMATION * time.Second)
5958

6059
var logHash = tx.Hash // keep transaction hash for error logs
61-
tx, err = transaction.VerifyTransaction(tx.Hash, chain.GetServerChain())
60+
tx, err = transaction.VerifyTransactionWithNonce(tx.Hash, tx.GetTransaction().GetTransactionNonce())
6261
if err != nil {
6362
zLogger.Logger.Error("Error verifying the read redeem transaction", zap.Error(err), zap.String("txn", logHash))
6463
return common.NewErrorf("redeem_read_marker", "verifying transaction: %v", err)

code/go/0chain.net/blobbercore/writemarker/protocol.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,14 @@ func (wme *WriteMarkerEntity) RedeemMarker(ctx context.Context) error {
110110
}
111111

112112
time.Sleep(transaction.SLEEP_FOR_TXN_CONFIRMATION * time.Second)
113-
t, err := transaction.VerifyTransaction(txn.Hash, chain.GetServerChain())
113+
t, err := transaction.VerifyTransactionWithNonce(txn.Hash, txn.GetTransaction().GetTransactionNonce())
114114
if err != nil {
115115
Logger.Error("Error verifying the close connection transaction", zap.String("err:", err.Error()), zap.String("txn", txn.Hash))
116116
wme.Status = Failed
117117
wme.StatusMessage = "Error verifying the close connection transaction." + err.Error()
118118
wme.ReedeemRetries++
119119
wme.CloseTxnID = txn.Hash
120+
// TODO Is this single try?
120121
if err := wme.UpdateStatus(ctx, Failed, "Error verifying the close connection transaction."+err.Error(), txn.Hash); err != nil {
121122
Logger.Error("WriteMarkerEntity_UpdateStatus", zap.Error(err))
122123
}

code/go/0chain.net/blobbercore/writemarker/worker.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func redeemWriteMarker(allocationObj *allocation.Allocation, wm *WriteMarkerEnti
6666
if shouldRollback {
6767
if rollbackErr := db.Rollback().Error; rollbackErr != nil {
6868
logging.Logger.Error("Error rollback on redeeming the write marker.",
69+
zap.Any("allocation", allocationObj.ID),
6970
zap.Any("wm", wm.WM.AllocationID), zap.Error(rollbackErr))
7071
}
7172
}
@@ -74,6 +75,7 @@ func redeemWriteMarker(allocationObj *allocation.Allocation, wm *WriteMarkerEnti
7475
err := wm.RedeemMarker(ctx)
7576
if err != nil {
7677
logging.Logger.Error("Error redeeming the write marker.",
78+
zap.Any("allocation", allocationObj.ID),
7779
zap.Any("wm", wm.WM.AllocationID), zap.Any("error", err))
7880

7981
shouldRollback = true
@@ -85,6 +87,7 @@ func redeemWriteMarker(allocationObj *allocation.Allocation, wm *WriteMarkerEnti
8587
wm.WM.AllocationRoot, allocationObj.ID).Error
8688
if err != nil {
8789
logging.Logger.Error("Error redeeming the write marker. Allocation latest wm redeemed update failed",
90+
zap.Any("allocation", allocationObj.ID),
8891
zap.Any("wm", wm.WM.AllocationRoot), zap.Any("error", err))
8992
shouldRollback = true
9093
return err
@@ -94,13 +97,15 @@ func redeemWriteMarker(allocationObj *allocation.Allocation, wm *WriteMarkerEnti
9497
if err != nil {
9598
logging.Logger.Error("Error committing the writemarker redeem",
9699
zap.Any("allocation", allocationObj.ID),
97-
zap.Error(err))
100+
zap.Any("wm", wm.WM.AllocationRoot), zap.Error(err))
98101
shouldRollback = true
99102
return err
100103
}
101104

102105
allocationObj.LatestRedeemedWM = wm.WM.AllocationRoot
103-
logging.Logger.Info("Success Redeeming the write marker", zap.Any("wm", wm.WM.AllocationRoot), zap.Any("txn", wm.CloseTxnID))
106+
logging.Logger.Info("Success Redeeming the write marker",
107+
zap.Any("allocation", allocationObj.ID),
108+
zap.Any("wm", wm.WM.AllocationRoot), zap.Any("txn", wm.CloseTxnID))
104109

105110
return nil
106111
}

code/go/0chain.net/blobbercore/writemarker/writemarker.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
55
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
66
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
7+
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
78
"github.com/remeh/sizedwaitgroup"
9+
"go.uber.org/zap"
810
)
911

1012
func redeemWriteMarkers() {
@@ -14,6 +16,10 @@ func redeemWriteMarkers() {
1416
db.Where(&allocation.Allocation{IsRedeemRequired: true}).
1517
Find(&allocations)
1618
if len(allocations) > 0 {
19+
20+
logging.Logger.Info("Redeem writemarkers for allocations",
21+
zap.Any("numOfAllocations", len(allocations)))
22+
1723
swg := sizedwaitgroup.New(config.Configuration.WMRedeemNumWorkers)
1824
for _, allocationObj := range allocations {
1925
swg.Add()

0 commit comments

Comments
 (0)