Skip to content

Commit 88e72fe

Browse files
committed
isession: create internal session
This introduces the InternalSession. The immediate motivation for this is to improve the performance of LDR. LDR currently depends on the internal executor. Every statement executed with the internal executor creates its own connExecutor instance. This prevents LDR from taking advantage of prepared statements and generic query plans. Long term, the idea is to have InternalSession replace the internal executor. But it was developed as a stand alone conn executor wrapper in order to minimize the risk of breaking existing clients of the internal executor. One key design feature of the InternalSession is the state machine was moved from a separate goroutine to the goroutine that is requesting the statement execution. This makes CPU profiles for internal processes much easier to read because it is obvious which call stack is using the SQL layer. It may also help with integrating LDR's SQL CPU usage with elastic admission control since the CPU time is accounted to the calling goroutine. Release note: none Informs: #148310
1 parent d36e5f2 commit 88e72fe

24 files changed

+1639
-18
lines changed

pkg/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,7 @@ ALL_TESTS = [
512512
"//pkg/sql/inspect:inspect_test",
513513
"//pkg/sql/inverted:inverted_disallowed_imports_test",
514514
"//pkg/sql/inverted:inverted_test",
515+
"//pkg/sql/isession:isession_test",
515516
"//pkg/sql/lex:lex_disallowed_imports_test",
516517
"//pkg/sql/lex:lex_test",
517518
"//pkg/sql/lexbase:lexbase_test",
@@ -2076,6 +2077,8 @@ GO_TARGETS = [
20762077
"//pkg/sql/inspect:inspect_test",
20772078
"//pkg/sql/inverted:inverted",
20782079
"//pkg/sql/inverted:inverted_test",
2080+
"//pkg/sql/isession:isession",
2081+
"//pkg/sql/isession:isession_test",
20792082
"//pkg/sql/isql:isql",
20802083
"//pkg/sql/lex:lex",
20812084
"//pkg/sql/lex:lex_test",

pkg/server/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ go_library(
245245
"//pkg/sql/idxusage",
246246
"//pkg/sql/importer",
247247
"//pkg/sql/inspect",
248+
"//pkg/sql/isession",
248249
"//pkg/sql/isql",
249250
"//pkg/sql/lexbase",
250251
"//pkg/sql/optionalnodeliveness",

pkg/server/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ import (
101101
_ "github.com/cockroachdb/cockroach/pkg/sql/gcjob" // register jobs declared outside of pkg/sql
102102
_ "github.com/cockroachdb/cockroach/pkg/sql/importer" // register jobs/planHooks declared outside of pkg/sql
103103
_ "github.com/cockroachdb/cockroach/pkg/sql/inspect" // register job and planHook declared outside of pkg/sql
104+
_ "github.com/cockroachdb/cockroach/pkg/sql/isession" // register isession constructor hook
104105
"github.com/cockroachdb/cockroach/pkg/sql/optionalnodeliveness"
105106
"github.com/cockroachdb/cockroach/pkg/sql/pgwire"
106107
_ "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scjob" // register jobs declared outside of pkg/sql

pkg/sql/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ go_library(
145145
"instrumentation.go",
146146
"internal.go",
147147
"internal_result_channel.go",
148+
"internal_session.go",
148149
"inverted_filter.go",
149150
"inverted_join.go",
150151
"job_exec_context.go",

pkg/sql/conn_executor.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3503,9 +3503,9 @@ var retryableMinTimestampBoundUnsatisfiableError = errors.Newf(
35033503
"retryable MinTimestampBoundUnsatisfiableError",
35043504
)
35053505

3506-
// errIsRetryable is true if the error is a client-visible retry error
3506+
// ErrIsRetryable is true if the error is a client-visible retry error
35073507
// or the error is a special error that is handled internally and retried.
3508-
func errIsRetryable(err error) bool {
3508+
func ErrIsRetryable(err error) bool {
35093509
return errors.HasInterface(err, (*pgerror.ClientVisibleRetryError)(nil)) ||
35103510
errors.Is(err, retryableMinTimestampBoundUnsatisfiableError) ||
35113511
descs.IsTwoVersionInvariantViolationError(err)
@@ -3563,7 +3563,7 @@ func (ex *connExecutor) makeErrEvent(err error, stmt tree.Statement) (fsm.Event,
35633563
}
35643564
}
35653565

3566-
retryable := errIsRetryable(err)
3566+
retryable := ErrIsRetryable(err)
35673567
if retryable {
35683568
var rc rewindCapability
35693569
var canAutoRetry bool

pkg/sql/conn_executor_prepare.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -388,13 +388,13 @@ func (ex *connExecutor) execBind(
388388
// Decode the arguments, except for internal queries for which we just verify
389389
// that the arguments match what's expected.
390390
qargs := make(tree.QueryArguments, numQArgs)
391-
if bindCmd.internalArgs != nil {
392-
if len(bindCmd.internalArgs) != int(numQArgs) {
391+
if bindCmd.InternalArgs != nil {
392+
if len(bindCmd.InternalArgs) != int(numQArgs) {
393393
return retErr(
394394
pgwirebase.NewProtocolViolationErrorf(
395-
"expected %d arguments, got %d", numQArgs, len(bindCmd.internalArgs)))
395+
"expected %d arguments, got %d", numQArgs, len(bindCmd.InternalArgs)))
396396
}
397-
for i, datum := range bindCmd.internalArgs {
397+
for i, datum := range bindCmd.InternalArgs {
398398
t := ps.InferredTypes[i]
399399
if oid := datum.ResolvedType().Oid(); datum != tree.DNull && oid != t {
400400
return retErr(

pkg/sql/conn_executor_savepoints.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (ex *connExecutor) execRelease(
147147
}
148148
// Committing the transaction failed. We'll go to state RestartWait if
149149
// it's a retryable error, or to state RollbackWait otherwise.
150-
if errIsRetryable(err) {
150+
if ErrIsRetryable(err) {
151151
// For certain retryable errors, we should turn them into client visible
152152
// errors, since the client needs to retry now.
153153
var conversionError error

pkg/sql/conn_executor_show_commit_timestamp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (ex *connExecutor) execShowCommitTimestampInOpenState(
8383

8484
// Committing the transaction failed. We'll go to state RestartWait if
8585
// it's a retryable error, or to state RollbackWait otherwise.
86-
if errIsRetryable(err) {
86+
if ErrIsRetryable(err) {
8787
rc, canAutoRetry := ex.getRewindTxnCapability()
8888
ev := eventRetryableErr{
8989
IsCommit: fsm.FromBool(false /* isCommit */),

pkg/sql/conn_io.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,14 +281,14 @@ type BindStmt struct {
281281
// code, in which case that code will be applied to all arguments.
282282
ArgFormatCodes []pgwirebase.FormatCode
283283

284-
// internalArgs, if not nil, represents the arguments for the prepared
284+
// InternalArgs, if not nil, represents the arguments for the prepared
285285
// statements as produced by the internal clients. These don't need to go
286286
// through encoding/decoding of the args. However, the types of the datums
287287
// must correspond exactly to the inferred types (but note that the types of
288288
// the datums are passes as type hints to the PrepareStmt command, so the
289289
// inferred types should reflect that).
290-
// If internalArgs is specified, Args and ArgFormatCodes are ignored.
291-
internalArgs []tree.Datum
290+
// If InternalArgs is specified, Args and ArgFormatCodes are ignored.
291+
InternalArgs []tree.Datum
292292
}
293293

294294
// command implements the Command interface.
@@ -558,6 +558,23 @@ func (buf *StmtBuf) CurCmd() (Command, CmdPos, error) {
558558
}
559559
}
560560

561+
// Empty returns true if there are no unprocessed commands in the buffer.
562+
// If the buffer is closed, it returns io.EOF.
563+
func (buf *StmtBuf) Empty() (bool, error) {
564+
buf.mu.Lock()
565+
defer buf.mu.Unlock()
566+
567+
if buf.mu.closed {
568+
return false, io.EOF
569+
}
570+
curPos := buf.mu.curPos
571+
cmdIdx, err := buf.translatePosLocked(curPos)
572+
if err != nil {
573+
return false, err
574+
}
575+
return !(cmdIdx < buf.mu.data.Len()), nil
576+
}
577+
561578
// translatePosLocked translates an absolute position of a command (counting
562579
// from the connection start) to the index of the respective command in the
563580
// buffer (so, it returns an index relative to the start of the buffer).

pkg/sql/conn_io_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/cockroachdb/cockroach/pkg/sql/parser"
1515
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
1616
"github.com/cockroachdb/cockroach/pkg/util/log"
17+
"github.com/stretchr/testify/require"
1718
)
1819

1920
func assertStmt(t *testing.T, cmd Command, exp string) {
@@ -342,3 +343,52 @@ func TestStmtBufBatching(t *testing.T) {
342343
t.Fatalf("expected pos to be %d, got: %d", 9, pos)
343344
}
344345
}
346+
347+
func TestStmtBufEmpty(t *testing.T) {
348+
defer leaktest.AfterTest(t)()
349+
defer log.Scope(t).Close(t)
350+
351+
ctx := context.Background()
352+
353+
t.Run("EmptyBuffer", func(t *testing.T) {
354+
buf := NewStmtBuf(0)
355+
empty, err := buf.Empty()
356+
require.NoError(t, err)
357+
require.True(t, empty)
358+
})
359+
360+
t.Run("NonEmptyBuffer", func(t *testing.T) {
361+
buf := NewStmtBuf(0)
362+
s1, err := parser.ParseOne("SELECT 1")
363+
require.NoError(t, err)
364+
mustPush(ctx, t, buf, ExecStmt{Statement: s1})
365+
366+
empty, err := buf.Empty()
367+
require.NoError(t, err)
368+
require.False(t, empty)
369+
})
370+
371+
t.Run("EmptyAfterAdvancing", func(t *testing.T) {
372+
buf := NewStmtBuf(0)
373+
s1, err := parser.ParseOne("SELECT 1")
374+
require.NoError(t, err)
375+
mustPush(ctx, t, buf, ExecStmt{Statement: s1})
376+
377+
empty, err := buf.Empty()
378+
require.NoError(t, err)
379+
require.False(t, empty)
380+
381+
buf.AdvanceOne()
382+
empty, err = buf.Empty()
383+
require.NoError(t, err)
384+
require.True(t, empty)
385+
})
386+
387+
t.Run("ClosedBuffer", func(t *testing.T) {
388+
buf := NewStmtBuf(0)
389+
buf.Close()
390+
391+
_, err := buf.Empty()
392+
require.Equal(t, io.EOF, err)
393+
})
394+
}

0 commit comments

Comments
 (0)