Skip to content

Commit 5b6e9e9

Browse files
GuptaManan100arthurschreiber
authored andcommitted
Fix race condition that prevents queries from being buffered after vtgate startup (vitessio#16655)
Signed-off-by: Manan Gupta <[email protected]> Signed-off-by: Arthur Schreiber <[email protected]>
1 parent 5dcf247 commit 5b6e9e9

File tree

7 files changed

+297
-61
lines changed

7 files changed

+297
-61
lines changed

go/vt/discovery/keyspace_events.go

Lines changed: 82 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ package discovery
1919
import (
2020
"context"
2121
"fmt"
22+
"slices"
2223
"sync"
24+
"time"
2325

2426
"golang.org/x/sync/errgroup"
2527
"google.golang.org/protobuf/proto"
@@ -37,6 +39,11 @@ import (
3739
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
3840
)
3941

42+
var (
43+
// waitConsistentKeyspacesCheck is the amount of time to wait for between checks to verify the keyspace is consistent.
44+
waitConsistentKeyspacesCheck = 100 * time.Millisecond
45+
)
46+
4047
// KeyspaceEventWatcher is an auxiliary watcher that watches all availability incidents
4148
// for all keyspaces in a Vitess cell and notifies listeners when the events have been resolved.
4249
// Right now this is capable of detecting the end of failovers, both planned and unplanned,
@@ -692,29 +699,53 @@ func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(ctx context.Context, tar
692699
return ks.beingResharded(target.Shard)
693700
}
694701

695-
// PrimaryIsNotServing checks if the reason why the given target is not accessible right now is
696-
// that the primary tablet for that shard is not serving. This is possible during a Planned
697-
// Reparent Shard operation. Just as the operation completes, a new primary will be elected, and
702+
// ShouldStartBufferingForTarget checks if we should be starting buffering for the given target.
703+
// We check the following things before we start buffering -
704+
// 1. The shard must have a primary.
705+
// 2. The primary must be non-serving.
706+
// 3. The keyspace must be marked inconsistent.
707+
//
708+
// This buffering is meant to kick in during a Planned Reparent Shard operation.
709+
// As part of that operation the old primary will become non-serving. At that point
710+
// this code should return true to start buffering requests.
711+
// Just as the PRS operation completes, a new primary will be elected, and
698712
// it will send its own healthcheck stating that it is serving. We should buffer requests until
699-
// that point. There are use cases where people do not run with a Primary server at all, so we must
713+
// that point.
714+
//
715+
// There are use cases where people do not run with a Primary server at all, so we must
700716
// verify that we only start buffering when a primary was present, and it went not serving.
701717
// The shard state keeps track of the current primary and the last externally reparented time, which
702718
// we can use to determine that there was a serving primary which now became non serving. This is
703719
// only possible in a DemotePrimary RPC which are only called from ERS and PRS. So buffering will
704-
// stop when these operations succeed. We return the tablet alias of the primary if it is serving.
705-
func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(ctx context.Context, target *querypb.Target) (*topodatapb.TabletAlias, bool) {
720+
// stop when these operations succeed. We also return the tablet alias of the primary if it is serving.
721+
func (kew *KeyspaceEventWatcher) ShouldStartBufferingForTarget(ctx context.Context, target *querypb.Target) (*topodatapb.TabletAlias, bool) {
706722
if target.TabletType != topodatapb.TabletType_PRIMARY {
723+
// We don't support buffering for any target tablet type other than the primary.
707724
return nil, false
708725
}
709726
ks := kew.getKeyspaceStatus(ctx, target.Keyspace)
710727
if ks == nil {
728+
// If the keyspace status is nil, then the keyspace must be deleted.
729+
// The user query is trying to access a keyspace that has been deleted.
730+
// There is no reason to buffer this query.
711731
return nil, false
712732
}
713733
ks.mu.Lock()
714734
defer ks.mu.Unlock()
715735
if state, ok := ks.shards[target.Shard]; ok {
716-
// If the primary tablet was present then externallyReparented will be non-zero and
717-
// currentPrimary will be not nil.
736+
// As described in the function comment, we only want to start buffering when all the following conditions are met -
737+
// 1. The shard must have a primary. We check this by checking the currentPrimary and externallyReparented fields being non-empty.
738+
// They are set the first time the shard registers an update from a serving primary and are never cleared out after that.
739+
// If the user has configured vtgates to wait for the primary tablet healthchecks before starting query service, this condition
740+
// will always be true.
741+
// 2. The primary must be non-serving. We check this by checking the serving field in the shard state.
742+
// When a primary becomes non-serving, it also marks the keyspace inconsistent. So the next check is only added
743+
// for being defensive against any bugs.
744+
// 3. The keyspace must be marked inconsistent. We check this by checking the consistent field in the keyspace state.
745+
//
746+
// The reason we need all the three checks is that we want to be very defensive in when we start buffering.
747+
// We don't want to start buffering when we don't know for sure if the primary
748+
// is not serving and we will receive an update that stops buffering soon.
718749
return state.currentPrimary, !state.serving && !ks.consistent && state.externallyReparented != 0 && state.currentPrimary != nil
719750
}
720751
return nil, false
@@ -766,3 +797,46 @@ func (kew *KeyspaceEventWatcher) MarkShardNotServing(ctx context.Context, keyspa
766797
}
767798
return true
768799
}
800+
801+
// WaitForConsistentKeyspaces waits for the given set of keyspaces to be marked consistent.
802+
func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context, ksList []string) error {
803+
// We don't want to change the original keyspace list that we receive so we clone it
804+
// before we empty it elements down below.
805+
keyspaces := slices.Clone(ksList)
806+
for {
807+
// We empty keyspaces as we find them to be consistent.
808+
allConsistent := true
809+
for i, ks := range keyspaces {
810+
if ks == "" {
811+
continue
812+
}
813+
814+
// Get the keyspace status and see it is consistent yet or not.
815+
kss := kew.getKeyspaceStatus(ctx, ks)
816+
// If kss is nil, then it must be deleted. In that case too it is fine for us to consider
817+
// it consistent since the keyspace has been deleted.
818+
if kss == nil || kss.consistent {
819+
keyspaces[i] = ""
820+
} else {
821+
allConsistent = false
822+
}
823+
}
824+
825+
if allConsistent {
826+
// all the keyspaces are consistent.
827+
return nil
828+
}
829+
830+
// Unblock after the sleep or when the context has expired.
831+
select {
832+
case <-ctx.Done():
833+
for _, ks := range keyspaces {
834+
if ks != "" {
835+
log.Infof("keyspace %v didn't become consistent", ks)
836+
}
837+
}
838+
return ctx.Err()
839+
case <-time.After(waitConsistentKeyspacesCheck):
840+
}
841+
}
842+
}

go/vt/discovery/keyspace_events_test.go

Lines changed: 100 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -148,11 +148,11 @@ func TestKeyspaceEventTypes(t *testing.T) {
148148
kew := NewKeyspaceEventWatcher(ctx, ts2, hc, cell)
149149

150150
type testCase struct {
151-
name string
152-
kss *keyspaceState
153-
shardToCheck string
154-
expectResharding bool
155-
expectPrimaryNotServing bool
151+
name string
152+
kss *keyspaceState
153+
shardToCheck string
154+
expectResharding bool
155+
expectShouldBuffer bool
156156
}
157157

158158
testCases := []testCase{
@@ -189,9 +189,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
189189
},
190190
consistent: false,
191191
},
192-
shardToCheck: "-",
193-
expectResharding: true,
194-
expectPrimaryNotServing: false,
192+
shardToCheck: "-",
193+
expectResharding: true,
194+
expectShouldBuffer: false,
195195
},
196196
{
197197
name: "two to four resharding in progress",
@@ -250,9 +250,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
250250
},
251251
consistent: false,
252252
},
253-
shardToCheck: "-80",
254-
expectResharding: true,
255-
expectPrimaryNotServing: false,
253+
shardToCheck: "-80",
254+
expectResharding: true,
255+
expectShouldBuffer: false,
256256
},
257257
{
258258
name: "unsharded primary not serving",
@@ -276,9 +276,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
276276
},
277277
consistent: false,
278278
},
279-
shardToCheck: "-",
280-
expectResharding: false,
281-
expectPrimaryNotServing: true,
279+
shardToCheck: "-",
280+
expectResharding: false,
281+
expectShouldBuffer: true,
282282
},
283283
{
284284
name: "sharded primary not serving",
@@ -310,9 +310,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
310310
},
311311
consistent: false,
312312
},
313-
shardToCheck: "-80",
314-
expectResharding: false,
315-
expectPrimaryNotServing: true,
313+
shardToCheck: "-80",
314+
expectResharding: false,
315+
expectShouldBuffer: true,
316316
},
317317
}
318318

