Skip to content

Commit 7a856d9

Browse files
- Working localy for synchronous.
1 parent 4e3796a commit 7a856d9

File tree

7 files changed

+110
-44
lines changed

7 files changed

+110
-44
lines changed

internal/stackql/internal_data_transfer/builder_input/builder_input.go

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto"
1010
"github.com/stackql/stackql/internal/stackql/primitivegraph"
1111
"github.com/stackql/stackql/internal/stackql/provider"
12+
"github.com/stackql/stackql/internal/stackql/tableinsertioncontainer"
1213
"github.com/stackql/stackql/internal/stackql/tablemetadata"
1314
)
1415

@@ -51,27 +52,30 @@ type BuilderInput interface {
5152
SetIsTargetPhysicalTable(isPhysical bool)
5253
SetTxnCtrlCtrs(internaldto.TxnControlCounters)
5354
GetTxnCtrlCtrs() (internaldto.TxnControlCounters, bool)
55+
GetTableInsertionContainer() (tableinsertioncontainer.TableInsertionContainer, bool)
56+
SetTableInsertionContainer(tableinsertioncontainer.TableInsertionContainer)
5457
}
5558

5659
type builderInput struct {
57-
graphHolder primitivegraph.PrimitiveGraphHolder
58-
handlerCtx handler.HandlerContext
59-
paramMap map[int]map[string]interface{}
60-
tbl tablemetadata.ExtendedTableMetadata
61-
dependencyNode primitivegraph.PrimitiveNode
62-
commentDirectives sqlparser.CommentDirectives
63-
isAwait bool
64-
verb string
65-
inputAlias string
66-
isUndo bool
67-
node sqlparser.SQLNode
68-
paramMapStream streaming.MapStream
69-
httpPrepStream anysdk.HttpPreparatorStream
70-
op anysdk.OperationStore
71-
prov provider.IProvider
72-
annotatedAst annotatedast.AnnotatedAst
73-
isTargetPhysical bool
74-
txnCtrlCtrs internaldto.TxnControlCounters
60+
graphHolder primitivegraph.PrimitiveGraphHolder
61+
handlerCtx handler.HandlerContext
62+
paramMap map[int]map[string]interface{}
63+
tbl tablemetadata.ExtendedTableMetadata
64+
dependencyNode primitivegraph.PrimitiveNode
65+
commentDirectives sqlparser.CommentDirectives
66+
isAwait bool
67+
verb string
68+
inputAlias string
69+
isUndo bool
70+
node sqlparser.SQLNode
71+
paramMapStream streaming.MapStream
72+
httpPrepStream anysdk.HttpPreparatorStream
73+
op anysdk.OperationStore
74+
prov provider.IProvider
75+
annotatedAst annotatedast.AnnotatedAst
76+
isTargetPhysical bool
77+
txnCtrlCtrs internaldto.TxnControlCounters
78+
tableInsertionContainer tableinsertioncontainer.TableInsertionContainer
7579
}
7680

7781
func NewBuilderInput(
@@ -88,6 +92,14 @@ func NewBuilderInput(
8892
}
8993
}
9094

95+
func (bi *builderInput) GetTableInsertionContainer() (tableinsertioncontainer.TableInsertionContainer, bool) {
96+
return bi.tableInsertionContainer, bi.tableInsertionContainer != nil
97+
}
98+
99+
func (bi *builderInput) SetTableInsertionContainer(ti tableinsertioncontainer.TableInsertionContainer) {
100+
bi.tableInsertionContainer = ti
101+
}
102+
91103
func (bi *builderInput) SetTxnCtrlCtrs(tcc internaldto.TxnControlCounters) {
92104
bi.txnCtrlCtrs = tcc
93105
}

internal/stackql/parserutil/parser_util.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,23 @@ func ExtractSelectColumnNames(selStmt *sqlparser.Select, formatter sqlparser.Nod
8888
return colNames, err
8989
}
9090

