Skip to content

[Do Not Merge] v20.x #152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 24 commits into
base: release-20.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5b6e9e9
Fix race condition that prevents queries from being buffered after vt…
GuptaManan100 Aug 29, 2024
9815ca8
VReplication: Estimate lag when workflow fully throttled (#16577) (#149)
mhamza15 Apr 14, 2025
36832b1
Merge remote-tracking branch 'upstream/release-20.0' into release-20.…
mhamza15 Aug 7, 2025
f828b54
Set parsed comments in operator for subqueries (#18369)
davidpiegza Jul 1, 2025
fae60c5
sqlparser: Remove unneeded escaping (#16255)
dbussink Jun 24, 2024
0a327cb
remove skip_e2e
mhamza15 Aug 7, 2025
0bd5a8d
fix escape in tests
mhamza15 Aug 7, 2025
cb5a0a0
fix new tests
mhamza15 Aug 7, 2025
81279a7
Fix formatting
mhamza15 Aug 7, 2025
d422f04
Fix formatting
mhamza15 Aug 7, 2025
b7050ae
Fix table formatting in select_cases.json
mhamza15 Aug 7, 2025
2bd3d6b
Fix JSON formatting in select_cases.json
mhamza15 Aug 7, 2025
c703f98
Fail loading an ACL config if the provided file is empty and enforceT…
mhamza15 Aug 7, 2025
ae90711
Revert "VReplication: Estimate lag when workflow fully throttled (#16…
mhamza15 Aug 7, 2025
59a7d3a
Merge branch 'release-20.0-github' into mhamza/backport-18369
mhamza15 Aug 7, 2025
0a3b142
Merge branch 'release-20.0-github' into mhamza/backport-16255
mhamza15 Aug 7, 2025
afff925
fix runs on in workflows
mhamza15 Apr 2, 2025
50a5578
fix actions
mhamza15 Apr 2, 2025
6d47856
Merge branch 'release-20.0-github' into mhamza/backport-18369
mhamza15 Aug 7, 2025
793357a
Merge branch 'release-20.0-github' into mhamza/backport-16255
mhamza15 Aug 7, 2025
0a4023d
VReplication: Estimate lag when workflow fully throttled (#16577)
mattlord Aug 15, 2024
aa56cad
Merge pull request #174 from github/mhamza/backport-18369
arthurschreiber Aug 7, 2025
4fe826d
Merge pull request #175 from github/mhamza/backport-16255
arthurschreiber Aug 8, 2025
ecaf20e
Merge pull request #178 from github/mhamza/backport-16577
arthurschreiber Aug 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 82 additions & 8 deletions go/vt/discovery/keyspace_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package discovery
import (
"context"
"fmt"
"slices"
"sync"
"time"

"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
Expand All @@ -37,6 +39,11 @@ import (
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)

var (
// waitConsistentKeyspacesCheck is the amount of time to wait for between checks to verify the keyspace is consistent.
waitConsistentKeyspacesCheck = 100 * time.Millisecond
)

// KeyspaceEventWatcher is an auxiliary watcher that watches all availability incidents
// for all keyspaces in a Vitess cell and notifies listeners when the events have been resolved.
// Right now this is capable of detecting the end of failovers, both planned and unplanned,
Expand Down Expand Up @@ -692,29 +699,53 @@ func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(ctx context.Context, tar
return ks.beingResharded(target.Shard)
}

// PrimaryIsNotServing checks if the reason why the given target is not accessible right now is
// that the primary tablet for that shard is not serving. This is possible during a Planned
// Reparent Shard operation. Just as the operation completes, a new primary will be elected, and
// ShouldStartBufferingForTarget checks if we should be starting buffering for the given target.
// We check the following things before we start buffering -
// 1. The shard must have a primary.
// 2. The primary must be non-serving.
// 3. The keyspace must be marked inconsistent.
//
// This buffering is meant to kick in during a Planned Reparent Shard operation.
// As part of that operation the old primary will become non-serving. At that point
// this code should return true to start buffering requests.
// Just as the PRS operation completes, a new primary will be elected, and
// it will send its own healthcheck stating that it is serving. We should buffer requests until
// that point. There are use cases where people do not run with a Primary server at all, so we must
// that point.
//
// There are use cases where people do not run with a Primary server at all, so we must
// verify that we only start buffering when a primary was present, and it went not serving.
// The shard state keeps track of the current primary and the last externally reparented time, which
// we can use to determine that there was a serving primary which now became non serving. This is
// only possible in a DemotePrimary RPC which are only called from ERS and PRS. So buffering will
// stop when these operations succeed. We return the tablet alias of the primary if it is serving.
func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(ctx context.Context, target *querypb.Target) (*topodatapb.TabletAlias, bool) {
// stop when these operations succeed. We also return the tablet alias of the primary if it is serving.
func (kew *KeyspaceEventWatcher) ShouldStartBufferingForTarget(ctx context.Context, target *querypb.Target) (*topodatapb.TabletAlias, bool) {
if target.TabletType != topodatapb.TabletType_PRIMARY {
// We don't support buffering for any target tablet type other than the primary.
return nil, false
}
ks := kew.getKeyspaceStatus(ctx, target.Keyspace)
if ks == nil {
// If the keyspace status is nil, then the keyspace must be deleted.
// The user query is trying to access a keyspace that has been deleted.
// There is no reason to buffer this query.
return nil, false
}
ks.mu.Lock()
defer ks.mu.Unlock()
if state, ok := ks.shards[target.Shard]; ok {
// If the primary tablet was present then externallyReparented will be non-zero and
// currentPrimary will be not nil.
// As described in the function comment, we only want to start buffering when all the following conditions are met -
// 1. The shard must have a primary. We check this by checking the currentPrimary and externallyReparented fields being non-empty.
// They are set the first time the shard registers an update from a serving primary and are never cleared out after that.
// If the user has configured vtgates to wait for the primary tablet healthchecks before starting query service, this condition
// will always be true.
// 2. The primary must be non-serving. We check this by checking the serving field in the shard state.
// When a primary becomes non-serving, it also marks the keyspace inconsistent. So the next check is only added
// for being defensive against any bugs.
// 3. The keyspace must be marked inconsistent. We check this by checking the consistent field in the keyspace state.
//
// The reason we need all the three checks is that we want to be very defensive in when we start buffering.
// We don't want to start buffering when we don't know for sure if the primary
// is not serving and we will receive an update that stops buffering soon.
return state.currentPrimary, !state.serving && !ks.consistent && state.externallyReparented != 0 && state.currentPrimary != nil
}
return nil, false
Expand Down Expand Up @@ -766,3 +797,46 @@ func (kew *KeyspaceEventWatcher) MarkShardNotServing(ctx context.Context, keyspa
}
return true
}

// WaitForConsistentKeyspaces waits for the given set of keyspaces to be marked consistent.
func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context, ksList []string) error {
// We don't want to change the original keyspace list that we receive so we clone it
// before we empty it elements down below.
keyspaces := slices.Clone(ksList)
Comment on lines +802 to +805
Copy link
Preview

Copilot AI Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WaitForConsistentKeyspaces uses a fixed sleep interval in a loop which may lead to long waits if keyspaces never become consistent; consider introducing a configurable timeout or maximum iteration limit to avoid potential resource waste.

Suggested change
func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context, ksList []string) error {
// We don't want to change the original keyspace list that we receive so we clone it
// before we empty it elements down below.
keyspaces := slices.Clone(ksList)
func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context, ksList []string, timeout time.Duration) error {
// We don't want to change the original keyspace list that we receive so we clone it
// before we empty it elements down below.
keyspaces := slices.Clone(ksList)
timeoutChan := time.After(timeout)

Copilot uses AI. Check for mistakes.

for {
// We empty keyspaces as we find them to be consistent.
allConsistent := true
for i, ks := range keyspaces {
if ks == "" {
continue
}

// Get the keyspace status and see it is consistent yet or not.
kss := kew.getKeyspaceStatus(ctx, ks)
// If kss is nil, then it must be deleted. In that case too it is fine for us to consider
// it consistent since the keyspace has been deleted.
if kss == nil || kss.consistent {
keyspaces[i] = ""
} else {
allConsistent = false
}
}

if allConsistent {
// all the keyspaces are consistent.
return nil
}

// Unblock after the sleep or when the context has expired.
select {
case <-ctx.Done():
for _, ks := range keyspaces {
if ks != "" {
log.Infof("keyspace %v didn't become consistent", ks)
}
}
return ctx.Err()
case <-time.After(waitConsistentKeyspacesCheck):
}
}
}
119 changes: 100 additions & 19 deletions go/vt/discovery/keyspace_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,11 @@ func TestKeyspaceEventTypes(t *testing.T) {
kew := NewKeyspaceEventWatcher(ctx, ts2, hc, cell)

type testCase struct {
name string
kss *keyspaceState
shardToCheck string
expectResharding bool
expectPrimaryNotServing bool
name string
kss *keyspaceState
shardToCheck string
expectResharding bool
expectShouldBuffer bool
}

testCases := []testCase{
Expand Down Expand Up @@ -189,9 +189,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
},
consistent: false,
},
shardToCheck: "-",
expectResharding: true,
expectPrimaryNotServing: false,
shardToCheck: "-",
expectResharding: true,
expectShouldBuffer: false,
},
{
name: "two to four resharding in progress",
Expand Down Expand Up @@ -250,9 +250,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
},
consistent: false,
},
shardToCheck: "-80",
expectResharding: true,
expectPrimaryNotServing: false,
shardToCheck: "-80",
expectResharding: true,
expectShouldBuffer: false,
},
{
name: "unsharded primary not serving",
Expand All @@ -276,9 +276,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
},
consistent: false,
},
shardToCheck: "-",
expectResharding: false,
expectPrimaryNotServing: true,
shardToCheck: "-",
expectResharding: false,
expectShouldBuffer: true,
},
{
name: "sharded primary not serving",
Expand Down Expand Up @@ -310,9 +310,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
},
consistent: false,
},
shardToCheck: "-80",
expectResharding: false,
expectPrimaryNotServing: true,
shardToCheck: "-80",
expectResharding: false,
expectShouldBuffer: true,
},
}

