Skip to content

Commit 974ef4a

Browse files
craig[bot]shubhamdhamayuzefovichjeffswensonnormanchenn
committed
143354: server: enable TestServerControllerLoginLogout for secondary tenants r=dhartunian,cthumuluru-crdb a=shubhamdhama Fixes: #110002 Epic: CRDB-38970 Release note: none 143990: distsql: cancel remote flows on quiesce r=yuzefovich a=yuzefovich We recently saw a test timeout where the cluster shutdown was blocked on the outbox goroutine attempting to set up a connection to one of the nodes. We blocked there indefinitely since the remote flows previously didn't respect quiesce signal (we're using the background context there because the remote flows outlive the SetupFlow RPC handler context). This is now fixed. The bug has been present since forever, and given that we haven't hit this before, it seems like an extreme edge case, so I decided to omit the release note. Fixes: #143875. Release note: None 144007: logical: have tombstone updater accept datums r=jeffswenson a=jeffswenson Previously, the tombstone deleter accepted a cdcevent.Row. Now, it accepts datums derived from the cdcevent.Row. This allows the tombstone updater to be used by the crud sql writer which internally expects datums. This also removes an extra `isLwwLoser(err)` check that caused the tombstone updater to silently drop errors. This bug was caught by testing in #143988. Release Note: none Epic: CRDB-48647 144113: util/json: remove `AllPathsWithDepth` r=normanchenn a=normanchenn This commit reverts a65115d. `AllPathsWithDepth` was initially created to iterate through the children of a json array or object, but due to performance reasons (jsonEncoded would decode the entire object on each call), another approach was taken to use `json.ObjectIter` and `json.FetchValIdx`, thus eliminating the need for `AllPathsWithDepth`. Epic: None Release note: None 144165: workflows: run update_releases on release-25.2 r=iskettaneh a=iskettaneh Epic: none Release note: None Co-authored-by: Shubham Dhama <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: Jeff Swenson <[email protected]> Co-authored-by: Norman Chen <[email protected]> Co-authored-by: Ibrahim Kettaneh <[email protected]>
6 parents a53735e + bb339e1 + 27a65c9 + 691f423 + 0e28b1e + f0c390e commit 974ef4a

File tree

8 files changed

+78
-193
lines changed

8 files changed

+78
-193
lines changed

.github/workflows/update_releases.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ jobs:
3131
- "release-24.1"
3232
- "release-24.3"
3333
- "release-25.1"
34+
- "release-25.2"
3435
name: Update pkg/testutils/release/cockroach_releases.yaml on ${{ matrix.branch }}
3536
runs-on: ubuntu-latest
3637
steps:

