Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 14 additions & 33 deletions pkg/storer/internal/upload/uploadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,61 +565,42 @@ func CleanupDirty(st transaction.Storage) error {
// Report is the implementation of the PushReporter interface.
func Report(ctx context.Context, st transaction.Store, chunk swarm.Chunk, state storage.ChunkState) error {

ui := &uploadItem{Address: chunk.Address(), BatchID: chunk.Stamp().BatchID()}

indexStore := st.IndexStore()

ui := &uploadItem{Address: chunk.Address(), BatchID: chunk.Stamp().BatchID()}
err := indexStore.Get(ui)
if err != nil {
// because of the nature of the feed mechanism of the uploadstore/pusher, a chunk that is in inflight may be sent more than once to the pusher.
// this is because the chunks are removed from the queue only when they are synced, not at the start of the upload
if errors.Is(err, storage.ErrNotFound) {
return nil
}
return fmt.Errorf("failed to read uploadItem %x: %w", ui.BatchID, err)
}

// Once the chunk is stored/synced/failed to sync, it is deleted from the upload store as
// we no longer need to keep track of this chunk. We also need to cleanup
// the pushItem.
deleteFunc := func() error {
if state == storage.ChunkSent {
return nil
}

pi := &pushItem{
Timestamp: ui.Uploaded,
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
}

return errors.Join(
indexStore.Delete(pi),
chunkstamp.Delete(indexStore, uploadScope, pi.Address, pi.BatchID),
indexStore.Delete(&pushItem{Timestamp: ui.Uploaded, Address: chunk.Address(), BatchID: chunk.Stamp().BatchID()}),
chunkstamp.DeleteWithStamp(indexStore, uploadScope, chunk.Address(), chunk.Stamp()),
st.ChunkStore().Delete(ctx, chunk.Address()),
indexStore.Delete(ui),
)
}

err := indexStore.Get(ui)
if err != nil {
// because of the nature of the feed mechanism of the uploadstore/pusher, a chunk that is in inflight may be sent more than once to the pusher.
// this is because the chunks are removed from the queue only when they are synced, not at the start of the upload
if errors.Is(err, storage.ErrNotFound) {
return nil
}

return fmt.Errorf("failed to read uploadItem %x: %w", ui.BatchID, err)
}

// tag is missing
if ui.TagID == 0 {
return deleteFunc()
}

ti := &TagItem{TagID: ui.TagID}
err = indexStore.Get(ti)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return fmt.Errorf("failed getting tag: %w", err)
}

ui.TagID = 0
err = indexStore.Put(ui)
if err != nil {
return fmt.Errorf("failed updating empty tag for chunk: %w", err)
}

// tag is missing, no need update it
return deleteFunc()
}

Expand Down
13 changes: 0 additions & 13 deletions pkg/storer/internal/upload/uploadstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,19 +878,6 @@ func TestDeleteTagReporter(t *testing.T) {
t.Run("mark sent", func(t *testing.T) {
report(chunk, storage.ChunkSent)
})

t.Run("verify internal state", func(t *testing.T) {

ui := &upload.UploadItem{Address: chunk.Address(), BatchID: chunk.Stamp().BatchID()}
err := ts.IndexStore().Get(ui)
if err != nil {
t.Fatalf("Report(...): unexpected error: %v", err)
}

if diff := cmp.Diff(uint64(0), ui.TagID); diff != "" {
t.Fatalf("Get(...): unexpected TagItem (-want +have):\n%s", diff)
}
})
})

t.Run("delete tag while uploading and not sent", func(t *testing.T) {
Expand Down
Loading