Skip to content

Commit e7bdf19

Browse files
craig[bot]alyshanjahani-crlZhouXing19wenyihu6
committed
152986: workload: set allow_unsafe_internals for workload commands r=alyshanjahani-crl a=alyshanjahani-crl Previously some workload `run` and `init` commands would fail if and when they try to access crdb_internal or system tables (if the `allow_unsafe_internals` session var is globally defaulted to false - which will be the case in 26.1) This commit modifies all connections that workload commands use to have the `allow_unsafe_internals` session variable set to on. Fixes: #152755 Release note: None 153059: jsonpath: add err check for jsonValue Iterator retrieval r=ZhouXing19 a=ZhouXing19 There are 2 types that can return `ObjectJSONType` as the type: `json.jsonObject` or `json.jsonEncoded`. The latter can return not-nil error in their implementation of `ObjectIter()`. This commit is to add error check for this case. Epic: none Release note (bug fix): adding error check for jsonValue Iterator when evaluating a jsonpath on a json value. 153061: jsonpath/eval: graceful type unmatched handling r=ZhouXing19 a=ZhouXing19 Previously, if the type is not found in the switch case, we panic with a unhandled type error. Now we implement a more graceful way for erroring out. Epic: none Release note: None 153160: allocator: add doc.md r=tbg a=wenyihu6 This commit adds a notes page covering some parts of the allocator code from my code reading. Epic: none Release note: none 153281: cdctest/validator: handle error early r=wenyihu6 a=wenyihu6 This commit properly handles errors from `rowDatums.FetchValKey`. Epic: none Release note: none Co-authored-by: Alyshan Jahani <[email protected]> Co-authored-by: ZhouXing19 <[email protected]> Co-authored-by: wenyihu6 <[email protected]>
6 parents 0317bff + f73fb81 + 9e7f8b3 + b451c16 + 4b3d00d + 33b0f95 commit e7bdf19

File tree

6 files changed

+63
-14
lines changed

6 files changed

+63
-14
lines changed

pkg/ccl/changefeedccl/cdctest/validator.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -321,14 +321,13 @@ func (v *beforeAfterValidator) checkRowAt(
321321
stmtBuf.WriteString(` AND `)
322322
}
323323
jsonCol, err := rowDatums.FetchValKey(col)
324-
324+
if err != nil {
325+
return err
326+
}
325327
if jsonCol == nil || jsonCol.Type() == json.NullJSONType {
326328
fmt.Fprintf(&stmtBuf, `%s IS NULL`, col)
327329
} else {
328330
fmt.Fprintf(&stmtBuf, `to_json(%s)::TEXT = $%d`, col, i+1)
329-
if err != nil {
330-
return err
331-
}
332331
args = append(args, jsonCol.String())
333332
}
334333
}

