Skip to content

Commit 3a393c1

Browse files
craig[bot]Eric HarmelingDrewKimballyuzefovichnvb
committed
107199: cli: simplify output assertion in TestUnavailableZip r=ericharmeling a=ericharmeling This commit removes the `datadriven.RunTest` call from `TestUnavailableZip`, as this unit test is meant to test the ability of debug zip to output while nodes are unavailable, and not meant to assert specific, non-deterministic output. Fixes cockroachdb#106961. Release note: None 108211: plpgsql: add support for ELSIF branches r=DrewKimball a=DrewKimball This patch adds support for executing PLpgSQL `IF` statements with `ELSIF` branches (else if). `IF` statements were already executed as CASE statements under the hood, so this change only requires building the `ELSIF` branches and appending them to the `whens` list. Informs cockroachdb#105254 Release note (sql change): Added support for specifying PLpgSQL `IF` statements with `ELSIF` branches. 108538: internal/sqlsmith: don't generate UDFs with collated strings r=yuzefovich a=yuzefovich Collated strings are not a valid return type for UDFs in Postgres, so they are disallowed. We already had the check in most places, and the only exception was if we created a mutation with a single RETURNING column of collated string type. This omission is now fixed. Fixes: cockroachdb#108006. Release note: None 108550: kv: add error return from Txn.PrepareForRetry r=nvanbenschoten a=nvanbenschoten Instead of using `log.Fatal`, return assertion errors. Epic: None Release note: None 108558: kvstreamer: fix up some comments r=yuzefovich a=yuzefovich As of d564a0c we no longer use FIFO order in the OutOfOrder requests provider, so this commit adjusts the comments accordingly. Epic: None Release note: None Co-authored-by: Eric Harmeling <[email protected]> Co-authored-by: Drew Kimball <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: Nathan VanBenschoten <[email protected]>
6 parents ce570fe + 381b0c1 + b9cdf07 + 7d10ef1 + a3f2cf9 + 36bb0aa commit 3a393c1

File tree

14 files changed

+425
-339
lines changed

14 files changed

+425
-339
lines changed

pkg/cli/testdata/zip/unavailable

Lines changed: 0 additions & 255 deletions
This file was deleted.

pkg/cli/zip_test.go

Lines changed: 77 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"time"
2929

