Skip to content

Commit a10fb03

Browse files
committed
fix: tests
1 parent 452338a commit a10fb03

File tree

15 files changed

+20641
-185
lines changed

15 files changed

+20641
-185
lines changed

fail

Lines changed: 20409 additions & 0 deletions
Large diffs are not rendered by default.

pkg/pusher/pusher.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,9 @@ func (s *Service) chunksWorker(warmupTime time.Duration) {
146146

147147
wg.Done()
148148
<-sem
149+
150+
s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID())
151+
149152
if reAttempt {
150153
select {
151154
case cc <- op:
@@ -254,12 +257,6 @@ func (s *Service) chunksWorker(warmupTime time.Duration) {
254257
func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (repeat bool, err error) {
255258
loggerV1 := logger.V(1).Build()
256259

257-
defer func() {
258-
if !repeat {
259-
s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID())
260-
}
261-
}()
262-
263260
ok, err := s.batchExist.Exists(op.Chunk.Stamp().BatchID())
264261
if !ok || err != nil {
265262
loggerV1.Warning(
@@ -315,9 +312,6 @@ func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) (re
315312
default:
316313
loggerV1.Error(err, "pusher: failed to return error for direct upload")
317314
}
318-
if !reAttempt {
319-
s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID())
320-
}
321315
}()
322316

323317
ok, err := s.batchExist.Exists(op.Chunk.Stamp().BatchID())

pkg/pusher/pusher_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ func TestPusherRetryShallow(t *testing.T) {
338338

339339
storer.chunks <- chunk
340340

341-
err := spinlock.Wait(spinTimeout, func() bool {
341+
err := spinlock.Wait(time.Minute*5, func() bool {
342342
c := int(atomic.LoadInt32(&callCount))
343343
return c == retryCount
344344
})

pkg/storer/internal/events/subscribe.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package events
66

77
import (
8+
"slices"
89
"sync"
910
)
1011

@@ -33,7 +34,7 @@ func (b *Subscriber) Subscribe(str string) (<-chan struct{}, func()) {
3334
for i, s := range b.subs[str] {
3435
if s == c {
3536
b.subs[str][i] = nil
36-
b.subs[str] = append(b.subs[str][:i], b.subs[str][i+1:]...)
37+
b.subs[str] = slices.Delete(b.subs[str], i, i+1)
3738
break
3839
}
3940
}

pkg/storer/internal/internal.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ import (
1919
// swarm reference associated with this session.
2020
type PutterCloserWithReference interface {
2121
Put(context.Context, transaction.Store, swarm.Chunk) error
22-
Close(transaction.Storage, swarm.Address) error
22+
Close(storage.IndexStore, swarm.Address) error
23+
}
24+
25+
type PutterCloserCleanerWithReference interface {
26+
PutterCloserWithReference
2327
Cleanup(transaction.Storage) error
2428
}
2529

pkg/storer/internal/pinning/pinning.go

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ type CollectionStat struct {
6767
// It will create a new UUID for the collection which can be used to iterate on all the chunks
6868
// that are part of this collection. The root pin is only updated on successful close of this.
6969
// Calls to the Putter MUST be mutex locked to prevent concurrent upload data races.
70-
func NewCollection(st storage.IndexStore) (internal.PutterCloserWithReference, error) {
70+
func NewCollection(st storage.IndexStore) (internal.PutterCloserCleanerWithReference, error) {
7171
newCollectionUUID := newUUID()
7272
err := st.Put(&dirtyCollection{UUID: newCollectionUUID})
7373
if err != nil {
@@ -121,40 +121,35 @@ func (c *collectionPutter) Put(ctx context.Context, st transaction.Store, ch swa
121121
return nil
122122
}
123123

124-
func (c *collectionPutter) Close(st transaction.Storage, root swarm.Address) error {
124+
func (c *collectionPutter) Close(st storage.IndexStore, root swarm.Address) error {
125125
if root.IsZero() {
126126
return errCollectionRootAddressIsZero
127127
}
128128

129-
return st.Run(context.Background(), func(s transaction.Store) error {
130-
131-
collection := &pinCollectionItem{Addr: root}
132-
has, err := s.IndexStore().Has(collection)
133-
if err != nil {
134-
return fmt.Errorf("pin store: check previous root: %w", err)
135-
}
136-
137-
if has {
138-
return ErrDuplicatePinCollection
139-
}
140-
141-
// Save the root pin reference.
142-
c.collection.Addr = root
143-
err = s.IndexStore().Put(c.collection)
144-
if err != nil {
145-
return fmt.Errorf("pin store: failed updating collection: %w", err)
146-
}
129+
collection := &pinCollectionItem{Addr: root}
130+
has, err := st.Has(collection)
131+
if err != nil {
132+
return fmt.Errorf("pin store: check previous root: %w", err)
133+
}
147134

148-
err = s.IndexStore().Delete(&dirtyCollection{UUID: c.collection.UUID})
149-
if err != nil {
150-
return fmt.Errorf("pin store: failed deleting dirty collection: %w", err)
151-
}
135+
if has {
136+
return ErrDuplicatePinCollection
137+
}
152138

153-
c.closed = true
154-
return nil
139+
// Save the root pin reference.
140+
c.collection.Addr = root
141+
err = st.Put(c.collection)
142+
if err != nil {
143+
return fmt.Errorf("pin store: failed updating collection: %w", err)
144+
}
155145

156-
})
146+
err = st.Delete(&dirtyCollection{UUID: c.collection.UUID})
147+
if err != nil {
148+
return fmt.Errorf("pin store: failed deleting dirty collection: %w", err)
149+
}
157150

151+
c.closed = true
152+
return nil
158153
}
159154

160155
func (c *collectionPutter) Cleanup(st transaction.Storage) error {

pkg/storer/internal/pinning/pinning_test.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,9 @@ func TestPinStore(t *testing.T) {
9393
}
9494
}
9595

96-
if err := putter.Close(st, tc.root.Address()); err != nil {
96+
if err := st.Run(context.Background(), func(s transaction.Store) error {
97+
return putter.Close(s.IndexStore(), tc.root.Address())
98+
}); err != nil {
9799
t.Fatal(err)
98100
}
99101
})
@@ -276,7 +278,9 @@ func TestPinStore(t *testing.T) {
276278
t.Fatal(err)
277279
}
278280

279-
err = putter.Close(st, root.Address())
281+
err = st.Run(context.Background(), func(s transaction.Store) error {
282+
return putter.Close(s.IndexStore(), root.Address())
283+
})
280284
if err != nil {
281285
t.Fatal(err)
282286
}
@@ -311,12 +315,16 @@ func TestPinStore(t *testing.T) {
311315
t.Fatal(err)
312316
}
313317

314-
err = putter.Close(st, root.Address())
318+
err = st.Run(context.Background(), func(s transaction.Store) error {
319+
return putter.Close(s.IndexStore(), root.Address())
320+
})
315321
if err != nil {
316322
t.Fatal(err)
317323
}
318324

319-
err = putter.Close(st, root.Address())
325+
err = st.Run(context.Background(), func(s transaction.Store) error {
326+
return putter.Close(s.IndexStore(), root.Address())
327+
})
320328
if err == nil || !errors.Is(err, pinstore.ErrDuplicatePinCollection) {
321329
t.Fatalf("unexpected error during CLose, want: %v, got: %v", pinstore.ErrDuplicatePinCollection, err)
322330
}
@@ -344,7 +352,9 @@ func TestPinStore(t *testing.T) {
344352
t.Fatal(err)
345353
}
346354

347-
err = putter.Close(st, swarm.ZeroAddress)
355+
err = st.Run(context.Background(), func(s transaction.Store) error {
356+
return putter.Close(s.IndexStore(), swarm.ZeroAddress)
357+
})
348358
if !errors.Is(err, pinstore.ErrCollectionRootAddressIsZero) {
349359
t.Fatalf("unexpected error on close, want: %v, got: %v", pinstore.ErrCollectionRootAddressIsZero, err)
350360
}
@@ -375,7 +385,7 @@ func TestCleanup(t *testing.T) {
375385
chunks := chunktest.GenerateTestRandomChunks(5)
376386

377387
var (
378-
putter internal.PutterCloserWithReference
388+
putter internal.PutterCloserCleanerWithReference
379389
err error
380390
)
381391
err = st.Run(context.Background(), func(s transaction.Store) error {

pkg/storer/internal/upload/uploadstore.go

Lines changed: 34 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -454,62 +454,46 @@ func (u *uploadPutter) Put(ctx context.Context, st transaction.Store, chunk swar
454454
// with a swarm reference. This can be useful while keeping track of uploads through
455455
// the tags. It will update the tag. This will be filled with the Split and Seen count
456456
// by the Putter.
457-
func (u *uploadPutter) Close(st transaction.Storage, addr swarm.Address) error {
457+
func (u *uploadPutter) Close(s storage.IndexStore, addr swarm.Address) error {
458458
if u.closed {
459459
return nil
460460
}
461-
defer func() {
462-
u.closed = true
463-
}()
464461

465-
if err := u.Cleanup(st); err != nil {
466-
return fmt.Errorf("cleanup failed on close, root_ref %s: %w", addr, err)
467-
}
468-
469-
return st.Run(context.Background(), func(s transaction.Store) error {
462+
u.closed = true
470463

471-
ti := &TagItem{TagID: u.tagID}
472-
err := s.IndexStore().Get(ti)
473-
if err != nil {
474-
if errors.Is(err, storage.ErrNotFound) {
475-
return nil
476-
}
477-
return fmt.Errorf("failed reading tag while closing: %w", err)
464+
ti := &TagItem{TagID: u.tagID}
465+
err := s.Get(ti)
466+
if err != nil {
467+
if errors.Is(err, storage.ErrNotFound) {
468+
return s.Delete(&dirtyTagItem{TagID: u.tagID})
478469
}
470+
return fmt.Errorf("failed reading tag while closing: %w", err)
471+
}
479472

480-
ti.Split += u.split
481-
ti.Seen += u.seen
473+
ti.Split += u.split
474+
ti.Seen += u.seen
482475

483-
if !addr.IsZero() {
484-
ti.Address = addr.Clone()
485-
}
476+
if !addr.IsZero() {
477+
ti.Address = addr.Clone()
478+
}
486479

487-
return s.IndexStore().Put(ti)
488-
})
480+
return errors.Join(
481+
s.Put(ti),
482+
s.Delete(&dirtyTagItem{TagID: u.tagID}),
483+
)
489484
}
490485

491-
func (u *uploadPutter) Cleanup(st transaction.Storage) error {
492-
if u.closed {
493-
return errPutterAlreadyClosed
494-
}
486+
func Cleanup(st transaction.Storage, tag uint64) error {
495487

496488
itemsToDelete := make([]*pushItem, 0)
497489

498-
di := &dirtyTagItem{TagID: u.tagID}
499-
err := st.IndexStore().Get(di)
500-
if err != nil {
501-
return fmt.Errorf("failed reading dirty tag while cleaning up: %w", err)
502-
}
503-
504-
err = st.IndexStore().Iterate(
490+
err := st.IndexStore().Iterate(
505491
storage.Query{
506-
Factory: func() storage.Item { return &pushItem{} },
507-
PrefixAtStart: true,
508-
Prefix: fmt.Sprintf("%d", di.Started),
492+
Factory: func() storage.Item { return &pushItem{} },
509493
},
510494
func(res storage.Result) (bool, error) {
511495
pi := res.Entry.(*pushItem)
512-
if pi.TagID == u.tagID {
496+
if pi.TagID == tag {
513497
itemsToDelete = append(itemsToDelete, pi)
514498
}
515499
return false, nil
@@ -540,7 +524,7 @@ func (u *uploadPutter) Cleanup(st transaction.Storage) error {
540524
return errors.Join(
541525
eg.Wait(),
542526
st.Run(context.Background(), func(s transaction.Store) error {
543-
return s.IndexStore().Delete(di)
527+
return s.IndexStore().Delete(&dirtyTagItem{TagID: tag})
544528
}),
545529
)
546530
}
@@ -564,33 +548,33 @@ func CleanupDirty(st transaction.Storage) error {
564548
}
565549

566550
for _, di := range dirtyTags {
567-
err = errors.Join(err, (&uploadPutter{tagID: di.TagID}).Cleanup(st))
551+
err = errors.Join(err, Cleanup(st, di.TagID))
568552
}
569553

570554
return err
571555
}
572556

573557
// Report is the implementation of the PushReporter interface.
574558
// Must be mutex locked.
575-
func Report(st transaction.Store, tag uint64, update *TagUpdate) error {
559+
func Report(st storage.IndexStore, tag uint64, update *TagUpdate) (bool, error) {
576560

577561
ti := &TagItem{TagID: tag}
578-
err := st.IndexStore().Get(ti)
562+
err := st.Get(ti)
579563
if err != nil {
580564
if !errors.Is(err, storage.ErrNotFound) {
581-
return fmt.Errorf("failed getting tag: %w", err)
565+
return false, fmt.Errorf("failed getting tag: %w", err)
582566
}
583567

584568
// tag is missing, no need update it
585-
return nil
569+
return false, nil
586570
}
587571

588572
// update the tag
589573
ti.Sent += update.Sent
590574
ti.Stored += update.Stored
591575
ti.Synced += update.Synced
592576

593-
return st.IndexStore().Put(ti)
577+
return ti.Synced >= ti.Split, st.Put(ti)
594578
}
595579

596580
var (
@@ -683,9 +667,11 @@ func ListAllTags(st storage.Reader) ([]TagItem, error) {
683667
return tags, nil
684668
}
685669

686-
func IteratePending(ctx context.Context, s transaction.ReadOnlyStore, consumerFn func(chunk swarm.Chunk) (bool, error)) error {
670+
func IteratePending(ctx context.Context, s transaction.ReadOnlyStore, start int64, consumerFn func(chunk swarm.Chunk, ts int64) (bool, error)) error {
687671
return s.IndexStore().Iterate(storage.Query{
688-
Factory: func() storage.Item { return &pushItem{} },
672+
Factory: func() storage.Item { return &pushItem{} },
673+
PrefixAtStart: true,
674+
Prefix: fmt.Sprintf("%d", start),
689675
}, func(r storage.Result) (bool, error) {
690676
pi := r.Entry.(*pushItem)
691677
has, err := s.IndexStore().Has(&dirtyTagItem{TagID: pi.TagID})
@@ -709,7 +695,7 @@ func IteratePending(ctx context.Context, s transaction.ReadOnlyStore, consumerFn
709695
WithStamp(stamp).
710696
WithTagID(pi.TagID)
711697

712-
return consumerFn(chunk)
698+
return consumerFn(chunk, pi.Timestamp)
713699
})
714700
}
715701

0 commit comments

Comments
 (0)