Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 36 additions & 40 deletions pkg/storer/internal/upload/uploadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,10 +564,14 @@ func Report(ctx context.Context, st transaction.Store, chunk swarm.Chunk, state

indexStore := st.IndexStore()

// Once the chunk is stored/synced/failed to sync, it is deleted from the upload store as
// we no longer need to keep track of this chunk. We also need to cleanup
// the pushItem.
deleteFunc := func() error {
// Once the chunk is stored/synced/failed to sync, it is deleted from the upload store as
// we no longer need to keep track of this chunk. We also need to cleanup
// the pushItem.
if state == storage.ChunkSent {
return nil
}

pi := &pushItem{
Timestamp: ui.Uploaded,
Address: chunk.Address(),
Expand All @@ -593,49 +597,41 @@ func Report(ctx context.Context, st transaction.Store, chunk swarm.Chunk, state
return fmt.Errorf("failed to read uploadItem %x: %w", ui.BatchID, err)
}

if ui.TagID > 0 {
ti := &TagItem{TagID: ui.TagID}
err = indexStore.Get(ti)
if err != nil {
if errors.Is(err, storage.ErrNotFound) { // tag was deleted by user
if state == storage.ChunkSent {
ui.TagID = 0
err = indexStore.Put(ui)
if err != nil {
return fmt.Errorf("failed updating empty tag for chunk: %w", err)
}

return nil
}

// state != sent
return deleteFunc()

} else {
return fmt.Errorf("failed getting tag: %w", err)
}
}
switch state {
case storage.ChunkSent:
ti.Sent++
case storage.ChunkStored:
ti.Stored++
// also mark it as synced
fallthrough
case storage.ChunkSynced:
ti.Synced++
case storage.ChunkCouldNotSync:
break
// tag is missing
if ui.TagID == 0 {
return deleteFunc()
}

ti := &TagItem{TagID: ui.TagID}
err = indexStore.Get(ti)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return fmt.Errorf("failed getting tag: %w", err)
}

err = indexStore.Put(ti)
ui.TagID = 0
err = indexStore.Put(ui)
if err != nil {
return fmt.Errorf("failed updating tag: %w", err)
return fmt.Errorf("failed updating empty tag for chunk: %w", err)
}

return deleteFunc()
}

if state == storage.ChunkSent {
return nil
// update the tag
switch state {
case storage.ChunkSent:
ti.Sent++
case storage.ChunkStored:
ti.Stored++
ti.Synced++
case storage.ChunkSynced:
ti.Synced++
}

err = indexStore.Put(ti)
if err != nil {
return fmt.Errorf("failed updating tag: %w", err)
}

return deleteFunc()
Expand Down
Loading