Skip to content

Commit 0e9251d

Browse files
craig[bot]dtspilchenwenyihu6andyyang890
committed
150535: ui: show tenant dropdown in insecure mode when multiple tenants exist r=dt a=dt Previously, the tenant dropdown would not appear in insecure mode because it required a tenant cookie to be set, which only happens after authentication. This change modifies the dropdown logic to show when running in insecure mode and the virtual_clusters endpoint returns more than one tenant. In insecure mode, the dropdown now displays "default" when no tenant cookie is set, allowing users to select from available virtual clusters without requiring authentication. Subsumes #150143. Closes #150143. Release note (ui change): The tenant dropdown now appears in insecure mode when multiple virtual clusters are available. Epic: none. 150771: sql/ttl: improve TTL replan decision logic r=spilchen a=spilchen Replace calculatePlanGrowth with detectNodeAvailabilityChanges to make TTL job replanning less sensitive to span changes. The new logic focuses specifically on detecting when nodes become unavailable rather than reacting to all plan differences. The previous implementation would trigger replans for span splits/merges that don't actually indicate beneficial restart scenarios. The new approach only considers missing nodes from the original plan, which typically indicates node failures where work redistribution would benefit from restarting the job. It also supports a stability window so that replan decisions need to fire consecutively. This should help aleviate changes in plans due to range cache issues. Fixes #150343 Epic: none Release note (ops change): The 'sql.ttl.replan_flow_threshold' may have been set to 0 to work around the TTL replanner being too sensitive. This fix will alleviate that and any instance that had set replan_flow_threshold to 0 can be reset back to the default. 150947: asim: port over some trivial commits from prototype r=tbg a=wenyihu6 This commit ports over some trivial changes from the mma prototype branch. Epic: CRDB-25222 Release note: none 150949: span: clean up test code after llrb frontier deletion r=rharding6373 a=andyyang890 This patch cleans up some code in the unit tests that previously would toggle the frontier type but now just leads to each test being running twice with the exact same configuration. Informs #141668 Release note: None Co-authored-by: David Taylor <[email protected]> Co-authored-by: Matt Spilchen <[email protected]> Co-authored-by: wenyihu6 <[email protected]> Co-authored-by: Andy Yang <[email protected]>
5 parents a617840 + 411c0f2 + 706d137 + 169b7ab + 4cf5a10 commit 0e9251d

File tree

12 files changed

+648
-333
lines changed

12 files changed

+648
-333
lines changed

pkg/kv/kvserver/asim/metrics/series.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func MakeTS(metrics [][]StoreMetrics) map[string][][]float64 {
1717
}
1818

1919
stores := len(metrics[0])
20-
// TODO(kvoli): Either begin resuing prometheus metric definitions with a
20+
// TODO(kvoli): Either begin reusing prometheus metric definitions with a
2121
// custom scraper or provide definitions for each metric available. These
2222
// are partially duplicated with the cluster tracker.
2323
ret["qps"] = make([][]float64, stores)

