55 "context"
66 "errors"
77 "fmt"
8+ "math"
9+ "slices"
810 "sync"
911 "time"
1012
@@ -15,13 +17,37 @@ import (
1517 "go.uber.org/zap"
1618)
1719
18- type MetaClient interface {
20+ type OSSMetaClient interface {
1921 Databases () []meta.DatabaseInfo
2022 DeleteShardGroup (database , policy string , id uint64 ) error
2123 DropShard (id uint64 ) error
2224 PruneShardGroups () error
2325}
2426
27+ type MetaClient interface {
28+ OSSMetaClient
29+ NodeID () uint64
30+ }
31+
32+ const (
33+ // ossNodeID is a special node ID for OSS nodes. No enterprise node will ever have this node ID.
34+ // 0 can not be used because there is a brief period on startup before meta-client initialization
35+ // and before joining a cluster that NodeID() == 0.
36+ ossNodeID uint64 = math .MaxUint64
37+ )
38+
39+ // ossMetaClientAdapter adds methods the retention service needs to the OSS meta.Client implementation.
40+ // OSSMetaClient is decorated with methods needed for the Enterprise retention service instead of adding
41+ // them to the OSS MetaClient to avoid polluting the OSS MetaClient namespace.
42+ type ossMetaClientAdapter struct {
43+ OSSMetaClient
44+ }
45+
46+ // NodeID returns the magic ossNodeID identifier.
47+ func (c * ossMetaClientAdapter ) NodeID () uint64 {
48+ return ossNodeID
49+ }
50+
2551// Service represents the retention policy enforcement service.
2652type Service struct {
2753 MetaClient
@@ -58,12 +84,20 @@ func NewService(c Config) *Service {
5884}
5985
6086// OSSDropShardMetaRef creates a closure appropriate for OSS to use as DropShardMetaRef.
61- func OSSDropShardMetaRef (mc MetaClient ) func (uint64 , []uint64 ) error {
87+ func OSSDropShardMetaRef (mc OSSMetaClient ) func (uint64 , []uint64 ) error {
6288 return func (shardID uint64 , owners []uint64 ) error {
6389 return mc .DropShard (shardID )
6490 }
6591}
6692
93+ func (s * Service ) SetMetaClient (c MetaClient ) {
94+ s .MetaClient = c
95+ }
96+
97+ func (s * Service ) SetOSSMetaClient (c OSSMetaClient ) {
98+ s .SetMetaClient (& ossMetaClientAdapter {OSSMetaClient : c })
99+ }
100+
67101// Open starts retention policy enforcement.
68102func (s * Service ) Open (ctx context.Context ) error {
69103 if ! s .config .Enabled || s .cancel != nil {
@@ -148,6 +182,10 @@ func (s *Service) run(ctx context.Context) {
148182 }
149183}
150184
185+ func (s * Service ) isOSS () bool {
186+ return s .NodeID () == ossNodeID
187+ }
188+
151189func (s * Service ) DeletionCheck (ctx context.Context ) {
152190 log , logEnd := logger .NewOperation (ctx , s .logger , "Retention policy deletion check" , "retention_delete_check" )
153191 defer logEnd ()
@@ -276,16 +314,21 @@ func (s *Service) DeletionCheck(ctx context.Context) {
276314
277315 // Check for expired phantom shards that exist in the metadata but not in the store.
278316 for id , info := range deletedShardIDs {
279- func () {
280- log , logEnd := logger .NewOperation (ctx , log , "Drop phantom shard references" , "retention_drop_phantom_refs" ,
281- logger .Database (info .db ), logger .Shard (id ), logger .RetentionPolicy (info .rp ), zap .Uint64s ("owners" , info .owners ))
282- defer logEnd ()
283- log .Warn ("Expired phantom shard detected during retention check, removing from metadata" )
284- if err := s .DropShardMetaRef (id , info .owners ); err != nil {
285- log .Error ("Error dropping shard meta reference for phantom shard" , zap .Error (err ))
286- retryNeeded = true
287- }
288- }()
317+ // Enterprise tracks shard ownership while OSS does not because it is single node. A shard not in the
318+ // TSDB but in the metadata is always a phantom shard for OSS. For enterprise, it is only a phantom shard
319+ // if this node is supposed to own the shard according to the metadata.
320+ if s .isOSS () || slices .Contains (info .owners , s .NodeID ()) {
321+ func () {
322+ log , logEnd := logger .NewOperation (ctx , log , "Drop phantom shard references" , "retention_drop_phantom_refs" ,
323+ logger .Database (info .db ), logger .Shard (id ), logger .RetentionPolicy (info .rp ), zap .Uint64s ("owners" , info .owners ))
324+ defer logEnd ()
325+ log .Warn ("Expired phantom shard detected during retention check, removing from metadata" )
326+ if err := s .DropShardMetaRef (id , info .owners ); err != nil {
327+ log .Error ("Error dropping shard meta reference for phantom shard" , zap .Error (err ))
328+ retryNeeded = true
329+ }
330+ }()
331+ }
289332 }
290333
291334 func () {
0 commit comments