pkg/kv/kvserver/allocator/doc.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Allocator
2+
3+
### Replica Scanner And Base Queue
4+
5+
- Every store has a replica scanner that periodically scans and iterates over every replica of stores. It calls every base queues’s [`MaybeAddAsync`](https://github.com/cockroachdb/cockroach/blob/308665f4fb5176f8f7dcefdbfe8b9eee669e9f5a/pkg/kv/kvserver/queue.go#L635) with every replica. Every base queue’s `MaybeAdd` then calls into base queue’s `replicaCanBeProcessed`, shouldQueue. If `shouldQueue` returns true, it [adds](https://github.com/cockroachdb/cockroach/blob/308665f4fb5176f8f7dcefdbfe8b9eee669e9f5a/pkg/kv/kvserver/queue.go#L763) the replica to the queue with the priority given by `shouldQueue`. Skips if `shouldQueue` returns false.
6+
- Every base queue implements `bq.processLoop` which pops items added by the replica scanner and calls into `bq.processOneAsyncAndReleaseSem`. `bq.processOneAsyncAndReleaseSem` then calls `bq.processReplica` async with a time out. `bq.processReplica` then calls `bq.process`. If replicas [fail](https://github.com/cockroachdb/cockroach/blob/308665f4fb5176f8f7dcefdbfe8b9eee669e9f5a/pkg/kv/kvserver/queue.go#L1141-L1143) to be processed, they will be sent to the purgatory queue.
7+
- Base queue includes: consistency queue, lease queue, merge queue, mvcc gc queue, raft log queue, raft snapshot queue, replica gc queue, replicate queue, split queue, timeseries maintenance queue.
8+
9+
10+
## Replicate Queue
11+
### High Level
12+
1. `ReplicateQueue.shouldQueue` calls into `ReplicaPlanner.ShouldPlanChange` which decides whether the replica scanner adds the replica to the queue.
13+
2. Replica scanner adds replicas that need to be queued to the base queue.
14+
3. Inside `replicateQueue.process` with one replica, a) grab the range token b) calls into `rq.processOneChange`.
15+
4. `processOneChange` calls into `ReplicaPlanner.PlanOneChange` which plans a replicate change or an error. `PlanOneChange` calls into `allocator.ComputeAction` which returns an enum action. This enum action tells replicate queue to consider add / remove / replace replicas. `ComputeAction` gives an action that should be done.
16+
The separation is here because `planner.ShouldPlanChange` calls into `ComputeAction` as well, and we want a cheap check for the queue. Note that actions returned from `ComputeAction` in `ShouldPlanChange` v.s. `PlanOneChange` may be different.
17+
5. Replica planner then switches and computes what replicate changes need to be done based on the action.
18+
19+
### shouldQueue: whether queue replicas to replicate queue
20+
1. `shouldQueue` takes a replica and calls into [`ReplicaPlanner.ShouldPlanChange`](https://github.com/cockroachdb/cockroach/blob/5c2a22f84f0581df39239bc890b9d7d08c6e020a/pkg/kv/kvserver/allocator/plan/replicate.go#L145) which then calls into the allocator via `rp.allocator.ComputeAction`.
21+
2. [`ComputeAction`](https://github.com/cockroachdb/cockroach/blob/ad4d1478461f2edfa49d1cadd34cad910f5f5b10/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go#L913) computes an action that needs to be done on this replica.
22+
+ It first checks if a repair action is needed: under-replicated, quorum, dead/decommissioning replicas, over-replicated. Note that this does not include constraint repair.
23+
+ If no repair needs to be done on this range, it would fall back and return `AllocatorConsiderRebalance` by default.
24+
3. After ComputeAction gives `ReplicaPlanner.ShouldPlanChange` the action, it would return true if a repair action is needed. Otherwise, it checks if a rebalance target can be found via [Allocator.RebalanceTarget](https://github.com/cockroachdb/cockroach/blob/ad4d1478461f2edfa49d1cadd34cad910f5f5b10/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go#L1784). We will go into the details of `RebalanceTarget` below.
25+
26+
### processOneChange: plan and process a change on the replica
27+
1. [`replicateQueue.processOneChange`](https://github.com/cockroachdb/cockroach/blob/a96cb747d2043d2959c1d44e29064c57d7aacde8/pkg/kv/kvserver/replicate_queue.go#L941) happens when the base queue pops the replica off the queue and actually start processing the replica.
28+
2. `replicateQueue.processOneChange` calls into `ReplicaPlanner.PlanOneChange` which then calls into `ComputeAction` as well. Based on the action, it would call into ReplicaPlanner's helper functions (such as [`addOrReplaceVoters`](https://github.com/cockroachdb/cockroach/blob/5c2a22f84f0581df39239bc890b9d7d08c6e020a/pkg/kv/kvserver/allocator/plan/replicate.go#L355)) to compute the exact allocation operation out (such as a lease transfer, change of replicas).
29+
3. For now, lets focus on how [`considerRebalance`](https://github.com/cockroachdb/cockroach/blob/5c2a22f84f0581df39239bc890b9d7d08c6e020a/pkg/kv/kvserver/allocator/plan/replicate.go#L773). As a reminder, this action is returned by default for every replica when no other repair action applies. Note that this action does help repair zone constraints.
30+
31+
### considerRebalance: find one rebalancing target for any of the existing replica
32+
1. `ReplicaPlanner` first [picks](https://github.com/cockroachdb/cockroach/blob/5c2a22f84f0581df39239bc890b9d7d08c6e020a/pkg/kv/kvserver/allocator/plan/replicate.go#L789) which scorer options to be used for the allocator decision. It uses the `RangeCountScorerOptions` by default and `ScatterScorerOptions` if this action originates from a scatter operation. [ScorerOptions](https://github.com/cockroachdb/cockroach/blob/bc9fdcd029eae1f30a5ea82e44b60a629d452f6c/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go#L266) define the heuristics used to evaluate candidate replicas, helping the allocator guide decisions toward specific goals such as range count convergence.
33+
2. `ReplicaPlanner` then calls into the allocator to find the best rebalancing target via [`Allocator.RebalanceTarget`](https://github.com/cockroachdb/cockroach/blob/ad4d1478461f2edfa49d1cadd34cad910f5f5b10/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go#L1784).
34+
3. `Allocator.RebalanceTarget` first [builds](https://github.com/cockroachdb/cockroach/blob/ad4d1478461f2edfa49d1cadd34cad910f5f5b10/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go#L1818-L1820) constraint checkers, which are passed to rankedCandidateListForRebalancing to determine which existing replicas / stores are valid or required based on the range’s constraints.
35+
4. `Allocator.RebalanceTarget` then calls into [`rankedCandidateListForRebalancing`](https://github.com/cockroachdb/cockroach/blob/ad4d1478461f2edfa49d1cadd34cad910f5f5b10/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go#L1848) to find a list of rebalance options. Each rebalance option consists of an existing replica and a set of candidate stores that can replace it.
36+
- `rankedCandidateListForRebalancing` first [builds](https://github.com/cockroachdb/cockroach/blob/bc9fdcd029eae1f30a5ea82e44b60a629d452f6c/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go#L1513) a map of existing store ids to their corresponding candidate entries - annotated with attributes such as valid, necessary, disk fullness, I/O overload, and diversity score.
37+
- For each existing replica store, it then constructs an [`equivalence class`](https://github.com/cockroachdb/cockroach/blob/bc9fdcd029eae1f30a5ea82e44b60a629d452f6c/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go#L1579), containing the store itself along with a list of comparable candidates.
38+
- This is how each equivalence class is constraucted: for every existing replica, all other stores are considered as potential candidates. During this iteration, some [filtering](https://github.com/cockroachdb/cockroach/blob/bc9fdcd029eae1f30a5ea82e44b60a629d452f6c/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go#L1650) is applied based on the state of other existing replicas. If a candidate store is [not worse](https://github.com/cockroachdb/cockroach/blob/bc9fdcd029eae1f30a5ea82e44b60a629d452f6c/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go#L1671) than the existing store, it is added to the list of comparable candidates. From all considered candidates, the best set is selected, and an equivalance class is constructed using existing replica and these best candidates. Note that at this stage, only attributes such as valid, necessary (constraint conformance), diversity score, and disk fullness are considered. At this point, range count convergence has not been considered yet.
39+
40+
- (TODO(wenyihu6 during review): why ioOverloaded is always false here there is a comment https://github.com/cockroachdb/cockroach/blob/bc9fdcd029eae1f30a5ea82e44b60a629d452f6c/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go#L1550-L1553 but idu).
41+
42+
- (TODO(wenyihu6 during review) looks like at this stage we do not exclude other existing replicas stores - we filter it out below when iterating over comparable candidates but why)
43+
44+
- At this point, we have an equivalence class for every existing replica. We then examine candidates’ range count statistics within each class to help break ties.
45+
- For each equivalence class, ScorerOptions [populate](https://github.com/cockroachdb/cockroach/blob/bc9fdcd029eae1f30a5ea82e44b60a629d452f6c/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go#L1775) the convergence score, balance score, and range count for existing store and potential candidates. The candidate set is then further filtered to include only those [better](https://github.com/cockroachdb/cockroach/blob/bc9fdcd029eae1f30a5ea82e44b60a629d452f6c/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go#L1792) than the existing store.
46+
5. Using the results from rankedCandidateListForRebalancing, `Allocator.RebalanceTarget` iterates over all rebalanceOptions to [identify](https://github.com/cockroachdb/cockroach/blob/bc9fdcd029eae1f30a5ea82e44b60a629d452f6c/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go#L1826) the option that offers the largest improvement. As a reminder, each `rebalanceOption` consists of an existing replica and a list of candidate stores. For each option, it selects the best candidate set, randomly [chooses](https://github.com/cockroachdb/cockroach/blob/bc9fdcd029eae1f30a5ea82e44b60a629d452f6c/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go#L1821) one candidate, and then determines the overall best rebalance option by comparing all rebalanceOptions.

pkg/util/jsonpath/eval/eval.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,14 +267,17 @@ func (ctx *jsonpathCtx) executeAnyItem(
267267
}
268268
}
269269
case json.ObjectJSONType:
270-
iter, _ := jsonValue.ObjectIter()
270+
iter, err := jsonValue.ObjectIter()
271+
if err != nil {
272+
return nil, errors.Wrapf(err, "getting iterator for json object")
273+
}
271274
for iter.Next() {
272275
if err := processItem(iter.Value()); err != nil {
273276
return nil, err
274277
}
275278
}
276279
default:
277-
panic(errors.AssertionFailedf("executeAnyItem called with type: %s", jsonValue.Type()))
280+
return nil, errors.AssertionFailedf("executeAnyItem called with type: %s", jsonValue.Type())
278281
}
279282
return agg, nil
280283
}

pkg/util/jsonpath/eval/method.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (ctx *jsonpathCtx) evalNumericMethod(
8383
dec.Negative = false
8484
}
8585
default:
86-
panic(errors.Newf("unimplemented method: %s", method.Type))
86+
return nil, errors.Newf("unsupported jsonpath method type %d for numeric evaluation", method.Type)
8787
}
8888
if err != nil {
8989
return nil, err

pkg/util/jsonpath/eval/operation.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (ctx *jsonpathCtx) evalOperation(
7575
case jsonpath.OpPlus, jsonpath.OpMinus:
7676
return ctx.evalUnaryArithmetic(op, jsonValue)
7777
default:
78-
panic(errors.AssertionFailedf("unhandled operation type"))
78+
return nil, errors.AssertionFailedf("unhandled operation type")
7979
}
8080
}
8181

@@ -98,7 +98,7 @@ func (ctx *jsonpathCtx) evalBoolean(
9898
case jsonpath.OpStartsWith:
9999
return ctx.evalPredicate(op, jsonValue, evalStartsWithFunc, true /* evalRight */, false /* unwrapRight */)
100100
default:
101-
panic(errors.AssertionFailedf("unhandled operation type"))
101+
return jsonpathBoolUnknown, errors.AssertionFailedf("unhandled operation type")
102102
}
103103
}
104104

@@ -224,7 +224,7 @@ func (ctx *jsonpathCtx) evalLogical(
224224
}
225225
return jsonpathBoolTrue, nil
226226
default:
227-
panic(errors.AssertionFailedf("unhandled logical operation type"))
227+
return jsonpathBoolUnknown, errors.AssertionFailedf("unhandled logical operation type")
228228
}
229229

230230
rightOp, ok := op.Right.(jsonpath.Operation)
@@ -247,7 +247,7 @@ func (ctx *jsonpathCtx) evalLogical(
247247
}
248248
return rightBool, nil
249249
default:
250-
panic(errors.AssertionFailedf("unhandled logical operation type"))
250+
return jsonpathBoolUnknown, errors.AssertionFailedf("unhandled logical operation type")
251251
}
252252
}
253253

@@ -359,7 +359,7 @@ func evalComparisonFunc(operation jsonpath.Operation, l, r json.JSON) (jsonpathB
359359
// Don't evaluate non-scalar types.
360360
return jsonpathBoolUnknown, nil
361361
default:
362-
panic(errors.AssertionFailedf("unhandled json type"))
362+
return jsonpathBoolUnknown, errors.AssertionFailedf("unhandled json type")
363363
}
364364

365365
var res bool
@@ -377,7 +377,7 @@ func evalComparisonFunc(operation jsonpath.Operation, l, r json.JSON) (jsonpathB
377377
case jsonpath.OpCompGreaterEqual:
378378
res = cmp >= 0
379379
default:
380-
panic(errors.AssertionFailedf("unhandled jsonpath comparison type"))
380+
return jsonpathBoolUnknown, errors.AssertionFailedf("unhandled jsonpath comparison type")
381381
}
382382
if res {
383383
return jsonpathBoolTrue, nil
@@ -473,7 +473,7 @@ func performArithmetic(
473473
}
474474
_, err = tree.DecimalCtx.Rem(&res, leftNum, rightNum)
475475
default:
476-
panic(errors.AssertionFailedf("unhandled jsonpath arithmetic type"))
476+
return nil, errors.AssertionFailedf("unhandled jsonpath arithmetic type")
477477
}
478478
if err != nil {
479479
return nil, err

pkg/workload/connection.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ func SanitizeUrls(gen Generator, connFlags *ConnFlags, urls []string) (string, e
118118
func SetUrlConnVars(gen Generator, connFlags *ConnFlags, urls []string) error {
119119
vars := make(map[string]string)
120120
vars["application_name"] = gen.Meta().Name
121+
vars["allow_unsafe_internals"] = "true"
121122
if connFlags != nil {
122123
if connFlags.IsoLevel != "" {
123124
// As a convenience, replace underscores with spaces. This allows users of

0 commit comments

Comments
 (0)