@@ -327,8 +327,89 @@ func TestKeyspaceEventTypes(t *testing.T) {
327327
resharding := kew.TargetIsBeingResharded(ctx, tc.kss.shards[tc.shardToCheck].target)
328328
require.Equal(t, resharding, tc.expectResharding, "TargetIsBeingResharded should return %t", tc.expectResharding)
329329

330-
_, primaryDown := kew.PrimaryIsNotServing(ctx, tc.kss.shards[tc.shardToCheck].target)
331-
require.Equal(t, primaryDown, tc.expectPrimaryNotServing, "PrimaryIsNotServing should return %t", tc.expectPrimaryNotServing)
330+
_, shouldBuffer := kew.ShouldStartBufferingForTarget(ctx, tc.kss.shards[tc.shardToCheck].target)
331+
require.Equal(t, shouldBuffer, tc.expectShouldBuffer, "ShouldStartBufferingForTarget should return %t", tc.expectShouldBuffer)
332+
})
333+
}
334+
}
335+
336+
// TestWaitForConsistentKeyspaces tests the behaviour of WaitForConsistent for different scenarios.
337+
func TestWaitForConsistentKeyspaces(t *testing.T) {
338+
testcases := []struct {
339+
name string
340+
ksMap map[string]*keyspaceState
341+
ksList []string
342+
errExpected string
343+
}{
344+
{
345+
name: "Empty keyspace list",
346+
ksList: nil,
347+
ksMap: map[string]*keyspaceState{
348+
"ks1": {},
349+
},
350+
errExpected: "",
351+
},
352+
{
353+
name: "All keyspaces consistent",
354+
ksList: []string{"ks1", "ks2"},
355+
ksMap: map[string]*keyspaceState{
356+
"ks1": {
357+
consistent: true,
358+
},
359+
"ks2": {
360+
consistent: true,
361+
},
362+
},
363+
errExpected: "",
364+
},
365+
{
366+
name: "One keyspace inconsistent",
367+
ksList: []string{"ks1", "ks2"},
368+
ksMap: map[string]*keyspaceState{
369+
"ks1": {
370+
consistent: true,
371+
},
372+
"ks2": {
373+
consistent: false,
374+
},
375+
},
376+
errExpected: "context canceled",
377+
},
378+
{
379+
name: "One deleted keyspace - consistent",
380+
ksList: []string{"ks1", "ks2"},
381+
ksMap: map[string]*keyspaceState{
382+
"ks1": {
383+
consistent: true,
384+
},
385+
"ks2": {
386+
deleted: true,
387+
},
388+
},
389+
errExpected: "",
390+
},
391+
}
392+
393+
for _, tt := range testcases {
394+
t.Run(tt.name, func(t *testing.T) {
395+
// We create a cancelable context and immediately cancel it.
396+
// We don't want the unit tests to wait, so we only test the first
397+
// iteration of whether the keyspace event watcher returns
398+
// that the keyspaces are consistent or not.
399+
ctx, cancel := context.WithCancel(context.Background())
400+
cancel()
401+
kew := KeyspaceEventWatcher{
402+
keyspaces: tt.ksMap,
403+
mu: sync.Mutex{},
404+
ts: &fakeTopoServer{},
405+
}
406+
err := kew.WaitForConsistentKeyspaces(ctx, tt.ksList)
407+
if tt.errExpected != "" {
408+
require.ErrorContains(t, err, tt.errExpected)
409+
} else {
410+
require.NoError(t, err)
411+
}
412+
332413
})
333414
}
334415
}

