Skip to content

Commit 9fd30ab

Browse files
authored
Merge pull request #1535 from 0chain/feat/clear-precommit
Clean precommit dir
2 parents d150b9b + 4fa159f commit 9fd30ab

File tree

4 files changed

+61
-3
lines changed

4 files changed

+61
-3
lines changed

code/go/0chain.net/blobbercore/allocation/allocationchange.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,11 @@ func (a *AllocationChangeCollector) MoveToFilestore(ctx context.Context, allocat
449449
logging.Logger.Error("Error while moving to filestore", zap.Error(err))
450450
return err
451451
}
452+
err = filestore.GetFileStore().DeletePreCommitDir(a.AllocationID)
453+
if err != nil {
454+
logging.Logger.Error("Error while deleting precommit dir", zap.Error(err))
455+
return err
456+
}
452457

453458
return tx.Exec("UPDATE reference_objects SET is_precommit=?, prev_validation_root=validation_root, prev_thumbnail_hash=thumbnail_hash WHERE allocation_id=? AND is_precommit=? AND deleted_at is NULL", false, a.AllocationID, true).Error
454459
})

code/go/0chain.net/blobbercore/allocation/connection.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,8 +244,13 @@ func DeleteConnectionObjEntry(connectionID string) {
244244
connectionObj, ok := connectionProcessor[connectionID]
245245
if ok {
246246
connectionObj.cnclCtx()
247+
for _, change := range connectionObj.changes {
248+
if change.seqPQ != nil {
249+
change.seqPQ.Done(seqpriorityqueue.UploadData{}, 1)
250+
}
251+
}
252+
delete(connectionProcessor, connectionID)
247253
}
248-
delete(connectionProcessor, connectionID)
249254
connectionObjMutex.Unlock()
250255
}
251256

code/go/0chain.net/blobbercore/filestore/storage.go

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
4747
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
4848
"github.com/0chain/gosdk/core/util"
49+
"github.com/remeh/sizedwaitgroup"
4950
"go.uber.org/zap"
5051
"golang.org/x/crypto/sha3"
5152
"golang.org/x/sys/unix"
@@ -201,12 +202,55 @@ func (fs *FileStore) DeleteFromFilestore(allocID, hash string, version int) erro
201202
}
202203

203204
func (fs *FileStore) DeletePreCommitDir(allocID string) error {
204-
205+
now := time.Now()
205206
preCommitDir := fs.getPreCommitDir(allocID)
206-
err := os.RemoveAll(preCommitDir)
207+
swg := sizedwaitgroup.New(5)
208+
ctx, cancel := context.WithCancelCause(context.Background())
209+
defer cancel(nil)
210+
211+
entries, err := os.ReadDir(preCommitDir)
212+
if err != nil {
213+
if errors.Is(err, os.ErrNotExist) {
214+
return nil
215+
}
216+
return common.NewError("pre_commit_dir_read_error", err.Error())
217+
}
218+
219+
for _, entry := range entries {
220+
entryPath := filepath.Join(preCommitDir, entry.Name())
221+
if entry.IsDir() {
222+
swg.Add()
223+
go func() {
224+
select {
225+
case <-ctx.Done():
226+
return
227+
default:
228+
err := os.RemoveAll(entryPath)
229+
if err != nil {
230+
logging.Logger.Error("failed to remove directory", zap.String("path", entryPath), zap.Error(err))
231+
cancel(err)
232+
}
233+
}
234+
swg.Done()
235+
}()
236+
} else {
237+
if err := os.Remove(entryPath); err != nil {
238+
return fmt.Errorf("failed to remove file %s: %w", entryPath, err)
239+
}
240+
}
241+
}
242+
swg.Wait()
243+
select {
244+
case <-ctx.Done():
245+
return common.NewError("pre_commit_dir_deletion_error", ctx.Err().Error())
246+
default:
247+
}
248+
249+
err = os.Remove(preCommitDir)
207250
if err != nil {
208251
return common.NewError("pre_commit_dir_deletion_error", err.Error())
209252
}
253+
logging.Logger.Debug("pre_commit_dir_deleted", zap.String("allocation_id", allocID), zap.Duration("elapsed", time.Since(now)))
210254
return nil
211255
}
212256

code/go/0chain.net/blobbercore/handler/worker.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ func cleanupTempFiles(ctx context.Context) {
8383

8484
for i := 0; i < len(openConnectionsToDelete); i++ {
8585
connection := &openConnectionsToDelete[i]
86+
processor := allocation.GetConnectionProcessor(connection.ID)
87+
if processor != nil {
88+
continue
89+
}
8690
logging.Logger.Info("Deleting temp files for the connection", zap.Any("connection", connection.ID))
8791
connection.ComputeProperties()
8892

0 commit comments

Comments
 (0)