Skip to content

Commit 7614706

Browse files
authored
Merge pull request #4642 from oasisprotocol/kostko/stable/22.1.x/backport-4640
[BACKPORT/22.1.x] go/worker/compute: Improve proposed transaction handling
2 parents 4102894 + 0e9ad65 commit 7614706

File tree

17 files changed

+253
-119
lines changed

17 files changed

+253
-119
lines changed

.changelog/4640.bugfix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
go/worker/compute: Improve proposed transaction handling

.changelog/4640.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
runtime: Make persistent transaction check state configurable

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/runtime/txpool/txpool.go

Lines changed: 88 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,17 @@ type TransactionPool interface {
7878
// SubmitTxNoWait adds the transaction into the transaction pool and returns immediately.
7979
SubmitTxNoWait(ctx context.Context, tx []byte, meta *TransactionMeta) error
8080

81+
// SubmitProposedBatch adds the given (possibly new) transaction batch into the current
82+
// proposal queue.
83+
SubmitProposedBatch(batch [][]byte)
84+
85+
// PromoteProposedBatch promotes the specified transactions that are already in the transaction
86+
// pool into the current proposal queue.
87+
PromoteProposedBatch(batch []hash.Hash)
88+
89+
// ClearProposedBatch clears the proposal queue.
90+
ClearProposedBatch()
91+
8192
// RemoveTxBatch removes a transaction batch from the transaction pool.
8293
RemoveTxBatch(txs []hash.Hash)
8394

@@ -183,11 +194,13 @@ type txPool struct {
183194
schedulerTicker *time.Ticker
184195
schedulerNotifier *pubsub.Broker
185196

197+
proposedTxsLock sync.Mutex
198+
proposedTxs map[hash.Hash]*transaction.CheckedTransaction
199+
186200
blockInfoLock sync.Mutex
187201
blockInfo *BlockInfo
188202
lastRecheckRound uint64
189203

190-
epoCh *channels.RingChannel
191204
republishCh *channels.RingChannel
192205

193206
// roundWeightLimits is guarded by schedulerLock.
@@ -237,7 +250,7 @@ func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMe
237250
// Skip recently seen transactions.
238251
txHash := hash.NewFromBytes(rawTx)
239252
if _, seen := t.seenCache.Peek(txHash); seen && !meta.Recheck {
240-
t.logger.Debug("ignoring already seen transaction", "tx", rawTx)
253+
t.logger.Debug("ignoring already seen transaction", "tx_hash", txHash)
241254
return fmt.Errorf("duplicate transaction")
242255
}
243256

@@ -251,11 +264,13 @@ func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMe
251264
// Queue transaction for checks.
252265
t.logger.Debug("queuing transaction for check",
253266
"tx", rawTx,
267+
"tx_hash", txHash,
254268
"recheck", meta.Recheck,
255269
)
256270
if err := t.checkTxQueue.Add(tx); err != nil {
257271
t.logger.Warn("unable to queue transaction",
258272
"tx", rawTx,
273+
"tx_hash", txHash,
259274
"err", err,
260275
)
261276
return err
@@ -269,6 +284,47 @@ func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMe
269284
return nil
270285
}
271286

287+
func (t *txPool) SubmitProposedBatch(batch [][]byte) {
288+
// Also ingest into the regular pool (may fail).
289+
for _, rawTx := range batch {
290+
_ = t.SubmitTxNoWait(context.Background(), rawTx, &TransactionMeta{Local: false})
291+
}
292+
293+
t.proposedTxsLock.Lock()
294+
defer t.proposedTxsLock.Unlock()
295+
296+
for _, rawTx := range batch {
297+
tx := transaction.RawCheckedTransaction(rawTx)
298+
t.proposedTxs[tx.Hash()] = tx
299+
}
300+
}
301+
302+
func (t *txPool) PromoteProposedBatch(batch []hash.Hash) {
303+
txs, missingTxs := t.GetKnownBatch(batch)
304+
if len(missingTxs) > 0 {
305+
t.logger.Debug("promoted proposed batch contains missing transactions",
306+
"missing_tx_count", len(missingTxs),
307+
)
308+
}
309+
310+
t.proposedTxsLock.Lock()
311+
defer t.proposedTxsLock.Unlock()
312+
313+
for _, tx := range txs {
314+
if tx == nil {
315+
continue
316+
}
317+
t.proposedTxs[tx.Hash()] = tx
318+
}
319+
}
320+
321+
func (t *txPool) ClearProposedBatch() {
322+
t.proposedTxsLock.Lock()
323+
defer t.proposedTxsLock.Unlock()
324+
325+
t.proposedTxs = make(map[hash.Hash]*transaction.CheckedTransaction)
326+
}
327+
272328
func (t *txPool) RemoveTxBatch(txs []hash.Hash) {
273329
t.schedulerLock.Lock()
274330
defer t.schedulerLock.Unlock()
@@ -303,7 +359,6 @@ func (t *txPool) GetPrioritizedBatch(offset *hash.Hash, limit uint32) []*transac
303359

304360
func (t *txPool) GetKnownBatch(batch []hash.Hash) ([]*transaction.CheckedTransaction, map[hash.Hash]int) {
305361
t.schedulerLock.Lock()
306-
defer t.schedulerLock.Unlock()
307362

308363
if t.schedulerQueue == nil {
309364
result := make([]*transaction.CheckedTransaction, 0, len(batch))
@@ -312,9 +367,28 @@ func (t *txPool) GetKnownBatch(batch []hash.Hash) ([]*transaction.CheckedTransac
312367
result = append(result, nil)
313368
missing[txHash] = index
314369
}
370+
t.schedulerLock.Unlock()
315371
return result, missing
316372
}
317-
return t.schedulerQueue.GetKnownBatch(batch)
373+
374+
txs, missingTxs := t.schedulerQueue.GetKnownBatch(batch)
375+
t.schedulerLock.Unlock()
376+
377+
// Also check the proposed transactions set.
378+
t.proposedTxsLock.Lock()
379+
defer t.proposedTxsLock.Unlock()
380+
381+
for txHash, index := range missingTxs {
382+
tx, exists := t.proposedTxs[txHash]
383+
if !exists {
384+
continue
385+
}
386+
387+
delete(missingTxs, txHash)
388+
txs[index] = tx
389+
}
390+
391+
return txs, missingTxs
318392
}
319393

320394
func (t *txPool) ProcessBlock(bi *BlockInfo) error {
@@ -327,7 +401,6 @@ func (t *txPool) ProcessBlock(bi *BlockInfo) error {
327401
return fmt.Errorf("failed to update scheduler: %w", err)
328402
}
329403

330-
t.epoCh.In() <- struct{}{}
331404
// Force recheck on epoch transitions.
332405
t.recheckTxCh.In() <- struct{}{}
333406
}
@@ -409,12 +482,13 @@ func (t *txPool) WakeupScheduler() {
409482

410483
func (t *txPool) Clear() {
411484
t.schedulerLock.Lock()
412-
defer t.schedulerLock.Unlock()
413-
414485
if t.schedulerQueue != nil {
415486
t.schedulerQueue.Clear()
416487
}
488+
t.schedulerLock.Unlock()
489+
417490
t.seenCache.Clear()
491+
t.ClearProposedBatch()
418492

419493
pendingScheduleSize.With(t.getMetricLabels()).Set(0)
420494
}
@@ -527,6 +601,7 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) {
527601
if !res.IsSuccess() {
528602
t.logger.Debug("check tx failed",
529603
"tx", batch[i].Tx,
604+
"tx_hash", batch[i].TxHash,
530605
"result", res,
531606
"recheck", batch[i].Meta.Recheck,
532607
)
@@ -829,6 +904,10 @@ func New(
829904
return nil, fmt.Errorf("error creating seen cache: %w", err)
830905
}
831906

907+
// The transaction check queue should be 10% larger than the transaction pool to allow for some
908+
// buffer in case the schedule queue is full and is being rechecked.
909+
maxCheckTxQueueSize := (110 * cfg.MaxPoolSize) / 100
910+
832911
return &txPool{
833912
logger: logging.GetLogger("runtime/txpool"),
834913
stopCh: make(chan struct{}),
@@ -839,13 +918,13 @@ func New(
839918
host: host,
840919
txPublisher: txPublisher,
841920
seenCache: seenCache,
842-
checkTxQueue: newCheckTxQueue(cfg.MaxPoolSize, cfg.MaxCheckTxBatchSize),
921+
checkTxQueue: newCheckTxQueue(maxCheckTxQueueSize, cfg.MaxCheckTxBatchSize),
843922
checkTxCh: channels.NewRingChannel(1),
844923
checkTxNotifier: pubsub.NewBroker(false),
845924
recheckTxCh: channels.NewRingChannel(1),
846925
schedulerTicker: time.NewTicker(1 * time.Hour),
847926
schedulerNotifier: pubsub.NewBroker(false),
848-
epoCh: channels.NewRingChannel(1),
927+
proposedTxs: make(map[hash.Hash]*transaction.CheckedTransaction),
849928
republishCh: channels.NewRingChannel(1),
850929
roundWeightLimits: make(map[transaction.Weight]uint64),
851930
}, nil

go/worker/common/p2p/rpc/client.go

Lines changed: 73 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/cenkalti/backoff/v4"
10+
"github.com/eapache/channels"
1011
core "github.com/libp2p/go-libp2p-core"
1112
"github.com/libp2p/go-libp2p-core/network"
1213
"github.com/libp2p/go-libp2p-core/protocol"
@@ -146,6 +147,27 @@ func WithValidationFn(fn ValidationFunc) CallOption {
146147
}
147148
}
148149

150+
// AggregateFunc returns a result aggregation function.
151+
//
152+
// The function is passed the response and PeerFeedback instance. If the function returns true, the
153+
// client will continue to call other peers. If it returns false, processing will stop.
154+
type AggregateFunc func(rsp interface{}, pf PeerFeedback) bool
155+
156+
// CallMultiOptions are per-multicall options
157+
type CallMultiOptions struct {
158+
aggregateFn AggregateFunc
159+
}
160+
161+
// CallMultiOption is a per-multicall option setter.
162+
type CallMultiOption func(opts *CallMultiOptions)
163+
164+
// WithAggregateFn configures the response aggregation function to use.
165+
func WithAggregateFn(fn AggregateFunc) CallMultiOption {
166+
return func(opts *CallMultiOptions) {
167+
opts.aggregateFn = fn
168+
}
169+
}
170+
149171
// Client is an RPC client for a given protocol.
150172
type Client interface {
151173
PeerManager
@@ -174,6 +196,7 @@ type Client interface {
174196
body, rspTyp interface{},
175197
maxPeerResponseTime time.Duration,
176198
maxParallelRequests uint,
199+
opts ...CallMultiOption,
177200
) ([]interface{}, []PeerFeedback, error)
178201
}
179202

@@ -276,9 +299,15 @@ func (c *client) CallMulti(
276299
body, rspTyp interface{},
277300
maxPeerResponseTime time.Duration,
278301
maxParallelRequests uint,
302+
opts ...CallMultiOption,
279303
) ([]interface{}, []PeerFeedback, error) {
280304
c.logger.Debug("call multiple", "method", method)
281305

306+
var co CallMultiOptions
307+
for _, opt := range opts {
308+
opt(&co)
309+
}
310+
282311
// Prepare the request.
283312
request := Request{
284313
Method: method,
@@ -290,48 +319,75 @@ func (c *client) CallMulti(
290319
pool.Resize(maxParallelRequests)
291320
defer pool.Stop()
292321

322+
// Create a subcontext so we abort further requests if we are done early.
323+
peerCtx, cancel := context.WithCancel(ctx)
324+
defer cancel()
325+
293326
// Requests results from peers.
294327
type result struct {
295328
rsp interface{}
296329
pf PeerFeedback
297330
err error
298331
}
299-
var resultCh []chan *result
332+
var resultChs []channels.SimpleOutChannel
300333
for _, peer := range c.GetBestPeers() {
301334
if !c.isPeerAcceptable(peer) {
302335
continue
303336
}
304337

305-
ch := make(chan *result, 1)
306-
resultCh = append(resultCh, ch)
338+
ch := channels.NewNativeChannel(channels.BufferCap(1))
339+
resultChs = append(resultChs, ch)
307340

308341
pool.Submit(func() {
342+
defer close(ch)
343+
344+
// Abort early in case we are done.
345+
select {
346+
case <-peerCtx.Done():
347+
return
348+
default:
349+
}
350+
309351
rsp := reflect.New(reflect.TypeOf(rspTyp)).Interface()
310-
pf, err := c.call(ctx, peer, &request, rsp, maxPeerResponseTime)
311-
ch <- &result{rsp, pf, err}
312-
close(ch)
352+
pf, err := c.call(peerCtx, peer, &request, rsp, maxPeerResponseTime)
353+
ch.In() <- &result{rsp, pf, err}
313354
})
314355
}
315356

357+
if len(resultChs) == 0 {
358+
return nil, nil, nil
359+
}
360+
resultCh := channels.NewNativeChannel(channels.None)
361+
channels.Multiplex(resultCh, resultChs...)
362+
316363
// Gather results.
317364
var (
318365
rsps []interface{}
319366
pfs []PeerFeedback
320367
)
321-
for _, ch := range resultCh {
322-
select {
323-
case <-ctx.Done():
324-
return nil, nil, ctx.Err()
325-
case result := <-ch:
326-
// Ignore failed results.
327-
if result.err != nil {
328-
continue
329-
}
368+
for r := range resultCh.Out() {
369+
result := r.(*result)
370+
371+
// Ignore failed results.
372+
if result.err != nil {
373+
continue
374+
}
330375

331-
rsps = append(rsps, result.rsp)
332-
pfs = append(pfs, result.pf)
376+
rsps = append(rsps, result.rsp)
377+
pfs = append(pfs, result.pf)
378+
379+
if co.aggregateFn != nil {
380+
if !co.aggregateFn(result.rsp, result.pf) {
381+
break
382+
}
333383
}
334384
}
385+
386+
c.logger.Debug("received responses from peers",
387+
"method", method,
388+
"num_peers", len(rsps),
389+
)
390+
335391
return rsps, pfs, nil
336392
}
337393

0 commit comments

Comments
 (0)