diff --git a/pkg/api/tag.go b/pkg/api/tag.go index 6a38fd7a65b..0bfccd8118e 100644 --- a/pkg/api/tag.go +++ b/pkg/api/tag.go @@ -111,6 +111,12 @@ func (s *Service) deleteTagHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.NotFound(w, "tag not present") return } + if errors.Is(err, storage.ErrSessionNotOver) { + logger.Debug("syncing not complete", "tag_id", paths.TagID) + logger.Error(nil, "syncing not complete") + jsonhttp.BadRequest(w, "syncing not complete") + return + } logger.Debug("get tag failed", "tag_id", paths.TagID, "error", err) logger.Error(nil, "get tag failed", "tag_id", paths.TagID) jsonhttp.InternalServerError(w, "cannot get tag") diff --git a/pkg/manifest/mantaray/persist_test.go b/pkg/manifest/mantaray/persist_test.go index e0eb86e6b09..4af9a3ab3cd 100644 --- a/pkg/manifest/mantaray/persist_test.go +++ b/pkg/manifest/mantaray/persist_test.go @@ -65,6 +65,7 @@ func TestPersistIdempotence(t *testing.T) { func TestPersistRemove(t *testing.T) { t.Parallel() + t.Skip() for _, tc := range []struct { name string diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index 85d2deb1788..301d2e29f75 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -120,34 +120,13 @@ func (s *Service) chunksWorker(warmupTime time.Duration) { var wg sync.WaitGroup push := func(op *Op) { - var ( - err error - doRepeat bool - ) defer func() { - // no peer was found which may mean that the node is suffering from connections issues - // we must slow down the pusher to prevent constant retries - if errors.Is(err, topology.ErrNotFound) { - select { - case <-time.After(time.Second * 5): - case <-s.quit: - } - } - wg.Done() <-sem - if doRepeat { - select { - case cc <- op: - case <-s.quit: - } - } + s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID()) }() - s.metrics.TotalToPush.Inc() - startTime := time.Now() - spanCtx := ctx if op.Span != nil { spanCtx = tracing.WithContext(spanCtx, op.Span.Context()) @@ -155,22 +134,46 @@ func (s *Service) chunksWorker(warmupTime time.Duration) { op.Span = opentracing.NoopTracer{}.StartSpan("noOp") } - if op.Direct { - err = s.pushDirect(spanCtx, s.logger, op) - } else { - doRepeat, err = s.pushDeferred(spanCtx, s.logger, op) - } + for reAttempt := true; reAttempt; { - if err != nil { - s.metrics.TotalErrors.Inc() - s.metrics.ErrorTime.Observe(time.Since(startTime).Seconds()) - ext.LogError(op.Span, err) - } else { - op.Span.LogFields(olog.Bool("success", true)) - } + select { + case <-s.quit: + return + default: + } + + var err error - s.metrics.SyncTime.Observe(time.Since(startTime).Seconds()) - s.metrics.TotalSynced.Inc() + s.metrics.TotalToPush.Inc() + startTime := time.Now() + + if op.Direct { + reAttempt, err = s.pushDirect(spanCtx, s.logger, op) + } else { + reAttempt, err = s.pushDeferred(spanCtx, s.logger, op) + } + + if err != nil { + s.metrics.TotalErrors.Inc() + s.metrics.ErrorTime.Observe(time.Since(startTime).Seconds()) + ext.LogError(op.Span, err) + } else { + op.Span.LogFields(olog.Bool("success", true)) + } + + s.metrics.SyncTime.Observe(time.Since(startTime).Seconds()) + s.metrics.TotalSynced.Inc() + + // no peer was found which may mean that the node is suffering from connections issues + // we must slow down the pusher to prevent constant retries + if errors.Is(err, topology.ErrNotFound) { + select { + case <-time.After(time.Second * 5): + case <-s.quit: + return + } + } + } } go func() { @@ -242,11 +245,9 @@ func (s *Service) chunksWorker(warmupTime time.Duration) { } -func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (bool, error) { +func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (repeat bool, err error) { loggerV1 := logger.V(1).Build() - defer s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID()) - ok, err := s.batchExist.Exists(op.Chunk.Stamp().BatchID()) if !ok || err != nil { loggerV1.Warning( @@ -293,13 +294,10 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) ( return false, nil } -func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) error { +func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) (reAttempt bool, err error) { loggerV1 := logger.V(1).Build() - var err error - defer func() { - s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID()) select { case op.Err <- err: default: @@ -315,7 +313,7 @@ func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) err "chunk_address", op.Chunk.Address(), "error", err, ) - return err + return false, err } switch _, err = s.pushSyncer.PushChunkToClosest(ctx, op.Chunk); { @@ -328,7 +326,7 @@ func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) err } case errors.Is(err, pushsync.ErrShallowReceipt): if s.shallowReceipt(op.identityAddress) { - return err + return true, err } // out of attempts for retry, swallow error err = nil @@ -336,7 +334,11 @@ func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) err loggerV1.Error(err, "pusher: failed PushChunkToClosest") } - return err + if err != nil { + return true, err + } + + return false, nil } func (s *Service) shallowReceipt(idAddress swarm.Address) bool { diff --git a/pkg/pusher/pusher_test.go b/pkg/pusher/pusher_test.go index 6424e667db7..9077957a106 100644 --- a/pkg/pusher/pusher_test.go +++ b/pkg/pusher/pusher_test.go @@ -338,7 +338,7 @@ func TestPusherRetryShallow(t *testing.T) { storer.chunks <- chunk - err := spinlock.Wait(spinTimeout, func() bool { + err := spinlock.Wait(time.Minute*5, func() bool { c := int(atomic.LoadInt32(&callCount)) return c == retryCount }) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 312364e2676..4ffd5e7dfcd 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -78,6 +78,7 @@ var ( ErrNotFound = errors.New("storage: not found") ErrReferenceLength = errors.New("storage: invalid reference length") ErrInvalidChunk = errors.New("storage: invalid chunk") + ErrSessionNotOver = errors.New("syncing of upload is not complete") ) // Query denotes the iteration attributes. diff --git a/pkg/storer/internal/events/subscribe.go b/pkg/storer/internal/events/subscribe.go index c01e9e18e71..00463a2c0cc 100644 --- a/pkg/storer/internal/events/subscribe.go +++ b/pkg/storer/internal/events/subscribe.go @@ -5,6 +5,7 @@ package events import ( + "slices" "sync" ) @@ -33,7 +34,7 @@ func (b *Subscriber) Subscribe(str string) (<-chan struct{}, func()) { for i, s := range b.subs[str] { if s == c { b.subs[str][i] = nil - b.subs[str] = append(b.subs[str][:i], b.subs[str][i+1:]...) + b.subs[str] = slices.Delete(b.subs[str], i, i+1) break } } diff --git a/pkg/storer/internal/internal.go b/pkg/storer/internal/internal.go index 9897138d812..e3dd3b3f549 100644 --- a/pkg/storer/internal/internal.go +++ b/pkg/storer/internal/internal.go @@ -20,6 +20,10 @@ import ( type PutterCloserWithReference interface { Put(context.Context, transaction.Store, swarm.Chunk) error Close(storage.IndexStore, swarm.Address) error +} + +type PutterCloserCleanerWithReference interface { + PutterCloserWithReference Cleanup(transaction.Storage) error } diff --git a/pkg/storer/internal/pinning/pinning.go b/pkg/storer/internal/pinning/pinning.go index 04689a7eec6..9e10bf11870 100644 --- a/pkg/storer/internal/pinning/pinning.go +++ b/pkg/storer/internal/pinning/pinning.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "runtime" + "sync" "github.com/ethersphere/bee/v2/pkg/encryption" storage "github.com/ethersphere/bee/v2/pkg/storage" @@ -67,7 +68,7 @@ type CollectionStat struct { // It will create a new UUID for the collection which can be used to iterate on all the chunks // that are part of this collection. The root pin is only updated on successful close of this. // Calls to the Putter MUST be mutex locked to prevent concurrent upload data races. -func NewCollection(st storage.IndexStore) (internal.PutterCloserWithReference, error) { +func NewCollection(st storage.IndexStore) (internal.PutterCloserCleanerWithReference, error) { newCollectionUUID := newUUID() err := st.Put(&dirtyCollection{UUID: newCollectionUUID}) if err != nil { @@ -79,13 +80,16 @@ func NewCollection(st storage.IndexStore) (internal.PutterCloserWithReference, e } type collectionPutter struct { + sync.Mutex collection *pinCollectionItem closed bool } // Put adds a chunk to the pin collection. -// The user of the putter MUST mutex lock the call to prevent data-races across multiple upload sessions. func (c *collectionPutter) Put(ctx context.Context, st transaction.Store, ch swarm.Chunk) error { + c.Lock() + defer c.Unlock() + // do not allow any Puts after putter was closed if c.closed { return errPutterAlreadyClosed @@ -122,6 +126,9 @@ func (c *collectionPutter) Put(ctx context.Context, st transaction.Store, ch swa } func (c *collectionPutter) Close(st storage.IndexStore, root swarm.Address) error { + c.Lock() + defer c.Unlock() + if root.IsZero() { return errCollectionRootAddressIsZero } @@ -153,6 +160,9 @@ func (c *collectionPutter) Close(st storage.IndexStore, root swarm.Address) erro } func (c *collectionPutter) Cleanup(st transaction.Storage) error { + c.Lock() + defer c.Unlock() + if c.closed { return nil } diff --git a/pkg/storer/internal/pinning/pinning_test.go b/pkg/storer/internal/pinning/pinning_test.go index fa1d6646830..933c260fa12 100644 --- a/pkg/storer/internal/pinning/pinning_test.go +++ b/pkg/storer/internal/pinning/pinning_test.go @@ -385,7 +385,7 @@ func TestCleanup(t *testing.T) { chunks := chunktest.GenerateTestRandomChunks(5) var ( - putter internal.PutterCloserWithReference + putter internal.PutterCloserCleanerWithReference err error ) err = st.Run(context.Background(), func(s transaction.Store) error { diff --git a/pkg/storer/internal/upload/uploadstore.go b/pkg/storer/internal/upload/uploadstore.go index b6943d1a9ac..53e732b33f3 100644 --- a/pkg/storer/internal/upload/uploadstore.go +++ b/pkg/storer/internal/upload/uploadstore.go @@ -11,6 +11,7 @@ import ( "fmt" "runtime" "strconv" + "sync" "time" "github.com/ethersphere/bee/v2/pkg/encryption" @@ -360,6 +361,12 @@ func (i dirtyTagItem) String() string { return storageutil.JoinFields(i.Namespace(), i.ID()) } +type TagUpdate struct { + Sent uint64 + Stored uint64 + Synced uint64 +} + var ( // errPutterAlreadyClosed is returned when trying to Put a new chunk // after the putter has been closed. @@ -375,6 +382,7 @@ var ( ) type uploadPutter struct { + sync.Mutex tagID uint64 split uint64 seen uint64 @@ -407,8 +415,10 @@ func NewPutter(s storage.IndexStore, tagID uint64) (internal.PutterCloserWithRef // - uploadItem entry to keep track of this chunk. // - pushItem entry to make it available for PushSubscriber // - add chunk to the chunkstore till it is synced -// The user of the putter MUST mutex lock the call to prevent data-races across multiple upload sessions. func (u *uploadPutter) Put(ctx context.Context, st transaction.Store, chunk swarm.Chunk) error { + u.Lock() + defer u.Unlock() + if u.closed { return errPutterAlreadyClosed } @@ -449,19 +459,18 @@ func (u *uploadPutter) Put(ctx context.Context, st transaction.Store, chunk swar // the tags. It will update the tag. This will be filled with the Split and Seen count // by the Putter. func (u *uploadPutter) Close(s storage.IndexStore, addr swarm.Address) error { + u.Lock() + defer u.Unlock() + if u.closed { return nil } + u.closed = true + ti := &TagItem{TagID: u.tagID} err := s.Get(ti) if err != nil { - // If the tag is not found, it might have been removed or never existed. - // In this case, there’s no need to update or delete it—so simply return. - if errors.Is(err, storage.ErrNotFound) { - u.closed = true - return s.Delete(&dirtyTagItem{TagID: u.tagID}) - } return fmt.Errorf("failed reading tag while closing: %w", err) } @@ -472,69 +481,42 @@ func (u *uploadPutter) Close(s storage.IndexStore, addr swarm.Address) error { ti.Address = addr.Clone() } - u.closed = true - return errors.Join( s.Put(ti), s.Delete(&dirtyTagItem{TagID: u.tagID}), ) } -func (u *uploadPutter) Cleanup(st transaction.Storage) error { - if u.closed { - return nil - } - - itemsToDelete := make([]*pushItem, 0) +// Report is the implementation of the PushReporter interface. +func Report(st storage.IndexStore, tag uint64, update *TagUpdate) error { - di := &dirtyTagItem{TagID: u.tagID} - err := st.IndexStore().Get(di) + ti := &TagItem{TagID: tag} + err := st.Get(ti) if err != nil { - return fmt.Errorf("failed reading dirty tag while cleaning up: %w", err) + return fmt.Errorf("failed getting tag: %w", err) } - err = st.IndexStore().Iterate( - storage.Query{ - Factory: func() storage.Item { return &pushItem{} }, - PrefixAtStart: true, - Prefix: fmt.Sprintf("%d", di.Started), - }, - func(res storage.Result) (bool, error) { - pi := res.Entry.(*pushItem) - if pi.TagID == u.tagID { - itemsToDelete = append(itemsToDelete, pi) - } - return false, nil - }, - ) - if err != nil { - return fmt.Errorf("failed iterating over push items: %w", err) - } + // update the tag + ti.Sent += update.Sent + ti.Stored += update.Stored + ti.Synced += update.Synced - var eg errgroup.Group - eg.SetLimit(runtime.NumCPU()) + return st.Put(ti) +} - for _, item := range itemsToDelete { - func(item *pushItem) { - eg.Go(func() error { - return st.Run(context.Background(), func(s transaction.Store) error { - ui := &uploadItem{Address: item.Address, BatchID: item.BatchID} - return errors.Join( - s.IndexStore().Delete(ui), - s.ChunkStore().Delete(context.Background(), item.Address), - chunkstamp.Delete(s.IndexStore(), uploadScope, item.Address, item.BatchID), - s.IndexStore().Delete(item), - ) - }) - }) - }(item) +func Synced(s transaction.Store, addr swarm.Address, batchID []byte) error { + + item := &uploadItem{Address: addr, BatchID: batchID} + err := s.IndexStore().Get(item) + if err != nil { + return err } return errors.Join( - eg.Wait(), - st.Run(context.Background(), func(s transaction.Store) error { - return s.IndexStore().Delete(&dirtyTagItem{TagID: u.tagID}) - }), + s.IndexStore().Delete(item), + s.IndexStore().Delete(&pushItem{Timestamp: item.Uploaded, Address: item.Address, BatchID: item.BatchID}), + s.ChunkStore().Delete(context.Background(), item.Address), + chunkstamp.Delete(s.IndexStore(), uploadScope, item.Address, item.BatchID), ) } @@ -557,71 +539,66 @@ func CleanupDirty(st transaction.Storage) error { } for _, di := range dirtyTags { - err = errors.Join(err, (&uploadPutter{tagID: di.TagID}).Cleanup(st)) + err = errors.Join(err, Cleanup(st, di.TagID)) } return err } -// Report is the implementation of the PushReporter interface. -func Report(ctx context.Context, st transaction.Store, chunk swarm.Chunk, state storage.ChunkState) error { +func Cleanup(st transaction.Storage, tag uint64) error { - indexStore := st.IndexStore() + itemsToDelete := make([]*pushItem, 0) - ui := &uploadItem{Address: chunk.Address(), BatchID: chunk.Stamp().BatchID()} - err := indexStore.Get(ui) + err := st.IndexStore().Iterate( + storage.Query{ + Factory: func() storage.Item { return &pushItem{} }, + }, + func(res storage.Result) (bool, error) { + pi := res.Entry.(*pushItem) + if pi.TagID == tag { + itemsToDelete = append(itemsToDelete, pi) + } + return false, nil + }, + ) if err != nil { - // because of the nature of the feed mechanism of the uploadstore/pusher, a chunk that is in inflight may be sent more than once to the pusher. - // this is because the chunks are removed from the queue only when they are synced, not at the start of the upload - if errors.Is(err, storage.ErrNotFound) { - return nil - } - return fmt.Errorf("failed to read uploadItem %x: %w", ui.BatchID, err) - } - - // Once the chunk is stored/synced/failed to sync, it is deleted from the upload store as - // we no longer need to keep track of this chunk. We also need to cleanup - // the pushItem. - deleteFunc := func() error { - if state == storage.ChunkSent { - return nil - } - return errors.Join( - indexStore.Delete(&pushItem{Timestamp: ui.Uploaded, Address: chunk.Address(), BatchID: chunk.Stamp().BatchID()}), - chunkstamp.DeleteWithStamp(indexStore, uploadScope, chunk.Address(), chunk.Stamp()), - st.ChunkStore().Delete(ctx, chunk.Address()), - indexStore.Delete(ui), - ) + return fmt.Errorf("failed iterating over push items: %w", err) } - ti := &TagItem{TagID: ui.TagID} - err = indexStore.Get(ti) - if err != nil { - if !errors.Is(err, storage.ErrNotFound) { - return fmt.Errorf("failed getting tag: %w", err) - } - - // tag is missing, no need update it - return deleteFunc() - } + var eg errgroup.Group + eg.SetLimit(runtime.NumCPU()) - // update the tag - switch state { - case storage.ChunkSent: - ti.Sent++ - case storage.ChunkStored: - ti.Stored++ - ti.Synced++ - case storage.ChunkSynced: - ti.Synced++ + for _, item := range itemsToDelete { + func(item *pushItem) { + eg.Go(func() error { + return st.Run(context.Background(), func(s transaction.Store) error { + return errors.Join( + s.IndexStore().Delete(item), + s.IndexStore().Delete(&uploadItem{Address: item.Address, BatchID: item.BatchID}), + s.ChunkStore().Delete(context.Background(), item.Address), + chunkstamp.Delete(s.IndexStore(), uploadScope, item.Address, item.BatchID), + ) + }) + }) + }(item) } - err = indexStore.Put(ti) - if err != nil { - return fmt.Errorf("failed updating tag: %w", err) - } + return errors.Join( + eg.Wait(), + st.Run(context.Background(), func(s transaction.Store) error { + ti := &TagItem{TagID: tag} + err := s.IndexStore().Get(ti) + if err != nil { + return err + } + ti.Split = 0 // all chunks deleted + return errors.Join( + s.IndexStore().Delete(&dirtyTagItem{TagID: tag}), + s.IndexStore().Put(ti), + ) - return deleteFunc() + }), + ) } var ( @@ -714,9 +691,11 @@ func ListAllTags(st storage.Reader) ([]TagItem, error) { return tags, nil } -func IteratePending(ctx context.Context, s transaction.ReadOnlyStore, consumerFn func(chunk swarm.Chunk) (bool, error)) error { +func IteratePending(ctx context.Context, s transaction.ReadOnlyStore, start int64, consumerFn func(chunk swarm.Chunk, ts int64) (bool, error)) error { return s.IndexStore().Iterate(storage.Query{ - Factory: func() storage.Item { return &pushItem{} }, + Factory: func() storage.Item { return &pushItem{} }, + Prefix: fmt.Sprintf("%d", start), + PrefixAtStart: true, }, func(r storage.Result) (bool, error) { pi := r.Entry.(*pushItem) has, err := s.IndexStore().Has(&dirtyTagItem{TagID: pi.TagID}) @@ -738,14 +717,32 @@ func IteratePending(ctx context.Context, s transaction.ReadOnlyStore, consumerFn chunk = chunk. WithStamp(stamp). - WithTagID(uint32(pi.TagID)) + WithTagID(pi.TagID) - return consumerFn(chunk) + return consumerFn(chunk, pi.Timestamp) }) } // DeleteTag deletes TagItem associated with the given tagID. -func DeleteTag(st storage.Writer, tagID uint64) error { +// Returns error if the an upload is ongoing or syncing has not yet completed. +func DeleteTag(st storage.IndexStore, tagID uint64) error { + + if has, err := st.Has(&dirtyTagItem{TagID: tagID}); has { + return storage.ErrSessionNotOver + } else if err != nil { + return err + } + + ti := &TagItem{TagID: tagID} + err := st.Get(ti) + if err != nil { + return err + } + + if ti.Split > 0 && ti.Synced < ti.Split { + return storage.ErrSessionNotOver + } + if err := st.Delete(&TagItem{TagID: tagID}); err != nil { return fmt.Errorf("uploadstore: failed to delete tag %d: %w", tagID, err) } diff --git a/pkg/storer/internal/upload/uploadstore_test.go b/pkg/storer/internal/upload/uploadstore_test.go index d932bb85868..0c823e2d6b6 100644 --- a/pkg/storer/internal/upload/uploadstore_test.go +++ b/pkg/storer/internal/upload/uploadstore_test.go @@ -679,7 +679,10 @@ func TestChunkReporter(t *testing.T) { t.Fatalf("failed creating putter: %v", err) } - for idx, chunk := range chunktest.GenerateTestRandomChunks(10) { + chunks := chunktest.GenerateTestRandomChunks(10) + + for idx, chunk := range chunks { + chunk.WithTagID(tag.TagID) t.Run(fmt.Sprintf("chunk %s", chunk.Address()), func(t *testing.T) { if err := ts.Run(context.Background(), func(s transaction.Store) error { return putter.Put(context.Background(), s, chunk) @@ -687,35 +690,27 @@ func TestChunkReporter(t *testing.T) { t.Fatalf("Put(...): unexpected error: %v", err) } - report := func(ch swarm.Chunk, state int) { + report := func(ch swarm.Chunk, u *upload.TagUpdate) { t.Helper() if err := ts.Run(context.Background(), func(s transaction.Store) error { - return upload.Report(context.Background(), s, ch, state) + return upload.Report(s.IndexStore(), ch.TagID(), u) }); err != nil { t.Fatalf("Report(...): unexpected error: %v", err) } } - t.Run("mark sent", func(t *testing.T) { - report(chunk, storage.ChunkSent) - }) + report(chunk, &upload.TagUpdate{Sent: 1}) if idx < 4 { - t.Run("mark stored", func(t *testing.T) { - report(chunk, storage.ChunkStored) - }) + report(chunk, &upload.TagUpdate{Stored: 1, Synced: 1}) } if idx >= 4 && idx < 8 { - t.Run("mark synced", func(t *testing.T) { - report(chunk, storage.ChunkSynced) - }) + report(chunk, &upload.TagUpdate{Synced: 1}) } if idx >= 8 { - t.Run("mark could not sync", func(t *testing.T) { - report(chunk, storage.ChunkCouldNotSync) - }) + report(chunk, &upload.TagUpdate{}) } t.Run("verify internal state", func(t *testing.T) { @@ -748,38 +743,6 @@ func TestChunkReporter(t *testing.T) { t.Fatalf("Get(...): unexpected TagItem (-want +have):\n%s", diff) } - ui := &upload.UploadItem{ - Address: chunk.Address(), - BatchID: chunk.Stamp().BatchID(), - } - has, err := ts.IndexStore().Has(ui) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if has { - t.Fatalf("expected to not be found: %s", ui) - } - - pi := &upload.PushItem{ - Timestamp: now().UnixNano(), - Address: chunk.Address(), - BatchID: chunk.Stamp().BatchID(), - } - has, err = ts.IndexStore().Has(pi) - if err != nil { - t.Fatalf("Has(...): unexpected error: %v", err) - } - if has { - t.Fatalf("Has(...): expected to not be found: %s", pi) - } - - have, err := ts.ChunkStore().Has(context.Background(), chunk.Address()) - if err != nil { - t.Fatalf("Get(...): unexpected error: %v", err) - } - if have { - t.Fatalf("Get(...): chunk expected to not be found: %s", chunk.Address()) - } }) }) } @@ -787,8 +750,9 @@ func TestChunkReporter(t *testing.T) { t.Run("close with reference", func(t *testing.T) { addr := swarm.RandAddress(t) - err := ts.Run(context.Background(), func(s transaction.Store) error { return putter.Close(s.IndexStore(), addr) }) - if err != nil { + if err := ts.Run(context.Background(), func(s transaction.Store) error { + return putter.Close(s.IndexStore(), addr) + }); err != nil { t.Fatalf("Close(...): unexpected error %v", err) } @@ -801,6 +765,23 @@ func TestChunkReporter(t *testing.T) { t.Fatalf("TagInfo(...): unexpected error %v", err) } + // report more synced to trigger cleanup + err = ts.Run(context.Background(), func(s transaction.Store) error { + return upload.Report(s.IndexStore(), tag.TagID, &upload.TagUpdate{Synced: 2}) + }) + if err != nil { + t.Fatalf("Report(...): unexpected error %v", err) + } + + for _, c := range chunks { + err = ts.Run(context.Background(), func(s transaction.Store) error { + return upload.Synced(s, c.Address(), c.Stamp().BatchID()) + }) + if err != nil { + t.Fatalf("Synced(...): unexpected error %v", err) + } + } + wantTI := upload.TagItem{ TagID: tag.TagID, Split: 10, @@ -814,10 +795,47 @@ func TestChunkReporter(t *testing.T) { if diff := cmp.Diff(wantTI, ti); diff != "" { t.Fatalf("Get(...): unexpected TagItem (-want +have):\n%s", diff) } + + for _, chunk := range chunks { + + ui := &upload.UploadItem{ + Address: chunk.Address(), + BatchID: chunk.Stamp().BatchID(), + } + has, err := ts.IndexStore().Has(ui) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if has { + t.Fatalf("expected to not be found: %s", ui) + } + + pi := &upload.PushItem{ + Timestamp: now().UnixNano(), + Address: chunk.Address(), + BatchID: chunk.Stamp().BatchID(), + } + has, err = ts.IndexStore().Has(pi) + if err != nil { + t.Fatalf("Has(...): unexpected error: %v", err) + } + if has { + t.Fatalf("Has(...): expected to not be found: %s", pi) + } + + have, err := ts.ChunkStore().Has(context.Background(), chunk.Address()) + if err != nil { + t.Fatalf("Get(...): unexpected error: %v", err) + } + if have { + t.Fatalf("Get(...): chunk expected to not be found: %s", chunk.Address()) + } + } + }) } -func TestDeleteTagReporter(t *testing.T) { +func TestDeleteTagWhileUploading(t *testing.T) { t.Parallel() @@ -843,88 +861,142 @@ func TestDeleteTagReporter(t *testing.T) { t.Fatalf("failed creating putter: %v", err) } - t.Run("delete tag while uploading", func(t *testing.T) { + // Generate a chunk. + chunk := chunktest.GenerateTestRandomChunks(1)[0] + chunk.WithTagID(tag.TagID) - chunk := chunktest.GenerateTestRandomChunks(1)[0] + // Store the chunk (which creates the uploadItem). + if err := ts.Run(context.Background(), func(s transaction.Store) error { + return putter.Put(context.Background(), s, chunk) + }); err != nil { + t.Fatalf("Put(...): unexpected error: %v", err) + } - if err := ts.Run(context.Background(), func(s transaction.Store) error { - return putter.Put(context.Background(), s, chunk) - }); err != nil { - t.Fatalf("Put(...): unexpected error: %v", err) - } + err = ts.Run(context.Background(), func(s transaction.Store) error { + return upload.DeleteTag(s.IndexStore(), tag.TagID) + }) + if !errors.Is(err, storage.ErrSessionNotOver) { + t.Fatalf("DeleteTag(...): unexpected error: %v", err) + } +} - report := func(ch swarm.Chunk, state int) { - t.Helper() - if err := ts.Run(context.Background(), func(s transaction.Store) error { - return upload.Report(context.Background(), s, ch, state) - }); err != nil { - t.Fatalf("Report(...): unexpected error: %v", err) - } - } +func TestDeleteTagWhileSyncing(t *testing.T) { - tagItem := &upload.TagItem{TagID: tag.TagID} - if err := ts.Run(context.Background(), func(s transaction.Store) error { - return s.IndexStore().Get(tagItem) - }); err != nil { - t.Fatalf("Get(...): unexpected error: %v", err) - } + t.Parallel() - if err := ts.Run(context.Background(), func(s transaction.Store) error { - return s.IndexStore().Delete(tagItem) - }); err != nil { - t.Fatalf("Put(...): unexpected error: %v", err) - } + ts := newTestStorage(t) - t.Run("mark sent", func(t *testing.T) { - report(chunk, storage.ChunkSent) - }) + var ( + tag upload.TagItem + putter internal.PutterCloserWithReference + err error + ) + + if err := ts.Run(context.Background(), func(s transaction.Store) error { + tag, err = upload.NextTag(s.IndexStore()) + return err + }); err != nil { + t.Fatalf("failed creating tag: %v", err) + } + + if err := ts.Run(context.Background(), func(s transaction.Store) error { + putter, err = upload.NewPutter(s.IndexStore(), tag.TagID) + return err + }); err != nil { + t.Fatalf("failed creating putter: %v", err) + } + + // Generate a chunk. + chunk := chunktest.GenerateTestRandomChunks(1)[0] + chunk.WithTagID(tag.TagID) + + // Store the chunk (which creates the uploadItem). + if err := ts.Run(context.Background(), func(s transaction.Store) error { + return putter.Put(context.Background(), s, chunk) + }); err != nil { + t.Fatalf("Put(...): unexpected error: %v", err) + } + + if err := ts.Run(context.Background(), func(s transaction.Store) error { + return putter.Close(s.IndexStore(), chunk.Address()) + }); err != nil { + t.Fatalf("Close(...): unexpected error: %v", err) + } + + err = ts.Run(context.Background(), func(s transaction.Store) error { + return upload.DeleteTag(s.IndexStore(), tag.TagID) }) + if !errors.Is(err, storage.ErrSessionNotOver) { + t.Fatalf("DeleteTag(...): unexpected error: %v", err) + } +} + +func TestDeleteTag(t *testing.T) { + t.Parallel() + + ts := newTestStorage(t) + + var ( + tag upload.TagItem + putter internal.PutterCloserWithReference + err error + ) - t.Run("delete tag while uploading and not sent", func(t *testing.T) { - // Generate a chunk. - chunk := chunktest.GenerateTestRandomChunks(1)[0] + if err := ts.Run(context.Background(), func(s transaction.Store) error { + tag, err = upload.NextTag(s.IndexStore()) + return err + }); err != nil { + t.Fatalf("failed creating tag: %v", err) + } + + if err := ts.Run(context.Background(), func(s transaction.Store) error { + putter, err = upload.NewPutter(s.IndexStore(), tag.TagID) + return err + }); err != nil { + t.Fatalf("failed creating putter: %v", err) + } + // Generate a chunk. + chunks := chunktest.GenerateTestRandomChunks(10) + for _, chunk := range chunks { + chunk.WithTagID(tag.TagID) // Store the chunk (which creates the uploadItem). if err := ts.Run(context.Background(), func(s transaction.Store) error { return putter.Put(context.Background(), s, chunk) }); err != nil { t.Fatalf("Put(...): unexpected error: %v", err) } + } - // Confirm the upload item exists. - ui := &upload.UploadItem{ - Address: chunk.Address(), - BatchID: chunk.Stamp().BatchID(), - } - if err := ts.Run(context.Background(), func(s transaction.Store) error { - return s.IndexStore().Get(ui) - }); err != nil { - t.Fatalf("Get(...): unexpected error: %v", err) - } + if err := ts.Run(context.Background(), func(s transaction.Store) error { + return putter.Close(s.IndexStore(), chunks[0].Address()) + }); err != nil { + t.Fatalf("Close(...): unexpected error: %v", err) + } - // Delete the tag item to simulate a user deleting it. - tagItem := &upload.TagItem{TagID: tag.TagID} - if err := ts.Run(context.Background(), func(s transaction.Store) error { - return s.IndexStore().Delete(tagItem) - }); err != nil { - t.Fatalf("Delete(...): unexpected error: %v", err) - } + err = upload.Cleanup(ts, tag.TagID) + if err != nil { + t.Fatalf("Cleanup(...): unexpected error: %v", err) + } - // Now report with a state other than ChunkSent, e.g. ChunkStored. - if err := ts.Run(context.Background(), func(s transaction.Store) error { - return upload.Report(context.Background(), s, chunk, storage.ChunkStored) - }); err != nil { - t.Fatalf("Report(...): unexpected error: %v", err) - } + err = ts.Run(context.Background(), func(s transaction.Store) error { + return upload.Report(s.IndexStore(), tag.TagID, &upload.TagUpdate{Synced: uint64(len(chunks))}) + }) + if err != nil { + t.Fatalf("DeleteTag(...): unexpected error: %v", err) + } - // Verify that the upload item was deleted (cleanup via deleteFunc). - if err := ts.Run(context.Background(), func(s transaction.Store) error { - return s.IndexStore().Get(ui) - }); !errors.Is(err, storage.ErrNotFound) { - t.Fatalf("expected uploadItem to be deleted, got error: %v", err) - } + err = ts.Run(context.Background(), func(s transaction.Store) error { + return upload.DeleteTag(s.IndexStore(), tag.TagID) }) + if err != nil { + t.Fatalf("DeleteTag(...): unexpected error: %v", err) + } + _, err = upload.TagInfo(ts.IndexStore(), tag.TagID) + if !errors.Is(err, storage.ErrNotFound) { + t.Fatalf("want: %v; have: %v", storage.ErrNotFound, err) + } } func TestNextTagID(t *testing.T) { @@ -995,7 +1067,7 @@ func TestIterate(t *testing.T) { ts := newTestStorage(t) t.Run("on empty storage does not call the callback fn", func(t *testing.T) { - err := upload.IteratePending(context.Background(), ts, func(chunk swarm.Chunk) (bool, error) { + err := upload.IteratePending(context.Background(), ts, 0, func(chunk swarm.Chunk, ts int64) (bool, error) { t.Fatal("unexpected call") return false, nil }) @@ -1036,7 +1108,7 @@ func TestIterate(t *testing.T) { var count int - err = upload.IteratePending(context.Background(), ts, func(chunk swarm.Chunk) (bool, error) { + err = upload.IteratePending(context.Background(), ts, 0, func(chunk swarm.Chunk, ts int64) (bool, error) { count++ if !chunk.Equal(chunk1) && !chunk.Equal(chunk2) { return true, fmt.Errorf("unknown chunk %s", chunk.Address()) @@ -1051,16 +1123,14 @@ func TestIterate(t *testing.T) { t.Fatalf("expected to iterate 0 chunks, got: %v", count) } - err = ts.Run(context.Background(), func(s transaction.Store) error { return putter.Close(s.IndexStore(), swarm.ZeroAddress) }) - if err != nil { + if err := ts.Run(context.Background(), func(s transaction.Store) error { + return putter.Close(s.IndexStore(), swarm.ZeroAddress) + }); err != nil { t.Fatalf("Close(...) error: %v", err) } - err = upload.IteratePending(context.Background(), ts, func(chunk swarm.Chunk) (bool, error) { + err = upload.IteratePending(context.Background(), ts, 0, func(chunk swarm.Chunk, ts int64) (bool, error) { count++ - if !chunk.Equal(chunk1) && !chunk.Equal(chunk2) { - return true, fmt.Errorf("unknown chunk %s", chunk.Address()) - } return false, nil }) if err != nil { @@ -1068,39 +1138,11 @@ func TestIterate(t *testing.T) { } if count != 2 { - t.Fatalf("expected to iterate two chunks, got: %v", count) + t.Fatalf("expected to iterate 0 chunks, got: %v", count) } }) } -func TestDeleteTag(t *testing.T) { - t.Parallel() - - ts := newTestStorage(t) - - var tag upload.TagItem - var err error - err = ts.Run(context.Background(), func(s transaction.Store) error { - tag, err = upload.NextTag(s.IndexStore()) - return err - }) - if err != nil { - t.Fatalf("failed creating tag: %v", err) - } - - err = ts.Run(context.Background(), func(s transaction.Store) error { - return upload.DeleteTag(s.IndexStore(), tag.TagID) - }) - if err != nil { - t.Fatalf("upload.DeleteTag(): unexpected error: %v", err) - } - - _, err = upload.TagInfo(ts.IndexStore(), tag.TagID) - if !errors.Is(err, storage.ErrNotFound) { - t.Fatalf("want: %v; have: %v", storage.ErrNotFound, err) - } -} - func TestBatchIDForChunk(t *testing.T) { t.Parallel() @@ -1173,13 +1215,13 @@ func TestCleanup(t *testing.T) { t.Fatal("session.Put(...): unexpected error", err) } - err = putter.Cleanup(ts) + err = upload.Cleanup(ts, tag.TagID) if err != nil { t.Fatal("upload.Cleanup(...): unexpected error", err) } count := 0 - _ = upload.IteratePending(context.Background(), ts, func(chunk swarm.Chunk) (bool, error) { + _ = upload.IteratePending(context.Background(), ts, 0, func(chunk swarm.Chunk, ts int64) (bool, error) { count++ return false, nil }) @@ -1228,7 +1270,7 @@ func TestCleanup(t *testing.T) { } count := 0 - _ = upload.IteratePending(context.Background(), ts, func(chunk swarm.Chunk) (bool, error) { + _ = upload.IteratePending(context.Background(), ts, 0, func(chunk swarm.Chunk, ts int64) (bool, error) { count++ return false, nil }) diff --git a/pkg/storer/migration/step_05_test.go b/pkg/storer/migration/step_05_test.go index aeacd310f3e..e1555c2f577 100644 --- a/pkg/storer/migration/step_05_test.go +++ b/pkg/storer/migration/step_05_test.go @@ -90,6 +90,8 @@ func Test_Step_05(t *testing.T) { t.Fatalf("put chunk: %v", err) } + wantCount(t, store.IndexStore(), 10) + err = store.Run(ctx, func(s transaction.Store) error { return putter.Close(s.IndexStore(), swarm.RandAddress(t)) }) @@ -97,8 +99,6 @@ func Test_Step_05(t *testing.T) { t.Fatalf("close putter: %v", err) } - wantCount(t, store.IndexStore(), 10) - err = localmigration.Step_05(store, log.Noop)() if err != nil { t.Fatalf("step 05: %v", err) diff --git a/pkg/storer/netstore.go b/pkg/storer/netstore.go index 1a192114920..c96b7358563 100644 --- a/pkg/storer/netstore.go +++ b/pkg/storer/netstore.go @@ -20,17 +20,14 @@ import ( // DirectUpload is the implementation of the NetStore.DirectUpload method. func (db *DB) DirectUpload() PutterSession { - // egCtx will allow early exit of Put operations if we have - // already encountered error. - eg, egCtx := errgroup.WithContext(context.Background()) + + eg := errgroup.Group{} + eg.SetLimit(pusher.ConcurrentPushes) return &putterSession{ Putter: putterWithMetrics{ storage.PutterFunc(func(ctx context.Context, ch swarm.Chunk) error { - db.directUploadLimiter <- struct{}{} eg.Go(func() (err error) { - defer func() { <-db.directUploadLimiter }() - span, logger, ctx := db.tracer.FollowSpanFromContext(ctx, "put-direct-upload", db.logger) defer func() { if err != nil { @@ -39,34 +36,29 @@ func (db *DB) DirectUpload() PutterSession { span.Finish() }() - for { - op := &pusher.Op{Chunk: ch, Err: make(chan error, 1), Direct: true, Span: span} + op := &pusher.Op{Chunk: ch, Err: make(chan error, 1), Direct: true, Span: span} + select { + case <-ctx.Done(): + return ctx.Err() + case <-db.quit: + return ErrDBQuit + case db.pusherFeed <- op: select { case <-ctx.Done(): return ctx.Err() - case <-egCtx.Done(): - return egCtx.Err() case <-db.quit: return ErrDBQuit - case db.pusherFeed <- op: - select { - case <-ctx.Done(): - return ctx.Err() - case <-egCtx.Done(): - return egCtx.Err() - case <-db.quit: - return ErrDBQuit - case err := <-op.Err: - if errors.Is(err, pushsync.ErrShallowReceipt) { - logger.Debug("direct upload: shallow receipt received, retrying", "chunk", ch.Address()) - } else if errors.Is(err, topology.ErrNotFound) { - logger.Debug("direct upload: no peers available, retrying", "chunk", ch.Address()) - } else { - return err - } + case err := <-op.Err: + if errors.Is(err, pushsync.ErrShallowReceipt) { + logger.Debug("direct upload: shallow receipt received, retrying", "chunk", ch.Address()) + } else if errors.Is(err, topology.ErrNotFound) { + logger.Debug("direct upload: no peers available, retrying", "chunk", ch.Address()) + } else { + return err } } } + return nil }) return nil }), diff --git a/pkg/storer/netstore_test.go b/pkg/storer/netstore_test.go index c32f3f9252b..e4441a34d77 100644 --- a/pkg/storer/netstore_test.go +++ b/pkg/storer/netstore_test.go @@ -198,7 +198,7 @@ func testNetStore(t *testing.T, newStorer func(r retrieval.Interface) (*storer.D t.Fatal(err) } - count := 3 + count := 1 go func() { for op := range lstore.PusherFeed() { if !op.Chunk.Equal(chunk) { diff --git a/pkg/storer/pinstore.go b/pkg/storer/pinstore.go index c57580b71e6..c27639d0683 100644 --- a/pkg/storer/pinstore.go +++ b/pkg/storer/pinstore.go @@ -16,10 +16,12 @@ import ( "github.com/ethersphere/bee/v2/pkg/swarm" ) +const pinningDoneLock = "pinning-done" + // NewCollection is the implementation of the PinStore.NewCollection method. func (db *DB) NewCollection(ctx context.Context) (PutterSession, error) { var ( - pinningPutter internal.PutterCloserWithReference + pinningPutter internal.PutterCloserCleanerWithReference err error ) err = db.storage.Run(ctx, func(store transaction.Store) error { @@ -37,8 +39,9 @@ func (db *DB) NewCollection(ctx context.Context) (PutterSession, error) { Putter: putterWithMetrics{ storage.PutterFunc( func(ctx context.Context, chunk swarm.Chunk) error { - unlock := db.Lock(uploadsLock) + unlock := db.Lock(addrKey(chunk.Address())) // protect against multiple uploads of same chunk defer unlock() + return db.storage.Run(ctx, func(s transaction.Store) error { return pinningPutter.Put(ctx, s, chunk) }) @@ -48,15 +51,17 @@ func (db *DB) NewCollection(ctx context.Context) (PutterSession, error) { "pinstore", }, done: func(address swarm.Address) error { - unlock := db.Lock(uploadsLock) + unlock := db.Lock(pinningDoneLock) defer unlock() + return db.storage.Run(ctx, func(s transaction.Store) error { return pinningPutter.Close(s.IndexStore(), address) }) }, cleanup: func() error { - unlock := db.Lock(uploadsLock) + unlock := db.Lock(pinningDoneLock) defer unlock() + return pinningPutter.Cleanup(db.storage) }, }, nil @@ -74,7 +79,7 @@ func (db *DB) DeletePin(ctx context.Context, root swarm.Address) (err error) { } }() - unlock := db.Lock(uploadsLock) + unlock := db.Lock(pinningDoneLock) defer unlock() return pinstore.DeletePin(ctx, db.storage, root) diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index 32223e401de..05d90b89e39 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -364,8 +364,6 @@ func initDiskRepository( return transaction.NewStorage(sharky, store), pinIntegrity, closer(store, sharky, recoveryCloser), nil } -const lockKeyNewSession string = "new_session" - // Options provides a container to configure different things in the storer. type Options struct { // These are options related to levelDB. Currently, the underlying storage used is levelDB. @@ -417,23 +415,34 @@ type cacheLimiter struct { cancel context.CancelFunc } +type chunkBatch struct { + address swarm.Address + batchID []byte +} + +type tagCache struct { + sync.Mutex + updates map[uint64]*upload.TagUpdate + synced []chunkBatch + wakeup chan struct{} +} + // DB implements all the component stores described above. type DB struct { logger log.Logger tracer *tracing.Tracer - metrics metrics - storage transaction.Storage - multex *multex.Multex - cacheObj *cache.Cache - retrieval retrieval.Interface - pusherFeed chan *pusher.Op - quit chan struct{} - cacheLimiter cacheLimiter - dbCloser io.Closer - subscriptionsWG sync.WaitGroup - events *events.Subscriber - directUploadLimiter chan struct{} + metrics metrics + storage transaction.Storage + multex *multex.Multex + cacheObj *cache.Cache + retrieval retrieval.Interface + pusherFeed chan *pusher.Op + quit chan struct{} + cacheLimiter cacheLimiter + dbCloser io.Closer + subscriptionsWG sync.WaitGroup + events *events.Subscriber reserve *reserve.Reserve inFlight sync.WaitGroup @@ -446,6 +455,8 @@ type DB struct { reserveOptions reserveOpts pinIntegrity *PinIntegrity + + tagCache *tagCache } type reserveOpts struct { @@ -549,8 +560,11 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { minimumRadius: uint8(opts.MinimumStorageRadius), capacityDoubling: opts.ReserveCapacityDoubling, }, - directUploadLimiter: make(chan struct{}, pusher.ConcurrentPushes), - pinIntegrity: pinIntegrity, + pinIntegrity: pinIntegrity, + tagCache: &tagCache{ + updates: make(map[uint64]*upload.TagUpdate), + wakeup: make(chan struct{}, 1), + }, } if db.validStamp == nil { @@ -588,6 +602,9 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { db.inFlight.Add(1) go db.cacheWorker(ctx) + db.inFlight.Add(1) + go db.reportWorker(ctx) + return db, nil } @@ -608,9 +625,9 @@ func (db *DB) Metrics() []prometheus.Collector { func (db *DB) Close() error { close(db.quit) - bgReserveWorkersClosed := make(chan struct{}) + bgWorkersClosed := make(chan struct{}) go func() { - defer close(bgReserveWorkersClosed) + defer close(bgWorkersClosed) if !syncutil.WaitWithTimeout(&db.inFlight, 5*time.Second) { db.logger.Warning("db shutting down with running goroutines") } @@ -637,7 +654,7 @@ func (db *DB) Close() error { defer close(done) <-closerDone <-bgCacheWorkersClosed - <-bgReserveWorkersClosed + <-bgWorkersClosed }() select { diff --git a/pkg/storer/subscribe_push.go b/pkg/storer/subscribe_push.go index bd34956b387..b6125795d0a 100644 --- a/pkg/storer/subscribe_push.go +++ b/pkg/storer/subscribe_push.go @@ -33,11 +33,14 @@ func (db *DB) SubscribePush(ctx context.Context) (<-chan swarm.Chunk, func()) { // close the returned chunkInfo channel at the end to // signal that the subscription is done defer close(chunks) + + var last int64 = 0 for { - err := upload.IteratePending(ctx, db.storage, func(chunk swarm.Chunk) (bool, error) { + err := upload.IteratePending(ctx, db.storage, last, func(chunk swarm.Chunk, ts int64) (bool, error) { select { case chunks <- chunk: + last = ts return false, nil case <-stopChan: // gracefully stop the iteration diff --git a/pkg/storer/uploadstore.go b/pkg/storer/uploadstore.go index f4f21fc59ba..fa66f206671 100644 --- a/pkg/storer/uploadstore.go +++ b/pkg/storer/uploadstore.go @@ -8,7 +8,9 @@ import ( "context" "errors" "fmt" + "runtime" "sort" + "time" storage "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storer/internal" @@ -16,27 +18,116 @@ import ( "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" "github.com/ethersphere/bee/v2/pkg/storer/internal/upload" "github.com/ethersphere/bee/v2/pkg/swarm" + "golang.org/x/sync/errgroup" ) -const uploadsLock = "pin-upload-store" +func addrKey(ch swarm.Address) string { return fmt.Sprintf("upload-chunk-%s", ch) } +func sessionKey(tag uint64) string { return fmt.Sprintf("upload-session-%d", tag) } + +const lockKeyNewSession string = "new_session" +const reportedEvent = "reportEnd" // Report implements the storage.PushReporter by wrapping the internal reporter // with a transaction. func (db *DB) Report(ctx context.Context, chunk swarm.Chunk, state storage.ChunkState) error { - unlock := db.Lock(uploadsLock) - defer unlock() + db.tagCache.Lock() + defer db.tagCache.Unlock() - err := db.storage.Run(ctx, func(s transaction.Store) error { - return upload.Report(ctx, s, chunk, state) - }) - if err != nil { - return fmt.Errorf("reporter.Report: %w", err) + update, ok := db.tagCache.updates[chunk.TagID()] + if !ok { + update = &upload.TagUpdate{} + db.tagCache.updates[chunk.TagID()] = update + } + + switch state { + case storage.ChunkSent: + update.Sent++ + case storage.ChunkStored: + update.Stored++ + fallthrough + case storage.ChunkSynced: + update.Synced++ + db.tagCache.synced = append(db.tagCache.synced, chunkBatch{chunk.Address(), chunk.Stamp().BatchID()}) + } + + select { + case db.tagCache.wakeup <- struct{}{}: + default: } return nil } +func (db *DB) reportWorker(ctx context.Context) { + + defer db.inFlight.Done() + + for { + select { + case <-ctx.Done(): + return + case <-db.quit: + return + case <-db.tagCache.wakeup: + + db.tagCache.Lock() + for tag, t := range db.tagCache.updates { + + unlock := db.Lock(sessionKey(tag)) + + if err := db.storage.Run(ctx, func(s transaction.Store) error { + return upload.Report(s.IndexStore(), tag, t) + }); err != nil { + db.logger.Debug("report failed", "error", err) + } + + unlock() + } + + //reset + db.tagCache.updates = make(map[uint64]*upload.TagUpdate) + + synced := db.tagCache.synced[:] + + db.tagCache.Unlock() + + eg := errgroup.Group{} + eg.SetLimit(runtime.NumCPU()) + + for _, s := range synced { + eg.Go(func() error { + unlock := db.Lock(addrKey(s.address)) + defer unlock() + + return db.storage.Run(ctx, func(st transaction.Store) error { + return upload.Synced(st, s.address, s.batchID) + }) + }) + } + + db.tagCache.Lock() + db.tagCache.synced = db.tagCache.synced[len(synced):] + db.tagCache.Unlock() + + if err := eg.Wait(); err != nil { + db.logger.Debug("sync cleanup failed", "error", err) + } + + db.events.Trigger(reportedEvent) + + // slowdown + select { + case <-ctx.Done(): + return + case <-db.quit: + return + case <-time.After(time.Second * 5): + } + } + } +} + // Upload is the implementation of UploadStore.Upload method. func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession, error) { if tagID == 0 { @@ -45,7 +136,7 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession var ( uploadPutter internal.PutterCloserWithReference - pinningPutter internal.PutterCloserWithReference + pinningPutter internal.PutterCloserCleanerWithReference err error ) @@ -71,61 +162,65 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession return &putterSession{ Putter: putterWithMetrics{ storage.PutterFunc(func(ctx context.Context, chunk swarm.Chunk) error { - unlock := db.Lock(uploadsLock) + unlock := db.Lock(addrKey(chunk.Address())) // protect against multiple uploads of same chunk defer unlock() - return errors.Join( - db.storage.Run(ctx, func(s transaction.Store) error { - return uploadPutter.Put(ctx, s, chunk) - }), - func() error { - if pinningPutter != nil { - return db.storage.Run(ctx, func(s transaction.Store) error { - return pinningPutter.Put(ctx, s, chunk) - }) - } - return nil - }(), - ) + + if err := db.storage.Run(ctx, func(s transaction.Store) error { + return uploadPutter.Put(ctx, s, chunk) + }); err != nil { + return err + } + + if pinningPutter != nil { + return db.storage.Run(ctx, func(s transaction.Store) error { + return pinningPutter.Put(ctx, s, chunk) + }) + } + return nil }), db.metrics, "uploadstore", }, done: func(address swarm.Address) error { defer db.events.Trigger(subscribePushEventKey) - unlock := db.Lock(uploadsLock) + unlock := db.Lock(sessionKey(tagID)) defer unlock() - return errors.Join( - db.storage.Run(ctx, func(s transaction.Store) error { - return uploadPutter.Close(s.IndexStore(), address) - }), - func() error { - if pinningPutter != nil { - return db.storage.Run(ctx, func(s transaction.Store) error { - pinErr := pinningPutter.Close(s.IndexStore(), address) - if errors.Is(pinErr, pinstore.ErrDuplicatePinCollection) { - pinErr = pinningPutter.Cleanup(db.storage) - } - return pinErr - }) - } - return nil - }(), - ) + if err := db.storage.Run(ctx, func(s transaction.Store) error { + return uploadPutter.Close(s.IndexStore(), address) + }); err != nil { + return err + } + if pinningPutter != nil { + + unlock := db.Lock(pinningDoneLock) + defer unlock() + + err := db.storage.Run(ctx, func(s transaction.Store) error { + return pinningPutter.Close(s.IndexStore(), address) + }) + if errors.Is(err, pinstore.ErrDuplicatePinCollection) { + return pinningPutter.Cleanup(db.storage) + } + return err + } + return nil }, cleanup: func() error { defer db.events.Trigger(subscribePushEventKey) - unlock := db.Lock(uploadsLock) + unlock := db.Lock(sessionKey(tagID)) defer unlock() - return errors.Join( - uploadPutter.Cleanup(db.storage), - func() error { - if pinningPutter != nil { - return pinningPutter.Cleanup(db.storage) - } - return nil - }(), - ) + + if err := upload.Cleanup(db.storage, tagID); err != nil { + return err + } + if pinningPutter != nil { + unlock := db.Lock(pinningDoneLock) + defer unlock() + + return pinningPutter.Cleanup(db.storage) + } + return nil }, }, nil } @@ -152,6 +247,8 @@ func (db *DB) Session(tagID uint64) (SessionInfo, error) { // DeleteSession is the implementation of the UploadStore.DeleteSession method. func (db *DB) DeleteSession(tagID uint64) error { + unlock := db.Lock(sessionKey(tagID)) + defer unlock() return db.storage.Run(context.Background(), func(s transaction.Store) error { return upload.DeleteTag(s.IndexStore(), tagID) }) diff --git a/pkg/storer/uploadstore_test.go b/pkg/storer/uploadstore_test.go index 095904ade7d..f38ea207ee7 100644 --- a/pkg/storer/uploadstore_test.go +++ b/pkg/storer/uploadstore_test.go @@ -115,6 +115,7 @@ func testUploadStore(t *testing.T, newStorer func() (*storer.DB, error)) { } for _, ch := range tc.chunks { + ch.WithTagID(tag.TagID) err := session.Put(context.TODO(), ch) if err != nil { t.Fatalf("session.Put(...): unexpected error: %v", err) @@ -344,14 +345,12 @@ func TestUploadStore(t *testing.T) { t.Run("inmem", func(t *testing.T) { t.Parallel() - testUploadStore(t, func() (*storer.DB, error) { return storer.New(context.Background(), "", dbTestOps(swarm.RandAddress(t), 0, nil, nil, time.Second)) }) }) t.Run("disk", func(t *testing.T) { t.Parallel() - testUploadStore(t, diskStorer(t, dbTestOps(swarm.RandAddress(t), 0, nil, nil, time.Second))) }) } @@ -377,6 +376,7 @@ func testReporter(t *testing.T, newStorer func() (*storer.DB, error)) { } for _, ch := range chunks { + ch.WithTagID(session.TagID) err = putter.Put(context.Background(), ch) if err != nil { t.Fatal(err) @@ -390,42 +390,43 @@ func testReporter(t *testing.T, newStorer func() (*storer.DB, error)) { t.Fatal(err) } - t.Run("report", func(t *testing.T) { - t.Run("commit", func(t *testing.T) { - err := lstore.Report(context.Background(), chunks[0], storage.ChunkSynced) - if err != nil { - t.Fatalf("Report(...): unexpected error %v", err) - } + s, unsub := lstore.Events().Subscribe("reportEnd") + t.Cleanup(unsub) - wantTI := storer.SessionInfo{ - TagID: session.TagID, - Split: 3, - Seen: 0, - Sent: 0, - Synced: 1, - Stored: 0, - StartedAt: session.StartedAt, - Address: root.Address(), - } + err = lstore.Report(context.Background(), chunks[0], storage.ChunkSynced) + if err != nil { + t.Fatalf("Report(...): unexpected error %v", err) + } - gotTI, err := lstore.Session(session.TagID) - if err != nil { - t.Fatalf("Session(...): unexpected error: %v", err) - } + <-s + + wantTI := storer.SessionInfo{ + TagID: session.TagID, + Split: 3, + Seen: 0, + Sent: 0, + Synced: 1, + Stored: 0, + StartedAt: session.StartedAt, + Address: root.Address(), + } - if diff := cmp.Diff(wantTI, gotTI); diff != "" { - t.Fatalf("unexpected tag item (-want +have):\n%s", diff) - } + gotTI, err := lstore.Session(session.TagID) + if err != nil { + t.Fatalf("Session(...): unexpected error: %v", err) + } - has, err := lstore.Storage().ChunkStore().Has(context.Background(), chunks[0].Address()) - if err != nil { - t.Fatalf("ChunkStore.Has(...): unexpected error: %v", err) - } - if !has { - t.Fatalf("expected chunk %s to not be found", chunks[0].Address()) - } - }) - }) + if diff := cmp.Diff(wantTI, gotTI); diff != "" { + t.Fatalf("unexpected tag item (-want +have):\n%s", diff) + } + + has, err := lstore.Storage().ChunkStore().Has(context.Background(), chunks[0].Address()) + if err != nil { + t.Fatalf("ChunkStore.Has(...): unexpected error: %v", err) + } + if !has { + t.Fatalf("expected chunk %s to not be found", chunks[0].Address()) + } } func TestReporter(t *testing.T) { diff --git a/pkg/swarm/swarm.go b/pkg/swarm/swarm.go index 1b2e83b2ed7..96ae147beb5 100644 --- a/pkg/swarm/swarm.go +++ b/pkg/swarm/swarm.go @@ -170,9 +170,9 @@ type Chunk interface { // Data returns the chunk data. Data() []byte // TagID returns the tag ID for this chunk. - TagID() uint32 + TagID() uint64 // WithTagID attaches the tag ID to the chunk. - WithTagID(t uint32) Chunk + WithTagID(t uint64) Chunk // Stamp returns the postage stamp associated with this chunk. Stamp() Stamp // WithStamp attaches a postage stamp to the chunk. @@ -226,7 +226,7 @@ type Stamp interface { type chunk struct { addr Address sdata []byte - tagID uint32 + tagID uint64 stamp Stamp depth uint8 bucketDepth uint8 @@ -240,7 +240,7 @@ func NewChunk(addr Address, data []byte) Chunk { } } -func (c *chunk) WithTagID(t uint32) Chunk { +func (c *chunk) WithTagID(t uint64) Chunk { c.tagID = t return c } @@ -265,7 +265,7 @@ func (c *chunk) Data() []byte { return c.sdata } -func (c *chunk) TagID() uint32 { +func (c *chunk) TagID() uint64 { return c.tagID }