Skip to content

Commit a575233

Browse files
insert-returning-asynchronous
Summary: - Support for asynchronous `INSERT RETURNINNG` semantics. - Update docs. - Flask app uplift. - Added robot test `Insert Async Returning Simple Projection`. - Async progress messages now routed to `stderr`, prior case was `stdout`.
1 parent 18933ee commit a575233

File tree

29 files changed

+399
-43
lines changed

29 files changed

+399
-43
lines changed

.github/workflows/build.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1461,6 +1461,8 @@ jobs:
14611461

14621462

14631463
dockertest:
1464+
env:
1465+
IS_DOCKER: 'true'
14641466
name: Docker Test
14651467
needs:
14661468
- dockerbuild

.vscode/launch.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@
176176
"select rings.projectsId as project, rings.locationsId as locale, split_part(rings.name, '/', -1) as key_ring_name, split_part(keys.name, '/', -1) as key_name, json_extract(keys.\"versionTemplate\", '$.algorithm') as key_algorithm, json_extract(keys.\"versionTemplate\", '$.protectionLevel') as key_protection_level from google.cloudkms.key_rings rings inner join google.cloudkms.crypto_keys keys on keys.keyRingsId = split_part(rings.name, '/', -1) and keys.projectsId = rings.projectsId and keys.locationsId = rings.locationsId where rings.projectsId in ('testing-project', 'testing-project-two', 'testing-project-three') and rings.locationsId in ('global', 'australia-southeast1', 'australia-southeast2') order by project, locale, key_name ;",
177177
"delete from aws.cloud_control.resources where region = 'ap-southeast-1' and data__TypeName = 'AWS::Logs::LogGroup' and data__Identifier = 'LogGroupResourceExampleThird' ;",
178178
"insert into google.storage.buckets( project, data__name) select 'testing-project', 'silly-bucket' returning projectNumber;",
179+
"insert /*+ AWAIT */ into google.compute.networks(project, data__name, data__autoCreateSubnetworks) select 'mutable-project', 'auto-test-01', false returning creationTimestamp, name;",
179180
],
180181
"default": "show providers;"
181182
},

docs/developer_guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ https://docs.aws.amazon.com/sdk-for-go/api/aws/signer/v4/
274274
`INSERT RETURNING` can function in two mechanisms:
275275

