Skip to content

Commit 4af3e74

Browse files
committed
go/worker/compute: Improve proposed transaction handling
1 parent a5b4ae5 commit 4af3e74

File tree

8 files changed

+167
-89
lines changed

8 files changed

+167
-89
lines changed

.changelog/4640.bugfix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
go/worker/compute: Improve proposed transaction handling

go/runtime/txpool/txpool.go

Lines changed: 88 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,17 @@ type TransactionPool interface {
7878
// SubmitTxNoWait adds the transaction into the transaction pool and returns immediately.
7979
SubmitTxNoWait(ctx context.Context, tx []byte, meta *TransactionMeta) error
8080

81+
// SubmitProposedBatch adds the given (possibly new) transaction batch into the current
82+
// proposal queue.
83+
SubmitProposedBatch(batch [][]byte)
84+
85+
// PromoteProposedBatch promotes the specified transactions that are already in the transaction
86+
// pool into the current proposal queue.
87+
PromoteProposedBatch(batch []hash.Hash)
88+
89+
// ClearProposedBatch clears the proposal queue.
90+
ClearProposedBatch()
91+
8192
// RemoveTxBatch removes a transaction batch from the transaction pool.
8293
RemoveTxBatch(txs []hash.Hash)
8394

@@ -183,11 +194,13 @@ type txPool struct {
183194
schedulerTicker *time.Ticker
184195
schedulerNotifier *pubsub.Broker
185196

197+
proposedTxsLock sync.Mutex
198+
proposedTxs map[hash.Hash]*transaction.CheckedTransaction
199+
186200
blockInfoLock sync.Mutex
187201
blockInfo *BlockInfo
188202
lastRecheckRound uint64
189203

190-
epoCh *channels.RingChannel
191204
republishCh *channels.RingChannel
192205

193206
// roundWeightLimits is guarded by schedulerLock.
@@ -237,7 +250,7 @@ func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMe
237250
// Skip recently seen transactions.
238251
txHash := hash.NewFromBytes(rawTx)
239252
if _, seen := t.seenCache.Peek(txHash); seen && !meta.Recheck {
240-
t.logger.Debug("ignoring already seen transaction", "tx", rawTx)
253+
t.logger.Debug("ignoring already seen transaction", "tx_hash", txHash)
241254
return fmt.Errorf("duplicate transaction")
242255
}
243256

@@ -251,11 +264,13 @@ func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMe
251264
// Queue transaction for checks.
252265
t.logger.Debug("queuing transaction for check",
253266
"tx", rawTx,
267+
"tx_hash", txHash,
254268
"recheck", meta.Recheck,
255269
)
256270
if err := t.checkTxQueue.Add(tx); err != nil {
257271
t.logger.Warn("unable to queue transaction",
258272
"tx", rawTx,
273+
"tx_hash", txHash,
259274
"err", err,
260275
)
261276
return err
@@ -269,6 +284,47 @@ func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMe
269284
return nil
270285
}
271286

287+
func (t *txPool) SubmitProposedBatch(batch [][]byte) {
288+
// Also ingest into the regular pool (may fail).
289+
for _, rawTx := range batch {
290+
_ = t.SubmitTxNoWait(context.Background(), rawTx, &TransactionMeta{Local: false})
291+
}
292+
293+
t.proposedTxsLock.Lock()
294+
defer t.proposedTxsLock.Unlock()
295+
296+
for _, rawTx := range batch {
297+
tx := transaction.RawCheckedTransaction(rawTx)
298+
t.proposedTxs[tx.Hash()] = tx
299+
}
300+
}
301+
302+
func (t *txPool) PromoteProposedBatch(batch []hash.Hash) {
303+
txs, missingTxs := t.GetKnownBatch(batch)
304+
if len(missingTxs) > 0 {
305+
t.logger.Debug("promoted proposed batch contains missing transactions",
306+
"missing_tx_count", len(missingTxs),
307+
)
308+
}
309+
310+
t.proposedTxsLock.Lock()
311+
defer t.proposedTxsLock.Unlock()
312+
313+
for _, tx := range txs {
314+
if tx == nil {
315+
continue
316+
}
317+
t.proposedTxs[tx.Hash()] = tx
318+
}
319+
}
320+
321+
func (t *txPool) ClearProposedBatch() {
322+
t.proposedTxsLock.Lock()
323+
defer t.proposedTxsLock.Unlock()
324+
325+
t.proposedTxs = make(map[hash.Hash]*transaction.CheckedTransaction)
326+
}
327+
272328
func (t *txPool) RemoveTxBatch(txs []hash.Hash) {
273329
t.schedulerLock.Lock()
274330
defer t.schedulerLock.Unlock()
@@ -303,7 +359,6 @@ func (t *txPool) GetPrioritizedBatch(offset *hash.Hash, limit uint32) []*transac
303359

304360
func (t *txPool) GetKnownBatch(batch []hash.Hash) ([]*transaction.CheckedTransaction, map[hash.Hash]int) {
305361
t.schedulerLock.Lock()
306-
defer t.schedulerLock.Unlock()
307362

308363
if t.schedulerQueue == nil {
309364
result := make([]*transaction.CheckedTransaction, 0, len(batch))
@@ -312,9 +367,28 @@ func (t *txPool) GetKnownBatch(batch []hash.Hash) ([]*transaction.CheckedTransac
312367
result = append(result, nil)
313368
missing[txHash] = index
314369
}
370+
t.schedulerLock.Unlock()
315371
return result, missing
316372
}
317-
return t.schedulerQueue.GetKnownBatch(batch)
373+
374+
txs, missingTxs := t.schedulerQueue.GetKnownBatch(batch)
375+
t.schedulerLock.Unlock()
376+
377+
// Also check the proposed transactions set.
378+
t.proposedTxsLock.Lock()
379+
defer t.proposedTxsLock.Unlock()
380+
381+
for txHash, index := range missingTxs {
382+
tx, exists := t.proposedTxs[txHash]
383+
if !exists {
384+
continue
385+
}
386+
387+
delete(missingTxs, txHash)
388+
txs[index] = tx
389+
}
390+
391+
return txs, missingTxs
318392
}
319393

320394
func (t *txPool) ProcessBlock(bi *BlockInfo) error {
@@ -327,7 +401,6 @@ func (t *txPool) ProcessBlock(bi *BlockInfo) error {
327401
return fmt.Errorf("failed to update scheduler: %w", err)
328402
}
329403

330-
t.epoCh.In() <- struct{}{}
331404
// Force recheck on epoch transitions.
332405
t.recheckTxCh.In() <- struct{}{}
333406
}
@@ -409,12 +482,13 @@ func (t *txPool) WakeupScheduler() {
409482

410483
func (t *txPool) Clear() {
411484
t.schedulerLock.Lock()
412-
defer t.schedulerLock.Unlock()
413-
414485
if t.schedulerQueue != nil {
415486
t.schedulerQueue.Clear()
416487
}
488+
t.schedulerLock.Unlock()
489+
417490
t.seenCache.Clear()
491+
t.ClearProposedBatch()
418492

419493
pendingScheduleSize.With(t.getMetricLabels()).Set(0)
420494
}
@@ -527,6 +601,7 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) {
527601
if !res.IsSuccess() {
528602
t.logger.Debug("check tx failed",
529603
"tx", batch[i].Tx,
604+
"tx_hash", batch[i].TxHash,
530605
"result", res,
531606
"recheck", batch[i].Meta.Recheck,
532607
)
@@ -829,6 +904,10 @@ func New(
829904
return nil, fmt.Errorf("error creating seen cache: %w", err)
830905
}
831906

907+
// The transaction check queue should be 10% larger than the transaction pool to allow for some
908+
// buffer in case the schedule queue is full and is being rechecked.
909+
maxCheckTxQueueSize := (110 * cfg.MaxPoolSize) / 100
910+
832911
return &txPool{
833912
logger: logging.GetLogger("runtime/txpool"),
834913
stopCh: make(chan struct{}),
@@ -839,13 +918,13 @@ func New(
839918
host: host,
840919
txPublisher: txPublisher,
841920
seenCache: seenCache,
842-
checkTxQueue: newCheckTxQueue(cfg.MaxPoolSize, cfg.MaxCheckTxBatchSize),
921+
checkTxQueue: newCheckTxQueue(maxCheckTxQueueSize, cfg.MaxCheckTxBatchSize),
843922
checkTxCh: channels.NewRingChannel(1),
844923
checkTxNotifier: pubsub.NewBroker(false),
845924
recheckTxCh: channels.NewRingChannel(1),
846925
schedulerTicker: time.NewTicker(1 * time.Hour),
847926
schedulerNotifier: pubsub.NewBroker(false),
848-
epoCh: channels.NewRingChannel(1),
927+
proposedTxs: make(map[hash.Hash]*transaction.CheckedTransaction),
849928
republishCh: channels.NewRingChannel(1),
850929
roundWeightLimits: make(map[transaction.Weight]uint64),
851930
}, nil

go/worker/common/p2p/rpc/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,12 @@ func (c *client) CallMulti(
382382
}
383383
}
384384
}
385+
386+
c.logger.Debug("received responses from peers",
387+
"method", method,
388+
"num_peers", len(rsps),
389+
)
390+
385391
return rsps, pfs, nil
386392
}
387393

go/worker/common/p2p/txsync/client.go

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package txsync
22

33
import (
44
"context"
5-
"fmt"
65

76
"github.com/oasisprotocol/oasis-core/go/common"
87
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
@@ -12,14 +11,14 @@ import (
1211
// Client is a transaction sync protocol client.
1312
type Client interface {
1413
// GetTxs queries peers for transaction data.
15-
GetTxs(ctx context.Context, request *GetTxsRequest) (*GetTxsResponse, rpc.PeerFeedback, error)
14+
GetTxs(ctx context.Context, request *GetTxsRequest) (*GetTxsResponse, error)
1615
}
1716

1817
type client struct {
1918
rc rpc.Client
2019
}
2120

22-
func (c *client) GetTxs(ctx context.Context, request *GetTxsRequest) (*GetTxsResponse, rpc.PeerFeedback, error) {
21+
func (c *client) GetTxs(ctx context.Context, request *GetTxsRequest) (*GetTxsResponse, error) {
2322
// Make sure we don't request too many transactions.
2423
if len(request.Txs) > MaxGetTxsCount {
2524
request.Txs = request.Txs[:MaxGetTxsCount]
@@ -28,31 +27,46 @@ func (c *client) GetTxs(ctx context.Context, request *GetTxsRequest) (*GetTxsRes
2827
for _, txHash := range request.Txs {
2928
txHashMap[txHash] = struct{}{}
3029
}
30+
resultTxMap := make(map[hash.Hash][]byte)
3131

3232
var rsp GetTxsResponse
33-
pf, err := c.rc.Call(ctx, MethodGetTxs, request, &rsp, MaxGetTxsResponseTime,
34-
rpc.WithValidationFn(func(pf rpc.PeerFeedback) error {
33+
_, _, err := c.rc.CallMulti(ctx, MethodGetTxs, request, rsp, MaxGetTxsResponseTime, MaxGetTxsParallelRequests,
34+
rpc.WithAggregateFn(func(rawRsp interface{}, pf rpc.PeerFeedback) bool {
35+
rsp := rawRsp.(*GetTxsResponse)
36+
3537
// If we received more transactions than we requested, this is an error.
3638
if len(rsp.Txs) > len(request.Txs) {
3739
pf.RecordFailure()
38-
return fmt.Errorf("more transactions than requested (expected: %d got: %d)", len(request.Txs), len(rsp.Txs))
40+
return true
3941
}
4042

4143
// If we received transactions that we didn't request, this is an error.
4244
for _, tx := range rsp.Txs {
4345
txHash := hash.NewFromBytes(tx)
4446
if _, valid := txHashMap[txHash]; !valid {
4547
pf.RecordFailure()
46-
return fmt.Errorf("unsolicited transaction: %s", txHash)
48+
return true
4749
}
50+
51+
resultTxMap[txHash] = tx
52+
}
53+
54+
if len(rsp.Txs) > 0 {
55+
pf.RecordSuccess()
4856
}
49-
return nil
50-
}),
51-
)
57+
58+
// Check if we have everything and stop early.
59+
return len(resultTxMap) != len(txHashMap)
60+
}))
5261
if err != nil {
53-
return nil, nil, err
62+
return nil, err
63+
}
64+
65+
rsp.Txs = make([][]byte, 0, len(resultTxMap))
66+
for _, tx := range resultTxMap {
67+
rsp.Txs = append(rsp.Txs, tx)
5468
}
55-
return &rsp, pf, nil
69+
return &rsp, nil
5670
}
5771

5872
// NewClient creates a new transaction sync protocol client.

go/worker/common/p2p/txsync/protocol.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ var TxSyncProtocolVersion = version.Version{Major: 1, Minor: 0, Patch: 0}
1515

1616
// Constants related to the GetTxs method.
1717
const (
18-
MethodGetTxs = "GetTxs"
19-
MaxGetTxsResponseTime = 5 * time.Second
20-
MaxGetTxsCount = 128
18+
MethodGetTxs = "GetTxs"
19+
MaxGetTxsResponseTime = 5 * time.Second
20+
MaxGetTxsCount = 128
21+
MaxGetTxsParallelRequests = 5
2122
)
2223

2324
// GetTxsRequest is a GetTxs request.

go/worker/compute/executor/committee/batch.go

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@ import (
1313
type unresolvedBatch struct {
1414
proposal *commitment.Proposal
1515

16-
batch transaction.RawBatch
17-
missingTxs map[hash.Hash]int
18-
resolvedTxs map[hash.Hash][]byte
16+
batch transaction.RawBatch
17+
missingTxs map[hash.Hash]int
1918

2019
maxBatchSizeBytes uint64
2120
}
@@ -36,23 +35,6 @@ func (ub *unresolvedBatch) hash() hash.Hash {
3635
return ub.proposal.Header.BatchHash
3736
}
3837

39-
func (ub *unresolvedBatch) addResolvedTx(tx []byte) {
40-
if ub.missingTxs == nil {
41-
return
42-
}
43-
44-
txHash := hash.NewFromBytes(tx)
45-
if _, exists := ub.missingTxs[txHash]; !exists {
46-
return
47-
}
48-
49-
if ub.resolvedTxs == nil {
50-
ub.resolvedTxs = make(map[hash.Hash][]byte)
51-
}
52-
ub.resolvedTxs[txHash] = tx
53-
delete(ub.missingTxs, txHash)
54-
}
55-
5638
func (ub *unresolvedBatch) resolve(txPool txpool.TransactionPool) (transaction.RawBatch, error) {
5739
if ub.batch != nil {
5840
return ub.batch, nil
@@ -65,17 +47,6 @@ func (ub *unresolvedBatch) resolve(txPool txpool.TransactionPool) (transaction.R
6547
}
6648

6749
resolvedBatch, missingTxs := txPool.GetKnownBatch(ub.proposal.Batch)
68-
if ub.resolvedTxs != nil {
69-
for txHash, txIdx := range missingTxs {
70-
rawTx, exists := ub.resolvedTxs[txHash]
71-
if !exists {
72-
continue
73-
}
74-
75-
delete(missingTxs, txHash)
76-
resolvedBatch[txIdx] = transaction.RawCheckedTransaction(rawTx)
77-
}
78-
}
7950
if len(missingTxs) > 0 {
8051
ub.missingTxs = missingTxs
8152
return nil, nil

0 commit comments

Comments
 (0)