go/vt/srvtopo/discover.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@ limitations under the License.
1717
package srvtopo
1818

1919
import (
20-
"sync"
21-
2220
"context"
21+
"sync"
2322

2423
"vitess.io/vitess/go/vt/concurrency"
2524
"vitess.io/vitess/go/vt/log"
@@ -29,15 +28,16 @@ import (
2928
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
3029
)
3130

32-
// FindAllTargets goes through all serving shards in the topology for the provided keyspaces
31+
// FindAllTargetsAndKeyspaces goes through all serving shards in the topology for the provided keyspaces
3332
// and tablet types. If no keyspaces are provided all available keyspaces in the topo are
3433
// fetched. It returns one Target object per keyspace/shard/matching TabletType.
35-
func FindAllTargets(ctx context.Context, ts Server, cell string, keyspaces []string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, error) {
34+
// It also returns all the keyspaces that it found.
35+
func FindAllTargetsAndKeyspaces(ctx context.Context, ts Server, cell string, keyspaces []string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, []string, error) {
3636
var err error
3737
if len(keyspaces) == 0 {
3838
keyspaces, err = ts.GetSrvKeyspaceNames(ctx, cell, true)
3939
if err != nil {
40-
return nil, err
40+
return nil, nil, err
4141
}
4242
}
4343

@@ -95,8 +95,8 @@ func FindAllTargets(ctx context.Context, ts Server, cell string, keyspaces []str
9595
}
9696
wg.Wait()
9797
if errRecorder.HasErrors() {
98-
return nil, errRecorder.Error()
98+
return nil, nil, errRecorder.Error()
9999
}
100100

101-
return targets, nil
101+
return targets, keyspaces, nil
102102
}

0 commit comments

Comments
 (0)