Skip to content

Commit aa4ec60

Browse files
authored
perf(uploadstore): use deletewithstamp instead and tag deletion case … (#5032)
1 parent fb62fa4 commit aa4ec60

File tree

2 files changed

+16
-47
lines changed

2 files changed

+16
-47
lines changed

pkg/storer/internal/upload/uploadstore.go

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,8 @@ func (u *uploadPutter) Close(s storage.IndexStore, addr swarm.Address) error {
459459
// If the tag is not found, it might have been removed or never existed.
460460
// In this case, there’s no need to update or delete it—so simply return.
461461
if errors.Is(err, storage.ErrNotFound) {
462-
return nil
462+
u.closed = true
463+
return s.Delete(&dirtyTagItem{TagID: u.tagID})
463464
}
464465
return fmt.Errorf("failed reading tag while closing: %w", err)
465466
}
@@ -565,61 +566,42 @@ func CleanupDirty(st transaction.Storage) error {
565566
// Report is the implementation of the PushReporter interface.
566567
func Report(ctx context.Context, st transaction.Store, chunk swarm.Chunk, state storage.ChunkState) error {
567568

568-
ui := &uploadItem{Address: chunk.Address(), BatchID: chunk.Stamp().BatchID()}
569-
570569
indexStore := st.IndexStore()
571570

571+
ui := &uploadItem{Address: chunk.Address(), BatchID: chunk.Stamp().BatchID()}
572+
err := indexStore.Get(ui)
573+
if err != nil {
574+
// because of the nature of the feed mechanism of the uploadstore/pusher, a chunk that is in inflight may be sent more than once to the pusher.
575+
// this is because the chunks are removed from the queue only when they are synced, not at the start of the upload
576+
if errors.Is(err, storage.ErrNotFound) {
577+
return nil
578+
}
579+
return fmt.Errorf("failed to read uploadItem %x: %w", ui.BatchID, err)
580+
}
581+
572582
// Once the chunk is stored/synced/failed to sync, it is deleted from the upload store as
573583
// we no longer need to keep track of this chunk. We also need to cleanup
574584
// the pushItem.
575585
deleteFunc := func() error {
576586
if state == storage.ChunkSent {
577587
return nil
578588
}
579-
580-
pi := &pushItem{
581-
Timestamp: ui.Uploaded,
582-
Address: chunk.Address(),
583-
BatchID: chunk.Stamp().BatchID(),
584-
}
585-
586589
return errors.Join(
587-
indexStore.Delete(pi),
588-
chunkstamp.Delete(indexStore, uploadScope, pi.Address, pi.BatchID),
590+
indexStore.Delete(&pushItem{Timestamp: ui.Uploaded, Address: chunk.Address(), BatchID: chunk.Stamp().BatchID()}),
591+
chunkstamp.DeleteWithStamp(indexStore, uploadScope, chunk.Address(), chunk.Stamp()),
589592
st.ChunkStore().Delete(ctx, chunk.Address()),
590593
indexStore.Delete(ui),
591594
)
592595
}
593596

594-
err := indexStore.Get(ui)
595-
if err != nil {
596-
// because of the nature of the feed mechanism of the uploadstore/pusher, a chunk that is in inflight may be sent more than once to the pusher.
597-
// this is because the chunks are removed from the queue only when they are synced, not at the start of the upload
598-
if errors.Is(err, storage.ErrNotFound) {
599-
return nil
600-
}
601-
602-
return fmt.Errorf("failed to read uploadItem %x: %w", ui.BatchID, err)
603-
}
604-
605-
// tag is missing
606-
if ui.TagID == 0 {
607-
return deleteFunc()
608-
}
609-
610597
ti := &TagItem{TagID: ui.TagID}
611598
err = indexStore.Get(ti)
612599
if err != nil {
613600
if !errors.Is(err, storage.ErrNotFound) {
614601
return fmt.Errorf("failed getting tag: %w", err)
615602
}
616603

617-
ui.TagID = 0
618-
err = indexStore.Put(ui)
619-
if err != nil {
620-
return fmt.Errorf("failed updating empty tag for chunk: %w", err)
621-
}
622-
604+
// tag is missing, no need update it
623605
return deleteFunc()
624606
}
625607

pkg/storer/internal/upload/uploadstore_test.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -878,19 +878,6 @@ func TestDeleteTagReporter(t *testing.T) {
878878
t.Run("mark sent", func(t *testing.T) {
879879
report(chunk, storage.ChunkSent)
880880
})
881-
882-
t.Run("verify internal state", func(t *testing.T) {
883-
884-
ui := &upload.UploadItem{Address: chunk.Address(), BatchID: chunk.Stamp().BatchID()}
885-
err := ts.IndexStore().Get(ui)
886-
if err != nil {
887-
t.Fatalf("Report(...): unexpected error: %v", err)
888-
}
889-
890-
if diff := cmp.Diff(uint64(0), ui.TagID); diff != "" {
891-
t.Fatalf("Get(...): unexpected TagItem (-want +have):\n%s", diff)
892-
}
893-
})
894881
})
895882

896883
t.Run("delete tag while uploading and not sent", func(t *testing.T) {

0 commit comments

Comments
 (0)