Skip to content

Commit 63eebf3

Browse files
committed
sql: unconditionally check DistSQL supportability
This commit hardens c17591e. In particular, in that change we disabled the usage of the Streamer whenever some part of the plan isn't distributable. However, population of `distSQLProhibitedErr` was left on a best-effort basis. In particular, if we have `distsql=off` (as well as a couple of other conditions), we'd skip the supportability check. This meant that we could still end up using the Streamer in some illegal cases - one example that I came up with is if we have a routine (which currently prohibits distsql), it might access the RootTxn concurrently with the LeafTxn access by the Streamer, which is no bueno. This commit makes it so that we do DistSQL supportability check unconditionally. Note that this shouldn't really have a performance impact - after all, we do expect this check to be done pretty much all the time (unless someone runs with `distsql=off`). Additionally, this change happens to fix a nil pointer error around the recent fix to top-level query stats in presence of routines. Namely, that patch only set the metadata forwarder for local plans if we end up using the RootTxn, but in a query where we happened to use the streamer with routines, (before this patch) we'd end up with LeafTxn and unset metadata forwarder. Now we'll have streamer disabled, so we'll have the RootTxn and the forwarder will be set. Release note: None
1 parent 3a397eb commit 63eebf3

File tree

6 files changed

+62
-39
lines changed

6 files changed

+62
-39
lines changed