91+
func ExtractInsertReturningColumnNames(insertStmt *sqlparser.Insert, formatter sqlparser.NodeFormatter) ([]ColumnHandle, error) {
92+
var colNames []ColumnHandle
93+
var err error
94+
for _, node := range insertStmt.SelectExprs {
95+
switch node := node.(type) {
96+
case *sqlparser.AliasedExpr:
97+
cn, cErr := inferColNameFromExpr(node.Expr, formatter, node.As.GetRawVal())
98+
if cErr != nil {
99+
return nil, cErr
100+
}
101+
colNames = append(colNames, cn)
102+
case *sqlparser.StarExpr:
103+
}
104+
}
105+
return colNames, err
106+
}
107+
91108
func ExtractInsertColumnNames(insertStmt *sqlparser.Insert) ([]string, error) {
92109
var colNames []string
93110
var err error

internal/stackql/planbuilder/plan_builder.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/stackql/any-sdk/anysdk"
1111

1212
"github.com/stackql/any-sdk/pkg/logging"
13-
"github.com/stackql/any-sdk/pkg/streaming"
1413
"github.com/stackql/stackql/internal/stackql/acid/txn_context"
1514
"github.com/stackql/stackql/internal/stackql/astanalysis/routeanalysis"
1615
"github.com/stackql/stackql/internal/stackql/handler"
@@ -926,6 +925,7 @@ func (pgb *standardPlanGraphBuilder) handleInsert(pbi planbuilderinput.PlanBuild
926925
// Two cases:
927926
// 1. Synchronous. Equivalent to select.
928927
// 2. Asynchronous. Whole other story.
928+
// Synchronous only for now...
929929
tableMeta, tableMetaExists := bldrInput.GetTableMetadata()
930930
if !tableMetaExists {
931931
return fmt.Errorf("could not obtain table metadata for node '%s'", node.Action)
@@ -938,13 +938,12 @@ func (pgb *standardPlanGraphBuilder) handleInsert(pbi planbuilderinput.PlanBuild
938938
if rcErr != nil {
939939
return rcErr
940940
}
941-
bldr = primitivebuilder.NewSingleSelectAcquire(
942-
pgb.planGraphHolder,
943-
handlerCtx,
944-
rc,
941+
bldrInput.SetTableInsertionContainer(rc)
942+
bldr = primitivebuilder.NewSingleAcquireAndSelect(
943+
bldrInput,
945944
primitiveGenerator.GetPrimitiveComposer().GetInsertPreparedStatementCtx(),
945+
primitiveGenerator.GetPrimitiveComposer().GetSelectPreparedStatementCtx(),
946946
nil,
947-
streaming.NewNopMapStream(),
948947
)
949948
} else {
950949
bldr = primitivebuilder.NewInsertOrUpdate(

internal/stackql/primitivebuilder/exec.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/stackql/stackql/internal/stackql/drm"
77
"github.com/stackql/stackql/internal/stackql/execution"
88
"github.com/stackql/stackql/internal/stackql/handler"
9+
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/builder_input"
910
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto"
1011
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/primitive_context"
1112
"github.com/stackql/stackql/internal/stackql/primitive"
@@ -92,11 +93,14 @@ func (ss *Exec) Build() error {
9293
return analysisErr
9394
}
9495
methodAnalysisOutput.GetInsertTabulation()
95-
deFactoSelectBuilder := NewSingleAcquireAndSelect(
96+
bldrInput := builder_input.NewBuilderInput(
9697
ss.graph,
97-
ss.tcc,
98-
ss.handlerCtx.Clone(),
99-
nil,
98+
handlerCtx.Clone(),
99+
tbl,
100+
)
101+
bldrInput.SetTxnCtrlCtrs(ss.tcc)
102+
deFactoSelectBuilder := NewSingleAcquireAndSelect(
103+
bldrInput,
100104
nil,
101105
nil,
102106
nil,

internal/stackql/primitivebuilder/single_acquire_and_select.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ package primitivebuilder
33
import (
44
"github.com/stackql/any-sdk/pkg/streaming"
55
"github.com/stackql/stackql/internal/stackql/drm"
6-
"github.com/stackql/stackql/internal/stackql/handler"
7-
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto"
6+
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/builder_input"
87
"github.com/stackql/stackql/internal/stackql/primitivegraph"
98
"github.com/stackql/stackql/internal/stackql/tableinsertioncontainer"
109
)
@@ -13,17 +12,24 @@ type SingleAcquireAndSelect struct {
1312
graph primitivegraph.PrimitiveGraphHolder
1413
acquireBuilder Builder
1514
selectBuilder Builder
15+
bldrInput builder_input.BuilderInput
16+
root primitivegraph.PrimitiveNode
1617
}
1718

1819
func NewSingleAcquireAndSelect(
19-
graph primitivegraph.PrimitiveGraphHolder,
20-
txnControlCounters internaldto.TxnControlCounters, //nolint:revive // future proofing
21-
handlerCtx handler.HandlerContext,
22-
insertContainer tableinsertioncontainer.TableInsertionContainer,
20+
// graph primitivegraph.PrimitiveGraphHolder,
21+
// txnControlCounters internaldto.TxnControlCounters, //nolint:revive // future proofing
22+
// handlerCtx handler.HandlerContext,
23+
// insertContainer tableinsertioncontainer.TableInsertionContainer,
24+
bldrInput builder_input.BuilderInput,
2325
insertCtx drm.PreparedStatementCtx,
2426
selectCtx drm.PreparedStatementCtx,
2527
rowSort func(map[string]map[string]interface{}) []string,
2628
) Builder {
29+
graph, _ := bldrInput.GetGraphHolder()
30+
// txnControlCounters, _ := bldrInput.GetTxnCtrlCtrs()
31+
handlerCtx, _ := bldrInput.GetHandlerContext()
32+
insertContainer, _ := bldrInput.GetTableInsertionContainer()
2733
return &SingleAcquireAndSelect{
2834
graph: graph,
2935
acquireBuilder: NewSingleSelectAcquire(
@@ -38,6 +44,7 @@ func NewSingleAcquireAndSelect(
3844
[]tableinsertioncontainer.TableInsertionContainer{insertContainer},
3945
rowSort,
4046
streaming.NewNopMapStream()),
47+
bldrInput: bldrInput,
4148
}
4249
}
4350

@@ -60,5 +67,14 @@ func (ss *SingleAcquireAndSelect) Build() error {
6067
}
6168
graph := ss.graph
6269
graph.NewDependency(ss.acquireBuilder.GetTail(), ss.selectBuilder.GetRoot(), 1.0)
70+
rootNode := ss.acquireBuilder.GetRoot()
71+
ss.root = rootNode
72+
dependencyNode, dependencyNodeExists := ss.bldrInput.GetDependencyNode()
73+
if dependencyNodeExists {
74+
//nolint:errcheck // TODO: fix this
75+
rootNode.SetInputAlias("", dependencyNode.ID())
76+
ss.graph.NewDependency(dependencyNode, rootNode, 1.0)
77+
// ss.root = dependencyNode // dont think this is needed
78+
}
6379
return nil
6480
}

internal/stackql/primitivegenerator/select.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/stackql/stackql/internal/stackql/astindirect"
1111
"github.com/stackql/stackql/internal/stackql/astvisit"
1212
"github.com/stackql/stackql/internal/stackql/dependencyplanner"
13+
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/builder_input"
1314
"github.com/stackql/stackql/internal/stackql/parserutil"
1415
"github.com/stackql/stackql/internal/stackql/planbuilderinput"
1516
"github.com/stackql/stackql/internal/stackql/primitivebuilder"
@@ -251,12 +252,16 @@ func (pb *standardPrimitiveGenerator) analyzeSelect(pbi planbuilderinput.PlanBui
251252
return indirectErr
252253
}
253254
annotatedAST.SetSelectIndirect(node, selIndirect)
255+
bldrInput := builder_input.NewBuilderInput(
256+
pChild.GetPrimitiveComposer().GetGraphHolder(),
257+
handlerCtx,
258+
tbl,
259+
)
260+
bldrInput.SetTxnCtrlCtrs(pChild.GetPrimitiveComposer().GetTxnCtrlCtrs())
261+
bldrInput.SetTableInsertionContainer(insertionContainer)
254262
pChild.GetPrimitiveComposer().SetBuilder(
255263
primitivebuilder.NewSingleAcquireAndSelect(
256-
pChild.GetPrimitiveComposer().GetGraphHolder(),
257-
pChild.GetPrimitiveComposer().GetTxnCtrlCtrs(),
258-
handlerCtx,
259-
insertionContainer,
264+
bldrInput,
260265
pChild.GetPrimitiveComposer().GetInsertPreparedStatementCtx(),
261266
pChild.GetPrimitiveComposer().GetSelectPreparedStatementCtx(),
262267
nil))

internal/stackql/primitivegenerator/statement_analyzer.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -704,14 +704,20 @@ func (pb *standardPrimitiveGenerator) analyzeExec(pbi planbuilderinput.PlanBuild
704704
return indirectErr
705705
}
706706
annotatedAST.SetExecIndirect(node, selIndirect)
707+
bldrInput := builder_input.NewBuilderInput(
708+
pb.PrimitiveComposer.GetGraphHolder(),
709+
handlerCtx,
710+
tbl,
711+
)
712+
bldrInput.SetTxnCtrlCtrs(pb.PrimitiveComposer.GetTxnCtrlCtrs())
713+
bldrInput.SetTableInsertionContainer(insertionContainer)
707714
pb.PrimitiveComposer.SetBuilder(
708715
primitivebuilder.NewSingleAcquireAndSelect(
709-
pb.PrimitiveComposer.GetGraphHolder(),
710-
pb.PrimitiveComposer.GetTxnCtrlCtrs(),
711-
handlerCtx,
712-
insertionContainer,
716+
bldrInput,
713717
pb.PrimitiveComposer.GetInsertPreparedStatementCtx(),
714-
pb.PrimitiveComposer.GetSelectPreparedStatementCtx(), nil))
718+
pb.PrimitiveComposer.GetSelectPreparedStatementCtx(),
719+
nil,
720+
))
715721
return nil
716722
}
717723

@@ -1127,13 +1133,20 @@ func (pb *standardPrimitiveGenerator) AnalyzeInsert(pbi planbuilderinput.PlanBui
11271133
if err != nil {
11281134
return err
11291135
}
1136+
columnHandles := []parserutil.ColumnHandle{}
1137+
if len(node.SelectExprs) > 0 {
1138+
columnHandles, err = parserutil.ExtractInsertReturningColumnNames(node, handlerCtx.GetASTFormatter())
1139+
if err != nil {
1140+
return err
1141+
}
1142+
}
11301143
err = pb.analyzeUnaryAction(
11311144
pbi,
11321145
handlerCtx,
11331146
node,
11341147
nil,
11351148
tbl,
1136-
[]parserutil.ColumnHandle{},
1149+
columnHandles,
11371150
methodAnalysisOutput,
11381151
)
11391152
if err != nil {

0 commit comments

Comments
 (0)