276276
- Synchronous responses, such as [`google.storage.buckets`](https://cloud.google.com/storage/docs/json_api/v1/buckets/insert). The returning clause is a projection on the immediately available reponse body.
277-
- Asynchronous responses, such as [`google.compute.instances`](https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert). The returning clause is a projection on the reponse body **after** the await flow has concluded.
277+
- Asynchronous responses, such as [`google.compute.instances`](https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert) and [`google.compute.networks`](https://cloud.google.com/compute/docs/reference/rest/v1/networks/insert). The returning clause is a projection on the reponse body **after** the await flow has concluded.
278278

279279
Future use cases for `UPDATE RETURNING`, `REPLACE RETURNING` and `DELETE RETURNING` will function the same observable fashion.
280280

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ require (
1919
github.com/spf13/cobra v1.4.0
2020
github.com/spf13/pflag v1.0.5
2121
github.com/spf13/viper v1.10.1
22-
github.com/stackql/any-sdk v0.1.3-beta02
22+
github.com/stackql/any-sdk v0.1.4-alpha06
2323
github.com/stackql/go-suffix-map v0.0.1-alpha01
2424
github.com/stackql/psql-wire v0.1.1-beta23
2525
github.com/stackql/stackql-parser v0.0.15-alpha06

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -484,8 +484,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
484484
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
485485
github.com/spf13/viper v1.10.1 h1:nuJZuYpG7gTj/XqiUwg8bA0cp1+M2mC3J4g5luUYBKk=
486486
github.com/spf13/viper v1.10.1/go.mod h1:IGlFPqhNAPKRxohIzWpI5QEy4kuI7tcl5WvR+8qy1rU=
487-
github.com/stackql/any-sdk v0.1.3-beta02 h1:0jSwyYFddjAN++U+yNNLF7SMLPIIaRhv522UzeWDf2E=
488-
github.com/stackql/any-sdk v0.1.3-beta02/go.mod h1:AKS/g28y7m4SWL/YW8veE9MCNy8XJgaicVibemVE9e8=
487+
github.com/stackql/any-sdk v0.1.4-alpha06 h1:QJPf3ehPrRqmYZR+TmD897AsmsaOamHErLhaE5B9v/w=
488+
github.com/stackql/any-sdk v0.1.4-alpha06/go.mod h1:AKS/g28y7m4SWL/YW8veE9MCNy8XJgaicVibemVE9e8=
489489
github.com/stackql/go-suffix-map v0.0.1-alpha01 h1:TDUDS8bySu41Oo9p0eniUeCm43mnRM6zFEd6j6VUaz8=
490490
github.com/stackql/go-suffix-map v0.0.1-alpha01/go.mod h1:QAi+SKukOyf4dBtWy8UMy+hsXXV+yyEE4vmBkji2V7g=
491491
github.com/stackql/psql-wire v0.1.1-beta23 h1:1ayYMjZArfDcIMyEOKnm+Bp1zRCISw8pguvTFuUhhVQ=

internal/stackql/primitivebuilder/async_compose.go renamed to internal/stackql/asynccompose/async_compose.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,26 @@
1-
package primitivebuilder
1+
package asynccompose
22

33
import (
44
"github.com/stackql/any-sdk/anysdk"
55
"github.com/stackql/stackql-parser/go/vt/sqlparser"
6+
"github.com/stackql/stackql/internal/stackql/drm"
67
"github.com/stackql/stackql/internal/stackql/handler"
78
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto"
89
"github.com/stackql/stackql/internal/stackql/primitive"
910
"github.com/stackql/stackql/internal/stackql/provider"
1011
)
1112

12-
func composeAsyncMonitor(
13+
func ComposeAsyncMonitor(
1314
handlerCtx handler.HandlerContext,
1415
precursor primitive.IPrimitive,
1516
prov provider.IProvider,
1617
method anysdk.OperationStore,
1718
commentDirectives sqlparser.CommentDirectives,
19+
isReturning bool,
20+
insertCtx drm.PreparedStatementCtx,
21+
drmCfg drm.Config,
1822
) (primitive.IPrimitive, error) {
19-
asm, err := NewAsyncMonitor(handlerCtx, prov, method)
23+
asm, err := NewAsyncMonitor(handlerCtx, prov, method, isReturning)
2024
if err != nil {
2125
return nil, err
2226
}
@@ -31,7 +35,8 @@ func composeAsyncMonitor(
3135
handlerCtx.GetOutfile(),
3236
handlerCtx.GetOutErrFile(),
3337
)
34-
primitive, err := asm.GetMonitorPrimitive(prov, method, precursor, pl, commentDirectives)
38+
primitive, err := asm.GetMonitorPrimitive(
39+
prov, method, precursor, pl, commentDirectives, isReturning, insertCtx, drmCfg)
3540
if err != nil {
3641
return nil, err
3742
}

internal/stackql/primitivebuilder/asyncmonitor.go renamed to internal/stackql/asynccompose/asyncmonitor.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
package primitivebuilder
1+
package asynccompose
22

33
import (
44
"fmt"
55
"strings"
66

77
"github.com/stackql/any-sdk/anysdk"
88
"github.com/stackql/stackql/internal/stackql/acid/binlog"
9+
"github.com/stackql/stackql/internal/stackql/drm"
910
"github.com/stackql/stackql/internal/stackql/execution"
1011
"github.com/stackql/stackql/internal/stackql/handler"
1112
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto"
@@ -22,6 +23,9 @@ type IAsyncMonitor interface {
2223
precursor primitive.IPrimitive,
2324
initialCtx primitive.IPrimitiveCtx,
2425
comments sqlparser.CommentDirectives,
26+
isReturning bool,
27+
insertCtx drm.PreparedStatementCtx,
28+
drmCfg drm.Config,
2529
) (primitive.IPrimitive, error)
2630
}
2731

@@ -120,11 +124,12 @@ func NewAsyncMonitor(
120124
handlerCtx handler.HandlerContext,
121125
prov provider.IProvider,
122126
op anysdk.OperationStore,
127+
isReturning bool,
123128
) (IAsyncMonitor, error) {
124129
//nolint:gocritic //TODO: refactor
125130
switch prov.GetProviderString() {
126131
case "google":
127-
return newGoogleAsyncMonitor(handlerCtx, prov, op, prov.GetVersion())
132+
return newGoogleAsyncMonitor(handlerCtx, prov, op, prov.GetVersion(), isReturning)
128133
}
129134
return nil, fmt.Errorf(
130135
"async operation monitor for provider = '%s', api version = '%s' currently not supported",
@@ -136,6 +141,7 @@ func newGoogleAsyncMonitor(
136141
prov provider.IProvider,
137142
op anysdk.OperationStore,
138143
version string, //nolint:unparam // TODO: refactor
144+
isReturning bool, //nolint:unparam,revive // TODO: refactor
139145
) (IAsyncMonitor, error) {
140146
//nolint:gocritic //TODO: refactor
141147
switch version {
@@ -160,11 +166,14 @@ func (gm *DefaultGoogleAsyncMonitor) GetMonitorPrimitive(
160166
precursor primitive.IPrimitive,
161167
initialCtx primitive.IPrimitiveCtx,
162168
comments sqlparser.CommentDirectives,
169+
isReturning bool,
170+
insertCtx drm.PreparedStatementCtx,
171+
drmCfg drm.Config,
163172
) (primitive.IPrimitive, error) {
164173
//nolint:gocritic,staticcheck //TODO: refactor
165174
switch strings.ToLower(prov.GetVersion()) {
166175
default:
167-
return gm.getV1Monitor(prov, op, precursor, initialCtx, comments)
176+
return gm.getV1Monitor(prov, op, precursor, initialCtx, comments, isReturning, insertCtx, drmCfg)
168177
}
169178
}
170179

@@ -174,6 +183,9 @@ func (gm *DefaultGoogleAsyncMonitor) getV1Monitor(
174183
precursor primitive.IPrimitive,
175184
initialCtx primitive.IPrimitiveCtx,
176185
comments sqlparser.CommentDirectives,
186+
isReturning bool,
187+
insertCtx drm.PreparedStatementCtx,
188+
drmCfg drm.Config,
177189
) (primitive.IPrimitive, error) {
178190
provider, providerErr := prov.GetProvider()
179191
if providerErr != nil {
@@ -186,6 +198,9 @@ func (gm *DefaultGoogleAsyncMonitor) getV1Monitor(
186198
precursor,
187199
initialCtx,
188200
comments,
201+
isReturning,
202+
insertCtx,
203+
drmCfg,
189204
)
190205
if exPrepErr != nil {
191206
return nil, exPrepErr

internal/stackql/dependencyplanner/dependencyplanner.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,7 @@ func (dp *standardDependencyPlanner) orchestrate(
485485
insPsc,
486486
nil,
487487
outStream,
488+
false, // returning hardcoded to false for now
488489
)
489490
}
490491
dp.execSlice = append(dp.execSlice, builder)

internal/stackql/driver/dependent_simple_integration_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestSimpleInsertDependentGoogleComputeDiskAsync(t *testing.T) {
3838
if err != nil {
3939
t.Fatalf("Test failed: %v", err)
4040
}
41-
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, rdr, lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
41+
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, rdr, lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdErr(outFile), true)
4242
if err != nil {
4343
t.Fatalf("Test failed: %v", err)
4444
}
@@ -76,7 +76,7 @@ func TestSimpleInsertDependentGoogleComputeDiskAsyncReversed(t *testing.T) {
7676
if err != nil {
7777
t.Fatalf("Test failed: %v", err)
7878
}
79-
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, rdr, lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
79+
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, rdr, lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdErr(outFile), true)
8080
if err != nil {
8181
t.Fatalf("Test failed: %v", err)
8282
}

internal/stackql/driver/driver_integration_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func TestSimpleInsertGoogleComputeNetworkAsync(t *testing.T) {
180180
}
181181

182182
testSubject := func(t *testing.T, outFile *bufio.Writer) {
183-
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
183+
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdErr(outFile), true)
184184
if err != nil {
185185
t.Fatalf("Test failed: %v", err)
186186
}
@@ -226,7 +226,7 @@ func TestK8sTheHardWayAsync(t *testing.T) {
226226
runtimeCtx.InfilePath = k8sthwRenderedFile
227227
runtimeCtx.CSVHeadersDisable = true
228228

229-
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
229+
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdErr(outFile), true)
230230
if err != nil {
231231
t.Fatalf("Test failed: %v", err)
232232
}

0 commit comments

Comments
 (0)