11package execution
22
33import (
4+ "bytes"
5+ "encoding/json"
46 "fmt"
57 "io"
68 "net/http"
@@ -11,8 +13,10 @@ import (
1113 "github.com/stackql/any-sdk/pkg/client"
1214 "github.com/stackql/any-sdk/pkg/dto"
1315 "github.com/stackql/any-sdk/pkg/httpelement"
16+ "github.com/stackql/any-sdk/pkg/local_template_executor"
1417 "github.com/stackql/any-sdk/pkg/logging"
1518 "github.com/stackql/any-sdk/pkg/response"
19+ "github.com/stackql/any-sdk/pkg/stream_transform"
1620 "github.com/stackql/any-sdk/pkg/streaming"
1721 "github.com/stackql/stackql-parser/go/vt/sqlparser"
1822 "github.com/stackql/stackql/internal/stackql/acid/binlog"
@@ -1232,7 +1236,7 @@ func (sp *standardProcessor) Process() ProcessorResponse {
12321236 return newHTTPProcessorResponse (nil , reversalStream , false , nil )
12331237}
12341238
1235- //nolint:revive // TODO: investigate
1239+ //nolint:revive,nestif,funlen,gocognit // TODO: investigate
12361240func (mv * monoValentExecution ) GetExecutor () (func (pc primitive.IPrimitiveCtx ) internaldto.ExecutorOutput , error ) {
12371241 prov , err := mv .tableMeta .GetProvider ()
12381242 if err != nil {
@@ -1261,47 +1265,150 @@ func (mv *monoValentExecution) GetExecutor() (func(pc primitive.IPrimitiveCtx) i
12611265 polyHandler := NewStandardPolyHandler (
12621266 mv .handlerCtx ,
12631267 )
1264- agnosticatePayload := newHTTPAgnosticatePayload (
1265- mv .tableMeta ,
1266- provider ,
1267- m ,
1268- tableName ,
1269- authCtx ,
1270- mv .handlerCtx .GetRuntimeContext (),
1271- mv .handlerCtx .GetOutErrFile (),
1272- mr ,
1273- mv .elideActionIfPossible (
1274- currentTcc ,
1275- tableName ,
1276- "" , // late binding, should remove AOT reference
1277- ),
1278- true ,
1279- polyHandler ,
1280- mv .tableMeta .GetSelectItemsKey (),
1281- mv ,
1282- mv .isSkipResponse ,
1283- mv .isMutation ,
1284- )
1285- processorResponse , agnosticErr := agnosticate (agnosticatePayload )
1286- if agnosticErr != nil {
1287- return internaldto .NewErroneousExecutorOutput (agnosticErr )
1288- }
1289- messages := polyHandler .GetMessages ()
1290- var castMessages internaldto.BackendMessages
1291- if len (messages ) > 0 {
1292- castMessages = internaldto .NewBackendMessages (messages )
1268+ protocolType , protocolTypeErr := provider .GetProtocolType ()
1269+ if protocolTypeErr != nil {
1270+ return internaldto .NewErroneousExecutorOutput (protocolTypeErr )
12931271 }
1294- if processorResponse != nil && len (processorResponse .GetSuccessMessages ()) > 0 {
1295- if len (messages ) == 0 {
1296- castMessages = internaldto .NewBackendMessages (processorResponse .GetSuccessMessages ())
1297- } else {
1298- castMessages .AppendMessages (processorResponse .GetSuccessMessages ())
1272+ //nolint:exhaustive // acceptable for now
1273+ switch protocolType {
1274+ case client .LocalTemplated :
1275+ inlines := m .GetInline ()
1276+ if len (inlines ) == 0 {
1277+ return internaldto .NewErroneousExecutorOutput (fmt .Errorf ("no inlines found" ))
12991278 }
1279+ executor := local_template_executor .NewLocalTemplateExecutor (
1280+ inlines [0 ],
1281+ inlines [1 :],
1282+ nil ,
1283+ )
1284+ armoury , armouryErr := mv .tableMeta .GetHTTPArmoury ()
1285+ if armouryErr != nil {
1286+ return internaldto .NewErroneousExecutorOutput (armouryErr )
1287+ }
1288+ requestParams := armoury .GetRequestParams ()
1289+ logging .GetLogger ().Infoln (fmt .Sprintf ("requestParams = %v" , requestParams ))
1290+ flatInlineParams := make (map [string ]interface {})
1291+ for _ , p := range requestParams {
1292+ foundInlineParams , loopErr := p .GetParameters ().GetInlineParameterFlatMap ()
1293+ if loopErr == nil {
1294+ flatInlineParams = foundInlineParams
1295+ }
1296+ break //nolint:staticcheck // acceptable for now
1297+ }
1298+ // if mapsErr != nil {
1299+ // return internaldto.NewErroneousExecutorOutput(mapsErr)
1300+ // }
1301+ // paramMap := interestingMaps.getParameterMap()
1302+ // params := paramMap[0]
1303+ resp , exErr := executor .Execute (
1304+ map [string ]any {"parameters" : flatInlineParams },
1305+ )
1306+ if exErr != nil {
1307+ return internaldto .NewErroneousExecutorOutput (exErr )
1308+ }
1309+ var backendMessages []string
1310+ stdOut , stdOutExists := resp .GetStdOut ()
1311+ var stdoutStr string
1312+ if stdOutExists {
1313+ stdoutStr = stdOut .String ()
1314+ expectedResponse , isExpectedResponse := m .GetResponse ()
1315+ if isExpectedResponse {
1316+ responseTransform , responseTransformExists := expectedResponse .GetTransform ()
1317+ if responseTransformExists && responseTransform .GetType () == "golang_template_v0.1.0" {
1318+ input := stdoutStr
1319+ tmpl := responseTransform .GetBody ()
1320+ inStream := stream_transform .NewTextReader (bytes .NewBufferString (input ))
1321+ outStream := bytes .NewBuffer (nil )
1322+ tfm , setupErr := stream_transform .NewTemplateStreamTransformer (tmpl , inStream , outStream )
1323+ if setupErr != nil {
1324+ return internaldto .NewErroneousExecutorOutput (fmt .Errorf ("template stream transform error: %w" , setupErr ))
1325+ }
1326+ if tfErr := tfm .Transform (); tfErr != nil {
1327+ return internaldto .NewErroneousExecutorOutput (fmt .Errorf ("failed to transform: %w" , tfErr ))
1328+ }
1329+ outputStr := outStream .String ()
1330+ stdoutStr = outputStr
1331+ }
1332+ }
1333+ var res []map [string ]interface {}
1334+ resErr := json .Unmarshal ([]byte (stdoutStr ), & res )
1335+ itemisationResult := itemise (res , resErr , "" )
1336+ insertPrepResult := mv .ActionInsertPreparation (
1337+ newHTTPActionInsertPayload (
1338+ itemisationResult ,
1339+ false ,
1340+ tableName ,
1341+ flatInlineParams ,
1342+ "" ,
1343+ ),
1344+ )
1345+ insertErr , hasErr := insertPrepResult .GetError ()
1346+ if hasErr {
1347+ return internaldto .NewErroneousExecutorOutput (insertErr )
1348+ }
1349+ // fmt.Fprintf(os.Stdout, "%s", stdoutStr)
1350+ }
1351+ // if stdOutExists {
1352+ // backendMessages = append(backendMessages, stdOut.String())
1353+ // }
1354+ stdErr , stdErrExists := resp .GetStdErr ()
1355+ if stdErrExists {
1356+ backendMessages = append (backendMessages , stdErr .String ())
1357+ }
1358+ backendMessages = append (backendMessages , "OK" )
1359+ return internaldto .NewExecutorOutput (
1360+ nil ,
1361+ nil ,
1362+ nil ,
1363+ internaldto .NewBackendMessages (backendMessages ),
1364+ nil ,
1365+ )
1366+ case client .HTTP :
1367+ agnosticatePayload := newHTTPAgnosticatePayload (
1368+ mv .tableMeta ,
1369+ provider ,
1370+ m ,
1371+ tableName ,
1372+ authCtx ,
1373+ mv .handlerCtx .GetRuntimeContext (),
1374+ mv .handlerCtx .GetOutErrFile (),
1375+ mr ,
1376+ mv .elideActionIfPossible (
1377+ currentTcc ,
1378+ tableName ,
1379+ "" , // late binding, should remove AOT reference
1380+ ),
1381+ true ,
1382+ polyHandler ,
1383+ mv .tableMeta .GetSelectItemsKey (),
1384+ mv ,
1385+ mv .isSkipResponse ,
1386+ mv .isMutation ,
1387+ )
1388+ processorResponse , agnosticErr := agnosticate (agnosticatePayload )
1389+ if agnosticErr != nil {
1390+ return internaldto .NewErroneousExecutorOutput (agnosticErr )
1391+ }
1392+ messages := polyHandler .GetMessages ()
1393+ var castMessages internaldto.BackendMessages
1394+ if len (messages ) > 0 {
1395+ castMessages = internaldto .NewBackendMessages (messages )
1396+ }
1397+ if processorResponse != nil && len (processorResponse .GetSuccessMessages ()) > 0 {
1398+ if len (messages ) == 0 {
1399+ castMessages = internaldto .NewBackendMessages (processorResponse .GetSuccessMessages ())
1400+ } else {
1401+ castMessages .AppendMessages (processorResponse .GetSuccessMessages ())
1402+ }
1403+ }
1404+ if processorResponse == nil {
1405+ return internaldto .NewExecutorOutput (nil , nil , nil , castMessages , nil )
1406+ }
1407+ return internaldto .NewExecutorOutput (nil , processorResponse .GetSingletonBody (), nil , castMessages , err )
1408+ default :
1409+ return internaldto .NewErroneousExecutorOutput (
1410+ fmt .Errorf ("unsupported protocol type '%v'" , protocolType ))
13001411 }
1301- if processorResponse == nil {
1302- return internaldto .NewExecutorOutput (nil , nil , nil , castMessages , nil )
1303- }
1304- return internaldto .NewExecutorOutput (nil , processorResponse .GetSingletonBody (), nil , castMessages , err )
13051412 }
13061413 return ex , nil
13071414}
0 commit comments