Expand All @@ -327,8 +327,89 @@ func TestKeyspaceEventTypes(t *testing.T) {
resharding := kew.TargetIsBeingResharded(ctx, tc.kss.shards[tc.shardToCheck].target)
require.Equal(t, resharding, tc.expectResharding, "TargetIsBeingResharded should return %t", tc.expectResharding)

_, primaryDown := kew.PrimaryIsNotServing(ctx, tc.kss.shards[tc.shardToCheck].target)
require.Equal(t, primaryDown, tc.expectPrimaryNotServing, "PrimaryIsNotServing should return %t", tc.expectPrimaryNotServing)
_, shouldBuffer := kew.ShouldStartBufferingForTarget(ctx, tc.kss.shards[tc.shardToCheck].target)
require.Equal(t, shouldBuffer, tc.expectShouldBuffer, "ShouldStartBufferingForTarget should return %t", tc.expectShouldBuffer)
})
}
}

// TestWaitForConsistentKeyspaces tests the behaviour of WaitForConsistent for different scenarios.
func TestWaitForConsistentKeyspaces(t *testing.T) {
testcases := []struct {
name string
ksMap map[string]*keyspaceState
ksList []string
errExpected string
}{
{
name: "Empty keyspace list",
ksList: nil,
ksMap: map[string]*keyspaceState{
"ks1": {},
},
errExpected: "",
},
{
name: "All keyspaces consistent",
ksList: []string{"ks1", "ks2"},
ksMap: map[string]*keyspaceState{
"ks1": {
consistent: true,
},
"ks2": {
consistent: true,
},
},
errExpected: "",
},
{
name: "One keyspace inconsistent",
ksList: []string{"ks1", "ks2"},
ksMap: map[string]*keyspaceState{
"ks1": {
consistent: true,
},
"ks2": {
consistent: false,
},
},
errExpected: "context canceled",
},
{
name: "One deleted keyspace - consistent",
ksList: []string{"ks1", "ks2"},
ksMap: map[string]*keyspaceState{
"ks1": {
consistent: true,
},
"ks2": {
deleted: true,
},
},
errExpected: "",
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
// We create a cancelable context and immediately cancel it.
// We don't want the unit tests to wait, so we only test the first
// iteration of whether the keyspace event watcher returns
// that the keyspaces are consistent or not.
ctx, cancel := context.WithCancel(context.Background())
cancel()
kew := KeyspaceEventWatcher{
keyspaces: tt.ksMap,
mu: sync.Mutex{},
ts: &fakeTopoServer{},
}
err := kew.WaitForConsistentKeyspaces(ctx, tt.ksList)
if tt.errExpected != "" {
require.ErrorContains(t, err, tt.errExpected)
} else {
require.NoError(t, err)
}

})
}
}
Expand Down
14 changes: 7 additions & 7 deletions go/vt/srvtopo/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ limitations under the License.
package srvtopo

import (
"sync"

"context"
"sync"

"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
Expand All @@ -29,15 +28,16 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// FindAllTargets goes through all serving shards in the topology for the provided keyspaces
// FindAllTargetsAndKeyspaces goes through all serving shards in the topology for the provided keyspaces
// and tablet types. If no keyspaces are provided all available keyspaces in the topo are
// fetched. It returns one Target object per keyspace/shard/matching TabletType.
func FindAllTargets(ctx context.Context, ts Server, cell string, keyspaces []string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, error) {
// It also returns all the keyspaces that it found.
func FindAllTargetsAndKeyspaces(ctx context.Context, ts Server, cell string, keyspaces []string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, []string, error) {
var err error
if len(keyspaces) == 0 {
keyspaces, err = ts.GetSrvKeyspaceNames(ctx, cell, true)
if err != nil {
return nil, err
return nil, nil, err
}
}

Expand Down Expand Up @@ -95,8 +95,8 @@ func FindAllTargets(ctx context.Context, ts Server, cell string, keyspaces []str
}
wg.Wait()
if errRecorder.HasErrors() {
return nil, errRecorder.Error()
return nil, nil, errRecorder.Error()
}

return targets, nil
return targets, keyspaces, nil
}
Loading
Loading