diff --git a/enginetest/harness.go b/enginetest/harness.go index 2a28d882fd..8b8445e874 100644 --- a/enginetest/harness.go +++ b/enginetest/harness.go @@ -35,8 +35,6 @@ import ( // times during a single test run, and must return a "fresh" engine instance each time, i.e. an instance that contains // exactly the test data provided via other setup methods. type Harness interface { - // Parallelism returns how many parallel go routines to use when constructing an engine for test. - Parallelism() int // NewContext allows a harness to specify any sessions or context variables necessary for the proper functioning of // their engine implementation. Every harnessed engine test uses the context created by this method, with some // additional information (e.g. current DB) set uniformly. To replicate the behavior of tests during setup, diff --git a/enginetest/initialization.go b/enginetest/initialization.go index b9ac230f07..f9e1574ca2 100644 --- a/enginetest/initialization.go +++ b/enginetest/initialization.go @@ -37,11 +37,6 @@ func NewContextWithClient(harness ClientHarness, client sql.Client) *sql.Context return newContextSetup(harness.NewContextWithClient(client)) } -// TODO: remove -func NewContextWithEngine(harness Harness, engine QueryEngine) *sql.Context { - return NewContext(harness) -} - var pid uint64 func newContextSetup(ctx *sql.Context) *sql.Context { @@ -84,22 +79,15 @@ func NewBaseSession() *sql.BaseSession { // NewEngineWithProvider returns a new engine with the specified provider func NewEngineWithProvider(_ *testing.T, harness Harness, provider sql.DatabaseProvider) *sqle.Engine { - var a *analyzer.Analyzer - - if harness.Parallelism() > 1 { - a = analyzer.NewBuilder(provider).WithParallelism(harness.Parallelism()).Build() - } else { - a = analyzer.NewDefault(provider) - } + analyzer := analyzer.NewDefault(provider) // All tests will run with all privileges on the built-in root account - a.Catalog.MySQLDb.AddRootAccount() + analyzer.Catalog.MySQLDb.AddRootAccount() // Almost no tests require an information schema that can be updated, but test setup makes it difficult to not // provide everywhere - a.Catalog.InfoSchema = information_schema.NewInformationSchemaDatabase() - - engine := sqle.New(a, new(sqle.Config)) + analyzer.Catalog.InfoSchema = information_schema.NewInformationSchemaDatabase() + engine := sqle.New(analyzer, new(sqle.Config)) if idh, ok := harness.(IndexDriverHarness); ok { idh.InitializeIndexDriver(engine.Analyzer.Catalog.AllDatabases(NewContext(harness))) } diff --git a/enginetest/memory_engine_test.go b/enginetest/memory_engine_test.go index 1c075f88ed..06a9bb1915 100644 --- a/enginetest/memory_engine_test.go +++ b/enginetest/memory_engine_test.go @@ -484,10 +484,6 @@ func TestIndexQueryPlans(t *testing.T) { } } -func TestParallelismQueries(t *testing.T) { - enginetest.TestParallelismQueries(t, enginetest.NewMemoryHarness("default", 2, testNumPartitions, true, nil)) -} - func TestQueryErrors(t *testing.T) { enginetest.TestQueryErrors(t, enginetest.NewDefaultMemoryHarness()) } diff --git a/enginetest/parallelism.go b/enginetest/parallelism.go deleted file mode 100644 index 950091ce06..0000000000 --- a/enginetest/parallelism.go +++ /dev/null @@ -1,61 +0,0 @@ -package enginetest - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/dolthub/go-mysql-server/enginetest/queries" - "github.com/dolthub/go-mysql-server/enginetest/scriptgen/setup" - "github.com/dolthub/go-mysql-server/sql" - "github.com/dolthub/go-mysql-server/sql/plan" - "github.com/dolthub/go-mysql-server/sql/transform" -) - -func TestParallelismQueries(t *testing.T, harness Harness) { - harness.Setup(setup.XySetup...) - e := mustNewEngine(t, harness) - defer e.Close() - for _, tt := range queries.ParallelismTests { - t.Run(tt.Query, func(t *testing.T) { - evalParallelismTest(t, harness, e, tt.Query, tt.Parallel) - }) - } -} - -func evalParallelismTest(t *testing.T, harness Harness, e QueryEngine, query string, parallel bool) { - ctx := NewContext(harness) - ctx = ctx.WithQuery(query) - a, err := analyzeQuery(ctx, e, query) - require.NoError(t, err) - require.Equal(t, parallel, findExchange(a), fmt.Sprintf("expected exchange: %t\nplan:\n%s", parallel, sql.DebugString(a))) -} - -func findExchange(n sql.Node) bool { - return transform.InspectUp(n, func(n sql.Node) bool { - if n == nil { - return false - } - _, ok := n.(*plan.Exchange) - if ok { - return true - } - - if ex, ok := n.(sql.Expressioner); ok { - for _, e := range ex.Expressions() { - found := transform.InspectExpr(e, func(e sql.Expression) bool { - sq, ok := e.(*plan.Subquery) - if !ok { - return false - } - return findExchange(sq.Query) - }) - if found { - return true - } - } - } - return false - }) -} diff --git a/enginetest/plangen/cmd/plangen/main.go b/enginetest/plangen/cmd/plangen/main.go index 7779baf30b..79c18dafb3 100644 --- a/enginetest/plangen/cmd/plangen/main.go +++ b/enginetest/plangen/cmd/plangen/main.go @@ -164,7 +164,7 @@ func generatePlansForSuite(spec PlanSpec, w *bytes.Buffer) error { _, _ = w.WriteString("\n") if !tt.Skip { - ctx := enginetest.NewContextWithEngine(harness, engine) + ctx := enginetest.NewContext(harness) binder := planbuilder.New(ctx, engine.EngineAnalyzer().Catalog, sql.NewMysqlParser()) parsed, _, _, qFlags, err := binder.Parse(tt.Query, nil, false) if err != nil { diff --git a/sql/analyzer/analyzer.go b/sql/analyzer/analyzer.go index 712df34c3a..79dc70f382 100644 --- a/sql/analyzer/analyzer.go +++ b/sql/analyzer/analyzer.go @@ -74,7 +74,6 @@ type Builder struct { afterAllRules []Rule provider sql.DatabaseProvider debug bool - parallelism int } // NewBuilder creates a new Builder from a specific catalog. @@ -100,12 +99,6 @@ func (ab *Builder) WithDebug() *Builder { return ab } -// WithParallelism sets the parallelism level on the analyzer. -func (ab *Builder) WithParallelism(parallelism int) *Builder { - ab.parallelism = parallelism - return ab -} - // AddPreAnalyzeRule adds a new rule to the analyze before the standard analyzer rules. func (ab *Builder) AddPreAnalyzeRule(id RuleId, fn RuleFunc) *Builder { ab.preAnalyzeRules = append(ab.preAnalyzeRules, Rule{id, fn}) @@ -271,7 +264,6 @@ func (ab *Builder) Build() *Analyzer { contextStack: make([]string, 0), Batches: batches, Catalog: NewCatalog(ab.provider), - Parallelism: ab.parallelism, Coster: memo.NewDefaultCoster(), ExecBuilder: rowexec.DefaultBuilder, } @@ -286,7 +278,6 @@ type Analyzer struct { Verbose bool // A stack of debugger context. See PushDebugContext, PopDebugContext contextStack []string - Parallelism int // Batches of Rules to apply. Batches []*Batch // Catalog of databases and registered functions. @@ -304,7 +295,6 @@ type Analyzer struct { // To add custom rules, the easiest way is use the Builder. func NewDefault(provider sql.DatabaseProvider) *Analyzer { return NewBuilder(provider).Build() - } // NewDefaultWithVersion creates a default Analyzer instance either @@ -402,10 +392,8 @@ func NewProcRuleSelector(sel RuleSelector) RuleSelector { switch id { case pruneTablesId, unnestInSubqueriesId, - // once after default rules should only be run once - TrackProcessId, - parallelizeId: + TrackProcessId: return false } return sel(id) @@ -452,8 +440,7 @@ func NewFinalizeUnionSel(sel RuleSelector) RuleSelector { case // skip recursive resolve rules resolveSubqueriesId, - resolveUnionsId, - parallelizeId: + resolveUnionsId: return false case finalizeSubqueriesId, hoistOutOfScopeFiltersId: diff --git a/sql/analyzer/parallelize.go b/sql/analyzer/parallelize.go deleted file mode 100644 index 7f4b32f3d7..0000000000 --- a/sql/analyzer/parallelize.go +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright 2020-2021 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package analyzer - -import ( - "os" - "strconv" - - "github.com/go-kit/kit/metrics/discard" - - "github.com/dolthub/go-mysql-server/sql" - "github.com/dolthub/go-mysql-server/sql/plan" - "github.com/dolthub/go-mysql-server/sql/transform" -) - -func init() { - // check for single-threaded feature flag - if v, ok := os.LookupEnv(singleThreadFlag); ok && v != "" { - SingleThreadFeatureFlag = true - } -} - -const ( - singleThreadFlag = "GMS_SINGLE_THREAD" -) - -var ( - // ParallelQueryCounter describes a metric that accumulates - // number of parallel queries monotonically. - ParallelQueryCounter = discard.NewCounter() - - SingleThreadFeatureFlag = false -) - -func shouldParallelize(node sql.Node, scope *plan.Scope) bool { - if SingleThreadFeatureFlag { - return false - } - - // Don't parallelize subqueries, this can blow up the execution graph quickly - if !scope.IsEmpty() { - return false - } - - // Do not try to parallelize DDL or descriptive operations - return !plan.IsNoRowNode(node) -} - -func parallelize(ctx *sql.Context, a *Analyzer, node sql.Node, scope *plan.Scope, sel RuleSelector, qFlags *sql.QueryFlags) (sql.Node, transform.TreeIdentity, error) { - if a.Parallelism <= 1 || !node.Resolved() { - return node, transform.SameTree, nil - } - - if !shouldParallelize(node, scope) { - return node, transform.SameTree, nil - } - - foundOrderedDistinct := false - newNode, same, err := transform.NodeWithCtx(node, nil, func(c transform.Context) (sql.Node, transform.TreeIdentity, error) { - if _, ok := c.Node.(*plan.OrderedDistinct); ok { - foundOrderedDistinct = true - } else if !isParallelizable(c.Node) { - return c.Node, transform.SameTree, nil - } else if _, ok := c.Parent.(*plan.Max1Row); ok { - return c.Node, transform.SameTree, nil - } - ParallelQueryCounter.With("parallelism", strconv.Itoa(a.Parallelism)).Add(1) - - return plan.NewExchange(a.Parallelism, c.Node), transform.NewTree, nil - }) - if err != nil || bool(same) || foundOrderedDistinct { - return node, transform.SameTree, err - } - - newNode, _, err = transform.Node(newNode, removeRedundantExchanges) - if err != nil { - return nil, transform.SameTree, err - } - - return newNode, transform.NewTree, nil -} - -// removeRedundantExchanges removes all the exchanges except for the topmost -// of all. -func removeRedundantExchanges(node sql.Node) (sql.Node, transform.TreeIdentity, error) { - exchange, ok := node.(*plan.Exchange) - if !ok { - return node, transform.SameTree, nil - } - - var seenIta bool - child, same, err := transform.Node(exchange.Child, func(node sql.Node) (sql.Node, transform.TreeIdentity, error) { - if exchange, ok := node.(*plan.Exchange); ok { - return exchange.Child, transform.NewTree, nil - } else if ita, ok := node.(*plan.IndexedTableAccess); ok { - if !ita.IsStatic() { - // do not parallelize lookup join - // todo(max): more graceful top-down exchange application - seenIta = true - } - } - return node, transform.SameTree, nil - }) - if err != nil { - return nil, transform.SameTree, err - } - if seenIta { - return child, transform.NewTree, nil - } - if same { - return node, transform.SameTree, nil - } - node, err = exchange.WithChildren(child) - return node, transform.NewTree, err -} - -func isParallelizable(node sql.Node) bool { - var parallelizable = true - var tableSeen bool - var lastWasTable bool - - transform.Inspect(node, func(node sql.Node) bool { - if node == nil { - return true - } - - lastWasTable = false - if plan.IsBinary(node) { - parallelizable = false - return false - } - - switch node := node.(type) { - // These are the only unary nodes that can be parallelized. Any other - // unary nodes will not. - case *plan.TableAlias, *plan.Exchange: - // Some nodes may have subquery expressions that make them unparallelizable - case *plan.Project, *plan.Filter: - for _, e := range node.(sql.Expressioner).Expressions() { - sql.Inspect(e, func(e sql.Expression) bool { - if q, ok := e.(*plan.Subquery); ok { - subqueryParallelizable := true - transform.Inspect(q.Query, func(node sql.Node) bool { - if node == nil { - return true - } - subqueryParallelizable = isParallelizable(node) - return subqueryParallelizable - }) - if !subqueryParallelizable { - parallelizable = false - } - return true - } - return true - }) - } - // IndexedTablesAccess already uses an index for lookups, so parallelizing it won't help in most cases (and can - // blow up the query execution graph) - case *plan.IndexedTableAccess: - parallelizable = false - return false - // Foreign keys expect specific nodes as children and face issues when they're swapped with Exchange nodes - case *plan.ForeignKeyHandler: - parallelizable = false - return false - case *plan.JSONTable: - parallelizable = false - return false - case *plan.RecursiveCte: - parallelizable = false - return false - case sql.Table: - lastWasTable = true - tableSeen = true - case *plan.JoinNode: - if node.Op.IsFullOuter() { - parallelizable = false - lastWasTable = true - tableSeen = true - return false - } - default: - parallelizable = false - } - return true - }) - - return parallelizable && tableSeen && lastWasTable -} diff --git a/sql/analyzer/parallelize_test.go b/sql/analyzer/parallelize_test.go deleted file mode 100644 index 275c5237ef..0000000000 --- a/sql/analyzer/parallelize_test.go +++ /dev/null @@ -1,313 +0,0 @@ -// Copyright 2020-2021 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package analyzer - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/dolthub/go-mysql-server/memory" - "github.com/dolthub/go-mysql-server/sql" - "github.com/dolthub/go-mysql-server/sql/expression" - "github.com/dolthub/go-mysql-server/sql/expression/function/aggregation/window" - "github.com/dolthub/go-mysql-server/sql/plan" - "github.com/dolthub/go-mysql-server/sql/transform" - "github.com/dolthub/go-mysql-server/sql/types" -) - -func TestParallelize(t *testing.T) { - require := require.New(t) - db := memory.NewDatabase("db") - pro := memory.NewDBProvider(db) - ctx := newContext(pro) - - table := memory.NewTable(db, "t", sql.PrimaryKeySchema{}, nil) - rule := getRuleFrom(OnceAfterAll, parallelizeId) - node := plan.NewProject( - nil, - plan.NewInnerJoin( - plan.NewFilter( - expression.NewLiteral(1, types.Int64), - plan.NewResolvedTable(table, nil, nil), - ), - plan.NewFilter( - expression.NewLiteral(1, types.Int64), - plan.NewResolvedTable(table, nil, nil), - ), - expression.NewLiteral(1, types.Int64), - ), - ) - - expected := plan.NewProject( - nil, - plan.NewInnerJoin( - plan.NewExchange( - 2, - plan.NewFilter( - expression.NewLiteral(1, types.Int64), - plan.NewResolvedTable(table, nil, nil), - ), - ), - plan.NewExchange( - 2, - plan.NewFilter( - expression.NewLiteral(1, types.Int64), - plan.NewResolvedTable(table, nil, nil), - ), - ), - expression.NewLiteral(1, types.Int64), - ), - ) - - result, _, err := rule.Apply(ctx, &Analyzer{Parallelism: 2}, node, nil, DefaultRuleSelector, nil) - require.NoError(err) - require.Equal(expected, result) -} - -func TestParallelizeCreateIndex(t *testing.T) { - require := require.New(t) - db := memory.NewDatabase("db") - pro := memory.NewDBProvider(db) - ctx := newContext(pro) - - table := memory.NewTable(db, "t", sql.PrimaryKeySchema{}, nil) - rule := getRuleFrom(OnceAfterAll, parallelizeId) - node := plan.NewCreateIndex( - "", - plan.NewResolvedTable(table, nil, nil), - nil, - "", - nil, - ) - - result, _, err := rule.Apply(ctx, &Analyzer{Parallelism: 1}, node, nil, DefaultRuleSelector, nil) - require.NoError(err) - require.Equal(node, result) -} - -func TestIsParallelizable(t *testing.T) { - db := memory.NewDatabase("db") - table := memory.NewTable(db, "t", sql.PrimaryKeySchema{}, nil) - - testCases := []struct { - name string - node sql.Node - parallelizable bool - }{ - { - "just table", - plan.NewResolvedTable(table, nil, nil), - true, - }, - { - "filter", - plan.NewFilter( - expression.NewLiteral(1, types.Int64), - plan.NewResolvedTable(table, nil, nil), - ), - true, - }, - { - "filter with a subquery", - plan.NewFilter( - eq( - lit(1), - plan.NewSubquery( - plan.NewProject([]sql.Expression{lit(1)}, plan.NewResolvedTable(table, nil, nil)), "select 1 from table")), - plan.NewResolvedTable(table, nil, nil), - ), - true, - }, - { - "filter with an incompatible subquery", - plan.NewFilter( - eq( - lit(1), - plan.NewSubquery( - plan.NewProject([]sql.Expression{gf(0, "", "row_number()")}, - plan.NewWindow([]sql.Expression{window.NewRowNumber()}, plan.NewResolvedTable(table, nil, nil)), - ), - "select row_number over () from table", - ), - ), - plan.NewResolvedTable(table, nil, nil), - ), - false, - }, - { - "project", - plan.NewProject( - nil, - plan.NewFilter( - expression.NewLiteral(1, types.Int64), - plan.NewResolvedTable(table, nil, nil), - ), - ), - true, - }, - { - "project with a subquery", - plan.NewProject([]sql.Expression{ - plan.NewSubquery( - plan.NewProject([]sql.Expression{lit(1)}, plan.NewResolvedTable(table, nil, nil)), "select 1 from table"), - }, - plan.NewFilter( - expression.NewLiteral(1, types.Int64), - plan.NewResolvedTable(table, nil, nil), - ), - ), - true, - }, - { - "project with an incompatible subquery", - plan.NewProject([]sql.Expression{ - plan.NewSubquery( - plan.NewProject([]sql.Expression{gf(0, "", "row_number()")}, - plan.NewWindow([]sql.Expression{window.NewRowNumber()}, plan.NewResolvedTable(table, nil, nil)), - ), - "select row_number over () from table", - ), - }, - plan.NewFilter( - expression.NewLiteral(1, types.Int64), - plan.NewResolvedTable(table, nil, nil), - ), - ), - false, - }, - { - "join", - plan.NewInnerJoin( - plan.NewResolvedTable(table, nil, nil), - plan.NewResolvedTable(table, nil, nil), - expression.NewLiteral(1, types.Int64), - ), - false, - }, - { - "group by", - plan.NewGroupBy( - nil, - nil, - plan.NewResolvedTable(nil, nil, nil), - ), - false, - }, - { - "limit", - plan.NewLimit( - expression.NewLiteral(5, types.Int8), - plan.NewResolvedTable(nil, nil, nil), - ), - false, - }, - { - "offset", - plan.NewOffset( - expression.NewLiteral(5, types.Int8), - plan.NewResolvedTable(nil, nil, nil), - ), - false, - }, - { - "sort", - plan.NewSort( - nil, - plan.NewResolvedTable(nil, nil, nil), - ), - false, - }, - { - "distinct", - plan.NewDistinct( - plan.NewResolvedTable(nil, nil, nil), - ), - false, - }, - { - "ordered distinct", - plan.NewOrderedDistinct( - plan.NewResolvedTable(nil, nil, nil), - ), - false, - }, - } - - for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - require.Equal(t, tt.parallelizable, isParallelizable(tt.node)) - }) - } -} - -func TestRemoveRedundantExchanges(t *testing.T) { - require := require.New(t) - db := memory.NewDatabase("db") - - table := memory.NewTable(db, "t", sql.PrimaryKeySchema{}, nil) - - node := plan.NewProject( - nil, - plan.NewInnerJoin( - plan.NewExchange( - 1, - plan.NewFilter( - expression.NewLiteral(1, types.Int64), - plan.NewExchange( - 1, - plan.NewResolvedTable(table, nil, nil), - ), - ), - ), - plan.NewExchange( - 1, - plan.NewFilter( - expression.NewLiteral(1, types.Int64), - plan.NewExchange( - 1, - plan.NewResolvedTable(table, nil, nil), - ), - ), - ), - expression.NewLiteral(1, types.Int64), - ), - ) - - expected := plan.NewProject( - nil, - plan.NewInnerJoin( - plan.NewExchange( - 1, - plan.NewFilter( - expression.NewLiteral(1, types.Int64), - plan.NewResolvedTable(table, nil, nil), - ), - ), - plan.NewExchange( - 1, - plan.NewFilter( - expression.NewLiteral(1, types.Int64), - plan.NewResolvedTable(table, nil, nil), - ), - ), - expression.NewLiteral(1, types.Int64), - ), - ) - - result, _, err := transform.Node(node, removeRedundantExchanges) - require.NoError(err) - require.Equal(expected, result) -} diff --git a/sql/analyzer/rule_ids.go b/sql/analyzer/rule_ids.go index 1c37156b98..97f923e4c9 100644 --- a/sql/analyzer/rule_ids.go +++ b/sql/analyzer/rule_ids.go @@ -88,5 +88,4 @@ const ( cacheSubqueryAliasesInJoinsId // cacheSubqueryAliasesInJoins BacktickDefaulColumnValueNamesId // backtickDefaultColumnValueNames TrackProcessId // trackProcess - parallelizeId // parallelize ) diff --git a/sql/analyzer/ruleid_string.go b/sql/analyzer/ruleid_string.go index 5446095fdd..7186bbc769 100755 --- a/sql/analyzer/ruleid_string.go +++ b/sql/analyzer/ruleid_string.go @@ -82,12 +82,11 @@ func _() { _ = x[cacheSubqueryAliasesInJoinsId-71] _ = x[BacktickDefaulColumnValueNamesId-72] _ = x[TrackProcessId-73] - _ = x[parallelizeId-74] } -const _RuleId_name = "applyDefaultSelectLimitvalidateOffsetAndLimitvalidateStarExpressionsvalidateCreateTablevalidateAlterTablevalidateExprSemloadStoredProceduresvalidateDropTablesresolveDropConstraintvalidateDropConstraintresolveCreateSelectresolveSubqueriesresolveUnionsresolveDescribeQueryvalidateColumnDefaultsvalidateCreateTriggervalidateCreateProcedurevalidateReadOnlyDatabasevalidateReadOnlyTransactionvalidateDatabaseSetvalidatePrivilegesapplyEventSchedulerflattenTableAliasespushdownSubqueryAliasFiltersvalidateCheckConstraintsreplaceCountStarreplaceCrossJoinsmoveJoinConditionsToFiltersimplifyFilterspushNotFiltershoistOutOfScopeFiltersunnestInSubqueriesunnestExistsSubqueriesfinalizeSubqueriesfinalizeUnionsloadTriggersprocessTruncateresolveAlterColumnstripTableNamesFromColumnDefaultsoptimizeJoinspushFiltersapplyIndexesFromOuterScopepruneTablesassignExecIndexesinlineSubqueryAliasRefseraseProjectionflattenDistinctreplaceAggreplaceIdxSortinsertTopNNodesreplaceIdxOrderByDistanceapplyHashInresolveInsertRowsapplyTriggersapplyProceduresassignRoutinesmodifyUpdateExprsForJoinapplyUpdateAccumulatorswrapWithRollbackapplyForeignKeysvalidateResolvedvalidateOrderByvalidateGroupByvalidateSchemaSourcevalidateIndexCreationvalidateOperandsvalidateIntervalUsagevalidateSubqueryColumnsvalidateUnionSchemasMatchvalidateAggregationsvalidateDeleteFromcacheSubqueryAliasesInJoinsbacktickDefaultColumnValueNamestrackProcessparallelize" +const _RuleId_name = "applyDefaultSelectLimitvalidateOffsetAndLimitvalidateStarExpressionsvalidateCreateTablevalidateAlterTablevalidateExprSemloadStoredProceduresvalidateDropTablesresolveDropConstraintvalidateDropConstraintresolveCreateSelectresolveSubqueriesresolveUnionsresolveDescribeQueryvalidateColumnDefaultsvalidateCreateTriggervalidateCreateProcedurevalidateReadOnlyDatabasevalidateReadOnlyTransactionvalidateDatabaseSetvalidatePrivilegesapplyEventSchedulerflattenTableAliasespushdownSubqueryAliasFiltersvalidateCheckConstraintsreplaceCountStarreplaceCrossJoinsmoveJoinConditionsToFiltersimplifyFilterspushNotFiltershoistOutOfScopeFiltersunnestInSubqueriesunnestExistsSubqueriesfinalizeSubqueriesfinalizeUnionsloadTriggersprocessTruncateresolveAlterColumnstripTableNamesFromColumnDefaultsoptimizeJoinspushFiltersapplyIndexesFromOuterScopepruneTablesassignExecIndexesinlineSubqueryAliasRefseraseProjectionflattenDistinctreplaceAggreplaceIdxSortinsertTopNNodesreplaceIdxOrderByDistanceapplyHashInresolveInsertRowsapplyTriggersapplyProceduresassignRoutinesmodifyUpdateExprsForJoinapplyUpdateAccumulatorswrapWithRollbackapplyForeignKeysvalidateResolvedvalidateOrderByvalidateGroupByvalidateSchemaSourcevalidateIndexCreationvalidateOperandsvalidateIntervalUsagevalidateSubqueryColumnsvalidateUnionSchemasMatchvalidateAggregationsvalidateDeleteFromcacheSubqueryAliasesInJoinsbacktickDefaultColumnValueNamestrackProcess" -var _RuleId_index = [...]uint16{0, 23, 45, 68, 87, 105, 120, 140, 158, 179, 201, 220, 237, 250, 270, 292, 313, 336, 360, 387, 406, 424, 443, 462, 490, 514, 530, 547, 573, 588, 602, 624, 642, 664, 682, 696, 708, 723, 741, 774, 787, 798, 824, 835, 852, 875, 890, 905, 915, 929, 944, 969, 980, 997, 1010, 1025, 1039, 1063, 1086, 1102, 1118, 1134, 1149, 1164, 1184, 1205, 1221, 1242, 1265, 1290, 1310, 1328, 1355, 1386, 1398, 1409} +var _RuleId_index = [...]uint16{0, 23, 45, 68, 87, 105, 120, 140, 158, 179, 201, 220, 237, 250, 270, 292, 313, 336, 360, 387, 406, 424, 443, 462, 490, 514, 530, 547, 573, 588, 602, 624, 642, 664, 682, 696, 708, 723, 741, 774, 787, 798, 824, 835, 852, 875, 890, 905, 915, 929, 944, 969, 980, 997, 1010, 1025, 1039, 1063, 1086, 1102, 1118, 1134, 1149, 1164, 1184, 1205, 1221, 1242, 1265, 1290, 1310, 1328, 1355, 1386, 1398} func (i RuleId) String() string { if i < 0 || i >= RuleId(len(_RuleId_index)-1) { diff --git a/sql/analyzer/rules.go b/sql/analyzer/rules.go index ac5826485e..dc7a431b09 100644 --- a/sql/analyzer/rules.go +++ b/sql/analyzer/rules.go @@ -27,7 +27,6 @@ func init() { {cacheSubqueryAliasesInJoinsId, cacheSubqueryAliasesInJoins}, {BacktickDefaulColumnValueNamesId, backtickDefaultColumnValueNames}, {TrackProcessId, trackProcess}, - {parallelizeId, parallelize}, } } diff --git a/sql/plan/exchange.go b/sql/plan/exchange.go deleted file mode 100644 index 29be162f53..0000000000 --- a/sql/plan/exchange.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright 2020-2021 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package plan - -import ( - "fmt" - - errors "gopkg.in/src-d/go-errors.v1" - - "github.com/dolthub/go-mysql-server/sql" -) - -// ErrNoPartitionable is returned when no Partitionable node is found -// in the Exchange tree. -var ErrNoPartitionable = errors.NewKind("no partitionable node found in exchange tree") - -// Exchange is a node that can parallelize the underlying tree iterating -// partitions concurrently. -type Exchange struct { - UnaryNode - Parallelism int -} - -var _ sql.Node = (*Exchange)(nil) -var _ sql.CollationCoercible = (*Exchange)(nil) - -// NewExchange creates a new Exchange node. -func NewExchange( - parallelism int, - child sql.Node, -) *Exchange { - return &Exchange{ - UnaryNode: UnaryNode{Child: child}, - Parallelism: parallelism, - } -} - -func (e *Exchange) String() string { - p := sql.NewTreePrinter() - _ = p.WriteNode("Exchange") - _ = p.WriteChildren(e.Child.String()) - return p.String() -} - -func (e *Exchange) DebugString() string { - p := sql.NewTreePrinter() - _ = p.WriteNode("Exchange(parallelism=%d)", e.Parallelism) - _ = p.WriteChildren(sql.DebugString(e.Child)) - return p.String() -} - -func (e *Exchange) IsReadOnly() bool { - return e.Child.IsReadOnly() -} - -// WithChildren implements the Node interface. -func (e *Exchange) WithChildren(children ...sql.Node) (sql.Node, error) { - if len(children) != 1 { - return nil, sql.ErrInvalidChildrenNumber.New(e, len(children), 1) - } - - return NewExchange(e.Parallelism, children[0]), nil -} - -// CheckPrivileges implements the interface sql.Node. -func (e *Exchange) CheckPrivileges(ctx *sql.Context, opChecker sql.PrivilegedOperationChecker) bool { - return e.Child.CheckPrivileges(ctx, opChecker) -} - -// CollationCoercibility implements the interface sql.CollationCoercible. -func (e *Exchange) CollationCoercibility(ctx *sql.Context) (collation sql.CollationID, coercibility byte) { - return sql.GetCoercibility(ctx, e.Child) -} - -type ExchangePartition struct { - sql.Partition - Table sql.Table -} - -var _ sql.Node = (*ExchangePartition)(nil) - -func (p *ExchangePartition) String() string { - return fmt.Sprintf("Partition(%s)", string(p.Key())) -} - -func (ExchangePartition) Children() []sql.Node { return nil } - -func (ExchangePartition) Resolved() bool { return true } -func (ExchangePartition) IsReadOnly() bool { return true } - -func (p *ExchangePartition) Schema() sql.Schema { - return p.Table.Schema() -} - -// WithChildren implements the Node interface. -func (p *ExchangePartition) WithChildren(children ...sql.Node) (sql.Node, error) { - if len(children) != 0 { - return nil, sql.ErrInvalidChildrenNumber.New(p, len(children), 0) - } - - return p, nil -} - -// CheckPrivileges implements the interface sql.Node. -func (p *ExchangePartition) CheckPrivileges(ctx *sql.Context, opChecker sql.PrivilegedOperationChecker) bool { - if node, ok := p.Table.(sql.Node); ok { - return node.CheckPrivileges(ctx, opChecker) - } - // If the table is not a TableNode or other such node, then I guess we'll return true as to not fail. - // This may not be the correct behavior though, as it's just a guess. - return true -} - -// CollationCoercibility implements the interface sql.CollationCoercible. -func (p *ExchangePartition) CollationCoercibility(ctx *sql.Context) (collation sql.CollationID, coercibility byte) { - // This is inspired by CheckPrivileges, although it may not be the desired behavior in all circumstances - if node, ok := p.Table.(sql.Node); ok { - return sql.GetCoercibility(ctx, node) - } - return sql.Collation_binary, 7 -} diff --git a/sql/plan/insert.go b/sql/plan/insert.go index 20ddb381cb..78bf38d9d2 100644 --- a/sql/plan/insert.go +++ b/sql/plan/insert.go @@ -387,8 +387,6 @@ func (id *InsertDestination) CollationCoercibility(ctx *sql.Context) (collation func GetInsertable(node sql.Node) (sql.InsertableTable, error) { switch node := node.(type) { - case *Exchange: - return GetInsertable(node.Child) case sql.InsertableTable: return node, nil case *ResolvedTable: diff --git a/sql/planbuilder/dml_validate.go b/sql/planbuilder/dml_validate.go index cf915f0b09..f7e3c04f44 100644 --- a/sql/planbuilder/dml_validate.go +++ b/sql/planbuilder/dml_validate.go @@ -177,10 +177,6 @@ func validGeneratedColumnValue(idx int, source sql.Node) bool { } func validateValueCount(columnNames []string, values sql.Node) error { - if exchange, ok := values.(*plan.Exchange); ok { - values = exchange.Child - } - switch node := values.(type) { case *plan.Values: for _, exprTuple := range node.ExpressionTuples { diff --git a/sql/rowexec/builder_gen_test.go b/sql/rowexec/builder_gen_test.go index 9000f0618c..5580264995 100644 --- a/sql/rowexec/builder_gen_test.go +++ b/sql/rowexec/builder_gen_test.go @@ -84,8 +84,6 @@ func TestGenBuilder(t *testing.T) { "SingleDropView": "*plan.SingleDropView", "DropView": "*plan.DropView", "EmptyTable": "*plan.EmptyTable", - "Exchange": "*plan.Exchange", - "exchangePartition": "*plan.exchangePartition", "ExternalProcedure": "*plan.ExternalProcedure", "Fetch": "*plan.Fetch", "Filter": "*plan.Filter", diff --git a/sql/rowexec/exchange_test.go b/sql/rowexec/exchange_test.go deleted file mode 100644 index 989b156869..0000000000 --- a/sql/rowexec/exchange_test.go +++ /dev/null @@ -1,266 +0,0 @@ -// Copyright 2020-2021 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rowexec - -import ( - "context" - "fmt" - "io" - "sync/atomic" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/dolthub/go-mysql-server/sql" - "github.com/dolthub/go-mysql-server/sql/expression" - "github.com/dolthub/go-mysql-server/sql/plan" - "github.com/dolthub/go-mysql-server/sql/types" -) - -func TestExchange(t *testing.T) { - children := plan.NewProject( - []sql.Expression{ - expression.NewGetField(0, types.Text, "partition", false), - expression.NewArithmetic( - expression.NewGetField(1, types.Int64, "val", false), - expression.NewLiteral(int64(1), types.Int64), - "+", - ), - }, - plan.NewFilter( - expression.NewLessThan( - expression.NewGetField(1, types.Int64, "val", false), - expression.NewLiteral(int64(4), types.Int64), - ), - &partitionable{nil, 3, 6}, - ), - ) - - expected := []sql.Row{ - {"1", int64(2)}, - {"1", int64(3)}, - {"1", int64(4)}, - {"2", int64(2)}, - {"2", int64(3)}, - {"2", int64(4)}, - {"3", int64(2)}, - {"3", int64(3)}, - {"3", int64(4)}, - } - - for i := 1; i <= 4; i++ { - t.Run(fmt.Sprint(i), func(t *testing.T) { - require := require.New(t) - - exchange := plan.NewExchange(i, children) - ctx := sql.NewEmptyContext() - iter, err := DefaultBuilder.Build(ctx, exchange, nil) - require.NoError(err) - - rows, err := sql.RowIterToRows(ctx, iter) - require.NoError(err) - require.ElementsMatch(expected, rows) - }) - } -} - -func TestExchangeCancelled(t *testing.T) { - children := plan.NewProject( - []sql.Expression{ - expression.NewGetField(0, types.Text, "partition", false), - expression.NewArithmetic( - expression.NewGetField(1, types.Int64, "val", false), - expression.NewLiteral(int64(1), types.Int64), - "+", - ), - }, - plan.NewFilter( - expression.NewLessThan( - expression.NewGetField(1, types.Int64, "val", false), - expression.NewLiteral(int64(4), types.Int64), - ), - &partitionable{nil, 3, 2048}, - ), - ) - - exchange := plan.NewExchange(3, children) - require := require.New(t) - - c, cancel := context.WithCancel(context.Background()) - ctx := sql.NewContext(c) - cancel() - - iter, err := DefaultBuilder.Build(ctx, exchange, nil) - require.NoError(err) - - _, err = iter.Next(ctx) - require.Equal(context.Canceled, err) -} - -func TestExchangeIterPartitionsPanic(t *testing.T) { - ctx := sql.NewContext(context.Background()) - piter, err := (&partitionable{nil, 3, 2048}).Partitions(ctx) - assert.NoError(t, err) - closedCh := make(chan sql.Partition) - close(closedCh) - err = iterPartitions(ctx, piter, closedCh) - assert.Error(t, err) - assert.Contains(t, err.Error(), "panic") - - openCh := make(chan sql.Partition) - err = iterPartitions(ctx, &partitionPanic{}, openCh) - assert.Error(t, err) - assert.Contains(t, err.Error(), "panic") -} - -func TestExchangeIterPartitionRowsPanic(t *testing.T) { - ctx := sql.NewContext(context.Background()) - partitions := make(chan sql.Partition, 1) - partitions <- Partition("test") - err := iterPartitionRows(ctx, func(*sql.Context, sql.Partition) (sql.RowIter, error) { - return &rowIterPanic{}, nil - }, partitions, nil) - assert.Error(t, err) - assert.Contains(t, err.Error(), "panic") - - closedCh := make(chan sql.Row) - close(closedCh) - partitions <- Partition("test") - err = iterPartitionRows(ctx, func(*sql.Context, sql.Partition) (sql.RowIter, error) { - return &partitionRows{Partition("test"), 10}, nil - }, partitions, closedCh) - assert.Error(t, err) - assert.Contains(t, err.Error(), "panic") -} - -type partitionable struct { - sql.Node - partitions int - rowsPerPartition int -} - -var _ sql.Table = partitionable{} -var _ sql.CollationCoercible = partitionable{} - -// WithChildren implements the Node interface. -func (p *partitionable) WithChildren(children ...sql.Node) (sql.Node, error) { - if len(children) != 0 { - return nil, sql.ErrInvalidChildrenNumber.New(p, len(children), 0) - } - - return p, nil -} - -// CheckPrivileges implements the interface sql.Node. -func (p *partitionable) CheckPrivileges(ctx *sql.Context, opChecker sql.PrivilegedOperationChecker) bool { - return p.Node.CheckPrivileges(ctx, opChecker) -} - -// CollationCoercibility implements the interface sql.CollationCoercible. -func (p partitionable) CollationCoercibility(ctx *sql.Context) (collation sql.CollationID, coercibility byte) { - return sql.GetCoercibility(ctx, p.Node) -} - -func (partitionable) Children() []sql.Node { return nil } - -func (p partitionable) Partitions(*sql.Context) (sql.PartitionIter, error) { - return &exchangePartitionIter{int32(p.partitions)}, nil -} - -func (p partitionable) PartitionRows(_ *sql.Context, part sql.Partition) (sql.RowIter, error) { - return &partitionRows{part, int32(p.rowsPerPartition)}, nil -} - -func (partitionable) Schema() sql.Schema { - return sql.Schema{ - {Name: "partition", Type: types.Text, Source: "foo"}, - {Name: "val", Type: types.Int64, Source: "foo"}, - } -} - -func (partitionable) Collation() sql.CollationID { - return sql.Collation_Default -} - -func (partitionable) Name() string { return "partitionable" } - -type Partition string - -func (p Partition) Key() []byte { - return []byte(p) -} - -type exchangePartitionIter struct { - num int32 -} - -func (i *exchangePartitionIter) Next(*sql.Context) (sql.Partition, error) { - new := atomic.AddInt32(&i.num, -1) - if new < 0 { - return nil, io.EOF - } - - return Partition(fmt.Sprint(new + 1)), nil -} - -func (i *exchangePartitionIter) Close(*sql.Context) error { - atomic.StoreInt32(&i.num, -1) - return nil -} - -type partitionRows struct { - sql.Partition - num int32 -} - -func (r *partitionRows) Next(*sql.Context) (sql.Row, error) { - new := atomic.AddInt32(&r.num, -1) - if new < 0 { - return nil, io.EOF - } - - return sql.NewRow(string(r.Key()), int64(new+1)), nil -} - -func (r *partitionRows) Close(*sql.Context) error { - atomic.StoreInt32(&r.num, -1) - return nil -} - -type rowIterPanic struct { -} - -func (*rowIterPanic) Next(*sql.Context) (sql.Row, error) { - panic("i panic") -} - -func (*rowIterPanic) Close(*sql.Context) error { - return nil -} - -type partitionPanic struct { - sql.Partition - closed bool -} - -func (*partitionPanic) Next(*sql.Context) (sql.Partition, error) { - panic("partitionPanic.Next") -} - -func (p *partitionPanic) Close(_ *sql.Context) error { - p.closed = true - return nil -} diff --git a/sql/rowexec/node_builder.gen.go b/sql/rowexec/node_builder.gen.go index 4ec32514df..f21c8a3ec7 100644 --- a/sql/rowexec/node_builder.gen.go +++ b/sql/rowexec/node_builder.gen.go @@ -378,10 +378,6 @@ func (b *BaseBuilder) buildNodeExecNoAnalyze(ctx *sql.Context, n sql.Node, row s return b.buildJSONTable(ctx, n, row) case *plan.UnlockTables: return b.buildUnlockTables(ctx, n, row) - case *plan.Exchange: - return b.buildExchange(ctx, n, row) - case *plan.ExchangePartition: - return b.buildExchangePartition(ctx, n, row) case *plan.HashLookup: return b.buildHashLookup(ctx, n, row) case *plan.Iterate: diff --git a/sql/rowexec/other.go b/sql/rowexec/other.go index f1f6fef3b6..0c9326024d 100644 --- a/sql/rowexec/other.go +++ b/sql/rowexec/other.go @@ -21,7 +21,6 @@ import ( "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/expression" "github.com/dolthub/go-mysql-server/sql/plan" - "github.com/dolthub/go-mysql-server/sql/transform" "github.com/dolthub/go-mysql-server/sql/types" ) @@ -110,73 +109,6 @@ func (b *BaseBuilder) buildNamedWindows(ctx *sql.Context, n *plan.NamedWindows, return nil, fmt.Errorf("%T has no execution iterator", n) } -func (b *BaseBuilder) buildExchange(ctx *sql.Context, n *plan.Exchange, row sql.Row) (sql.RowIter, error) { - var t sql.Table - transform.Inspect(n.Child, func(n sql.Node) bool { - if table, ok := n.(sql.Table); ok { - t = table - return false - } - return true - }) - if t == nil { - return nil, plan.ErrNoPartitionable.New() - } - - partitions, err := t.Partitions(ctx) - if err != nil { - return nil, err - } - - // How this is structured is a little subtle. A top-level - // errgroup run |iterPartitions| and listens on the shutdown - // hook. A different, dependent, errgroup runs - // |e.Parallelism| instances of |iterPartitionRows|. A - // goroutine within the top-level errgroup |Wait|s on the - // dependent errgroup and closes |rowsCh| once all its - // goroutines are completed. - - partitionsCh := make(chan sql.Partition) - rowsCh := make(chan sql.Row, n.Parallelism*16) - - eg, egCtx := ctx.NewErrgroup() - eg.Go(func() error { - defer close(partitionsCh) - return iterPartitions(egCtx, partitions, partitionsCh) - }) - - // Spawn |iterPartitionRows| goroutines in the dependent - // errgroup. - getRowIter := b.exchangeIterGen(n, row) - seg, segCtx := egCtx.NewErrgroup() - for i := 0; i < n.Parallelism; i++ { - seg.Go(func() error { - return iterPartitionRows(segCtx, getRowIter, partitionsCh, rowsCh) - }) - } - - eg.Go(func() error { - defer close(rowsCh) - err := seg.Wait() - if err != nil { - return err - } - // If everything in |seg| returned |nil|, - // |iterPartitions| is done, |partitionsCh| is closed, - // and every partition RowIter returned |EOF|. That - // means we're EOF here. - return io.EOF - }) - - waiter := func() error { return eg.Wait() } - shutdownHook := newShutdownHook(eg, egCtx) - return &exchangeRowIter{shutdownHook: shutdownHook, waiter: waiter, rows: rowsCh}, nil -} - -func (b *BaseBuilder) buildExchangePartition(ctx *sql.Context, n *plan.ExchangePartition, row sql.Row) (sql.RowIter, error) { - return n.Table.PartitionRows(ctx, n.Partition) -} - func (b *BaseBuilder) buildEmptyTable(ctx *sql.Context, n *plan.EmptyTable, row sql.Row) (sql.RowIter, error) { return sql.RowsToRowIter(), nil } diff --git a/sql/rowexec/other_iters.go b/sql/rowexec/other_iters.go index dcc2e9b713..afe1b2b53d 100644 --- a/sql/rowexec/other_iters.go +++ b/sql/rowexec/other_iters.go @@ -15,17 +15,11 @@ package rowexec import ( - "context" - "fmt" "io" "sync" - "go.opentelemetry.io/otel/attribute" - "golang.org/x/sync/errgroup" - "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/plan" - "github.com/dolthub/go-mysql-server/sql/transform" ) type analyzeTableIter struct { @@ -274,177 +268,6 @@ func (d *declareCursorIter) Close(ctx *sql.Context) error { return nil } -// iterPartitions will call Next() on |iter| and send every result it -// finds to |partitions|. Meant to be run as a goroutine in an -// errgroup, it returns a non-nil error if it gets an error and it -// return |ctx.Err()| if the context becomes Done(). -func iterPartitions(ctx *sql.Context, iter sql.PartitionIter, partitions chan<- sql.Partition) (rerr error) { - defer func() { - if r := recover(); r != nil { - rerr = fmt.Errorf("panic in iterPartitions: %v", r) - } - }() - defer func() { - cerr := iter.Close(ctx) - if rerr == nil { - rerr = cerr - } - }() - for { - p, err := iter.Next(ctx) - if err != nil { - if err == io.EOF { - return nil - } - return err - } - select { - case partitions <- p: - case <-ctx.Done(): - return ctx.Err() - } - } -} - -type rowIterPartitionFunc func(ctx *sql.Context, partition sql.Partition) (sql.RowIter, error) - -// iterPartitionRows is the parallel worker for an Exchange node. It -// is meant to be run as a goroutine in an errgroup.Group. It will -// values read off of |partitions|. For each value it reads, it will -// call |getRowIter| to get a row ProjectIter, and will then call |Next| on -// that row ProjectIter, passing every row it gets into |rows|. If it -// receives an error at any point, it returns it. |iterPartitionRows| -// stops iterating and returns |nil| when |partitions| is closed. -func iterPartitionRows(ctx *sql.Context, getRowIter rowIterPartitionFunc, partitions <-chan sql.Partition, rows chan<- sql.Row) (rerr error) { - defer func() { - if r := recover(); r != nil { - rerr = fmt.Errorf("panic in ExchangeIterPartitionRows: %v", r) - } - }() - for { - select { - case p, ok := <-partitions: - if !ok { - return nil - } - span, ctx := ctx.Span("exchange.IterPartition") - iter, err := getRowIter(ctx, p) - if err != nil { - return err - } - count, err := sendAllRows(ctx, iter, rows) - span.SetAttributes(attribute.Int("num_rows", count)) - span.End() - if err != nil { - return err - } - case <-ctx.Done(): - return ctx.Err() - } - } -} - -func sendAllRows(ctx *sql.Context, iter sql.RowIter, rows chan<- sql.Row) (rowCount int, rerr error) { - defer func() { - cerr := iter.Close(ctx) - if rerr == nil { - rerr = cerr - } - }() - for { - r, err := iter.Next(ctx) - if err == io.EOF { - return rowCount, nil - } - if err != nil { - return rowCount, err - } - rowCount++ - select { - case rows <- r: - case <-ctx.Done(): - return rowCount, ctx.Err() - } - } -} - -func (b *BaseBuilder) exchangeIterGen(e *plan.Exchange, row sql.Row) func(*sql.Context, sql.Partition) (sql.RowIter, error) { - return func(ctx *sql.Context, partition sql.Partition) (sql.RowIter, error) { - node, _, err := transform.Node(e.Child, func(n sql.Node) (sql.Node, transform.TreeIdentity, error) { - if t, ok := n.(sql.Table); ok { - return &plan.ExchangePartition{partition, t}, transform.NewTree, nil - } - return n, transform.SameTree, nil - }) - if err != nil { - return nil, err - } - return b.buildNodeExec(ctx, node, row) - } -} - -// exchangeRowIter implements sql.RowIter for an exchange -// node. Calling |Next| reads off of |rows|, while calling |Close| -// calls |shutdownHook| and waits for exchange node workers to -// shutdown. If |rows| is closed, |Next| returns the error returned by -// |waiter|. |Close| returns the error returned by |waiter|, except it -// returns |nil| if |waiter| returns |io.EOF| or |shutdownHookErr|. -type exchangeRowIter struct { - shutdownHook func() - waiter func() error - rows <-chan sql.Row - rows2 <-chan sql.Row2 -} - -var _ sql.RowIter = (*exchangeRowIter)(nil) - -func (i *exchangeRowIter) Next(ctx *sql.Context) (sql.Row, error) { - if i.rows == nil { - panic("Next called for a Next2 iterator") - } - r, ok := <-i.rows - if !ok { - return nil, i.waiter() - } - return r, nil -} - -func (i *exchangeRowIter) Close(ctx *sql.Context) error { - i.shutdownHook() - err := i.waiter() - if err == shutdownHookErr || err == io.EOF { - return nil - } - return err -} - -var shutdownHookErr = fmt.Errorf("shutdown hook") - -// newShutdownHook returns a |func()| that can be called to cancel the -// |ctx| associated with the supplied |eg|. It is safe to call the -// hook more than once. -// -// If an errgroup is shutdown with a shutdown hook, eg.Wait() will -// return |shutdownHookErr|. This can be used to consider requested -// shutdowns successful in some contexts, for example. -func newShutdownHook(eg *errgroup.Group, ctx context.Context) func() { - stop := make(chan struct{}) - eg.Go(func() error { - select { - case <-stop: - return shutdownHookErr - case <-ctx.Done(): - return nil - } - }) - shutdownOnce := &sync.Once{} - return func() { - shutdownOnce.Do(func() { - close(stop) - }) - } -} - type releaseIter struct { child sql.RowIter release func() diff --git a/sql/rowexec/show.go b/sql/rowexec/show.go index fc69e5df71..e74d256933 100644 --- a/sql/rowexec/show.go +++ b/sql/rowexec/show.go @@ -451,12 +451,8 @@ func (b *BaseBuilder) buildShowColumns(ctx *sql.Context, n *plan.ShowColumns, ro null = "YES" } - node := n.Child - if exchange, ok := node.(*plan.Exchange); ok { - node = exchange.Child - } key := "" - switch table := node.(type) { + switch table := n.Child.(type) { case *plan.ResolvedTable: if col.PrimaryKey { key = "PRI"