Skip to content

Commit bc38021

Browse files
- Tests pass locally.
1 parent 4023176 commit bc38021

17 files changed

+99
-232
lines changed

internal/stackql/bundle/bundle.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package bundle
22

33
import (
4+
"io"
5+
46
"github.com/stackql/any-sdk/pkg/dto"
57
"github.com/stackql/stackql-parser/go/vt/sqlparser"
68
"github.com/stackql/stackql/internal/stackql/acid/txn_context"
@@ -30,6 +32,10 @@ type Bundle interface {
3032
GetTxnCoordinatorContext() txn_context.ITransactionCoordinatorContext
3133
GetTypingConfig() typing.Config
3234
GetSessionContext() dto.SessionContext
35+
GetStdOut() (io.Writer, bool)
36+
GetStdErr() (io.Writer, bool)
37+
WithStdOut(io.Writer) Bundle
38+
WithStdErr(io.Writer) Bundle
3339
}
3440

3541
func NewBundle(
@@ -80,6 +86,26 @@ type simpleBundle struct {
8086
authContexts map[string]*dto.AuthCtx
8187
txnCoordintatorContext txn_context.ITransactionCoordinatorContext
8288
sessionCtx dto.SessionContext
89+
stdOut io.Writer
90+
stdErr io.Writer
91+
}
92+
93+
func (sb *simpleBundle) WithStdOut(w io.Writer) Bundle {
94+
sb.stdOut = w
95+
return sb
96+
}
97+
98+
func (sb *simpleBundle) WithStdErr(w io.Writer) Bundle {
99+
sb.stdErr = w
100+
return sb
101+
}
102+
103+
func (sb *simpleBundle) GetStdOut() (io.Writer, bool) {
104+
return sb.stdOut, sb.stdOut != nil
105+
}
106+
107+
func (sb *simpleBundle) GetStdErr() (io.Writer, bool) {
108+
return sb.stdErr, sb.stdErr != nil
83109
}
84110

85111
func (sb *simpleBundle) GetSessionContext() dto.SessionContext {

internal/stackql/cmd/exec.go

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"github.com/stackql/stackql/internal/stackql/entryutil"
2828
"github.com/stackql/stackql/internal/stackql/handler"
2929
"github.com/stackql/stackql/internal/stackql/iqlerror"
30-
"github.com/stackql/stackql/internal/stackql/writer"
3130
)
3231

3332
// execCmd represents the exec command.
@@ -82,23 +81,12 @@ stackql exec -i iqlscripts/create-disk.iql --credentialsfilepath /mnt/c/tmp/stac
8281
iqlerror.PrintErrorAndExitOneIfError(err)
8382
iqlerror.PrintErrorAndExitOneIfNil(handlerCtx, "Handler context error")
8483
cr := newCommandRunner()
85-
cr.RunCommand(handlerCtx, nil, nil)
84+
cr.RunCommand(handlerCtx)
8685
},
8786
}
8887

89-
func getOutputFile(filename string) (*os.File, error) {
90-
switch filename {
91-
case "stdout":
92-
return os.Stdout, nil
93-
case "stderr":
94-
return os.Stderr, nil
95-
default:
96-
return os.Create(filename)
97-
}
98-
}
99-
10088
type commandRunner interface {
101-
RunCommand(handlerCtx handler.HandlerContext, outfile io.Writer, outErrFile io.Writer)
89+
RunCommand(handlerCtx handler.HandlerContext)
10290
}
10391

