Skip to content

Commit e0d1248

Browse files
authored
refactor(redis): rename tx to pipeliner (#499)
"transaction" is exactly what we shouldn't call the pipeliner. Redis offers both, the latter differs precisely in that it is not atomic.
1 parent cec4116 commit e0d1248

File tree

3 files changed

+52
-52
lines changed

3 files changed

+52
-52
lines changed

datastore/redis.go

Lines changed: 46 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -206,13 +206,13 @@ func (r *RedisCache) SetObj(key string, value any, expiration time.Duration) (er
206206
}
207207

208208
// SetObjPipelined saves an object in the given Redis key on a Redis pipeline (JSON encoded)
209-
func (r *RedisCache) SetObjPipelined(ctx context.Context, tx redis.Pipeliner, key string, value any, expiration time.Duration) (err error) {
209+
func (r *RedisCache) SetObjPipelined(ctx context.Context, pipeliner redis.Pipeliner, key string, value any, expiration time.Duration) (err error) {
210210
marshalledValue, err := json.Marshal(value)
211211
if err != nil {
212212
return err
213213
}
214214

215-
return tx.Set(ctx, key, marshalledValue, expiration).Err()
215+
return pipeliner.Set(ctx, key, marshalledValue, expiration).Err()
216216
}
217217

218218
func (r *RedisCache) HSetObj(key, field string, value any, expiration time.Duration) (err error) {
@@ -291,9 +291,9 @@ func (r *RedisCache) CheckAndSetLastSlotAndHashDelivered(slot uint64, hash strin
291291
return r.client.Watch(context.Background(), txf, r.keyLastSlotDelivered, r.keyLastHashDelivered)
292292
}
293293

294-
func (r *RedisCache) GetLastSlotDelivered(ctx context.Context, tx redis.Pipeliner) (slot uint64, err error) {
295-
c := tx.Get(ctx, r.keyLastSlotDelivered)
296-
_, err = tx.Exec(ctx)
294+
func (r *RedisCache) GetLastSlotDelivered(ctx context.Context, pipeliner redis.Pipeliner) (slot uint64, err error) {
295+
c := pipeliner.Get(ctx, r.keyLastSlotDelivered)
296+
_, err = pipeliner.Exec(ctx)
297297
if err != nil {
298298
return 0, err
299299
}
@@ -358,13 +358,13 @@ func (r *RedisCache) GetBestBid(slot uint64, parentHash, proposerPubkey string)
358358
return resp, err
359359
}
360360

361-
func (r *RedisCache) SaveExecutionPayloadCapella(ctx context.Context, tx redis.Pipeliner, slot uint64, proposerPubkey, blockHash string, execPayload *capella.ExecutionPayload) (err error) {
361+
func (r *RedisCache) SaveExecutionPayloadCapella(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, proposerPubkey, blockHash string, execPayload *capella.ExecutionPayload) (err error) {
362362
key := r.keyExecPayloadCapella(slot, proposerPubkey, blockHash)
363363
b, err := execPayload.MarshalSSZ()
364364
if err != nil {
365365
return err
366366
}
367-
return tx.Set(ctx, key, b, expiryBidCache).Err()
367+
return pipeliner.Set(ctx, key, b, expiryBidCache).Err()
368368
}
369369

370370
func (r *RedisCache) GetExecutionPayloadCapella(slot uint64, proposerPubkey, blockHash string) (*common.VersionedExecutionPayload, error) {
@@ -388,9 +388,9 @@ func (r *RedisCache) GetExecutionPayloadCapella(slot uint64, proposerPubkey, blo
388388
return resp, nil
389389
}
390390

391-
func (r *RedisCache) SaveBidTrace(ctx context.Context, tx redis.Pipeliner, trace *common.BidTraceV2) (err error) {
391+
func (r *RedisCache) SaveBidTrace(ctx context.Context, pipeliner redis.Pipeliner, trace *common.BidTraceV2) (err error) {
392392
key := r.keyCacheBidTrace(trace.Slot, trace.ProposerPubkey.String(), trace.BlockHash.String())
393-
return r.SetObjPipelined(ctx, tx, key, trace, expiryBidCache)
393+
return r.SetObjPipelined(ctx, pipeliner, key, trace, expiryBidCache)
394394
}
395395

396396
// GetBidTrace returns (trace, nil), or (nil, redis.Nil) if the trace does not exist
@@ -401,10 +401,10 @@ func (r *RedisCache) GetBidTrace(slot uint64, proposerPubkey, blockHash string)
401401
return resp, err
402402
}
403403

404-
func (r *RedisCache) GetBuilderLatestPayloadReceivedAt(ctx context.Context, tx redis.Pipeliner, slot uint64, builderPubkey, parentHash, proposerPubkey string) (int64, error) {
404+
func (r *RedisCache) GetBuilderLatestPayloadReceivedAt(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, builderPubkey, parentHash, proposerPubkey string) (int64, error) {
405405
keyLatestBidsTime := r.keyBlockBuilderLatestBidsTime(slot, parentHash, proposerPubkey)
406-
c := tx.HGet(context.Background(), keyLatestBidsTime, builderPubkey)
407-
_, err := tx.Exec(ctx)
406+
c := pipeliner.HGet(context.Background(), keyLatestBidsTime, builderPubkey)
407+
_, err := pipeliner.Exec(ctx)
408408
if errors.Is(err, redis.Nil) {
409409
return 0, nil
410410
} else if err != nil {
@@ -414,32 +414,32 @@ func (r *RedisCache) GetBuilderLatestPayloadReceivedAt(ctx context.Context, tx r
414414
}
415415

416416
// SaveBuilderBid saves the latest bid by a specific builder. TODO: use transaction to make these writes atomic
417-
func (r *RedisCache) SaveBuilderBid(ctx context.Context, tx redis.Pipeliner, slot uint64, parentHash, proposerPubkey, builderPubkey string, receivedAt time.Time, headerResp *common.GetHeaderResponse) (err error) {
417+
func (r *RedisCache) SaveBuilderBid(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, parentHash, proposerPubkey, builderPubkey string, receivedAt time.Time, headerResp *common.GetHeaderResponse) (err error) {
418418
// save the actual bid
419419
keyLatestBid := r.keyLatestBidByBuilder(slot, parentHash, proposerPubkey, builderPubkey)
420-
err = r.SetObjPipelined(ctx, tx, keyLatestBid, headerResp, expiryBidCache)
420+
err = r.SetObjPipelined(ctx, pipeliner, keyLatestBid, headerResp, expiryBidCache)
421421
if err != nil {
422422
return err
423423
}
424424

425425
// set the time of the request
426426
keyLatestBidsTime := r.keyBlockBuilderLatestBidsTime(slot, parentHash, proposerPubkey)
427-
err = tx.HSet(ctx, keyLatestBidsTime, builderPubkey, receivedAt.UnixMilli()).Err()
427+
err = pipeliner.HSet(ctx, keyLatestBidsTime, builderPubkey, receivedAt.UnixMilli()).Err()
428428
if err != nil {
429429
return err
430430
}
431-
err = tx.Expire(ctx, keyLatestBidsTime, expiryBidCache).Err()
431+
err = pipeliner.Expire(ctx, keyLatestBidsTime, expiryBidCache).Err()
432432
if err != nil {
433433
return err
434434
}
435435

436436
// set the value last, because that's iterated over when updating the best bid, and the payload has to be available
437437
keyLatestBidsValue := r.keyBlockBuilderLatestBidsValue(slot, parentHash, proposerPubkey)
438-
err = tx.HSet(ctx, keyLatestBidsValue, builderPubkey, headerResp.Value().String()).Err()
438+
err = pipeliner.HSet(ctx, keyLatestBidsValue, builderPubkey, headerResp.Value().String()).Err()
439439
if err != nil {
440440
return err
441441
}
442-
return tx.Expire(ctx, keyLatestBidsValue, expiryBidCache).Err()
442+
return pipeliner.Expire(ctx, keyLatestBidsValue, expiryBidCache).Err()
443443
}
444444

445445
type SaveBidAndUpdateTopBidResponse struct {
@@ -458,19 +458,19 @@ type SaveBidAndUpdateTopBidResponse struct {
458458
TimeUpdateFloor time.Duration
459459
}
460460

461-
func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeliner, trace *common.BidTraceV2, payload *common.BuilderSubmitBlockRequest, getPayloadResponse *common.GetPayloadResponse, getHeaderResponse *common.GetHeaderResponse, reqReceivedAt time.Time, isCancellationEnabled bool, floorValue *big.Int) (state SaveBidAndUpdateTopBidResponse, err error) {
461+
func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, pipeliner redis.Pipeliner, trace *common.BidTraceV2, payload *common.BuilderSubmitBlockRequest, getPayloadResponse *common.GetPayloadResponse, getHeaderResponse *common.GetHeaderResponse, reqReceivedAt time.Time, isCancellationEnabled bool, floorValue *big.Int) (state SaveBidAndUpdateTopBidResponse, err error) {
462462
var prevTime, nextTime time.Time
463463
prevTime = time.Now()
464464

465465
// Load latest bids for a given slot+parent+proposer
466-
builderBids, err := NewBuilderBidsFromRedis(ctx, r, tx, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey())
466+
builderBids, err := NewBuilderBidsFromRedis(ctx, r, pipeliner, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey())
467467
if err != nil {
468468
return state, err
469469
}
470470

471471
// Load floor value (if not passed in already)
472472
if floorValue == nil {
473-
floorValue, err = r.GetFloorBidValue(ctx, tx, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey())
473+
floorValue, err = r.GetFloorBidValue(ctx, pipeliner, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey())
474474
if err != nil {
475475
return state, err
476476
}
@@ -498,7 +498,7 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli
498498
// Time to save things in Redis
499499
//
500500
// 1. Save the execution payload
501-
err = r.SaveExecutionPayloadCapella(ctx, tx, payload.Slot(), payload.ProposerPubkey(), payload.BlockHash(), getPayloadResponse.Capella.Capella)
501+
err = r.SaveExecutionPayloadCapella(ctx, pipeliner, payload.Slot(), payload.ProposerPubkey(), payload.BlockHash(), getPayloadResponse.Capella.Capella)
502502
if err != nil {
503503
return state, err
504504
}
@@ -509,7 +509,7 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli
509509
prevTime = nextTime
510510

511511
// 2. Save latest bid for this builder
512-
err = r.SaveBuilderBid(ctx, tx, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey(), payload.BuilderPubkey().String(), reqReceivedAt, getHeaderResponse)
512+
err = r.SaveBuilderBid(ctx, pipeliner, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey(), payload.BuilderPubkey().String(), reqReceivedAt, getHeaderResponse)
513513
if err != nil {
514514
return state, err
515515
}
@@ -522,7 +522,7 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli
522522
prevTime = nextTime
523523

524524
// 3. Save the bid trace
525-
err = r.SaveBidTrace(ctx, tx, trace)
525+
err = r.SaveBidTrace(ctx, pipeliner, trace)
526526
if err != nil {
527527
return state, err
528528
}
@@ -538,7 +538,7 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli
538538
return state, nil
539539
}
540540

541-
state, err = r._updateTopBid(ctx, tx, state, builderBids, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey(), floorValue)
541+
state, err = r._updateTopBid(ctx, pipeliner, state, builderBids, payload.Slot(), payload.ParentHash(), payload.ProposerPubkey(), floorValue)
542542
if err != nil {
543543
return state, err
544544
}
@@ -556,8 +556,8 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli
556556
// Non-cancellable bid above floor should set new floor
557557
keyBidSource := r.keyLatestBidByBuilder(payload.Slot(), payload.ParentHash(), payload.ProposerPubkey(), payload.BuilderPubkey().String())
558558
keyFloorBid := r.keyFloorBid(payload.Slot(), payload.ParentHash(), payload.ProposerPubkey())
559-
c := tx.Copy(ctx, keyBidSource, keyFloorBid, 0, true)
560-
_, err = tx.Exec(ctx)
559+
c := pipeliner.Copy(ctx, keyBidSource, keyFloorBid, 0, true)
560+
_, err = pipeliner.Exec(ctx)
561561
if err != nil {
562562
return state, err
563563
}
@@ -568,19 +568,19 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli
568568
} else if wasCopied == 0 {
569569
return state, fmt.Errorf("could not copy floor bid from %s to %s", keyBidSource, keyFloorBid) //nolint:goerr113
570570
}
571-
err = tx.Expire(ctx, keyFloorBid, expiryBidCache).Err()
571+
err = pipeliner.Expire(ctx, keyFloorBid, expiryBidCache).Err()
572572
if err != nil {
573573
return state, err
574574
}
575575

576576
keyFloorBidValue := r.keyFloorBidValue(payload.Slot(), payload.ParentHash(), payload.ProposerPubkey())
577-
err = tx.Set(ctx, keyFloorBidValue, payload.Value().String(), expiryBidCache).Err()
577+
err = pipeliner.Set(ctx, keyFloorBidValue, payload.Value().String(), expiryBidCache).Err()
578578
if err != nil {
579579
return state, err
580580
}
581581

582582
// Execute setting the floor bid
583-
_, err = tx.Exec(ctx)
583+
_, err = pipeliner.Exec(ctx)
584584

585585
// Record time needed to update floor
586586
nextTime = time.Now().UTC()
@@ -589,9 +589,9 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli
589589
return state, err
590590
}
591591

592-
func (r *RedisCache) _updateTopBid(ctx context.Context, tx redis.Pipeliner, state SaveBidAndUpdateTopBidResponse, builderBids *BuilderBids, slot uint64, parentHash, proposerPubkey string, floorValue *big.Int) (resp SaveBidAndUpdateTopBidResponse, err error) {
592+
func (r *RedisCache) _updateTopBid(ctx context.Context, pipeliner redis.Pipeliner, state SaveBidAndUpdateTopBidResponse, builderBids *BuilderBids, slot uint64, parentHash, proposerPubkey string, floorValue *big.Int) (resp SaveBidAndUpdateTopBidResponse, err error) {
593593
if builderBids == nil {
594-
builderBids, err = NewBuilderBidsFromRedis(ctx, r, tx, slot, parentHash, proposerPubkey)
594+
builderBids, err = NewBuilderBidsFromRedis(ctx, r, pipeliner, slot, parentHash, proposerPubkey)
595595
if err != nil {
596596
return state, err
597597
}
@@ -603,7 +603,7 @@ func (r *RedisCache) _updateTopBid(ctx context.Context, tx redis.Pipeliner, stat
603603

604604
// Load floor value (if not passed in already)
605605
if floorValue == nil {
606-
floorValue, err = r.GetFloorBidValue(ctx, tx, slot, parentHash, proposerPubkey)
606+
floorValue, err = r.GetFloorBidValue(ctx, pipeliner, slot, parentHash, proposerPubkey)
607607
if err != nil {
608608
return state, err
609609
}
@@ -621,8 +621,8 @@ func (r *RedisCache) _updateTopBid(ctx context.Context, tx redis.Pipeliner, stat
621621

622622
// Copy winning bid to top bid cache
623623
keyTopBid := r.keyCacheGetHeaderResponse(slot, parentHash, proposerPubkey)
624-
c := tx.Copy(context.Background(), keyBidSource, keyTopBid, 0, true)
625-
_, err = tx.Exec(ctx)
624+
c := pipeliner.Copy(context.Background(), keyBidSource, keyTopBid, 0, true)
625+
_, err = pipeliner.Exec(ctx)
626626
if err != nil {
627627
return state, err
628628
}
@@ -632,7 +632,7 @@ func (r *RedisCache) _updateTopBid(ctx context.Context, tx redis.Pipeliner, stat
632632
} else if wasCopied == 0 {
633633
return state, fmt.Errorf("could not copy top bid from %s to %s", keyBidSource, keyTopBid) //nolint:goerr113
634634
}
635-
err = tx.Expire(context.Background(), keyTopBid, expiryBidCache).Err()
635+
err = pipeliner.Expire(context.Background(), keyTopBid, expiryBidCache).Err()
636636
if err != nil {
637637
return state, err
638638
}
@@ -641,20 +641,20 @@ func (r *RedisCache) _updateTopBid(ctx context.Context, tx redis.Pipeliner, stat
641641

642642
// 6. Finally, update the global top bid value
643643
keyTopBidValue := r.keyTopBidValue(slot, parentHash, proposerPubkey)
644-
err = tx.Set(context.Background(), keyTopBidValue, state.TopBidValue.String(), expiryBidCache).Err()
644+
err = pipeliner.Set(context.Background(), keyTopBidValue, state.TopBidValue.String(), expiryBidCache).Err()
645645
if err != nil {
646646
return state, err
647647
}
648648

649-
_, err = tx.Exec(ctx)
649+
_, err = pipeliner.Exec(ctx)
650650
return state, err
651651
}
652652

653653
// GetTopBidValue gets the top bid value for a given slot+parent+proposer combination
654-
func (r *RedisCache) GetTopBidValue(ctx context.Context, tx redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (topBidValue *big.Int, err error) {
654+
func (r *RedisCache) GetTopBidValue(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (topBidValue *big.Int, err error) {
655655
keyTopBidValue := r.keyTopBidValue(slot, parentHash, proposerPubkey)
656-
c := tx.Get(ctx, keyTopBidValue)
657-
_, err = tx.Exec(ctx)
656+
c := pipeliner.Get(ctx, keyTopBidValue)
657+
_, err = pipeliner.Exec(ctx)
658658
if errors.Is(err, redis.Nil) {
659659
return big.NewInt(0), nil
660660
} else if err != nil {
@@ -685,7 +685,7 @@ func (r *RedisCache) GetBuilderLatestValue(slot uint64, parentHash, proposerPubk
685685
}
686686

687687
// DelBuilderBid removes a builders most recent bid
688-
func (r *RedisCache) DelBuilderBid(ctx context.Context, tx redis.Pipeliner, slot uint64, parentHash, proposerPubkey, builderPubkey string) (err error) {
688+
func (r *RedisCache) DelBuilderBid(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, parentHash, proposerPubkey, builderPubkey string) (err error) {
689689
// delete the value
690690
keyLatestValue := r.keyBlockBuilderLatestBidsValue(slot, parentHash, proposerPubkey)
691691
err = r.client.HDel(ctx, keyLatestValue, builderPubkey).Err()
@@ -702,16 +702,16 @@ func (r *RedisCache) DelBuilderBid(ctx context.Context, tx redis.Pipeliner, slot
702702

703703
// update bids now to compute current top bid
704704
state := SaveBidAndUpdateTopBidResponse{} //nolint:exhaustruct
705-
_, err = r._updateTopBid(ctx, tx, state, nil, slot, parentHash, proposerPubkey, nil)
705+
_, err = r._updateTopBid(ctx, pipeliner, state, nil, slot, parentHash, proposerPubkey, nil)
706706
return err
707707
}
708708

709709
// GetFloorBidValue returns the value of the highest non-cancellable bid
710-
func (r *RedisCache) GetFloorBidValue(ctx context.Context, tx redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (floorValue *big.Int, err error) {
710+
func (r *RedisCache) GetFloorBidValue(ctx context.Context, pipeliner redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (floorValue *big.Int, err error) {
711711
keyFloorBidValue := r.keyFloorBidValue(slot, parentHash, proposerPubkey)
712-
c := tx.Get(ctx, keyFloorBidValue)
712+
c := pipeliner.Get(ctx, keyFloorBidValue)
713713

714-
_, err = tx.Exec(ctx)
714+
_, err = pipeliner.Exec(ctx)
715715
if errors.Is(err, redis.Nil) {
716716
return big.NewInt(0), nil
717717
} else if err != nil {

datastore/redis_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -491,8 +491,8 @@ func TestGetBuilderLatestValue(t *testing.T) {
491491
},
492492
}
493493

494-
_, err = cache.client.TxPipelined(context.Background(), func(tx redis.Pipeliner) error {
495-
return cache.SaveBuilderBid(context.Background(), tx, slot, parentHash, proposerPubkey, builderPubkey, time.Now().UTC(), getHeaderResp)
494+
_, err = cache.client.TxPipelined(context.Background(), func(pipeliner redis.Pipeliner) error {
495+
return cache.SaveBuilderBid(context.Background(), pipeliner, slot, parentHash, proposerPubkey, builderPubkey, time.Now().UTC(), getHeaderResp)
496496
})
497497
require.NoError(t, err)
498498

@@ -518,7 +518,7 @@ func TestPipelineNilCheck(t *testing.T) {
518518
// err := cache.client.Set(context.Background(), key1, val, 0).Err()
519519
// require.NoError(t, err)
520520

521-
// _, err = cache.client.TxPipelined(context.Background(), func(tx redis.Pipeliner) error {
521+
// _, err = cache.client.TxPipelined(context.Background(), func(pipeliner redis.Pipeliner) error {
522522
// c := tx.Get(context.Background(), key1)
523523
// _, err := tx.Exec(context.Background())
524524
// require.NoError(t, err)

datastore/utils.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ type BuilderBids struct {
1313
bidValues map[string]*big.Int
1414
}
1515

16-
func NewBuilderBidsFromRedis(ctx context.Context, r *RedisCache, tx redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (*BuilderBids, error) {
16+
func NewBuilderBidsFromRedis(ctx context.Context, r *RedisCache, pipeliner redis.Pipeliner, slot uint64, parentHash, proposerPubkey string) (*BuilderBids, error) {
1717
keyBidValues := r.keyBlockBuilderLatestBidsValue(slot, parentHash, proposerPubkey)
18-
c := tx.HGetAll(ctx, keyBidValues)
19-
_, err := tx.Exec(ctx)
18+
c := pipeliner.HGetAll(ctx, keyBidValues)
19+
_, err := pipeliner.Exec(ctx)
2020
if err != nil && !errors.Is(err, redis.Nil) {
2121
return nil, err
2222
}

0 commit comments

Comments
 (0)