Skip to content

Commit efebf4d

Browse files
chore: add logging to Filestore.purger (#26089) (#26103)
Also fixes error type checks in TestCompactor_CompactFull_InProgress (cherry picked from commit 2ab5aad)
1 parent cb5072c commit efebf4d

File tree

4 files changed

+47
-30
lines changed

4 files changed

+47
-30
lines changed

tsdb/engine/tsm1/compact.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1064,7 +1064,11 @@ func (c *Compactor) RemoveTmpFiles(files []string) error {
10641064
func (c *Compactor) RemoveTmpFilesOnErr(files []string, originalErrs ...error) error {
10651065
removeErr := c.RemoveTmpFiles(files)
10661066
if removeErr == nil {
1067-
return errors.Join(originalErrs...)
1067+
if len(originalErrs) == 1 {
1068+
return originalErrs[0]
1069+
} else {
1070+
return errors.Join(originalErrs...)
1071+
}
10681072
} else if errJoin, ok := removeErr.(interface{ Unwrap() []error }); ok {
10691073
return errors.Join(append(originalErrs, errJoin.Unwrap()...)...)
10701074
} else {

tsdb/engine/tsm1/compact_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ func TestCompactor_DecodeError(t *testing.T) {
302302
_, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
303303

304304
require.ErrorContains(t, err, "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp")
305-
tsm1.MoveTsmOnReadErr(err, zap.NewNop(), func(strings []string, strings2 []string, f func([]tsm1.TSMFile)) error {
305+
tsm1.MoveTsmOnReadErr(err, zap.NewNop(), func(strings []string, strings2 []string) error {
306306
require.Equal(t, 1, len(strings))
307307
require.Equal(t, strings[0], f3)
308308
return nil
@@ -1142,11 +1142,10 @@ func TestCompactor_CompactFull_InProgress(t *testing.T) {
11421142
}()
11431143
_, err = compactor.CompactFull([]string{f2Name}, zap.NewNop())
11441144
assert.Errorf(t, err, "expected an error writing snapshot for %s", f2Name)
1145-
e := errors.Unwrap(err)
1146-
assert.NotNil(t, e, "expected an error wrapped by errCompactionInProgress")
1147-
assert.Truef(t, errors.Is(e, fs.ErrExist), "error did not indicate file existence: %v", e)
1145+
assert.ErrorContainsf(t, err, "file exists", "unexpected error writing snapshot for %s", f2Name)
1146+
assert.Truef(t, errors.Is(err, fs.ErrExist), "error did not indicate file existence: %v", err)
11481147
pathErr := &os.PathError{}
1149-
assert.Truef(t, errors.As(e, &pathErr), "expected path error, got %v", e)
1148+
assert.Truef(t, errors.As(err, &pathErr), "expected path error, got %v", err)
11501149
assert.Truef(t, errors.Is(pathErr, fs.ErrExist), "error did not indicate file existence: %v", pathErr)
11511150
}
11521151

tsdb/engine/tsm1/engine.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2241,15 +2241,15 @@ func (s *compactionStrategy) compactGroup() {
22412241

22422242
log.Warn("Error compacting TSM files", zap.Error(err))
22432243

2244-
MoveTsmOnReadErr(err, log, s.fileStore.ReplaceWithCallback)
2244+
MoveTsmOnReadErr(err, log, s.fileStore.Replace)
22452245

22462246
s.errorStat.Inc()
22472247
time.Sleep(time.Second)
22482248
return
22492249
}
22502250

2251-
if err := s.fileStore.ReplaceWithCallback(group, files, nil); err != nil {
2252-
log.Info("Error replacing new TSM files", zap.Error(err))
2251+
if err := s.fileStore.Replace(group, files); err != nil {
2252+
log.Error("Error replacing new TSM files", zap.Error(err))
22532253
s.errorStat.Inc()
22542254
time.Sleep(time.Second)
22552255

@@ -2269,13 +2269,13 @@ func (s *compactionStrategy) compactGroup() {
22692269
zap.Int("tsm1_files_n", len(files)))
22702270
}
22712271

2272-
func MoveTsmOnReadErr(err error, log *zap.Logger, ReplaceWithCallback func([]string, []string, func([]TSMFile)) error) {
2272+
func MoveTsmOnReadErr(err error, log *zap.Logger, replaceFn func([]string, []string) error) {
22732273
var blockReadErr errBlockRead
22742274
// We hit a bad TSM file - rename so the next compaction can proceed.
22752275
if ok := errors.As(err, &blockReadErr); ok {
22762276
path := blockReadErr.file
22772277
log.Info("Renaming a corrupt TSM file due to compaction error", zap.String("file", path), zap.Error(err))
2278-
if err := ReplaceWithCallback([]string{path}, nil, nil); err != nil {
2278+
if err := replaceFn([]string{path}, nil); err != nil {
22792279
log.Info("Error removing bad TSM file", zap.String("file", path), zap.Error(err))
22802280
} else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil {
22812281
log.Info("Error renaming corrupt TSM file", zap.String("file", path), zap.Error(err))

tsdb/engine/tsm1/file_store.go

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"time"
2020

2121
"github.com/influxdata/influxdb/v2/influxql/query"
22+
"github.com/influxdata/influxdb/v2/logger"
2223
"github.com/influxdata/influxdb/v2/pkg/file"
2324
"github.com/influxdata/influxdb/v2/pkg/limiter"
2425
"github.com/influxdata/influxdb/v2/pkg/metrics"
@@ -868,11 +869,6 @@ func (f *FileStore) Stats() []FileStat {
868869
return f.lastFileStats
869870
}
870871

871-
// ReplaceWithCallback replaces oldFiles with newFiles and calls updatedFn with the files to be added the FileStore.
872-
func (f *FileStore) ReplaceWithCallback(oldFiles, newFiles []string, updatedFn func(r []TSMFile)) error {
873-
return f.replace(oldFiles, newFiles, updatedFn)
874-
}
875-
876872
// Replace replaces oldFiles with newFiles.
877873
func (f *FileStore) Replace(oldFiles, newFiles []string) error {
878874
return f.replace(oldFiles, newFiles, nil)
@@ -907,18 +903,19 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
907903
// The new TSM files have a tmp extension. First rename them.
908904
newName = file[:len(file)-4]
909905
if err := os.Rename(oldName, newName); err != nil {
910-
return err
906+
return fmt.Errorf("failed renaming %s to %s: %w", oldName, newName, err)
911907
}
912908
}
913909

914-
// Any error after this point should result in the file being bein named
910+
// Any error after this point should result in the file being named
915911
// back to the original name. The caller then has the opportunity to
916912
// remove it.
917913
fd, err := os.Open(newName)
918914
if err != nil {
915+
err = fmt.Errorf("failed opening %s: %w", newName, err)
919916
if newName != oldName {
920917
if err1 := os.Rename(newName, oldName); err1 != nil {
921-
return err1
918+
return errors.Join(err, fmt.Errorf("failed renaming %s to %s: %w", oldName, newName, err1))
922919
}
923920
}
924921
return err
@@ -933,9 +930,10 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
933930

934931
tsm, err := NewTSMReader(fd, f.readerOptions...)
935932
if err != nil {
933+
err = fmt.Errorf("failed creating TSMReader for %s: %w", newName, err)
936934
if newName != oldName {
937935
if err1 := os.Rename(newName, oldName); err1 != nil {
938-
return err1
936+
return errors.Join(err, fmt.Errorf("failed renaming %s to %s: %w", oldName, newName, err1))
939937
}
940938
}
941939
return err
@@ -998,14 +996,14 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
998996
// Rename the TSM file used by this reader
999997
tempPath := fmt.Sprintf("%s.%s", file.Path(), TmpTSMFileExtension)
1000998
if err := file.Rename(tempPath); err != nil {
1001-
return err
999+
return fmt.Errorf("failed renaming open TSM file to %s: %w", tempPath, err)
10021000
}
10031001

10041002
// Remove the old file and tombstones. We can't use the normal TSMReader.Remove()
10051003
// because it now refers to our temp file which we can't remove.
10061004
for _, f := range deletes {
10071005
if err := os.Remove(f); err != nil {
1008-
return err
1006+
return fmt.Errorf("failed removing old file %s: %w", f, err)
10091007
}
10101008
}
10111009

@@ -1014,11 +1012,11 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
10141012
}
10151013

10161014
if err := file.Close(); err != nil {
1017-
return err
1015+
return fmt.Errorf("failed to close TSM file %s: %w", file.Path(), err)
10181016
}
10191017

10201018
if err := file.Remove(); err != nil {
1021-
return err
1019+
return fmt.Errorf("failed to remove TSM file %s: %w", file.Path(), err)
10221020
}
10231021
break
10241022
}
@@ -1030,7 +1028,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
10301028
}
10311029

10321030
if err := file.SyncDir(f.dir); err != nil {
1033-
return err
1031+
return fmt.Errorf("failed to sync directory %s: %w", f.dir, err)
10341032
}
10351033

10361034
// Tell the purger about our in-use files we need to remove
@@ -1614,24 +1612,38 @@ type purger struct {
16141612
}
16151613

16161614
func (p *purger) add(files []TSMFile) {
1615+
var fileNames []string
16171616
p.mu.Lock()
16181617
for _, f := range files {
1619-
p.files[f.Path()] = f
1618+
fileName := f.Path()
1619+
fileNames = append(fileNames, fileName)
1620+
p.files[fileName] = f
16201621
}
16211622
p.mu.Unlock()
1622-
p.purge()
1623+
p.purge(fileNames)
16231624
}
16241625

1625-
func (p *purger) purge() {
1626+
func (p *purger) purge(fileNames []string) {
1627+
logger, logEndOp := logger.NewOperation(context.Background(), p.logger, "Purge held files", "filestore_purger")
1628+
1629+
logger.Info("added", zap.Int("count", len(fileNames)))
1630+
logger.Debug("purging", zap.Strings("files", fileNames))
16261631
p.mu.Lock()
16271632
if p.running {
16281633
p.mu.Unlock()
1634+
logger.Info("already running, files added to previous operation")
1635+
logEndOp()
16291636
return
16301637
}
16311638
p.running = true
16321639
p.mu.Unlock()
16331640

16341641
go func() {
1642+
var purgeCount int
1643+
defer func() {
1644+
logger.Info("removed", zap.Int("files", purgeCount))
1645+
logEndOp()
1646+
}()
16351647
for {
16361648
p.mu.Lock()
16371649
for k, v := range p.files {
@@ -1642,15 +1654,17 @@ func (p *purger) purge() {
16421654
// we allow calls to Ref and Unref under the read lock and no lock at all respectively.
16431655
if !v.InUse() {
16441656
if err := v.Close(); err != nil {
1645-
p.logger.Info("Purge: close file", zap.Error(err))
1657+
logger.Error("close file failed", zap.String("file", k), zap.Error(err))
16461658
continue
16471659
}
16481660

16491661
if err := v.Remove(); err != nil {
1650-
p.logger.Info("Purge: remove file", zap.Error(err))
1662+
logger.Error("remove file failed", zap.String("file", k), zap.Error(err))
16511663
continue
16521664
}
1665+
logger.Debug("successfully removed", zap.String("file", k))
16531666
delete(p.files, k)
1667+
purgeCount++
16541668
}
16551669
}
16561670

0 commit comments

Comments
 (0)