Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func NewTest(t *testing.T) *Test {
}

// delete one series
if err := seriesFile.DeleteSeriesID(ids[0]); err != nil {
if _, err := seriesFile.DeleteSeriesID(ids[0], tsdb.Flush); err != nil {
return err
}

Expand Down
12 changes: 11 additions & 1 deletion tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1798,15 +1798,25 @@ func (e *Engine) deleteSeriesRange(ctx context.Context, seriesKeys [][]byte, min
// Remove the remaining ids from the series file as they no longer exist
// in any shard.
var err error
var partitionIDs = make(map[int]struct{}, tsdb.SeriesFilePartitionN)
ids.ForEach(func(id uint64) {
if err1 := e.sfile.DeleteSeriesID(id); err1 != nil {
part, err1 := e.sfile.DeleteSeriesID(id, tsdb.NoFlush)
if err1 != nil {
err = err1
return
}
partitionIDs[part.ID()] = struct{}{}
})
if err != nil {
return err
}

if err := e.sfile.FlushSegments(partitionIDs); err != nil {
e.sfile.Logger.Error(
"error while flushing a series file segment",
zap.String("series_file_path", e.sfile.Path()),
zap.Error(err))
}
}

return nil
Expand Down
42 changes: 38 additions & 4 deletions tsdb/series_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ const SeriesIDSize = 8
const (
// SeriesFilePartitionN is the number of partitions a series file is split into.
SeriesFilePartitionN = 8
// Flush lets us know when to fsync
Flush = true
// NoFlush lets us know when to not fsync
NoFlush = false
)

// SeriesFile represents the section of the index that holds series data.
Expand Down Expand Up @@ -194,13 +198,43 @@ func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []mod
}

// DeleteSeriesID flags a series as permanently deleted.
// If the series is reintroduced later then it must create a new id.
func (f *SeriesFile) DeleteSeriesID(id uint64) error {
// If the series is reintroduced later than it must create a new id.
// Setting flush will indicate whether this method triggers a fsync.
func (f *SeriesFile) DeleteSeriesID(id uint64, flush bool) (*SeriesPartition, error) {
p := f.SeriesIDPartition(id)
if p == nil {
return ErrInvalidSeriesPartitionID
return nil, ErrInvalidSeriesPartitionID
}
return p.DeleteSeriesID(id)
return p, p.DeleteSeriesID(id, flush)
}

func (f *SeriesFile) FlushSegments(partitionIDs map[int]struct{}) error {
var wg sync.WaitGroup
errCh := make(chan error, SeriesFilePartitionN)

for id := range partitionIDs {
wg.Add(1)
p := f.partitions[id]
go func() {
defer wg.Done()
p.mu.Lock()
defer p.mu.Unlock()
if segment := p.activeSegment(); segment != nil {
if err := segment.Flush(); err != nil {
errCh <- fmt.Errorf("unable to flush segment %s: %w", segment.file.Name(), err)
}
}
}()
}

wg.Wait()
close(errCh)

var errs = make([]error, 0, SeriesFilePartitionN)
for err := range errCh {
errs = append(errs, err)
}
return errors.Join(errs...)
}

// IsDeleted returns true if the ID has been deleted before.
Expand Down
112 changes: 83 additions & 29 deletions tsdb/series_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,37 +192,57 @@ func TestSeriesFileCompactor(t *testing.T) {

// Ensure series file deletions persist across compactions.
func TestSeriesFile_DeleteSeriesID(t *testing.T) {
sfile := MustOpenSeriesFile(t)
defer sfile.Close()

ids0, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil})
if err != nil {
t.Fatal(err)
} else if _, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m2")}, []models.Tags{nil}); err != nil {
t.Fatal(err)
} else if err := sfile.ForceCompact(); err != nil {
t.Fatal(err)
deleteTestFn := func(flush bool) {
sfile := MustOpenSeriesFile(t)
defer func(sfile *SeriesFile) {
err := sfile.Close()
require.NoError(t, err, "close sfile")
}(sfile)

ids0, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil})
require.NoError(t, err, "create series list")
_, err = sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m2")}, []models.Tags{nil})
require.NoError(t, err, "create series list")
err = sfile.ForceCompact()
require.NoError(t, err, "force compact")

// Delete and ensure deletion.
_, err = sfile.DeleteSeriesID(ids0[0], flush)
require.NoError(t, err, "delete series list")
_, err = sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil})
require.NoError(t, err, "create series list")
require.True(t, sfile.IsDeleted(ids0[0]), "expected deleted")

err = sfile.ForceCompact()
require.NoError(t, err, "force compact")
require.True(t, sfile.IsDeleted(ids0[0]), "expected deleted")

err = sfile.Reopen()
require.NoError(t, err, "reopen")
require.True(t, sfile.IsDeleted(ids0[0]), "expected deleted")
}

// Delete and ensure deletion.
if err := sfile.DeleteSeriesID(ids0[0]); err != nil {
t.Fatal(err)
} else if _, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil}); err != nil {
t.Fatal(err)
} else if !sfile.IsDeleted(ids0[0]) {
t.Fatal("expected deletion before compaction")
}

if err := sfile.ForceCompact(); err != nil {
t.Fatal(err)
} else if !sfile.IsDeleted(ids0[0]) {
t.Fatal("expected deletion after compaction")
tests := []struct {
name string
fn func()
}{{
name: "delete series with flush",
fn: func() {
deleteTestFn(tsdb.Flush)
},
},
{
name: "delete series with no flush",
fn: func() {
deleteTestFn(tsdb.NoFlush)
},
},
}

if err := sfile.Reopen(); err != nil {
t.Fatal(err)
} else if !sfile.IsDeleted(ids0[0]) {
t.Fatal("expected deletion after reopen")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.fn()
})
}
}