pkg/sql/distsql_physical_planner.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -977,7 +977,8 @@ type PlanningCtx struct {
977977
// isLocal is set to true if we're planning this query on a single node.
978978
isLocal bool
979979
// distSQLProhibitedErr, if set, indicates why the plan couldn't be
980-
// distributed.
980+
// distributed. If any part of the plan isn't distributable, then this is
981+
// guaranteed to be non-nil.
981982
distSQLProhibitedErr error
982983
planner *planner
983984

pkg/sql/distsql_running.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -875,15 +875,10 @@ func (dsp *DistSQLPlanner) Run(
875875
// that we might have a plan where some expression (e.g. a cast to
876876
// an Oid type) uses the planner's txn (which is the RootTxn), so
877877
// it'd be illegal to use LeafTxns for a part of such plan.
878-
// TODO(yuzefovich): this check is both excessive and insufficient.
879-
// For example:
880-
// - it disables the usage of the Streamer when a subquery has an
881-
// Oid type, but that would have no impact on usage of the Streamer
882-
// in the main query;
883-
// - it might allow the usage of the Streamer even when the internal
884-
// executor is used by a part of the plan, and the IE would use the
885-
// RootTxn. Arguably, this would be a bug in not prohibiting the
886-
// DistSQL altogether.
878+
// TODO(yuzefovich): this check could be excessive. For example, it
879+
// disables the usage of the Streamer when a subquery has an Oid
880+
// type (due to a serialization issue), but that would have no
881+
// impact on usage of the Streamer in the main query.
887882
if !containsLocking && !mustUseRootTxn && planCtx.distSQLProhibitedErr == nil {
888883
if evalCtx.SessionData().StreamerEnabled {
889884
for _, proc := range plan.Processors {

pkg/sql/distsql_running_test.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1221,12 +1221,13 @@ func TestTopLevelQueryStats(t *testing.T) {
12211221
defer leaktest.AfterTest(t)()
12221222
defer log.Scope(t).Close(t)
12231223

1224+
ctx := context.Background()
12241225
// testQuery will be updated throughout the test to the current target.
12251226
var testQuery atomic.Value
12261227
// The callback will send number of rows read and rows written (for each
12271228
// ProducerMetadata.Metrics object) on these channels, respectively.
12281229
rowsReadCh, rowsWrittenCh := make(chan int64), make(chan int64)
1229-
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
1230+
srv, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
12301231
Knobs: base.TestingKnobs{
12311232
SQLExecutor: &ExecutorTestingKnobs{
12321233
DistSQLReceiverPushCallbackFactory: func(_ context.Context, query string) func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) (rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) {
@@ -1244,11 +1245,13 @@ func TestTopLevelQueryStats(t *testing.T) {
12441245
},
12451246
},
12461247
})
1247-
defer s.Stopper().Stop(context.Background())
1248+
defer srv.Stopper().Stop(ctx)
1249+
conn, err := sqlDB.Conn(ctx)
1250+
require.NoError(t, err)
12481251

12491252
if _, err := sqlDB.Exec(`
1250-
CREATE TABLE t (k INT PRIMARY KEY);
1251-
INSERT INTO t SELECT generate_series(1, 10);
1253+
CREATE TABLE t (k INT PRIMARY KEY, i INT, v INT, INDEX(i));
1254+
INSERT INTO t SELECT i, 1, 1 FROM generate_series(1, 10) AS g(i);
12521255
CREATE FUNCTION no_reads() RETURNS INT AS 'SELECT 1' LANGUAGE SQL;
12531256
CREATE FUNCTION reads() RETURNS INT AS 'SELECT count(*) FROM t' LANGUAGE SQL;
12541257
CREATE FUNCTION write(x INT) RETURNS INT AS 'INSERT INTO t VALUES (x); SELECT x' LANGUAGE SQL;
@@ -1259,6 +1262,7 @@ CREATE FUNCTION write(x INT) RETURNS INT AS 'INSERT INTO t VALUES (x); SELECT x'
12591262
for _, tc := range []struct {
12601263
name string
12611264
query string
1265+
setup, cleanup string // optional
12621266
expRowsRead int64
12631267
expRowsWritten int64
12641268
}{
@@ -1268,6 +1272,16 @@ CREATE FUNCTION write(x INT) RETURNS INT AS 'INSERT INTO t VALUES (x); SELECT x'
12681272
expRowsRead: 10,
12691273
expRowsWritten: 0,
12701274
},
1275+
{
1276+
name: "routine and index join (used to be powered by streamer)",
1277+
query: "SELECT v FROM t@t_i_idx WHERE reads() > 0",
1278+
setup: "SET distsql=off",
1279+
cleanup: "RESET distsql",
1280+
// 10 rows for secondary index, 10 for index join into primary, and
1281+
// then for each row do ten-row-scan in the routine.
1282+
expRowsRead: 120,
1283+
expRowsWritten: 0,
1284+
},
12711285
{
12721286
name: "simple write",
12731287
query: "INSERT INTO t SELECT generate_series(11, 42)",
@@ -1309,13 +1323,23 @@ CREATE FUNCTION write(x INT) RETURNS INT AS 'INSERT INTO t VALUES (x); SELECT x'
13091323
},
13101324
} {
13111325
t.Run(tc.name, func(t *testing.T) {
1326+
if tc.setup != "" {
1327+
_, err := conn.ExecContext(ctx, tc.setup)
1328+
require.NoError(t, err)
1329+
}
1330+
if tc.cleanup != "" {
1331+
defer func() {
1332+
_, err := conn.ExecContext(ctx, tc.cleanup)
1333+
require.NoError(t, err)
1334+
}()
1335+
}
13121336
testQuery.Store(tc.query)
13131337
errCh := make(chan error)
13141338
// Spin up the worker goroutine which will actually execute the
13151339
// query.
13161340
go func() {
13171341
defer close(errCh)
1318-
_, err := sqlDB.Exec(tc.query)
1342+
_, err := conn.ExecContext(ctx, tc.query)
13191343
errCh <- err
13201344
}()
13211345
// In the main goroutine, loop until the query is completed while

pkg/sql/exec_util.go

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2225,8 +2225,10 @@ func shouldDistributeGivenRecAndMode(
22252225
// the plan.
22262226
//
22272227
// The returned error, if any, indicates why we couldn't distribute the plan.
2228-
// Note that it's possible that we choose to not distribute the plan while
2229-
// nil error is returned.
2228+
// Note that it's possible that we choose to not distribute the plan while nil
2229+
// error is returned (but it's guaranteed that if some part of the plan isn't
2230+
// distributable, then non-nil error is returned).
2231+
//
22302232
// WARNING: in some cases when this method returns
22312233
// physicalplan.FullyDistributedPlan, the plan might actually run locally. This
22322234
// is the case when
@@ -2245,23 +2247,9 @@ func (p *planner) getPlanDistribution(
22452247
return plan.physPlan.Distribution, nil
22462248
}
22472249

2248-
// If this transaction has modified or created any types, it is not safe to
2249-
// distribute due to limitations around leasing descriptors modified in the
2250-
// current transaction.
2251-
if p.Descriptors().HasUncommittedDescriptors() {
2252-
return physicalplan.LocalPlan, nil
2253-
}
2254-
2250+
// Check DistSQL-supportability as the first order of business in order to
2251+
// find whether usage of DistSQL is prohibited by features of the plan.
22552252
sd := p.SessionData()
2256-
if sd.DistSQLMode == sessiondatapb.DistSQLOff {
2257-
return physicalplan.LocalPlan, nil
2258-
}
2259-
2260-
// Don't try to run empty nodes (e.g. SET commands) with distSQL.
2261-
if _, ok := plan.planNode.(*zeroNode); ok {
2262-
return physicalplan.LocalPlan, nil
2263-
}
2264-
22652253
// Determine whether the txn has buffered some writes.
22662254
txnHasBufferedWrites := p.txn.HasBufferedWrites()
22672255
if sd.BufferedWritesEnabled && p.curPlan.main == plan {
@@ -2277,11 +2265,26 @@ func (p *planner) getPlanDistribution(
22772265
}
22782266
rec, err := checkSupportForPlanNode(ctx, plan.planNode, &p.distSQLVisitor, sd, txnHasBufferedWrites)
22792267
if err != nil {
2280-
// Don't use distSQL for this request.
22812268
log.VEventf(ctx, 1, "query not supported for distSQL: %s", err)
22822269
return physicalplan.LocalPlan, err
22832270
}
22842271

2272+
// If this transaction has modified or created any types, it is not safe to
2273+
// distribute due to limitations around leasing descriptors modified in the
2274+
// current transaction.
2275+
if p.Descriptors().HasUncommittedDescriptors() {
2276+
return physicalplan.LocalPlan, nil
2277+
}
2278+
2279+
if sd.DistSQLMode == sessiondatapb.DistSQLOff {
2280+
return physicalplan.LocalPlan, nil
2281+
}
2282+
2283+
// Don't try to run empty nodes (e.g. SET commands) with distSQL.
2284+
if _, ok := plan.planNode.(*zeroNode); ok {
2285+
return physicalplan.LocalPlan, nil
2286+
}
2287+
22852288
if shouldDistributeGivenRecAndMode(rec, sd.DistSQLMode) {
22862289
return physicalplan.FullyDistributedPlan, nil
22872290
}

pkg/sql/opt/exec/execbuilder/testdata/inverted_join_geospatial

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ quality of service: regular
336336
│ │ estimated max sql temp disk usage: 0 B
337337
│ │ group by: lk
338338
│ │
339-
│ └── • lookup join (left outer) (streamer)
339+
│ └── • lookup join (left outer)
340340
│ │ sql nodes: <hidden>
341341
│ │ regions: <hidden>
342342
│ │ actual row count: 0

pkg/sql/opt/exec/execbuilder/testdata/jsonb_path_query

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,19 +153,19 @@ query T kvtrace
153153
SELECT jsonb_path_query(b, '$.a.b') FROM json_tab@foo_inv WHERE jsonb_path_exists(b, '$.a.b') ORDER BY a;
154154
----
155155
Scan /Table/106/2/{???-"a"/Arr/Arr/}, /Table/106/2/{???-"a"/Arr/}
156-
Scan /Table/106/1/7/0, /Table/106/1/11/0, /Table/106/1/8/0, /Table/106/1/13/0, /Table/106/1/14/0, /Table/106/1/15/0, /Table/106/1/16/0, /Table/106/1/12/0, /Table/106/1/3/0, /Table/106/1/9/0, /Table/106/1/4/0, /Table/106/1/6/0
156+
Scan /Table/106/1/3/0, /Table/106/1/4/0, /Table/106/1/6/0, /Table/106/1/7/0, /Table/106/1/8/0, /Table/106/1/9/0, /Table/106/1/11/0, /Table/106/1/12/0, /Table/106/1/13/0, /Table/106/1/14/0, /Table/106/1/15/0, /Table/106/1/16/0
157157

158158
query T kvtrace
159159
SELECT jsonb_path_query(b, '$.a[*].b') FROM json_tab@foo_inv WHERE jsonb_path_exists(b, '$.a[*].b') ORDER BY a;
160160
----
161161
Scan /Table/106/2/{???-"a"/Arr/Arr/Arr/}, /Table/106/2/{???-"a"/Arr/Arr/}, /Table/106/2/{???-"a"/Arr/}
162-
Scan /Table/106/1/7/0, /Table/106/1/11/0, /Table/106/1/8/0, /Table/106/1/13/0, /Table/106/1/14/0, /Table/106/1/15/0, /Table/106/1/16/0, /Table/106/1/12/0, /Table/106/1/3/0, /Table/106/1/9/0, /Table/106/1/4/0, /Table/106/1/6/0
162+
Scan /Table/106/1/3/0, /Table/106/1/4/0, /Table/106/1/6/0, /Table/106/1/7/0, /Table/106/1/8/0, /Table/106/1/9/0, /Table/106/1/11/0, /Table/106/1/12/0, /Table/106/1/13/0, /Table/106/1/14/0, /Table/106/1/15/0, /Table/106/1/16/0
163163

164164
query T kvtrace
165165
SELECT jsonb_path_query(b, '$.a[*][*].b') FROM json_tab@foo_inv WHERE jsonb_path_exists(b, '$.a[*][*].b') ORDER BY a;
166166
----
167167
Scan /Table/106/2/{???-"a"/Arr/Arr/Arr/Arr/}, /Table/106/2/{???-"a"/Arr/Arr/Arr/}, /Table/106/2/{???-"a"/Arr/Arr/}, /Table/106/2/{???-"a"/Arr/}
168-
Scan /Table/106/1/7/0, /Table/106/1/11/0, /Table/106/1/8/0, /Table/106/1/13/0, /Table/106/1/14/0, /Table/106/1/15/0, /Table/106/1/16/0, /Table/106/1/12/0, /Table/106/1/3/0, /Table/106/1/9/0, /Table/106/1/4/0, /Table/106/1/6/0
168+
Scan /Table/106/1/3/0, /Table/106/1/4/0, /Table/106/1/6/0, /Table/106/1/7/0, /Table/106/1/8/0, /Table/106/1/9/0, /Table/106/1/11/0, /Table/106/1/12/0, /Table/106/1/13/0, /Table/106/1/14/0, /Table/106/1/15/0, /Table/106/1/16/0
169169

170170
query T kvtrace
171171
SELECT jsonb_path_query(b, '$.a.b[*].c') FROM json_tab@foo_inv WHERE jsonb_path_exists(b, '$.a.b[*].c') ORDER BY a;
@@ -234,7 +234,7 @@ query T kvtrace
234234
SELECT a, jsonb_path_query(b, '$.a.b') FROM json_tab@foo_inv WHERE jsonb_path_exists(b, '$.a.b') ORDER BY a;
235235
----
236236
Scan /Table/106/2/{???-"a"/Arr/Arr/}, /Table/106/2/{???-"a"/Arr/}
237-
Scan /Table/106/1/7/0, /Table/106/1/11/0, /Table/106/1/8/0, /Table/106/1/13/0, /Table/106/1/14/0, /Table/106/1/15/0, /Table/106/1/16/0, /Table/106/1/12/0, /Table/106/1/3/0, /Table/106/1/9/0, /Table/106/1/4/0, /Table/106/1/6/0
237+
Scan /Table/106/1/3/0, /Table/106/1/4/0, /Table/106/1/6/0, /Table/106/1/7/0, /Table/106/1/8/0, /Table/106/1/9/0, /Table/106/1/11/0, /Table/106/1/12/0, /Table/106/1/13/0, /Table/106/1/14/0, /Table/106/1/15/0, /Table/106/1/16/0
238238

239239
query T kvtrace
240240
SELECT a, jsonb_path_query(b, '$.a.b'), jsonb_path_query(b, '$.a.d') FROM json_tab@foo_inv WHERE jsonb_path_exists(b, '$.a.b') AND jsonb_path_exists(b, '$.a.d') ORDER BY a;

0 commit comments

Comments
 (0)