Skip to content

Commit 2693914

Browse files
committed
fix: locks
1 parent 99c46b4 commit 2693914

File tree

3 files changed

+41
-40
lines changed

3 files changed

+41
-40
lines changed

pkg/storer/internal/pinning/pinning.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ func (c *collectionPutter) Close(st storage.IndexStore, root swarm.Address) erro
160160
}
161161

162162
func (c *collectionPutter) Cleanup(st transaction.Storage) error {
163+
c.Lock()
164+
defer c.Unlock()
165+
163166
if c.closed {
164167
return nil
165168
}

pkg/storer/pinstore.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func (db *DB) NewCollection(ctx context.Context) (PutterSession, error) {
4141
func(ctx context.Context, chunk swarm.Chunk) error {
4242
unlock := db.Lock(addrKey(chunk.Address()))
4343
defer unlock()
44+
4445
return db.storage.Run(ctx, func(s transaction.Store) error {
4546
return pinningPutter.Put(ctx, s, chunk)
4647
})
@@ -52,13 +53,15 @@ func (db *DB) NewCollection(ctx context.Context) (PutterSession, error) {
5253
done: func(address swarm.Address) error {
5354
unlock := db.Lock(uploadsLock)
5455
defer unlock()
56+
5557
return db.storage.Run(ctx, func(s transaction.Store) error {
5658
return pinningPutter.Close(s.IndexStore(), address)
5759
})
5860
},
5961
cleanup: func() error {
6062
unlock := db.Lock(uploadsLock)
6163
defer unlock()
64+
6265
return pinningPutter.Cleanup(db.storage)
6366
},
6467
}, nil

pkg/storer/uploadstore.go

Lines changed: 35 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -162,22 +162,21 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession
162162
return &putterSession{
163163
Putter: putterWithMetrics{
164164
storage.PutterFunc(func(ctx context.Context, chunk swarm.Chunk) error {
165-
unlock := db.Lock(addrKey(chunk.Address())) // protect against multiple upload of same chunk
165+
unlock := db.Lock(addrKey(chunk.Address())) // protect against multiple uploads of same chunk
166166
defer unlock()
167167

168-
return errors.Join(
169-
db.storage.Run(ctx, func(s transaction.Store) error {
170-
return uploadPutter.Put(ctx, s, chunk)
171-
}),
172-
func() error {
173-
if pinningPutter != nil {
174-
return db.storage.Run(ctx, func(s transaction.Store) error {
175-
return pinningPutter.Put(ctx, s, chunk)
176-
})
177-
}
178-
return nil
179-
}(),
180-
)
168+
if err := db.storage.Run(ctx, func(s transaction.Store) error {
169+
return uploadPutter.Put(ctx, s, chunk)
170+
}); err != nil {
171+
return err
172+
}
173+
174+
if pinningPutter != nil {
175+
return db.storage.Run(ctx, func(s transaction.Store) error {
176+
return pinningPutter.Put(ctx, s, chunk)
177+
})
178+
}
179+
return nil
181180
}),
182181
db.metrics,
183182
"uploadstore",
@@ -187,38 +186,34 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession
187186
unlock := db.Lock(sessionKey(tagID))
188187
defer unlock()
189188

190-
return errors.Join(
191-
db.storage.Run(ctx, func(s transaction.Store) error {
192-
return uploadPutter.Close(s.IndexStore(), address)
193-
}),
194-
func() error {
195-
if pinningPutter != nil {
196-
pinErr := db.storage.Run(ctx, func(s transaction.Store) error {
197-
return pinningPutter.Close(s.IndexStore(), address)
198-
})
199-
if errors.Is(pinErr, pinstore.ErrDuplicatePinCollection) {
200-
pinErr = pinningPutter.Cleanup(db.storage)
201-
}
202-
return pinErr
203-
}
204-
return nil
205-
}(),
206-
)
189+
if err := db.storage.Run(ctx, func(s transaction.Store) error {
190+
return uploadPutter.Close(s.IndexStore(), address)
191+
}); err != nil {
192+
return err
193+
}
194+
if pinningPutter != nil {
195+
err := db.storage.Run(ctx, func(s transaction.Store) error {
196+
return pinningPutter.Close(s.IndexStore(), address)
197+
})
198+
if errors.Is(err, pinstore.ErrDuplicatePinCollection) {
199+
return pinningPutter.Cleanup(db.storage)
200+
}
201+
return err
202+
}
203+
return nil
207204
},
208205
cleanup: func() error {
209206
defer db.events.Trigger(subscribePushEventKey)
210207
unlock := db.Lock(sessionKey(tagID))
211208
defer unlock()
212209

213-
return errors.Join(
214-
upload.Cleanup(db.storage, tagID),
215-
func() error {
216-
if pinningPutter != nil {
217-
return pinningPutter.Cleanup(db.storage)
218-
}
219-
return nil
220-
}(),
221-
)
210+
if err := upload.Cleanup(db.storage, tagID); err != nil {
211+
return err
212+
}
213+
if pinningPutter != nil {
214+
return pinningPutter.Cleanup(db.storage)
215+
}
216+
return nil
222217
},
223218
}, nil
224219
}

0 commit comments

Comments
 (0)