Expand Down Expand Up @@ -261,7 +281,10 @@ func TestSeriesFile_Compaction(t *testing.T) {
// Delete a subset of keys.
for i, id := range ids {
if i%10 == 0 {
require.NoError(t, sfile.DeleteSeriesID(id))
if _, err := sfile.DeleteSeriesID(id, tsdb.Flush); err != nil {
t.Fatal(err)
}
require.NoError(t, err)
}
}

Expand Down Expand Up @@ -329,7 +352,7 @@ func BenchmarkSeriesFile_Compaction(b *testing.B) {

// Delete a subset of keys.
for i := 0; i < len(ids); i += 10 {
if err := sfile.DeleteSeriesID(ids[i]); err != nil {
if _, err := sfile.DeleteSeriesID(ids[i], tsdb.Flush); err != nil {
b.Fatal(err)
}
}
Expand Down Expand Up @@ -357,6 +380,37 @@ func BenchmarkSeriesFile_Compaction(b *testing.B) {
}
}

func TestSeriesFile_FlushSegments(t *testing.T) {
const SeriesN = 100
sfile := MustOpenSeriesFile(t)
defer func() {
require.NoError(t, sfile.Close(), "series file close")
}()

// Create some series to ensure there's data in the segments.
var names = make([][]byte, 0, SeriesN)
var tagsSlice = make([]models.Tags, 0, SeriesN)
for i := 0; i < SeriesN; i++ {
names = append(names, []byte(fmt.Sprintf("measurement%d", i)))
tagsSlice = append(tagsSlice, models.NewTags(map[string]string{"tag": "value"}))
}
ids, err := sfile.CreateSeriesListIfNotExists(names, tagsSlice)
require.NoError(t, err)

// Collect all partition IDs that have series.
partitionIDs := make(map[int]struct{}, tsdb.SeriesFilePartitionN)
for _, id := range ids {
partitionIDs[sfile.SeriesIDPartitionID(id)] = struct{}{}
}

// Flush the segments.
err = sfile.FlushSegments(partitionIDs)
require.NoError(t, err)

// Verify series still exist after flush.
require.Equal(t, uint64(SeriesN), sfile.SeriesCount())
}

// Series represents name/tagset pairs that are used in testing.
type Series struct {
Name []byte
Expand Down
15 changes: 9 additions & 6 deletions tsdb/series_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitio
// Flush active segment writes so we can access data in mmap.
if segment := p.activeSegment(); segment != nil {
if err := segment.Flush(); err != nil {
return err
return fmt.Errorf("unable to flush segment %s: %w", segment.file.Name(), err)
}
}

Expand Down Expand Up @@ -315,8 +315,9 @@ func (p *SeriesPartition) Compacting() bool {
}

// DeleteSeriesID flags a series as permanently deleted.
// If the series is reintroduced later then it must create a new id.
func (p *SeriesPartition) DeleteSeriesID(id uint64) error {
// If the series is reintroduced later than it must create a new id.
// Setting flush will indicate whether this method triggers a fsync.
func (p *SeriesPartition) DeleteSeriesID(id uint64, flush bool) error {
p.mu.Lock()
defer p.mu.Unlock()

Expand All @@ -336,9 +337,11 @@ func (p *SeriesPartition) DeleteSeriesID(id uint64) error {
}

// Flush active segment write.
if segment := p.activeSegment(); segment != nil {
if err := segment.Flush(); err != nil {
return err
if flush {
if segment := p.activeSegment(); segment != nil {
if err := segment.Flush(); err != nil {
return fmt.Errorf("unable to flush segment %s: %w", segment.file.Name(), err)
}
}
}

Expand Down
34 changes: 32 additions & 2 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

errors3 "github.com/influxdata/influxdb/v2/pkg/errors"
Expand Down Expand Up @@ -1067,18 +1068,47 @@ func (s *Store) DeleteShard(shardID uint64) error {

// Remove any remaining series in the set from the series file, as they don't
// exist in any of the database's remaining shards.
if ss.Cardinality() > 0 {
seriesCount := ss.Cardinality()
if seriesCount > 0 {
const DeleteLogTrigger = 10_000
deleteStart := time.Now()
var deletedCount atomic.Uint64
var partitionIDs = make(map[int]struct{}, SeriesFilePartitionN)
sfile := s.seriesFile(db)
if sfile != nil {
ss.ForEach(func(id uint64) {
if err := sfile.DeleteSeriesID(id); err != nil {
p, err := sfile.DeleteSeriesID(id, NoFlush)
if err != nil {
sfile.Logger.Error(
"cannot delete series in shard",
zap.Uint64("series_id", id),
zap.Uint64("shard_id", shardID),
zap.String("series_file_path", sfile.Path()),
zap.Error(err))
} else {
partitionIDs[p.id] = struct{}{}
deleted := deletedCount.Add(1)

if deleted%DeleteLogTrigger == 0 {
s.Logger.Info(fmt.Sprintf("DeleteShard: %d series deleted", DeleteLogTrigger),
zap.String("db", db),
zap.Uint64("shard_id", shardID),
zap.String("series_file_path", sfile.Path()),
zap.Uint64("deleted", deleted),
zap.Uint64("remaining", seriesCount-deleted),
zap.Uint64("total", seriesCount),
zap.Duration("elapsed", time.Since(deleteStart)))
}
}
})

if err := sfile.FlushSegments(partitionIDs); err != nil {
sfile.Logger.Error(
"error while flushing a series file segment",
zap.Uint64("shard_id", shardID),
zap.String("series_file_path", sfile.Path()),
zap.Error(err))
}
}
}

Expand Down