Skip to content

Commit 090988c

Browse files
authored
Memberlist ignore old tombstones (#4420)
* Memberlist KV will no longer consider old tombstones as a "change" and will not gossip about them. Signed-off-by: Peter Štibraný <[email protected]> * CHANGELOG.md Signed-off-by: Peter Štibraný <[email protected]>
1 parent 399c860 commit 090988c

File tree

3 files changed

+114
-2
lines changed

3 files changed

+114
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
* [CHANGE] Some files and directories created by Mimir components on local disk now have stricter permissions, and are only readable by owner, but not group or others. #4394
2222
* [CHANGE] Compactor: compactor will no longer try to compact blocks that are already marked for deletion. Previously compactor would consider blocks marked for deletion within `-compactor.deletion-delay / 2` period as eligible for compaction. #4328
2323
* [CHANGE] Memberlist: forward only changes, not entire original message. #4419
24+
* [CHANGE] Memberlist: don't accept old tombstones as incoming change, and don't forward such messages to other gossip members. #4420
2425
* [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262
2526
* [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341
2627
* [ENHANCEMENT] Ring: allow experimental configuration of disabling of heartbeat timeouts by setting the relevant configuration value to zero. Applies to the following: #4342

pkg/ring/kv/memberlist/memberlist_client.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,6 +1201,17 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
12011201
total, removed := result.RemoveTombstones(limit)
12021202
m.storeTombstones.WithLabelValues(key).Set(float64(total))
12031203
m.storeRemovedTombstones.WithLabelValues(key).Add(float64(removed))
1204+
1205+
// Remove tombstones from change too. If change turns out to be empty after this,
1206+
// we don't need to change local value either!
1207+
//
1208+
// Note that "result" and "change" may actually be the same Mergeable. That is why we
1209+
// call RemoveTombstones on "result" first, so that we get the correct metrics. Calling
1210+
// RemoveTombstones twice with same limit should be noop.
1211+
change.RemoveTombstones(limit)
1212+
if len(change.MergeContent()) == 0 {
1213+
return nil, 0, nil
1214+
}
12041215
}
12051216

12061217
newVersion := curr.version + 1

pkg/ring/kv/memberlist/memberlist_client_test.go

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,19 @@ func (d *data) MergeContent() []string {
8484
return out
8585
}
8686

87-
func (d *data) RemoveTombstones(limit time.Time) (_, _ int) {
88-
// nothing to do
87+
// This method deliberately ignores zero limit, so that tests can observe LEFT state as well.
88+
func (d *data) RemoveTombstones(limit time.Time) (total, removed int) {
89+
for n, m := range d.Members {
90+
if m.State == LEFT {
91+
if time.Unix(m.Timestamp, 0).Before(limit) {
92+
// remove it
93+
delete(d.Members, n)
94+
removed++
95+
} else {
96+
total++
97+
}
98+
}
99+
}
89100
return
90101
}
91102

@@ -1137,6 +1148,95 @@ func TestNotifyMsgResendsOnlyChanges(t *testing.T) {
11371148
}}, d)
11381149
}
11391150

1151+
func TestSendingOldTombstoneShouldNotForwardMessage(t *testing.T) {
1152+
codec := dataCodec{}
1153+
1154+
cfg := KVConfig{}
1155+
// We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor.
1156+
cfg.RetransmitMult = 1
1157+
cfg.LeftIngestersTimeout = 5 * time.Minute
1158+
cfg.Codecs = append(cfg.Codecs, codec)
1159+
1160+
kv := NewKV(cfg, log.NewNopLogger())
1161+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv))
1162+
defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck
1163+
1164+
client, err := NewClient(kv, codec)
1165+
require.NoError(t, err)
1166+
1167+
now := time.Now()
1168+
1169+
// No broadcast messages from KV at the beginning.
1170+
require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32)))
1171+
1172+
for _, tc := range []struct {
1173+
name string
1174+
valueBeforeSend *data // value in KV store before sending messsage
1175+
msgToSend *data
1176+
valueAfterSend *data // value in KV store after sending message
1177+
broadcastMessage *data // broadcasted change, if not nil
1178+
}{
1179+
// These tests follow each other (end state of KV in state is starting point in the next state).
1180+
{
1181+
name: "old tombstone, empty KV",
1182+
valueBeforeSend: nil,
1183+
msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() - int64(2*cfg.LeftIngestersTimeout.Seconds()), State: LEFT}}},
1184+
valueAfterSend: nil, // no change to KV
1185+
broadcastMessage: nil, // no new message
1186+
},
1187+
1188+
{
1189+
name: "recent tombstone, empty KV",
1190+
valueBeforeSend: nil,
1191+
msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT}}},
1192+
broadcastMessage: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT}}},
1193+
valueAfterSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}},
1194+
},
1195+
1196+
{
1197+
name: "old tombstone, KV contains tombstone already",
1198+
valueBeforeSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}},
1199+
msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() - 10, State: LEFT}}},
1200+
broadcastMessage: nil,
1201+
valueAfterSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}},
1202+
},
1203+
1204+
{
1205+
name: "fresh tombstone, KV contains tombstone already",
1206+
valueBeforeSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}},
1207+
msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() + 10, State: LEFT}}},
1208+
broadcastMessage: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() + 10, State: LEFT}}},
1209+
valueAfterSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() + 10, State: LEFT, Tokens: []uint32{}}}},
1210+
},
1211+
} {
1212+
t.Run(tc.name, func(t *testing.T) {
1213+
d := getData(t, client, key)
1214+
if tc.valueBeforeSend == nil {
1215+
require.True(t, d == nil || len(d.Members) == 0)
1216+
} else {
1217+
require.Equal(t, tc.valueBeforeSend, d, "valueBeforeSend")
1218+
}
1219+
1220+
kv.NotifyMsg(marshalKeyValuePair(t, key, codec, tc.msgToSend))
1221+
1222+
bs := kv.GetBroadcasts(0, math.MaxInt32)
1223+
if tc.broadcastMessage == nil {
1224+
require.Equal(t, 0, len(bs), "expected no broadcast message")
1225+
} else {
1226+
require.Equal(t, 1, len(bs), "expected broadcast message")
1227+
require.Equal(t, tc.broadcastMessage, decodeDataFromMarshalledKeyValuePair(t, bs[0], key, codec))
1228+
}
1229+
1230+
d = getData(t, client, key)
1231+
if tc.valueAfterSend == nil {
1232+
require.True(t, d == nil || len(d.Members) == 0)
1233+
} else {
1234+
require.Equal(t, tc.valueAfterSend, d, "valueAfterSend")
1235+
}
1236+
})
1237+
}
1238+
}
1239+
11401240
func decodeDataFromMarshalledKeyValuePair(t *testing.T, marshalledKVP []byte, key string, codec dataCodec) *data {
11411241
kvp := KeyValuePair{}
11421242
require.NoError(t, kvp.Unmarshal(marshalledKVP))

0 commit comments

Comments
 (0)