pkg/ccl/serverccl/server_controller_test.go

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -563,16 +563,18 @@ func TestServerControllerLoginLogout(t *testing.T) {
563563

564564
ctx := context.Background()
565565

566-
srv := serverutils.StartServerOnly(t, base.TestServerArgs{
567-
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(110002),
568-
})
566+
srv := serverutils.StartServerOnly(t, base.TestServerArgs{})
569567
defer srv.Stopper().Stop(ctx)
570568
s := srv.ApplicationLayer()
569+
isExternal := srv.DeploymentMode().IsExternal()
571570

572-
client, err := s.GetAuthenticatedHTTPClient(false, serverutils.SingleTenantSession)
571+
sessionType := serverutils.SingleTenantSession
572+
client, err := s.GetAuthenticatedHTTPClient(false, sessionType)
573573
require.NoError(t, err)
574574

575-
resp, err := client.Post(s.AdminURL().WithPath("/logout").String(), "", nil)
575+
// Using `Get` here instead of `Post` because in external process mode, the
576+
// server returns a `StatusMethodNotAllowed` error when using `Post`.
577+
resp, err := client.Get(s.AdminURL().WithPath("/logout").String())
576578
require.NoError(t, err)
577579
defer resp.Body.Close()
578580

@@ -582,18 +584,33 @@ func TestServerControllerLoginLogout(t *testing.T) {
582584
for i, c := range resp.Cookies() {
583585
cookieNames[i] = c.Name
584586
cookieValues[i] = c.Value
585-
require.True(t, c.Secure)
587+
// Secure isn't set in external-mode but it is set in other modes.
588+
// See https://github.com/cockroachdb/cockroach/pull/143354#pullrequestreview-2751413632.
589+
if !isExternal {
590+
require.True(t, c.Secure)
591+
}
586592
if c.Name == "session" {
587593
require.True(t, c.HttpOnly)
588594
}
589595
}
590-
require.ElementsMatch(t, []string{"session", "tenant"}, cookieNames)
591-
require.ElementsMatch(t, []string{"", ""}, cookieValues)
592-
596+
expectedCookies := []string{"session"}
597+
expectedValues := []string{""}
598+
if !isExternal {
599+
expectedCookies = append(expectedCookies, "tenant")
600+
expectedValues = append(expectedValues, "")
601+
}
602+
require.ElementsMatch(t, expectedCookies, cookieNames)
603+
require.ElementsMatch(t, expectedValues, cookieValues)
604+
605+
// This part of the test doesn't run in external-process mode since we can't
606+
// set a session cookie with name `tenant`—that's only supported by HTTP
607+
// servers in the system tenant, which also handle routing for secondary
608+
// tenants.
609+
if isExternal {
610+
return
611+
}
593612
// Need a new server because the HTTP Client is memoized.
594-
srv2 := serverutils.StartServerOnly(t, base.TestServerArgs{
595-
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(110002),
596-
})
613+
srv2 := serverutils.StartServerOnly(t, base.TestServerArgs{})
597614
defer srv2.Stopper().Stop(ctx)
598615
s2 := srv2.ApplicationLayer()
599616

@@ -623,7 +640,7 @@ func TestServerControllerLoginLogout(t *testing.T) {
623640
require.NoError(t, err)
624641
cookieJar.SetCookies(s2.AdminURL().URL, []*http.Cookie{
625642
{
626-
Name: "multitenant-session",
643+
Name: "tenant",
627644
Value: "abc-123",
628645
},
629646
})

pkg/crosscluster/logical/lww_row_processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -711,7 +711,7 @@ func (lww *lwwQuerier) DeleteRow(
711711
// NOTE: at this point we don't know if we are updating a tombstone or if
712712
// we are losing LWW. As long as it is a LWW loss or a tombstone update,
713713
// updateTombstone will return okay.
714-
return lww.tombstoneUpdaters[row.TableID].updateTombstone(ctx, txn, row)
714+
return lww.tombstoneUpdaters[row.TableID].updateTombstoneAny(ctx, txn, row.MvccTimestamp, datums)
715715
}
716716
return batchStats{}, nil
717717
}

pkg/crosscluster/logical/tombstone_updater.go

Lines changed: 24 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package logical
88
import (
99
"context"
1010

11-
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
1211
"github.com/cockroachdb/cockroach/pkg/keys"
1312
"github.com/cockroachdb/cockroach/pkg/kv"
1413
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
@@ -19,6 +18,7 @@ import (
1918
"github.com/cockroachdb/cockroach/pkg/sql/row"
2019
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2120
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
21+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2222
)
2323

2424
// tombstoneUpdater is a helper for updating the mvcc origin timestamp assigned
@@ -40,12 +40,8 @@ type tombstoneUpdater struct {
4040
// deleter is a row.Deleter that uses the leased descriptor. Callers should
4141
// use getDeleter to ensure the lease is valid for the current transaction.
4242
deleter row.Deleter
43-
// columns are the name of the columns that are expected in the cdc row.
44-
columns []string
4543
}
4644

47-
// scratch is a scratch buffer for the tombstone updater. This is reused
48-
// across calls to addToBatch to avoid allocations.
4945
scratch []tree.Datum
5046
}
5147

@@ -56,7 +52,6 @@ func (c *tombstoneUpdater) ReleaseLeases(ctx context.Context) {
5652
c.leased.descriptor.Release(ctx)
5753
c.leased.descriptor = nil
5854
c.leased.deleter = row.Deleter{}
59-
c.leased.columns = c.leased.columns[:0]
6055
}
6156
}
6257

@@ -78,19 +73,31 @@ func newTombstoneUpdater(
7873
}
7974
}
8075