pkg/kv/kvserver/asim/state/load.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,6 @@ func LoadEventQPS(le workload.LoadEvent) float64 {
3939

4040
// ReplicaLoadCounter is the sum of all key accesses and size of bytes, both written
4141
// and read.
42-
// TODO(kvoli): In the non-simulated code, replica_stats currently maintains
43-
// this structure, which is rated. This datastructure needs to be adapated by
44-
// the user to be rated over time. In the future we should introduce a better
45-
// general pupose stucture that enables rating.
4642
type ReplicaLoadCounter struct {
4743
WriteKeys int64
4844
WriteBytes int64

pkg/kv/kvserver/asim/state/parser_replica_placement.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ func ParseReplicaPlacement(input string) ReplicaPlacement {
121121
}
122122
// For matches[0] and stores[0], storeID will be 1.
123123
storeID, _ := strconv.Atoi(parts[0][1:])
124+
if storeID == 0 {
125+
panic(fmt.Sprintf("unable to parse store id: %s", parts[0][1:]))
126+
}
124127
storeSet = append(storeSet, storeID)
125128

126129
// If the replica type or leaseholder is not specified, artificially

pkg/kv/kvserver/asim/tests/datadriven_simulation_test.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,13 +145,16 @@ var runAsimTests = envutil.EnvOrDefaultBool("COCKROACH_RUN_ASIM_TESTS", false)
145145
// constraints(violating) at the end of the evaluation.
146146
//
147147
// - "setting" [replicate_queue_enabled=bool] [lease_queue_enabled=bool]
148-
// [split_queue_enabled=bool] [rebalance_mode=<int>] [rebalance_interval=<duration>]
149-
// [rebalance_qps_threshold=<float>] [split_qps_threshold=<float>]
148+
// [split_queue_enabled=bool] [rebalance_interval=<duration>]
149+
// [rebalance_mode=<int>] [split_qps_threshold=<float>]
150150
// [rebalance_range_threshold=<float>] [gossip_delay=<duration>]
151+
// [rebalance_objective=<int>]
151152
// Configure the simulation's various settings. The default values are:
152-
// rebalance_mode=2 (leases and replicas) rebalance_interval=1m (1 minute)
153-
// rebalance_qps_threshold=0.1 split_qps_threshold=2500
154-
// rebalance_range_threshold=0.05 gossip_delay=500ms.
153+
// replicate_queue_enabled=true lease_queue_enabled=true
154+
// split_queue_enabled=true rebalance_interval=1m (1 minute)
155+
// rebalance_mode=2 (leases and replicas) split_qps_threshold=2500
156+
// rebalance_range_threshold=0.05 gossip_delay=500ms rebalance_objective=0
157+
// (QPS) (1=CPU)
155158
//
156159
// - "eval" [duration=<string>] [samples=<int>] [seed=<int>]
157160
// Run samples (e.g. samples=5) number of simulations for duration (e.g.

pkg/server/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1303,6 +1303,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
13031303
tenantCapabilitiesWatcher,
13041304
cfg.DisableSQLServer,
13051305
cfg.BaseConfig.DisableTLSForHTTP,
1306+
cfg.Insecure,
13061307
)
13071308
drain.serverCtl = sc
13081309

pkg/server/server_controller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ type serverController struct {
100100

101101
disableTLSForHTTP bool
102102

103+
insecure bool
104+
103105
mu struct {
104106
syncutil.RWMutex
105107

@@ -136,6 +138,7 @@ func newServerController(
136138
watcher *tenantcapabilitieswatcher.Watcher,
137139
disableSQLServer bool,
138140
disableTLSForHTTP bool,
141+
insecure bool,
139142
) *serverController {
140143
c := &serverController{
141144
AmbientContext: ambientCtx,
@@ -150,6 +153,7 @@ func newServerController(
150153
drainCh: make(chan struct{}),
151154
disableSQLServer: disableSQLServer,
152155
disableTLSForHTTP: disableTLSForHTTP,
156+
insecure: insecure,
153157
}
154158
c.orchestrator = newChannelOrchestrator(parentStopper, c)
155159
c.mu.servers = map[roachpb.TenantName]*serverState{

pkg/server/server_controller_http.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package server
77

88
import (
99
"bytes"
10+
"encoding/json"
1011
"io"
1112
"net/http"
1213
"strconv"
@@ -53,6 +54,26 @@ var ServerHTTPBasePath = settings.RegisterStringSetting(
5354
settings.WithPublic,
5455
)
5556

57+
func (c *serverController) insecureVirtualClusterList(w http.ResponseWriter, r *http.Request) {
58+
tenantNames := c.getCurrentTenantNames()
59+
tenants := make([]string, len(tenantNames))
60+
for i, name := range tenantNames {
61+
tenants[i] = string(name)
62+
}
63+
64+
resp := &virtualClustersResp{
65+
VirtualClusters: tenants,
66+
}
67+
respBytes, err := json.Marshal(resp)
68+
if err != nil {
69+
http.Error(w, "unable to marshal virtual clusters JSON", http.StatusInternalServerError)
70+
return
71+
}
72+
if _, err := w.Write(respBytes); err != nil {
73+
log.Errorf(r.Context(), "unable to write virtual clusters response: %s", err.Error())
74+
}
75+
}
76+
5677
// httpMux redirects incoming HTTP requests to the server selected by
5778
// the special HTTP request header.
5879
// If no tenant is specified, the default tenant is used.
@@ -78,6 +99,13 @@ func (c *serverController) httpMux(w http.ResponseWriter, r *http.Request) {
7899
c.attemptLogoutFromAllTenants().ServeHTTP(w, r)
79100
return
80101
}
102+
if c.insecure && r.URL.Path == virtualClustersPath {
103+
// If the insecure flag is set, the virtual clusters endpoint should just
104+
// return all tennants instead of delegating to a tenant server to read the
105+
// auth cookie which doesn't exist.
106+
c.insecureVirtualClusterList(w, r)
107+
return
108+
}
81109
tenantName := getTenantNameFromHTTPRequest(c.st, r)
82110
noFallback := false
83111
if noFallbackValue := r.URL.Query().Get(NoFallbackParam); noFallbackValue != "" {

pkg/sql/ttl/ttljob/ttljob.go

Lines changed: 98 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package ttljob
88
import (
99
"context"
1010
"math/rand"
11+
"sync/atomic"
1112
"time"
1213

1314
"github.com/cockroachdb/cockroach/pkg/base"
@@ -55,6 +56,14 @@ var replanFrequency = settings.RegisterDurationSetting(
5556
settings.PositiveDuration,
5657
)
5758

59+
var replanStabilityWindow = settings.RegisterIntSetting(
60+
settings.ApplicationLevel,
61+
"sql.ttl.replan_stability_window",
62+
"number of consecutive replan evaluations required before triggering a replan; set to 1 to disable stability window",
63+
2,
64+
settings.PositiveInt,
65+
)
66+
5867
// rowLevelTTLResumer implements the TTL job. The job can run on any node, but
5968
// the job node distributes SELECT/DELETE work via DistSQL to ttlProcessor
6069
// nodes. DistSQL divides work into spans that each ttlProcessor scans in a
@@ -65,6 +74,9 @@ type rowLevelTTLResumer struct {
6574
physicalPlan *sql.PhysicalPlan
6675
planCtx *sql.PlanningCtx
6776

77+
// consecutiveReplanDecisions tracks how many consecutive times replan was deemed necessary.
78+
consecutiveReplanDecisions *atomic.Int64
79+
6880
mu struct {
6981
syncutil.Mutex
7082
// lastUpdateTime is the wall time of the last job progress update.
@@ -296,7 +308,10 @@ func (t *rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) (r
296308
// the TTL job to utilize those nodes for parallel work.
297309
replanChecker, cancelReplanner := sql.PhysicalPlanChangeChecker(
298310
ctx, t.physicalPlan, makePlan, jobExecCtx,
299-
sql.ReplanOnChangedFraction(func() float64 { return replanThreshold.Get(&execCfg.Settings.SV) }),
311+
replanDecider(t.consecutiveReplanDecisions,
312+
func() int64 { return replanStabilityWindow.Get(&execCfg.Settings.SV) },
313+
func() float64 { return replanThreshold.Get(&execCfg.Settings.SV) },
314+
),
300315
func() time.Duration { return replanFrequency.Get(&execCfg.Settings.SV) },
301316
)
302317

@@ -507,11 +522,91 @@ func (t *rowLevelTTLResumer) refreshProgress(
507522
return newProgress, nil
508523
}
509524

525+
// replanDecider returns a function that determines whether a TTL job should be
526+
// replanned based on changes in the physical execution plan. It compares the
527+
// old and new plans to detect node availability changes and decides if the
528+
// benefit of replanning (better parallelization) outweighs the cost of
529+
// restarting the job. It implements a stability window to avoid replanning
530+
// due to transient changes.
531+
func replanDecider(
532+
consecutiveReplanDecisions *atomic.Int64,
533+
stabilityWindowFn func() int64,
534+
thresholdFn func() float64,
535+
) sql.PlanChangeDecision {
536+
return func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool {
537+
changed, growth := detectNodeAvailabilityChanges(oldPlan, newPlan)
538+
threshold := thresholdFn()
539+
shouldReplan := threshold != 0.0 && growth > threshold
540+
541+
stabilityWindow := stabilityWindowFn()
542+
543+
var currentDecisions int64
544+
if shouldReplan {
545+
currentDecisions = consecutiveReplanDecisions.Add(1)
546+
} else {
547+
consecutiveReplanDecisions.Store(0)
548+
currentDecisions = 0
549+
}
550+
551+
// If stability window is 1, replan immediately. Otherwise, require
552+
// consecutive decisions to meet the window threshold.
553+
replan := currentDecisions >= stabilityWindow
554+
555+
// Reset the counter when we decide to replan, since the job will restart
556+
if replan {
557+
consecutiveReplanDecisions.Store(0)
558+
}
559+
560+
if shouldReplan || growth > 0.1 || log.V(1) {
561+
log.Infof(ctx, "Re-planning would add or alter flows on %d nodes / %.2f, threshold %.2f, consecutive decisions %d/%d, replan %v",
562+
changed, growth, threshold, currentDecisions, stabilityWindow, replan)
563+
}
564+
565+
return replan
566+
}
567+
}
568+
569+
// detectNodeAvailabilityChanges analyzes differences between two physical plans
570+
// to determine if nodes have become unavailable. It returns the number of nodes
571+
// that are no longer available and the fraction of the original plan affected.
572+
//
573+
// The function focuses on detecting when nodes from the original plan are missing
574+
// from the new plan, which typically indicates node failures. When nodes fail,
575+
// their work gets redistributed to remaining nodes, making a job restart
576+
// beneficial for better parallelization. We ignore newly added nodes since
577+
// continuing the current job on existing nodes is usually more efficient than
578+
// restarting to incorporate new capacity.
579+
func detectNodeAvailabilityChanges(before, after *sql.PhysicalPlan) (int, float64) {
580+
var changed int
581+
beforeSpecs, beforeCleanup := before.GenerateFlowSpecs()
582+
defer beforeCleanup(beforeSpecs)
583+
afterSpecs, afterCleanup := after.GenerateFlowSpecs()
584+
defer afterCleanup(afterSpecs)
585+
586+
// Count nodes from the original plan that are no longer present in the new plan.
587+
// We only check nodes in beforeSpecs because we specifically want to detect
588+
// when nodes that were doing work are no longer available, which typically
589+
// indicates beneficial restart scenarios (node failures where work can be
590+
// redistributed more efficiently).
591+
for n := range beforeSpecs {
592+
if _, ok := afterSpecs[n]; !ok {
593+
changed++
594+
}
595+
}
596+
597+
var frac float64
598+
if changed > 0 {
599+
frac = float64(changed) / float64(len(beforeSpecs))
600+
}
601+
return changed, frac
602+
}
603+
510604
func init() {
511605
jobs.RegisterConstructor(jobspb.TypeRowLevelTTL, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
512606
return &rowLevelTTLResumer{
513-
job: job,
514-
st: settings,
607+
job: job,
608+
st: settings,
609+
consecutiveReplanDecisions: &atomic.Int64{},
515610
}
516611
}, jobs.UsesTenantCostControl)
517612
}

0 commit comments

Comments
 (0)