10492
func newCommandRunner() commandRunner {
@@ -108,16 +96,7 @@ func newCommandRunner() commandRunner {
10896
type commandRunnerImpl struct {
10997
}
11098

111-
func (cr *commandRunnerImpl) RunCommand(handlerCtx handler.HandlerContext, outfile io.Writer, outErrFile io.Writer) {
112-
defer iqlerror.HandlePanic(outErrFile)
113-
if outfile == nil {
114-
outfile, _ = getOutputFile(handlerCtx.GetRuntimeContext().OutfilePath)
115-
}
116-
if outErrFile == nil {
117-
outErrFile, _ = getOutputFile(writer.StdErrStr)
118-
}
119-
handlerCtx.SetOutfile(outfile)
120-
handlerCtx.SetOutErrFile(outErrFile)
99+
func (cr *commandRunnerImpl) RunCommand(handlerCtx handler.HandlerContext) {
121100
stackqlDriver, err := driver.NewStackQLDriver(handlerCtx)
122101
iqlerror.PrintErrorAndExitOneIfError(err)
123102
if handlerCtx.GetRuntimeContext().DryRunFlag {

internal/stackql/cmd/registry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ var registryCmd = &cobra.Command{
9191
iqlerror.PrintErrorAndExitOneIfError(err)
9292
iqlerror.PrintErrorAndExitOneIfNil(handlerCtx, "Handler context error")
9393
cr := newCommandRunner()
94-
cr.RunCommand(handlerCtx, nil, nil)
94+
cr.RunCommand(handlerCtx)
9595

9696
},
9797
}

internal/stackql/cmd/shell.go

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ var shellCmd = &cobra.Command{
125125

126126
inputBundle, err := entryutil.BuildInputBundle(runtimeCtx)
127127
iqlerror.PrintErrorAndExitOneIfError(err)
128+
inputBundle.WithStdOut(outfile).WithStdErr(outErrFile)
128129

129130
handlerCtx, handlerrErr := handler.NewHandlerCtx("", runtimeCtx, queryCache, inputBundle)
130131
if handlerrErr != nil {
@@ -260,21 +261,6 @@ func newSessionRunner(
260261
outfile io.Writer,
261262
outErrFile io.Writer,
262263
) (sessionRunner, error) {
263-
var err error
264-
if outfile == nil {
265-
outfile, err = getOutputFile(handlerCtx.GetRuntimeContext().OutfilePath)
266-
if err != nil {
267-
return nil, err
268-
}
269-
}
270-
if outErrFile == nil {
271-
outErrFile, err = getOutputFile(writer.StdErrStr)
272-
if err != nil {
273-
return nil, err
274-
}
275-
}
276-
handlerCtx.SetOutfile(outfile)
277-
handlerCtx.SetOutErrFile(outErrFile)
278264
stackqlDriver, driverErr := driver.NewStackQLDriver(handlerCtx)
279265
if driverErr != nil {
280266
return nil, driverErr

internal/stackql/driver/aggregation_compute_disks_integration_test.go

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,7 @@ func TestSelectComputeDisksOrderByCrtTmstpAsc(t *testing.T) {
3535
}
3636

3737
testSubject := func(t *testing.T, outFile *bufio.Writer) {
38-
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle, true)
39-
handlerCtx.SetOutfile(os.Stdout)
40-
handlerCtx.SetOutErrFile(os.Stderr)
38+
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
4139
if err != nil {
4240
t.Fatalf("Test failed: %v", err)
4341
}
@@ -54,7 +52,6 @@ func TestSelectComputeDisksOrderByCrtTmstpAsc(t *testing.T) {
5452
t.Fatalf("Test failed: %v", prepareErr)
5553
}
5654
response := querySubmitter.SubmitQuery()
57-
handlerCtx.SetOutfile(outFile)
5855
responsehandler.HandleResponse(handlerCtx, response)
5956

6057
dr.ProcessQuery(handlerCtx.GetRawQuery())
@@ -76,9 +73,7 @@ func TestSelectComputeDisksAggOrderBySizeAsc(t *testing.T) {
7673
}
7774

7875
testSubject := func(t *testing.T, outFile *bufio.Writer) {
79-
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle, true)
80-
handlerCtx.SetOutfile(os.Stdout)
81-
handlerCtx.SetOutErrFile(os.Stderr)
76+
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
8277
if err != nil {
8378
t.Fatalf("Test failed: %v", err)
8479
}
@@ -95,7 +90,6 @@ func TestSelectComputeDisksAggOrderBySizeAsc(t *testing.T) {
9590
t.Fatalf("Test failed: %v", prepareErr)
9691
}
9792
response := querySubmitter.SubmitQuery()
98-
handlerCtx.SetOutfile(outFile)
9993
responsehandler.HandleResponse(handlerCtx, response)
10094

10195
dr.ProcessQuery(handlerCtx.GetRawQuery())
@@ -117,9 +111,7 @@ func TestSelectComputeDisksAggOrderBySizeDesc(t *testing.T) {
117111
}
118112

119113
testSubject := func(t *testing.T, outFile *bufio.Writer) {
120-
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle, true)
121-
handlerCtx.SetOutfile(os.Stdout)
122-
handlerCtx.SetOutErrFile(os.Stderr)
114+
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
123115
if err != nil {
124116
t.Fatalf("Test failed: %v", err)
125117
}
@@ -136,7 +128,6 @@ func TestSelectComputeDisksAggOrderBySizeDesc(t *testing.T) {
136128
t.Fatalf("Test failed: %v", prepareErr)
137129
}
138130
response := querySubmitter.SubmitQuery()
139-
handlerCtx.SetOutfile(outFile)
140131
responsehandler.HandleResponse(handlerCtx, response)
141132

142133
dr.ProcessQuery(handlerCtx.GetRawQuery())
@@ -158,9 +149,7 @@ func TestSelectComputeDisksAggTotalSize(t *testing.T) {
158149
}
159150

160151
testSubject := func(t *testing.T, outFile *bufio.Writer) {
161-
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle, true)
162-
handlerCtx.SetOutfile(os.Stdout)
163-
handlerCtx.SetOutErrFile(os.Stderr)
152+
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
164153
if err != nil {
165154
t.Fatalf("Test failed: %v", err)
166155
}
@@ -177,7 +166,6 @@ func TestSelectComputeDisksAggTotalSize(t *testing.T) {
177166
t.Fatalf("Test failed: %v", prepareErr)
178167
}
179168
response := querySubmitter.SubmitQuery()
180-
handlerCtx.SetOutfile(outFile)
181169
responsehandler.HandleResponse(handlerCtx, response)
182170

183171
dr.ProcessQuery(handlerCtx.GetRawQuery())
@@ -199,9 +187,7 @@ func TestSelectComputeDisksAggTotalString(t *testing.T) {
199187
}
200188

201189
testSubject := func(t *testing.T, outFile *bufio.Writer) {
202-
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle, true)
203-
handlerCtx.SetOutfile(os.Stdout)
204-
handlerCtx.SetOutErrFile(os.Stderr)
190+
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
205191
if err != nil {
206192
t.Fatalf("Test failed: %v", err)
207193
}
@@ -218,7 +204,6 @@ func TestSelectComputeDisksAggTotalString(t *testing.T) {
218204
t.Fatalf("Test failed: %v", prepareErr)
219205
}
220206
response := querySubmitter.SubmitQuery()
221-
handlerCtx.SetOutfile(outFile)
222207
responsehandler.HandleResponse(handlerCtx, response)
223208

224209
dr.ProcessQuery(handlerCtx.GetRawQuery())

internal/stackql/driver/aggregation_container_subnetworks_integration_test.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package driver_test
22

33
import (
44
"bufio"
5-
"os"
65
"strings"
76
"testing"
87

@@ -27,16 +26,11 @@ func TestSimpleAggGoogleContainerSubnetworksGroupedAllowedDriverOutputAsc(t *tes
2726
}
2827

2928
testSubject := func(t *testing.T, outFile *bufio.Writer) {
30-
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle, true)
31-
handlerCtx.SetOutfile(os.Stdout)
32-
handlerCtx.SetOutErrFile(os.Stderr)
29+
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
3330
if err != nil {
3431
t.Fatalf("Test failed: %v", err)
3532
}
3633

37-
handlerCtx.SetOutfile(outFile)
38-
handlerCtx.SetOutErrFile(os.Stderr)
39-
4034
if err != nil {
4135
t.Fatalf("Test failed: %v", err)
4236
}
@@ -48,7 +42,6 @@ func TestSimpleAggGoogleContainerSubnetworksGroupedAllowedDriverOutputAsc(t *tes
4842
t.Fatalf("Test failed: %v", prepareErr)
4943
}
5044
response := querySubmitter.SubmitQuery()
51-
handlerCtx.SetOutfile(outFile)
5245
responsehandler.HandleResponse(handlerCtx, response)
5346
}
5447

@@ -68,16 +61,11 @@ func TestSimpleAggGoogleContainerSubnetworksGroupedAllowedDriverOutputDesc(t *te
6861
}
6962

7063
testSubject := func(t *testing.T, outFile *bufio.Writer) {
71-
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle, true)
72-
handlerCtx.SetOutfile(os.Stdout)
73-
handlerCtx.SetOutErrFile(os.Stderr)
64+
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
7465
if err != nil {
7566
t.Fatalf("Test failed: %v", err)
7667
}
7768

78-
handlerCtx.SetOutfile(outFile)
79-
handlerCtx.SetOutErrFile(os.Stderr)
80-
8169
if err != nil {
8270
t.Fatalf("Test failed: %v", err)
8371
}
@@ -89,7 +77,6 @@ func TestSimpleAggGoogleContainerSubnetworksGroupedAllowedDriverOutputDesc(t *te
8977
t.Fatalf("Test failed: %v", prepareErr)
9078
}
9179
response := querySubmitter.SubmitQuery()
92-
handlerCtx.SetOutfile(outFile)
9380
responsehandler.HandleResponse(handlerCtx, response)
9481
}
9582

internal/stackql/driver/aggregation_paginated_compute_disks_integration_test.go

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package driver_test
22

33
import (
44
"bufio"
5-
"os"
65
"strings"
76
"testing"
87

@@ -29,9 +28,7 @@ func TestSelectComputeDisksOrderByCrtTmstpAscPaginated(t *testing.T) {
2928
}
3029

3130
testSubject := func(t *testing.T, outFile *bufio.Writer) {
32-
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle, true)
33-
handlerCtx.SetOutfile(os.Stdout)
34-
handlerCtx.SetOutErrFile(os.Stderr)
31+
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
3532
if err != nil {
3633
t.Fatalf("Test failed: %v", err)
3734
}
@@ -48,7 +45,6 @@ func TestSelectComputeDisksOrderByCrtTmstpAscPaginated(t *testing.T) {
4845
t.Fatalf("Test failed: %v", prepareErr)
4946
}
5047
response := querySubmitter.SubmitQuery()
51-
handlerCtx.SetOutfile(outFile)
5248
responsehandler.HandleResponse(handlerCtx, response)
5349

5450
dr.ProcessQuery(handlerCtx.GetRawQuery())
@@ -71,9 +67,7 @@ func TestSelectComputeDisksAggOrderBySizeAscPaginated(t *testing.T) {
7167
}
7268

7369
testSubject := func(t *testing.T, outFile *bufio.Writer) {
74-
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle, true)
75-
handlerCtx.SetOutfile(os.Stdout)
76-
handlerCtx.SetOutErrFile(os.Stderr)
70+
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
7771
if err != nil {
7872
t.Fatalf("Test failed: %v", err)
7973
}
@@ -90,7 +84,6 @@ func TestSelectComputeDisksAggOrderBySizeAscPaginated(t *testing.T) {
9084
t.Fatalf("Test failed: %v", prepareErr)
9185
}
9286
response := querySubmitter.SubmitQuery()
93-
handlerCtx.SetOutfile(outFile)
9487
responsehandler.HandleResponse(handlerCtx, response)
9588

9689
dr.ProcessQuery(handlerCtx.GetRawQuery())
@@ -113,9 +106,7 @@ func TestSelectComputeDisksAggOrderBySizeDescPaginated(t *testing.T) {
113106
}
114107

115108
testSubject := func(t *testing.T, outFile *bufio.Writer) {
116-
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle, true)
117-
handlerCtx.SetOutfile(os.Stdout)
118-
handlerCtx.SetOutErrFile(os.Stderr)
109+
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
119110
if err != nil {
120111
t.Fatalf("Test failed: %v", err)
121112
}
@@ -132,7 +123,6 @@ func TestSelectComputeDisksAggOrderBySizeDescPaginated(t *testing.T) {
132123
t.Fatalf("Test failed: %v", prepareErr)
133124
}
134125
response := querySubmitter.SubmitQuery()
135-
handlerCtx.SetOutfile(outFile)
136126
responsehandler.HandleResponse(handlerCtx, response)
137127

138128
dr.ProcessQuery(handlerCtx.GetRawQuery())
@@ -155,9 +145,7 @@ func TestSelectComputeDisksAggTotalSizePaginated(t *testing.T) {
155145
}
156146

157147
testSubject := func(t *testing.T, outFile *bufio.Writer) {
158-
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle, true)
159-
handlerCtx.SetOutfile(os.Stdout)
160-
handlerCtx.SetOutErrFile(os.Stderr)
148+
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
161149
if err != nil {
162150
t.Fatalf("Test failed: %v", err)
163151
}
@@ -174,7 +162,6 @@ func TestSelectComputeDisksAggTotalSizePaginated(t *testing.T) {
174162
t.Fatalf("Test failed: %v", prepareErr)
175163
}
176164
response := querySubmitter.SubmitQuery()
177-
handlerCtx.SetOutfile(outFile)
178165
responsehandler.HandleResponse(handlerCtx, response)
179166

180167
dr.ProcessQuery(handlerCtx.GetRawQuery())
@@ -197,9 +184,7 @@ func TestSelectComputeDisksAggTotalStringPaginated(t *testing.T) {
197184
}
198185

199186
testSubject := func(t *testing.T, outFile *bufio.Writer) {
200-
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle, true)
201-
handlerCtx.SetOutfile(os.Stdout)
202-
handlerCtx.SetOutErrFile(os.Stderr)
187+
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
203188
if err != nil {
204189
t.Fatalf("Test failed: %v", err)
205190
}
@@ -216,7 +201,6 @@ func TestSelectComputeDisksAggTotalStringPaginated(t *testing.T) {
216201
t.Fatalf("Test failed: %v", prepareErr)
217202
}
218203
response := querySubmitter.SubmitQuery()
219-
handlerCtx.SetOutfile(outFile)
220204
responsehandler.HandleResponse(handlerCtx, response)
221205

222206
dr.ProcessQuery(handlerCtx.GetRawQuery())

0 commit comments

Comments
 (0)