44 "fmt"
55
66 "github.com/stackql/any-sdk/anysdk"
7+ "github.com/stackql/any-sdk/pkg/client"
78 "github.com/stackql/any-sdk/pkg/constants"
9+ "github.com/stackql/any-sdk/pkg/local_template_executor"
810 "github.com/stackql/any-sdk/pkg/logging"
911 "github.com/stackql/stackql-parser/go/vt/sqlparser"
1012 "github.com/stackql/stackql/internal/stackql/acid/binlog"
@@ -201,131 +203,175 @@ func (gh *genericHTTPStreamInput) Build() error {
201203 if prErr != nil {
202204 return internaldto .NewErroneousExecutorOutput (prErr )
203205 }
206+ protocolType , protocolTypeErr := pr .GetProtocolType ()
207+ if protocolTypeErr != nil {
208+ return internaldto .NewErroneousExecutorOutput (protocolTypeErr )
209+ }
204210 interestingMaps , mapsErr := gh .getInterestingMaps (actionPrimitive )
205211 if mapsErr != nil {
206212 return internaldto .NewErroneousExecutorOutput (mapsErr )
207213 }
208- httpPreparator := anysdk .NewHTTPPreparator (
209- pr ,
210- svc ,
211- m ,
212- interestingMaps .getParameterMap (),
213- nil ,
214- nil ,
215- logging .GetLogger (),
216- )
217- httpArmoury , httpErr := httpPreparator .BuildHTTPRequestCtx ()
218- if httpErr != nil {
219- return internaldto .NewErroneousExecutorOutput (httpErr )
220- }
214+ paramMap := interestingMaps .getParameterMap ()
215+ params := paramMap [0 ]
216+ switch protocolType {
217+ case client .LocalTemplated :
218+ inlines := m .GetInline ()
219+ if len (inlines ) == 0 {
220+ return internaldto .NewErroneousExecutorOutput (fmt .Errorf ("no inlines found" ))
221+ }
222+ executor := local_template_executor .NewLocalTemplateExecutor (
223+ inlines [0 ],
224+ inlines [1 :],
225+ nil ,
226+ )
227+ resp , err := executor .Execute (
228+ map [string ]any {"parameters" : params },
229+ )
230+ if err != nil {
231+ return internaldto .NewErroneousExecutorOutput (err )
232+ }
233+ var backendMessages []string
234+ stdOut , stdOutExists := resp .GetStdOut ()
235+ if stdOutExists {
236+ backendMessages = append (backendMessages , stdOut .String ())
237+ }
238+ stdErr , stdErrExists := resp .GetStdErr ()
239+ if stdErrExists {
240+ backendMessages = append (backendMessages , stdErr .String ())
241+ }
242+ backendMessages = append (backendMessages , "OK" )
243+ return internaldto .NewExecutorOutput (
244+ nil ,
245+ nil ,
246+ nil ,
247+ internaldto .NewBackendMessages (backendMessages ),
248+ nil ,
249+ )
250+ case client .HTTP :
251+ httpPreparator := anysdk .NewHTTPPreparator (
252+ pr ,
253+ svc ,
254+ m ,
255+ paramMap ,
256+ nil ,
257+ nil ,
258+ logging .GetLogger (),
259+ )
260+ httpArmoury , httpErr := httpPreparator .BuildHTTPRequestCtx ()
261+ if httpErr != nil {
262+ return internaldto .NewErroneousExecutorOutput (httpErr )
263+ }
221264
222- tableName , _ := tbl .GetTableName ()
265+ tableName , _ := tbl .GetTableName ()
223266
224- // var reversalParameters []map[string]interface{}
225- // TODO: Implement reversal operations:
226- // - depending upon reversal operation, collect sequence of HTTP operations.
227- // - for each HTTP operation, collect context and store in *some object*
228- // then attach to reversal graph for later retrieval and execution
229- var nullaryExecutors []func () internaldto.ExecutorOutput
230- for _ , r := range httpArmoury .GetRequestParams () {
231- req := r
232- isSkipResponse := responseAnalysisErr != nil
233- polyHandler := execution .NewStandardPolyHandler (
234- handlerCtx ,
235- )
236- nullaryEx := func () internaldto.ExecutorOutput {
237- pp := execution .NewProcessorPayload (
238- req ,
239- execution .NewNilMethodElider (),
240- provider ,
241- m ,
242- tableName ,
243- rtCtx ,
244- authCtx ,
245- outErrFile ,
246- polyHandler ,
247- "" ,
248- nil ,
249- isSkipResponse ,
250- true ,
251- isAwait ,
252- gh .isUndo ,
253- gh .isMutation ,
254- "" ,
267+ // var reversalParameters []map[string]interface{}
268+ // TODO: Implement reversal operations:
269+ // - depending upon reversal operation, collect sequence of HTTP operations.
270+ // - for each HTTP operation, collect context and store in *some object*
271+ // then attach to reversal graph for later retrieval and execution
272+ var nullaryExecutors []func () internaldto.ExecutorOutput
273+ for _ , r := range httpArmoury .GetRequestParams () {
274+ req := r
275+ isSkipResponse := responseAnalysisErr != nil
276+ polyHandler := execution .NewStandardPolyHandler (
277+ handlerCtx ,
255278 )
256- processor := execution .NewProcessor (pp )
257- processorResponse := processor .Process ()
258- processorErr := processorResponse .GetError ()
259- singletonBody := processorResponse .GetSingletonBody ()
260- reversalStrem := processorResponse .GetReversalStream ()
261- for {
262- rev , isRevExistent := reversalStrem .Next ()
263- if ! isRevExistent {
264- break
279+ nullaryEx := func () internaldto.ExecutorOutput {
280+ pp := execution .NewProcessorPayload (
281+ req ,
282+ execution .NewNilMethodElider (),
283+ provider ,
284+ m ,
285+ tableName ,
286+ rtCtx ,
287+ authCtx ,
288+ outErrFile ,
289+ polyHandler ,
290+ "" ,
291+ nil ,
292+ isSkipResponse ,
293+ true ,
294+ isAwait ,
295+ gh .isUndo ,
296+ gh .isMutation ,
297+ "" ,
298+ )
299+ processor := execution .NewProcessor (pp )
300+ processorResponse := processor .Process ()
301+ processorErr := processorResponse .GetError ()
302+ singletonBody := processorResponse .GetSingletonBody ()
303+ reversalStrem := processorResponse .GetReversalStream ()
304+ for {
305+ rev , isRevExistent := reversalStrem .Next ()
306+ if ! isRevExistent {
307+ break
308+ }
309+ revErr := gh .appendReversalData (rev )
310+ if revErr != nil {
311+ return internaldto .NewErroneousExecutorOutput (revErr )
312+ }
265313 }
266- revErr := gh .appendReversalData (rev )
267- if revErr != nil {
268- return internaldto .NewErroneousExecutorOutput (revErr )
314+ // if processorResponse.IsFailed() && !gh.isAwait {
315+ // processorErr = fmt.Errorf(processorResponse.GetFailedMessage())
316+ // }
317+ return gh .decorateOutput (
318+ internaldto .NewExecutorOutput (
319+ nil ,
320+ singletonBody ,
321+ nil ,
322+ internaldto .NewBackendMessages (processorResponse .GetSuccessMessages ()),
323+ processorErr ,
324+ ),
325+ tableName ,
326+ )
327+ }
328+
329+ nullaryExecutors = append (nullaryExecutors , nullaryEx )
330+ }
331+ resultSet := internaldto .NewErroneousExecutorOutput (fmt .Errorf ("no executions detected" ))
332+ if ! isAwait {
333+ for _ , ei := range nullaryExecutors {
334+ execInstance := ei
335+ aPrioriMessages := resultSet .GetMessages ()
336+ resultSet = execInstance ()
337+ resultSet .AppendMessages (aPrioriMessages )
338+ if resultSet .GetError () != nil {
339+ return resultSet
269340 }
270341 }
271- // if processorResponse.IsFailed() && !gh.isAwait {
272- // processorErr = fmt.Errorf(processorResponse.GetFailedMessage())
273- // }
274- return gh .decorateOutput (
275- internaldto .NewExecutorOutput (
276- nil ,
277- singletonBody ,
278- nil ,
279- internaldto .NewBackendMessages (processorResponse .GetSuccessMessages ()),
280- processorErr ,
281- ),
282- tableName ,
283- )
342+ return resultSet
284343 }
285-
286- nullaryExecutors = append (nullaryExecutors , nullaryEx )
287- }
288- resultSet := internaldto .NewErroneousExecutorOutput (fmt .Errorf ("no executions detected" ))
289- if ! isAwait {
290- for _ , ei := range nullaryExecutors {
291- execInstance := ei
292- aPrioriMessages := resultSet .GetMessages ()
293- resultSet = execInstance ()
294- resultSet .AppendMessages (aPrioriMessages )
344+ for _ , eI := range nullaryExecutors {
345+ execInstance := eI
346+ dependentInsertPrimitive := primitive .NewGenericPrimitive (
347+ nil ,
348+ nil ,
349+ nil ,
350+ primitive_context .NewPrimitiveContext (),
351+ )
352+ //nolint:revive // no big deal
353+ err = dependentInsertPrimitive .SetExecutor (func (pc primitive.IPrimitiveCtx ) internaldto.ExecutorOutput {
354+ return execInstance ()
355+ })
356+ if err != nil {
357+ return internaldto .NewErroneousExecutorOutput (err )
358+ }
359+ execPrim , execErr := composeAsyncMonitor (handlerCtx , dependentInsertPrimitive , prov , m , commentDirectives )
360+ if execErr != nil {
361+ return internaldto .NewErroneousExecutorOutput (execErr )
362+ }
363+ resultSet = execPrim .Execute (pc )
295364 if resultSet .GetError () != nil {
296365 return resultSet
297366 }
298367 }
299- return resultSet
300- }
301- for _ , eI := range nullaryExecutors {
302- execInstance := eI
303- dependentInsertPrimitive := primitive .NewGenericPrimitive (
304- nil ,
305- nil ,
306- nil ,
307- primitive_context .NewPrimitiveContext (),
368+ return gh .decorateOutput (
369+ resultSet ,
370+ tableName ,
308371 )
309- //nolint:revive // no big deal
310- err = dependentInsertPrimitive .SetExecutor (func (pc primitive.IPrimitiveCtx ) internaldto.ExecutorOutput {
311- return execInstance ()
312- })
313- if err != nil {
314- return internaldto .NewErroneousExecutorOutput (err )
315- }
316- execPrim , execErr := composeAsyncMonitor (handlerCtx , dependentInsertPrimitive , prov , m , commentDirectives )
317- if execErr != nil {
318- return internaldto .NewErroneousExecutorOutput (execErr )
319- }
320- resultSet = execPrim .Execute (pc )
321- if resultSet .GetError () != nil {
322- return resultSet
323- }
372+ default :
373+ return internaldto .NewErroneousExecutorOutput (fmt .Errorf ("unsupported protocol type: %v" , protocolType ))
324374 }
325- return gh .decorateOutput (
326- resultSet ,
327- tableName ,
328- )
329375 }
330376 err = actionPrimitive .SetExecutor (ex )
331377 if err != nil {
@@ -341,4 +387,5 @@ func (gh *genericHTTPStreamInput) Build() error {
341387 gh .root = gh .dependencyNode
342388
343389 return nil
390+
344391}
0 commit comments