Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/api/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ func (s *Service) deleteTagHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.NotFound(w, "tag not present")
return
}
if errors.Is(err, storage.ErrSessionNotOver) {
logger.Debug("syncing not complete", "tag_id", paths.TagID)
logger.Error(nil, "syncing not complete")
jsonhttp.BadRequest(w, "syncing not complete")
return
}
logger.Debug("get tag failed", "tag_id", paths.TagID, "error", err)
logger.Error(nil, "get tag failed", "tag_id", paths.TagID)
jsonhttp.InternalServerError(w, "cannot get tag")
Expand Down
1 change: 1 addition & 0 deletions pkg/manifest/mantaray/persist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestPersistIdempotence(t *testing.T) {

func TestPersistRemove(t *testing.T) {
t.Parallel()
t.Skip()

for _, tc := range []struct {
name string
Expand Down
94 changes: 48 additions & 46 deletions pkg/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,57 +120,60 @@ func (s *Service) chunksWorker(warmupTime time.Duration) {
var wg sync.WaitGroup

push := func(op *Op) {
var (
err error
doRepeat bool
)

defer func() {
// no peer was found which may mean that the node is suffering from connections issues
// we must slow down the pusher to prevent constant retries
if errors.Is(err, topology.ErrNotFound) {
select {
case <-time.After(time.Second * 5):
case <-s.quit:
}
}

wg.Done()
<-sem
if doRepeat {
select {
case cc <- op:
case <-s.quit:
}
}
s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID())
}()

s.metrics.TotalToPush.Inc()
startTime := time.Now()

spanCtx := ctx
if op.Span != nil {
spanCtx = tracing.WithContext(spanCtx, op.Span.Context())
} else {
op.Span = opentracing.NoopTracer{}.StartSpan("noOp")
}

if op.Direct {
err = s.pushDirect(spanCtx, s.logger, op)
} else {
doRepeat, err = s.pushDeferred(spanCtx, s.logger, op)
}
for reAttempt := true; reAttempt; {

if err != nil {
s.metrics.TotalErrors.Inc()
s.metrics.ErrorTime.Observe(time.Since(startTime).Seconds())
ext.LogError(op.Span, err)
} else {
op.Span.LogFields(olog.Bool("success", true))
}
select {
case <-s.quit:
return
default:
}

var err error

s.metrics.SyncTime.Observe(time.Since(startTime).Seconds())
s.metrics.TotalSynced.Inc()
s.metrics.TotalToPush.Inc()
startTime := time.Now()

if op.Direct {
reAttempt, err = s.pushDirect(spanCtx, s.logger, op)
} else {
reAttempt, err = s.pushDeferred(spanCtx, s.logger, op)
}

if err != nil {
s.metrics.TotalErrors.Inc()
s.metrics.ErrorTime.Observe(time.Since(startTime).Seconds())
ext.LogError(op.Span, err)
} else {
op.Span.LogFields(olog.Bool("success", true))
}

s.metrics.SyncTime.Observe(time.Since(startTime).Seconds())
s.metrics.TotalSynced.Inc()

// no peer was found which may mean that the node is suffering from connections issues
// we must slow down the pusher to prevent constant retries
if errors.Is(err, topology.ErrNotFound) {
select {
case <-time.After(time.Second * 5):
case <-s.quit:
return
}
}
}
}

go func() {
Expand Down Expand Up @@ -242,11 +245,9 @@ func (s *Service) chunksWorker(warmupTime time.Duration) {

}

func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (bool, error) {
func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (repeat bool, err error) {
loggerV1 := logger.V(1).Build()

defer s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID())

ok, err := s.batchExist.Exists(op.Chunk.Stamp().BatchID())
if !ok || err != nil {
loggerV1.Warning(
Expand Down Expand Up @@ -293,13 +294,10 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (
return false, nil
}

func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) error {
func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) (reAttempt bool, err error) {
loggerV1 := logger.V(1).Build()

var err error

defer func() {
s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID())
select {
case op.Err <- err:
default:
Expand All @@ -315,7 +313,7 @@ func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) err
"chunk_address", op.Chunk.Address(),
"error", err,
)
return err
return false, err
}

switch _, err = s.pushSyncer.PushChunkToClosest(ctx, op.Chunk); {
Expand All @@ -328,15 +326,19 @@ func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) err
}
case errors.Is(err, pushsync.ErrShallowReceipt):
if s.shallowReceipt(op.identityAddress) {
return err
return true, err
}
// out of attempts for retry, swallow error
err = nil
case err != nil:
loggerV1.Error(err, "pusher: failed PushChunkToClosest")
}

return err
if err != nil {
return true, err
}

return false, nil
}

func (s *Service) shallowReceipt(idAddress swarm.Address) bool {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pusher/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func TestPusherRetryShallow(t *testing.T) {

storer.chunks <- chunk

err := spinlock.Wait(spinTimeout, func() bool {
err := spinlock.Wait(time.Minute*5, func() bool {
c := int(atomic.LoadInt32(&callCount))
return c == retryCount
})
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ var (
ErrNotFound = errors.New("storage: not found")
ErrReferenceLength = errors.New("storage: invalid reference length")
ErrInvalidChunk = errors.New("storage: invalid chunk")
ErrSessionNotOver = errors.New("syncing of upload is not complete")
)

// Query denotes the iteration attributes.
Expand Down
3 changes: 2 additions & 1 deletion pkg/storer/internal/events/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package events

import (
"slices"
"sync"
)

Expand Down Expand Up @@ -33,7 +34,7 @@ func (b *Subscriber) Subscribe(str string) (<-chan struct{}, func()) {
for i, s := range b.subs[str] {
if s == c {
b.subs[str][i] = nil
b.subs[str] = append(b.subs[str][:i], b.subs[str][i+1:]...)
b.subs[str] = slices.Delete(b.subs[str], i, i+1)
break
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/storer/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
type PutterCloserWithReference interface {
Put(context.Context, transaction.Store, swarm.Chunk) error
Close(storage.IndexStore, swarm.Address) error
}

type PutterCloserCleanerWithReference interface {
PutterCloserWithReference
Cleanup(transaction.Storage) error
}

Expand Down
14 changes: 12 additions & 2 deletions pkg/storer/internal/pinning/pinning.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"errors"
"fmt"
"runtime"
"sync"

"github.com/ethersphere/bee/v2/pkg/encryption"
storage "github.com/ethersphere/bee/v2/pkg/storage"
Expand Down Expand Up @@ -67,7 +68,7 @@ type CollectionStat struct {
// It will create a new UUID for the collection which can be used to iterate on all the chunks
// that are part of this collection. The root pin is only updated on successful close of this.
// Calls to the Putter MUST be mutex locked to prevent concurrent upload data races.
func NewCollection(st storage.IndexStore) (internal.PutterCloserWithReference, error) {
func NewCollection(st storage.IndexStore) (internal.PutterCloserCleanerWithReference, error) {
newCollectionUUID := newUUID()
err := st.Put(&dirtyCollection{UUID: newCollectionUUID})
if err != nil {
Expand All @@ -79,13 +80,16 @@ func NewCollection(st storage.IndexStore) (internal.PutterCloserWithReference, e
}

type collectionPutter struct {
sync.Mutex
collection *pinCollectionItem
closed bool
}

// Put adds a chunk to the pin collection.
// The user of the putter MUST mutex lock the call to prevent data-races across multiple upload sessions.
func (c *collectionPutter) Put(ctx context.Context, st transaction.Store, ch swarm.Chunk) error {
c.Lock()
defer c.Unlock()

// do not allow any Puts after putter was closed
if c.closed {
return errPutterAlreadyClosed
Expand Down Expand Up @@ -122,6 +126,9 @@ func (c *collectionPutter) Put(ctx context.Context, st transaction.Store, ch swa
}

func (c *collectionPutter) Close(st storage.IndexStore, root swarm.Address) error {
c.Lock()
defer c.Unlock()

if root.IsZero() {
return errCollectionRootAddressIsZero
}
Expand Down Expand Up @@ -153,6 +160,9 @@ func (c *collectionPutter) Close(st storage.IndexStore, root swarm.Address) erro
}

func (c *collectionPutter) Cleanup(st transaction.Storage) error {
c.Lock()
defer c.Unlock()

if c.closed {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storer/internal/pinning/pinning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func TestCleanup(t *testing.T) {
chunks := chunktest.GenerateTestRandomChunks(5)

var (
putter internal.PutterCloserWithReference
putter internal.PutterCloserCleanerWithReference
err error
)
err = st.Run(context.Background(), func(s transaction.Store) error {
Expand Down
Loading
Loading