3030
"github.com/cockroachdb/cockroach/pkg/base"
31+
"github.com/cockroachdb/cockroach/pkg/keys"
3132
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
3233
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
3334
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
@@ -299,7 +300,6 @@ create table defaultdb."../system"(x int);
299300
func TestUnavailableZip(t *testing.T) {
300301
defer leaktest.AfterTest(t)()
301302

302-
skip.WithIssue(t, 107738, "flaky")
303303
skip.UnderShort(t)
304304
// Race builds make the servers so slow that they report spurious
305305
// unavailability.
@@ -317,35 +317,51 @@ func TestUnavailableZip(t *testing.T) {
317317
close(closedCh)
318318
unavailableCh.Store(closedCh)
319319
knobs := &kvserver.StoreTestingKnobs{
320-
TestingRequestFilter: func(ctx context.Context, _ *kvpb.BatchRequest) *kvpb.Error {
321-
select {
322-
case <-unavailableCh.Load().(chan struct{}):
323-
case <-ctx.Done():
320+
TestingRequestFilter: func(ctx context.Context,
321+
br *kvpb.BatchRequest) *kvpb.Error {
322+
if br.Header.GatewayNodeID == 2 {
323+
// For node 2 connections, block all replica requests.
324+
select {
325+
case <-unavailableCh.Load().(chan struct{}):
326+
case <-ctx.Done():
327+
}
328+
} else if br.Header.GatewayNodeID == 1 {
329+
// For node 1 connections, only block requests to table data ranges.
330+
if br.Requests[0].GetInner().Header().Key.Compare(keys.
331+
TableDataMin) >= 0 {
332+
select {
333+
case <-unavailableCh.Load().(chan struct{}):
334+
case <-ctx.Done():
335+
}
336+
}
324337
}
325338
return nil
326339
},
327340
}
328341

329-
// Make a 2-node cluster, with an option to make the first node unavailable.
330-
tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
331-
ServerArgsPerNode: map[int]base.TestServerArgs{
332-
0: {Insecure: true, Knobs: base.TestingKnobs{Store: knobs}},
333-
1: {Insecure: true},
334-
},
335-
})
342+
// Make a 3-node cluster, with an option to block replica requests.
343+
tc := testcluster.StartTestCluster(t, 3,
344+
base.TestClusterArgs{ServerArgs: base.TestServerArgs{Insecure: true,
345+
Knobs: base.TestingKnobs{Store: knobs}},
346+
})
336347
defer tc.Stopper().Stop(context.Background())
337348

338-
// Sanity test: check that a simple operation works.
349+
// Sanity test: check that a simple SQL operation works against node 1.
339350
if _, err := tc.ServerConn(0).Exec("SELECT * FROM system.users"); err != nil {
340351
t.Fatal(err)
341352
}
342353

343-
// Make the first two nodes unavailable.
354+
// Block querying table data from node 1.
355+
// Block all replica requests from node 2.
344356
ch := make(chan struct{})
345357
unavailableCh.Store(ch)
346358
defer close(ch)
347359

348-
// Zip it. We fake a CLI test context for this.
360+
// Run debug zip against node 1.
361+
debugZipCommand :=
362+
"debug zip --concurrency=1 --cpu-profile-duration=0 " + os.
363+
DevNull + " --timeout=.25s"
364+
349365
c := TestCLI{
350366
t: t,
351367
Server: tc.Server(0),
@@ -354,20 +370,57 @@ func TestUnavailableZip(t *testing.T) {
354370
defer func(prevStderr *os.File) { stderr = prevStderr }(stderr)
355371
stderr = os.Stdout
356372

357-
// Keep the timeout short so that the test doesn't take forever.
358-
out, err := c.RunWithCapture("debug zip --concurrency=1 --cpu-profile-duration=0 " + os.DevNull + " --timeout=.5s")
373+
out, err := c.RunWithCapture(debugZipCommand)
359374
if err != nil {
360375
t.Fatal(err)
361376
}
362377

363-
// Strip any non-deterministic messages.
364-
out = eraseNonDeterministicZipOutput(out)
365-
out = eraseNonDeterministicErrors(out)
378+
// Assert debug zip output for cluster, node 1, node 2, node 3.
379+
assert.NotEmpty(t, out)
380+
assert.Contains(t, out, "[cluster] requesting nodes... received response...")
381+
assert.Contains(t, out, "[cluster] requesting liveness... received response...")
382+
for i := 1; i < tc.NumServers()+1; i++ {
383+
assert.Contains(t, out, fmt.Sprintf("[node %d] using SQL connection URL",
384+
i))
385+
assert.Contains(t, out, fmt.Sprintf("[node %d] retrieving SQL data",
386+
i))
387+
assert.Contains(t, out, fmt.Sprintf("[node %d] requesting stacks... received response...",
388+
i))
389+
assert.Contains(t, out, fmt.Sprintf("[node %d] requesting stacks with labels... received response...",
390+
i))
391+
assert.Contains(t, out, fmt.Sprintf("[node %d] requesting heap file list... received response...",
392+
i))
393+
assert.Contains(t, out, fmt.Sprintf("[node %d] requesting goroutine dump list... received response...",
394+
i))
395+
assert.Contains(t, out, fmt.Sprintf("[node %d] requesting log files list... received response...",
396+
i))
397+
assert.Contains(t, out, fmt.Sprintf("[node %d] requesting ranges... received response...",
398+
i))
399+
}
400+
401+
// Run debug zip against node 2.
402+
c = TestCLI{
403+
t: t,
404+
Server: tc.Server(1),
405+
Insecure: true,
406+
}
407+
408+
out, err = c.RunWithCapture(debugZipCommand)
409+
if err != nil {
410+
t.Fatal(err)
411+
}
412+
413+
// Assert debug zip output for cluster, node 2.
414+
assert.NotEmpty(t, out)
415+
assert.Contains(t, out, "[cluster] requesting nodes... received response...")
416+
assert.Contains(t, out, "[cluster] requesting liveness... received response...")
417+
assert.Contains(t, out, "[node 2] using SQL connection URL")
418+
assert.Contains(t, out, "[node 2] retrieving SQL data")
419+
assert.Contains(t, out, "[node 2] requesting ranges... received response...")
420+
assert.Contains(t, out, "[node 2] writing ranges...")
421+
assert.NotContains(t, out, "[node 1]")
422+
assert.NotContains(t, out, "[node 3]")
366423

367-
datadriven.RunTest(t, datapathutils.TestDataPath(t, "zip", "unavailable"),
368-
func(t *testing.T, td *datadriven.TestData) string {
369-
return out
370-
})
371424
}
372425

373426
func eraseNonDeterministicZipOutput(out string) string {
@@ -407,21 +460,6 @@ func eraseNonDeterministicZipOutput(out string) string {
407460
return out
408461
}
409462

410-
func eraseNonDeterministicErrors(out string) string {
411-
// In order to avoid non-determinism here, we erase the output of
412-
// the range retrieval.
413-
re := regexp.MustCompile(`(?m)^(requesting ranges.*found|writing: debug/nodes/\d+/ranges).*\n`)
414-
out = re.ReplaceAllString(out, ``)
415-
416-
re = regexp.MustCompile(`(?m)^\[cluster\] requesting data for debug\/settings.*\n`)
417-
out = re.ReplaceAllString(out, ``)
418-
419-
// In order to avoid non-determinism here, we truncate error messages.
420-
re = regexp.MustCompile(`(?m)last request failed: .*$`)
421-
out = re.ReplaceAllString(out, `last request failed: ...`)
422-
return out
423-
}
424-
425463
// This tests the operation of zip over partial clusters.
426464
//
427465
// We cannot combine this test with TestZip above because TestPartialZip

pkg/internal/sqlsmith/relational.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1043,7 +1043,7 @@ func (s *Smither) makeCreateFunc() (cf *tree.CreateRoutine, ok bool) {
10431043
// If the rtype isn't a RECORD, change it to rrefs or RECORD depending
10441044
// how many columns there are to avoid return type mismatch errors.
10451045
if rtyp != types.AnyTuple {
1046-
if len(rrefs) == 1 && s.coin() {
1046+
if len(rrefs) == 1 && s.coin() && rrefs[0].typ.Family() != types.CollatedStringFamily {
10471047
rtyp = rrefs[0].typ
10481048
} else {
10491049
rtyp = types.AnyTuple

pkg/kv/db_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -872,7 +872,7 @@ func TestPreservingSteppingOnSenderReplacement(t *testing.T) {
872872
require.NotEqual(t, pErr.TxnID, pErr.Transaction.ID)
873873

874874
// Reset the handle in order to get a new sender.
875-
txn.PrepareForRetry(ctx)
875+
require.NoError(t, txn.PrepareForRetry(ctx))
876876

877877
// Make sure we have a new txn ID.
878878
require.NotEqual(t, pErr.TxnID, txn.ID())

pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,9 @@ func TestSavepoints(t *testing.T) {
127127

128128
case "reset":
129129
prevID := txn.ID()
130-
txn.PrepareForRetry(ctx)
130+
if err := txn.PrepareForRetry(ctx); err != nil {
131+
t.Fatal(err)
132+
}
131133
changed := "changed"
132134
if prevID == txn.ID() {
133135
changed = "not changed"

pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,9 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
912912
stopper.Stop(ctx)
913913

914914
if test.callPrepareForRetry {
915-
txn.PrepareForRetry(ctx)
915+
if err := txn.PrepareForRetry(ctx); err != nil {
916+
t.Fatal(err)
917+
}
916918
}
917919
if test.name != "nil" && err == nil {
918920
t.Fatalf("expected an error")

pkg/kv/kvclient/kvstreamer/requests_provider.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,8 +244,7 @@ func (b *requestsProviderBase) close() {
244244
}
245245

246246
// outOfOrderRequestsProvider is a requestProvider that returns requests in an
247-
// arbitrary order (namely in the same order as the requests are enqueued and
248-
// added).
247+
// arbitrary order (namely in the LIFO order of requests being enqueued).
249248
type outOfOrderRequestsProvider struct {
250249
*requestsProviderBase
251250
}
@@ -283,6 +282,8 @@ func (p *outOfOrderRequestsProvider) nextLocked() singleRangeBatch {
283282
if len(p.requests) == 0 {
284283
panic(errors.AssertionFailedf("nextLocked called when requestsProvider is empty"))
285284
}
285+
// Use the last request so that we could reuse its slot if resume request is
286+
// added.
286287
return p.requests[len(p.requests)-1]
287288
}
288289

@@ -291,6 +292,8 @@ func (p *outOfOrderRequestsProvider) removeNextLocked() {
291292
if len(p.requests) == 0 {
292293
panic(errors.AssertionFailedf("removeNextLocked called when requestsProvider is empty"))
293294
}
295+
// Use the last request so that we could reuse its slot if resume request is
296+
// added.
294297
p.requests = p.requests[:len(p.requests)-1]
295298
}
296299

pkg/kv/txn.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -994,15 +994,17 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
994994
break
995995
}
996996

997-
txn.PrepareForRetry(ctx)
997+
if err := txn.PrepareForRetry(ctx); err != nil {
998+
return err
999+
}
9981000
}
9991001

10001002
return err
10011003
}
10021004

10031005
// PrepareForRetry needs to be called before a retry to perform some
10041006
// book-keeping and clear errors when possible.
1005-
func (txn *Txn) PrepareForRetry(ctx context.Context) {
1007+
func (txn *Txn) PrepareForRetry(ctx context.Context) error {
10061008
// Reset commit triggers. These must be reconfigured by the client during the
10071009
// next retry.
10081010
txn.commitTriggers = nil
@@ -1012,11 +1014,11 @@ func (txn *Txn) PrepareForRetry(ctx context.Context) {
10121014

10131015
retryErr := txn.mu.sender.GetTxnRetryableErr(ctx)
10141016
if retryErr == nil {
1015-
return
1017+
return nil
10161018
}
10171019
if txn.typ != RootTxn {
1018-
panic(errors.WithContextTags(errors.NewAssertionErrorWithWrappedErrf(
1019-
retryErr, "PrepareForRetry() called on leaf txn"), ctx))
1020+
return errors.WithContextTags(errors.NewAssertionErrorWithWrappedErrf(
1021+
retryErr, "PrepareForRetry() called on leaf txn"), ctx)
10201022
}
10211023
log.VEventf(ctx, 2, "retrying transaction: %s because of a retryable error: %s",
10221024
txn.debugNameLocked(), retryErr)
@@ -1029,23 +1031,24 @@ func (txn *Txn) PrepareForRetry(ctx context.Context) {
10291031
// txn IDs. However, at no point can both the old and new incarnation of a
10301032
// transaction be active at the same time -- this would constitute a
10311033
// programming error.
1032-
log.Fatalf(
1033-
ctx,
1034-
"unexpected retryable error for old incarnation of the transaction %s; current incarnation %s",
1035-
retryErr.TxnID,
1036-
txn.mu.ID,
1037-
)
1034+
return errors.WithContextTags(
1035+
errors.NewAssertionErrorWithWrappedErrf(
1036+
retryErr,
1037+
"unexpected retryable error for old incarnation of the transaction %s; current incarnation %s",
1038+
retryErr.TxnID,
1039+
txn.mu.ID,
1040+
), ctx)
10381041
}
10391042

10401043
if !retryErr.PrevTxnAborted() {
10411044
// If the retryable error doesn't correspond to an aborted transaction,
10421045
// there's no need to switch out the transaction. We simply clear the
10431046
// retryable error and proceed.
10441047
txn.mu.sender.ClearTxnRetryableErr(ctx)
1045-
return
1048+
return nil
10461049
}
10471050

1048-
txn.handleTransactionAbortedErrorLocked(ctx, retryErr)
1051+
return txn.handleTransactionAbortedErrorLocked(ctx, retryErr)
10491052
}
10501053

10511054
// Send runs the specified calls synchronously in a single batch and
@@ -1354,10 +1357,11 @@ func (txn *Txn) UpdateStateOnRemoteRetryableErr(ctx context.Context, pErr *kvpb.
13541357
// the current one with it.
13551358
func (txn *Txn) handleTransactionAbortedErrorLocked(
13561359
ctx context.Context, retryErr *kvpb.TransactionRetryWithProtoRefreshError,
1357-
) {
1360+
) error {
13581361
if !retryErr.PrevTxnAborted() {
13591362
// Sanity check we're dealing with a TransactionAbortedError.
1360-
log.Fatalf(ctx, "cannot replace root sender if txn not aborted: %v", retryErr)
1363+
return errors.WithContextTags(errors.NewAssertionErrorWithWrappedErrf(
1364+
retryErr, "cannot replace root sender if txn not aborted"), ctx)
13611365
}
13621366

13631367
// The transaction we had been using thus far has been aborted. The proto
@@ -1368,6 +1372,7 @@ func (txn *Txn) handleTransactionAbortedErrorLocked(
13681372
prevSteppingMode := txn.mu.sender.GetSteppingMode(ctx)
13691373
txn.mu.sender = txn.db.factory.RootTransactionalSender(newTxn, txn.mu.userPriority)
13701374
txn.mu.sender.ConfigureStepping(ctx, prevSteppingMode)
1375+
return nil
13711376
}
13721377

13731378
// SetFixedTimestamp makes the transaction run in an unusual way, at a "fixed

pkg/sql/conn_fsm.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -552,11 +552,14 @@ func cleanupAndFinishOnError(args fsm.Args) error {
552552

553553
func prepareTxnForRetry(args fsm.Args) error {
554554
ts := args.Extended.(*txnState)
555-
func() {
555+
err := func() error {
556556
ts.mu.Lock()
557557
defer ts.mu.Unlock()
558-
ts.mu.txn.PrepareForRetry(ts.Ctx)
558+
return ts.mu.txn.PrepareForRetry(ts.Ctx)
559559
}()
560+
if err != nil {
561+
return err
562+
}
560563
ts.setAdvanceInfo(
561564
advanceOne,
562565
noRewind,
@@ -587,13 +590,16 @@ func moveToCommitWaitAfterInternalCommit(args fsm.Args) error {
587590
func prepareTxnForRetryWithRewind(args fsm.Args) error {
588591
pl := args.Payload.(eventRetriableErrPayload)
589592
ts := args.Extended.(*txnState)
590-
func() {
593+
err := func() error {
591594
ts.mu.Lock()
592595
defer ts.mu.Unlock()
593-
ts.mu.txn.PrepareForRetry(ts.Ctx)
594596
ts.mu.autoRetryReason = pl.err
595597
ts.mu.autoRetryCounter++
598+
return ts.mu.txn.PrepareForRetry(ts.Ctx)
596599
}()
600+
if err != nil {
601+
return err
602+
}
597603
// The caller will call rewCap.rewindAndUnlock().
598604
ts.setAdvanceInfo(
599605
rewind,

0 commit comments

Comments
 (0)