Skip to content

Commit 320998e

Browse files
- Added robot test Insert Async Returning Simple Projection.
1 parent 61f29f8 commit 320998e

File tree

20 files changed

+173
-50
lines changed

20 files changed

+173
-50
lines changed

.vscode/launch.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@
176176
"select rings.projectsId as project, rings.locationsId as locale, split_part(rings.name, '/', -1) as key_ring_name, split_part(keys.name, '/', -1) as key_name, json_extract(keys.\"versionTemplate\", '$.algorithm') as key_algorithm, json_extract(keys.\"versionTemplate\", '$.protectionLevel') as key_protection_level from google.cloudkms.key_rings rings inner join google.cloudkms.crypto_keys keys on keys.keyRingsId = split_part(rings.name, '/', -1) and keys.projectsId = rings.projectsId and keys.locationsId = rings.locationsId where rings.projectsId in ('testing-project', 'testing-project-two', 'testing-project-three') and rings.locationsId in ('global', 'australia-southeast1', 'australia-southeast2') order by project, locale, key_name ;",
177177
"delete from aws.cloud_control.resources where region = 'ap-southeast-1' and data__TypeName = 'AWS::Logs::LogGroup' and data__Identifier = 'LogGroupResourceExampleThird' ;",
178178
"insert into google.storage.buckets( project, data__name) select 'testing-project', 'silly-bucket' returning projectNumber;",
179+
"insert /*+ AWAIT */ into google.compute.networks(project, data__name, data__autoCreateSubnetworks) select 'mutable-project', 'auto-test-01', false returning creationTimestamp, name;",
179180
],
180181
"default": "show providers;"
181182
},

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ require (
1919
github.com/spf13/cobra v1.4.0
2020
github.com/spf13/pflag v1.0.5
2121
github.com/spf13/viper v1.10.1
22-
github.com/stackql/any-sdk v0.1.3-beta02
22+
github.com/stackql/any-sdk v0.1.4-alpha05
2323
github.com/stackql/go-suffix-map v0.0.1-alpha01
2424
github.com/stackql/psql-wire v0.1.1-beta23
2525
github.com/stackql/stackql-parser v0.0.15-alpha06

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -484,8 +484,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
484484
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
485485
github.com/spf13/viper v1.10.1 h1:nuJZuYpG7gTj/XqiUwg8bA0cp1+M2mC3J4g5luUYBKk=
486486
github.com/spf13/viper v1.10.1/go.mod h1:IGlFPqhNAPKRxohIzWpI5QEy4kuI7tcl5WvR+8qy1rU=
487-
github.com/stackql/any-sdk v0.1.3-beta02 h1:0jSwyYFddjAN++U+yNNLF7SMLPIIaRhv522UzeWDf2E=
488-
github.com/stackql/any-sdk v0.1.3-beta02/go.mod h1:AKS/g28y7m4SWL/YW8veE9MCNy8XJgaicVibemVE9e8=
487+
github.com/stackql/any-sdk v0.1.4-alpha05 h1:QWMTCU4OHJdsRI3zT4pmM8L+Opgf1RUmg2MPXDjAtzQ=
488+
github.com/stackql/any-sdk v0.1.4-alpha05/go.mod h1:AKS/g28y7m4SWL/YW8veE9MCNy8XJgaicVibemVE9e8=
489489
github.com/stackql/go-suffix-map v0.0.1-alpha01 h1:TDUDS8bySu41Oo9p0eniUeCm43mnRM6zFEd6j6VUaz8=
490490
github.com/stackql/go-suffix-map v0.0.1-alpha01/go.mod h1:QAi+SKukOyf4dBtWy8UMy+hsXXV+yyEE4vmBkji2V7g=
491491
github.com/stackql/psql-wire v0.1.1-beta23 h1:1ayYMjZArfDcIMyEOKnm+Bp1zRCISw8pguvTFuUhhVQ=

internal/stackql/primitivebuilder/async_compose.go renamed to internal/stackql/asynccompose/async_compose.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,24 @@
1-
package primitivebuilder
1+
package asynccompose
22

33
import (
44
"github.com/stackql/any-sdk/anysdk"
55
"github.com/stackql/stackql-parser/go/vt/sqlparser"
6+
"github.com/stackql/stackql/internal/stackql/drm"
67
"github.com/stackql/stackql/internal/stackql/handler"
78
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto"
89
"github.com/stackql/stackql/internal/stackql/primitive"
910
"github.com/stackql/stackql/internal/stackql/provider"
1011
)
1112

12-
func composeAsyncMonitor(
13+
func ComposeAsyncMonitor(
1314
handlerCtx handler.HandlerContext,
1415
precursor primitive.IPrimitive,
1516
prov provider.IProvider,
1617
method anysdk.OperationStore,
1718
commentDirectives sqlparser.CommentDirectives,
1819
isReturning bool,
20+
insertCtx drm.PreparedStatementCtx,
21+
drmCfg drm.Config,
1922
) (primitive.IPrimitive, error) {
2023
asm, err := NewAsyncMonitor(handlerCtx, prov, method, isReturning)
2124
if err != nil {
@@ -32,7 +35,8 @@ func composeAsyncMonitor(
3235
handlerCtx.GetOutfile(),
3336
handlerCtx.GetOutErrFile(),
3437
)
35-
primitive, err := asm.GetMonitorPrimitive(prov, method, precursor, pl, commentDirectives, isReturning)
38+
primitive, err := asm.GetMonitorPrimitive(
39+
prov, method, precursor, pl, commentDirectives, isReturning, insertCtx, drmCfg)
3640
if err != nil {
3741
return nil, err
3842
}

internal/stackql/primitivebuilder/asyncmonitor.go renamed to internal/stackql/asynccompose/asyncmonitor.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
package primitivebuilder
1+
package asynccompose
22

33
import (
44
"fmt"
55
"strings"
66

77
"github.com/stackql/any-sdk/anysdk"
88
"github.com/stackql/stackql/internal/stackql/acid/binlog"
9+
"github.com/stackql/stackql/internal/stackql/drm"
910
"github.com/stackql/stackql/internal/stackql/execution"
1011
"github.com/stackql/stackql/internal/stackql/handler"
1112
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto"
@@ -23,6 +24,8 @@ type IAsyncMonitor interface {
2324
initialCtx primitive.IPrimitiveCtx,
2425
comments sqlparser.CommentDirectives,
2526
isReturning bool,
27+
insertCtx drm.PreparedStatementCtx,
28+
drmCfg drm.Config,
2629
) (primitive.IPrimitive, error)
2730
}
2831

@@ -164,11 +167,13 @@ func (gm *DefaultGoogleAsyncMonitor) GetMonitorPrimitive(
164167
initialCtx primitive.IPrimitiveCtx,
165168
comments sqlparser.CommentDirectives,
166169
isReturning bool,
170+
insertCtx drm.PreparedStatementCtx,
171+
drmCfg drm.Config,
167172
) (primitive.IPrimitive, error) {
168173
//nolint:gocritic,staticcheck //TODO: refactor
169174
switch strings.ToLower(prov.GetVersion()) {
170175
default:
171-
return gm.getV1Monitor(prov, op, precursor, initialCtx, comments, isReturning)
176+
return gm.getV1Monitor(prov, op, precursor, initialCtx, comments, isReturning, insertCtx, drmCfg)
172177
}
173178
}
174179

@@ -179,6 +184,8 @@ func (gm *DefaultGoogleAsyncMonitor) getV1Monitor(
179184
initialCtx primitive.IPrimitiveCtx,
180185
comments sqlparser.CommentDirectives,
181186
isReturning bool,
187+
insertCtx drm.PreparedStatementCtx,
188+
drmCfg drm.Config,
182189
) (primitive.IPrimitive, error) {
183190
provider, providerErr := prov.GetProvider()
184191
if providerErr != nil {
@@ -192,6 +199,8 @@ func (gm *DefaultGoogleAsyncMonitor) getV1Monitor(
192199
initialCtx,
193200
comments,
194201
isReturning,
202+
insertCtx,
203+
drmCfg,
195204
)
196205
if exPrepErr != nil {
197206
return nil, exPrepErr

internal/stackql/dependencyplanner/dependencyplanner.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,7 @@ func (dp *standardDependencyPlanner) orchestrate(
485485
insPsc,
486486
nil,
487487
outStream,
488+
false,
488489
)
489490
}
490491
dp.execSlice = append(dp.execSlice, builder)

internal/stackql/execution/mono_valent_execution.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,6 +1130,7 @@ func (sp *standardProcessor) Process() ProcessorResponse {
11301130
if httpResponseErr != nil {
11311131
return newHTTPProcessorResponse(nil, reversalStream, false, httpResponseErr)
11321132
}
1133+
// TODO: add async monitor here
11331134
processed, resErr := method.ProcessResponse(httpResponse)
11341135
if resErr != nil {
11351136
if isSkipResponse && isMutation && httpResponse.StatusCode < 300 {
@@ -1479,6 +1480,8 @@ func GetMonitorExecutor(
14791480
initialCtx primitive.IPrimitiveCtx,
14801481
comments sqlparser.CommentDirectives,
14811482
isReturning bool,
1483+
insertCtx drm.PreparedStatementCtx,
1484+
drmCfg drm.Config,
14821485
) (primitive.IPrimitive, error) {
14831486
m := op
14841487
// tableName, err := mv.tableMeta.GetTableName()
@@ -1498,6 +1501,8 @@ func GetMonitorExecutor(
14981501
elapsedSeconds: 0,
14991502
pollIntervalSeconds: MonitorPollIntervalSeconds,
15001503
comments: comments,
1504+
insertCtx: insertCtx,
1505+
drmCfg: drmCfg,
15011506
}
15021507
if comments != nil {
15031508
asyncPrim.noStatus = comments.IsSet("NOSTATUS")
@@ -1567,6 +1572,29 @@ func GetMonitorExecutor(
15671572
if targetErr != nil {
15681573
return internaldto.NewExecutorOutput(nil, nil, nil, nil, targetErr)
15691574
}
1575+
// TODO: insert into table here
1576+
if isReturning {
1577+
if asyncPrim.insertCtx != nil {
1578+
_, rErr := asyncPrim.drmCfg.ExecuteInsertDML(
1579+
handlerCtx.GetSQLEngine(),
1580+
asyncPrim.insertCtx,
1581+
target,
1582+
"", // TODO: figure out how on earth to compute this encoding
1583+
)
1584+
if rErr != nil {
1585+
return internaldto.NewExecutorOutput(nil, nil, nil, nil, rErr)
1586+
}
1587+
}
1588+
if asyncPrim.selectCtx != nil {
1589+
_, rErr := asyncPrim.drmCfg.QueryDML(
1590+
handlerCtx.GetSQLEngine(),
1591+
drm.NewPreparedStatementParameterized(asyncPrim.selectCtx, nil, true),
1592+
)
1593+
if rErr != nil {
1594+
return internaldto.NewExecutorOutput(nil, nil, nil, nil, rErr)
1595+
}
1596+
}
1597+
}
15701598
return prepareResultSet(&asyncPrim, pc, target, operationDescriptor)
15711599
}
15721600
return prepareResultSet(&asyncPrim, pc, body, operationDescriptor)
@@ -1718,6 +1746,9 @@ type asyncHTTPMonitorPrimitive struct {
17181746
noStatus bool
17191747
id int64
17201748
comments sqlparser.CommentDirectives
1749+
insertCtx drm.PreparedStatementCtx
1750+
selectCtx drm.PreparedStatementCtx
1751+
drmCfg drm.Config
17211752
}
17221753

17231754
func (pr *asyncHTTPMonitorPrimitive) SetTxnID(_ int) {

internal/stackql/internal_data_transfer/builder_input/builder_input.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"github.com/stackql/any-sdk/pkg/streaming"
66
"github.com/stackql/stackql-parser/go/vt/sqlparser"
77
"github.com/stackql/stackql/internal/stackql/astanalysis/annotatedast"
8+
"github.com/stackql/stackql/internal/stackql/drm"
89
"github.com/stackql/stackql/internal/stackql/handler"
910
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto"
1011
"github.com/stackql/stackql/internal/stackql/primitivegraph"
@@ -56,6 +57,8 @@ type BuilderInput interface {
5657
GetTxnCtrlCtrs() (internaldto.TxnControlCounters, bool)
5758
GetTableInsertionContainer() (tableinsertioncontainer.TableInsertionContainer, bool)
5859
SetTableInsertionContainer(tableinsertioncontainer.TableInsertionContainer)
60+
SetInsertCtx(insertCtx drm.PreparedStatementCtx)
61+
GetInsertCtx() (drm.PreparedStatementCtx, bool)
5962
}
6063

6164
type builderInput struct {
@@ -79,6 +82,7 @@ type builderInput struct {
7982
isTargetPhysical bool
8083
txnCtrlCtrs internaldto.TxnControlCounters
8184
tableInsertionContainer tableinsertioncontainer.TableInsertionContainer
85+
insertCtx drm.PreparedStatementCtx
8286
}
8387

8488
func NewBuilderInput(
@@ -95,6 +99,17 @@ func NewBuilderInput(
9599
}
96100
}
97101

102+
func (bi *builderInput) SetInsertCtx(insertCtx drm.PreparedStatementCtx) {
103+
bi.insertCtx = insertCtx
104+
}
105+
106+
func (bi *builderInput) GetInsertCtx() (drm.PreparedStatementCtx, bool) {
107+
if bi.insertCtx == nil {
108+
return nil, false
109+
}
110+
return bi.insertCtx, true
111+
}
112+
98113
func (bi *builderInput) IsReturning() bool {
99114
return bi.isReturning
100115
}
@@ -271,5 +286,7 @@ func (bi *builderInput) Clone() BuilderInput {
271286
isTargetPhysical: bi.isTargetPhysical,
272287
annotatedAst: bi.annotatedAst,
273288
txnCtrlCtrs: bi.txnCtrlCtrs,
289+
isReturning: bi.isReturning,
290+
insertCtx: bi.insertCtx,
274291
}
275292
}

internal/stackql/planbuilder/plan_builder.go

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/stackql/any-sdk/anysdk"
1111

1212
"github.com/stackql/any-sdk/pkg/logging"
13+
"github.com/stackql/any-sdk/pkg/streaming"
1314
"github.com/stackql/stackql/internal/stackql/acid/txn_context"
1415
"github.com/stackql/stackql/internal/stackql/astanalysis/routeanalysis"
1516
"github.com/stackql/stackql/internal/stackql/handler"
@@ -921,10 +922,7 @@ func (pgb *standardPlanGraphBuilder) handleInsert(pbi planbuilderinput.PlanBuild
921922
if isPhysicalTable {
922923
bldrInput.SetIsTargetPhysicalTable(true)
923924
}
924-
//nolint:stylecheck // not in the mood
925-
var bldr primitivebuilder.Builder = primitivebuilder.NewInsertOrUpdate(
926-
bldrInput,
927-
)
925+
var bldr primitivebuilder.Builder
928926
if len(node.SelectExprs) > 0 {
929927
// Two cases:
930928
// 1. Synchronous. Equivalent to select.
@@ -943,25 +941,57 @@ func (pgb *standardPlanGraphBuilder) handleInsert(pbi planbuilderinput.PlanBuild
943941
return rcErr
944942
}
945943
bldrInput.SetTableInsertionContainer(rc)
946-
//nolint:stylecheck // not in the mood
947-
var returningBldr primitivebuilder.Builder = primitivebuilder.NewSingleAcquireAndSelect(
948-
bldrInput,
949-
primitiveGenerator.GetPrimitiveComposer().GetInsertPreparedStatementCtx(),
950-
primitiveGenerator.GetPrimitiveComposer().GetSelectPreparedStatementCtx(),
951-
nil,
952-
)
944+
bldrInput.SetIsReturning(true)
953945
if !isAwait {
954-
bldr = returningBldr
955-
} else {
956-
rhsBldr := primitivebuilder.NewSingleAcquireAndSelect(
946+
bldr = primitivebuilder.NewSingleAcquireAndSelect(
957947
bldrInput,
958948
primitiveGenerator.GetPrimitiveComposer().GetInsertPreparedStatementCtx(),
959949
primitiveGenerator.GetPrimitiveComposer().GetSelectPreparedStatementCtx(),
960950
nil,
961951
)
952+
} else {
953+
// bldr = returningBldr
954+
// TODO: stuff the async output into an appropriate table
955+
// rhsBldr := primitivebuilder.NewSingleSelect(
956+
// pgb.planGraphHolder,
957+
// handlerCtx,
958+
// primitiveGenerator.GetPrimitiveComposer().GetSelectPreparedStatementCtx(),
959+
// []tableinsertioncontainer.TableInsertionContainer{rc},
960+
// nil,
961+
// streaming.NewNopMapStream(),
962+
// )
963+
bldrInput.SetIsAwait(true)
964+
bldrInput.SetIsReturning(true)
965+
bldrInput.SetInsertCtx(primitiveGenerator.GetPrimitiveComposer().GetInsertPreparedStatementCtx())
966+
lhsBldr := primitivebuilder.NewInsertOrUpdate(
967+
bldrInput,
968+
)
969+
newBldrInput := builder_input.NewBuilderInput(
970+
pgb.planGraphHolder,
971+
handlerCtx,
972+
tbl,
973+
)
974+
newBldrInput.SetParserNode(node)
975+
newBldrInput.SetAnnotatedAST(pbi.GetAnnotatedAST())
976+
newBldrInput.SetTxnCtrlCtrs(pbi.GetTxnCtrlCtrs())
977+
newBldrInput.SetTableInsertionContainer(rc)
978+
newBldrInput.SetDependencyNode(selectPrimitiveNode)
979+
newBldrInput.SetIsAwait(isAwait)
980+
// rhsBldr := primitivebuilder.NewSingleAcquireAndSelect(
981+
// newBldrInput,
982+
// primitiveGenerator.GetPrimitiveComposer().GetInsertPreparedStatementCtx(),
983+
// primitiveGenerator.GetPrimitiveComposer().GetSelectPreparedStatementCtx(),
984+
// nil,
985+
// )
986+
rhsBldr := primitivebuilder.NewSingleSelect(
987+
pgb.planGraphHolder, handlerCtx, primitiveGenerator.GetPrimitiveComposer().GetSelectPreparedStatementCtx(),
988+
[]tableinsertioncontainer.TableInsertionContainer{rc},
989+
nil,
990+
streaming.NewNopMapStream(),
991+
)
962992
bldr = primitivebuilder.NewDependencySubDAGBuilder(
963993
pgb.planGraphHolder,
964-
[]primitivebuilder.Builder{bldr},
994+
[]primitivebuilder.Builder{lhsBldr},
965995
rhsBldr,
966996
)
967997
}

internal/stackql/primitivebuilder/delete.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package primitivebuilder
33
import (
44
"github.com/stackql/any-sdk/pkg/streaming"
55
"github.com/stackql/stackql-parser/go/vt/sqlparser"
6+
"github.com/stackql/stackql/internal/stackql/asynccompose"
67
"github.com/stackql/stackql/internal/stackql/drm"
78
"github.com/stackql/stackql/internal/stackql/execution"
89
"github.com/stackql/stackql/internal/stackql/handler"
@@ -93,8 +94,8 @@ func (ss *Delete) Build() error {
9394
primitive_context.NewPrimitiveContext(),
9495
)
9596
if ss.isAwait {
96-
deletePrimitive, err = composeAsyncMonitor(
97-
handlerCtx, deletePrimitive, prov, method, nil, false) // isReturning hardcoded to false for now
97+
deletePrimitive, err = asynccompose.ComposeAsyncMonitor(
98+
handlerCtx, deletePrimitive, prov, method, nil, false, nil, nil) // isReturning hardcoded to false for now
9899
}
99100
if err != nil {
100101
return err

0 commit comments

Comments
 (0)