76+
// updateTombstoneAny is an `updateTombstone` wrapper that accepts the []any
77+
// datum slice from the original sql writer's datum builder.
78+
func (tu *tombstoneUpdater) updateTombstoneAny(
79+
ctx context.Context, txn isql.Txn, mvccTimestamp hlc.Timestamp, datums []any,
80+
) (batchStats, error) {
81+
tu.scratch = tu.scratch[:0]
82+
for _, datum := range datums {
83+
tu.scratch = append(tu.scratch, datum.(tree.Datum))
84+
}
85+
return tu.updateTombstone(ctx, txn, mvccTimestamp, tu.scratch)
86+
}
87+
8188
// updateTombstone attempts to update the tombstone for the given row. This is
8289
// expected to always succeed. The delete will only return zero rows if the
8390
// operation loses LWW or the row does not exist. So if the cput fails on a
8491
// condition, it should also fail on LWW, which is treated as a success.
8592
func (tu *tombstoneUpdater) updateTombstone(
86-
ctx context.Context, txn isql.Txn, afterRow cdcevent.Row,
93+
ctx context.Context, txn isql.Txn, mvccTimestamp hlc.Timestamp, afterRow []tree.Datum,
8794
) (batchStats, error) {
8895
err := func() error {
8996
if txn != nil {
9097
// If updateTombstone is called in a transaction, create and run a batch
9198
// in the transaction.
9299
batch := txn.KV().NewBatch()
93-
if err := tu.addToBatch(ctx, txn.KV(), batch, afterRow); err != nil {
100+
if err := tu.addToBatch(ctx, txn.KV(), batch, mvccTimestamp, afterRow); err != nil {
94101
return err
95102
}
96103
return txn.KV().Run(ctx, batch)
@@ -99,13 +106,13 @@ func (tu *tombstoneUpdater) updateTombstone(
99106
// 1pc transaction.
100107
return tu.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
101108
batch := txn.NewBatch()
102-
if err := tu.addToBatch(ctx, txn, batch, afterRow); err != nil {
109+
if err := tu.addToBatch(ctx, txn, batch, mvccTimestamp, afterRow); err != nil {
103110
return err
104111
}
105112
return txn.CommitInBatch(ctx, batch)
106113
})
107114
}()
108-
if err != nil && isLwwLoser(err) {
115+
if err != nil {
109116
if isLwwLoser(err) {
110117
return batchStats{kvWriteTooOld: 1}, nil
111118
}
@@ -115,44 +122,28 @@ func (tu *tombstoneUpdater) updateTombstone(
115122
}
116123

117124
func (tu *tombstoneUpdater) addToBatch(
118-
ctx context.Context, txn *kv.Txn, batch *kv.Batch, afterRow cdcevent.Row,
125+
ctx context.Context,
126+
txn *kv.Txn,
127+
batch *kv.Batch,
128+
mvccTimestamp hlc.Timestamp,
129+
afterRow []tree.Datum,
119130
) error {
120131
deleter, err := tu.getDeleter(ctx, txn)
121132
if err != nil {
122133
return err
123134
}
124135

125-
tu.scratch = tu.scratch[:0]
126-
127-
// Note that the columns in the cdcevent row are decoded using a descriptor
128-
// for the source column, whereas the row.Deleter column list is initialized
129-
// using columns from the destination table. This is funky, but it works
130-
// because we validate LDR schemas are compatible.
131-
132-
datums, err := afterRow.DatumsNamed(tu.leased.columns)
133-
if err != nil {
134-
return err
135-
}
136-
if err := datums.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
137-
tu.scratch = append(tu.scratch, d)
138-
return nil
139-
}); err != nil {
140-
return err
141-
}
142-
143-
// the index helpers are never really used since we are always updating a
144-
// tombstone.
145136
var ph row.PartialIndexUpdateHelper
146137
var vh row.VectorIndexUpdateHelper
147138

148139
return deleter.DeleteRow(
149140
ctx,
150141
batch,
151-
tu.scratch,
142+
afterRow,
152143
ph,
153144
vh,
154145
&row.OriginTimestampCPutHelper{
155-
OriginTimestamp: afterRow.MvccTimestamp,
146+
OriginTimestamp: mvccTimestamp,
156147
PreviousWasDeleted: true,
157148
},
158149
false, /* mustValidateOldPKValues */
@@ -176,10 +167,6 @@ func (tu *tombstoneUpdater) getDeleter(ctx context.Context, txn *kv.Txn) (row.De
176167
return row.Deleter{}, err
177168
}
178169

179-
for _, col := range cols {
180-
tu.leased.columns = append(tu.leased.columns, col.GetName())
181-
}
182-
183170
tu.leased.deleter = row.MakeDeleter(tu.codec, tu.leased.descriptor.Underlying().(catalog.TableDescriptor), nil /* lockedIndexes */, cols, tu.sd, &tu.settings.SV, nil /* metrics */)
184171
}
185172
if err := txn.UpdateDeadline(ctx, tu.leased.descriptor.Expiration(ctx)); err != nil {

pkg/sql/distsql/server.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,10 @@ func (ds *ServerImpl) SetupFlow(
624624
// Note: the passed context will be canceled when this RPC completes, so we
625625
// can't associate it with the flow since it outlives the RPC.
626626
ctx = ds.AnnotateCtx(context.Background())
627+
// Ensure that the flow respects the node being shut down. Note that since
628+
// the flow outlives the RPC, we cannot defer the cancel function, so we
629+
// simply ignore it.
630+
ctx, _ = ds.Stopper.WithCancelOnQuiesce(ctx)
627631
if err := func() error {
628632
// Reserve some memory for this remote flow which is a poor man's
629633
// admission control based on the RAM usage.

pkg/util/json/encoded.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -797,12 +797,12 @@ func (j *jsonEncoded) numInvertedIndexEntries() (int, error) {
797797
return decoded.numInvertedIndexEntries()
798798
}
799799

800-
func (j *jsonEncoded) allPathsWithDepth(depth int) ([]JSON, error) {
800+
func (j *jsonEncoded) allPaths() ([]JSON, error) {
801801
decoded, err := j.decode()
802802
if err != nil {
803803
return nil, err
804804
}
805-
return decoded.allPathsWithDepth(depth)
805+
return decoded.allPaths()
806806
}
807807

808808
// HasContainerLeaf implements the JSON interface.

pkg/util/json/json.go

Lines changed: 16 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,10 @@ type JSON interface {
138138
// produced if this JSON gets included in an inverted index.
139139
numInvertedIndexEntries() (int, error)
140140

141-
// allPathsWithDepth returns a slice of new JSON documents, each a path
142-
// through the receiver. The depth parameter specifies the maximum depth of
143-
// the paths to return. If the depth is negative, all paths of any depth are
144-
// returned. If the depth is 0, the receiver itself is returned. Note that
145-
// leaves include the empty object and array in addition to scalars.
146-
allPathsWithDepth(depth int) ([]JSON, error)
141+
// allPaths returns a slice of new JSON documents, each a path to a leaf
142+
// through the receiver. Note that leaves include the empty object and array
143+
// in addition to scalars.
144+
allPaths() ([]JSON, error)
147145

148146
// FetchValKey implements the `->` operator for strings, returning nil if the
149147
// key is not found.
@@ -1795,51 +1793,36 @@ func (j jsonObject) numInvertedIndexEntries() (int, error) {
17951793
// through the input. Note that leaves include the empty object and array
17961794
// in addition to scalars.
17971795
func AllPaths(j JSON) ([]JSON, error) {
1798-
return j.allPathsWithDepth(-1)
1796+
return j.allPaths()
17991797
}
18001798

1801-
// AllPathsWithDepth returns a slice of new JSON documents, each a path
1802-
// through the receiver. The depth parameter specifies the maximum depth of
1803-
// the paths to return. If the depth is negative, all paths of any depth are
1804-
// returned. If the depth is 0, the receiver itself is returned. Note that
1805-
// leaves include the empty object and array in addition to scalars.
1806-
func AllPathsWithDepth(j JSON, depth int) ([]JSON, error) {
1807-
return j.allPathsWithDepth(depth)
1808-
}
1809-
1810-
func (j jsonNull) allPathsWithDepth(depth int) ([]JSON, error) {
1799+
func (j jsonNull) allPaths() ([]JSON, error) {
18111800
return []JSON{j}, nil
18121801
}
18131802

1814-
func (j jsonTrue) allPathsWithDepth(depth int) ([]JSON, error) {
1803+
func (j jsonTrue) allPaths() ([]JSON, error) {
18151804
return []JSON{j}, nil
18161805
}
18171806

1818-
func (j jsonFalse) allPathsWithDepth(depth int) ([]JSON, error) {
1807+
func (j jsonFalse) allPaths() ([]JSON, error) {
18191808
return []JSON{j}, nil
18201809
}
18211810

1822-
func (j jsonString) allPathsWithDepth(depth int) ([]JSON, error) {
1811+
func (j jsonString) allPaths() ([]JSON, error) {
18231812
return []JSON{j}, nil
18241813
}
18251814

1826-
func (j jsonNumber) allPathsWithDepth(depth int) ([]JSON, error) {
1815+
func (j jsonNumber) allPaths() ([]JSON, error) {
18271816
return []JSON{j}, nil
18281817
}
18291818

1830-
func (j jsonArray) allPathsWithDepth(depth int) ([]JSON, error) {
1831-
if len(j) == 0 || depth == 0 {
1819+
func (j jsonArray) allPaths() ([]JSON, error) {
1820+
if len(j) == 0 {
18321821
return []JSON{j}, nil
18331822
}
18341823
ret := make([]JSON, 0, len(j))
18351824
for i := range j {
1836-
var paths []JSON
1837-
var err error
1838-
if depth > 0 {
1839-
paths, err = j[i].allPathsWithDepth(depth - 1)
1840-
} else {
1841-
paths, err = j[i].allPathsWithDepth(depth)
1842-
}
1825+
paths, err := j[i].allPaths()
18431826
if err != nil {
18441827
return nil, err
18451828
}
@@ -1850,19 +1833,13 @@ func (j jsonArray) allPathsWithDepth(depth int) ([]JSON, error) {
18501833
return ret, nil
18511834
}
18521835

1853-
func (j jsonObject) allPathsWithDepth(depth int) ([]JSON, error) {
1854-
if len(j) == 0 || depth == 0 {
1836+
func (j jsonObject) allPaths() ([]JSON, error) {
1837+
if len(j) == 0 {
18551838
return []JSON{j}, nil
18561839
}
18571840
ret := make([]JSON, 0, len(j))
18581841
for i := range j {
1859-
var paths []JSON
1860-
var err error
1861-
if depth > 0 {
1862-
paths, err = j[i].v.allPathsWithDepth(depth - 1)
1863-
} else {
1864-
paths, err = j[i].v.allPathsWithDepth(depth)
1865-
}
1842+
paths, err := j[i].v.allPaths()
18661843
if err != nil {
18671844
return nil, err
18681845
}

0 commit comments

Comments
 (0)