Skip to content

Commit 5d5addc

Browse files
- Robot tests pass locally.
1 parent 048df4f commit 5d5addc

File tree

4 files changed

+119
-155
lines changed

4 files changed

+119
-155
lines changed

internal/stackql/parserutil/parser_util.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -274,10 +274,6 @@ func ExtractInsertValColumnsPlusPlaceHolders(insStmt *sqlparser.Insert) (map[int
274274
return extractInsertValColumns(insStmt, false)
275275
}
276276

277-
// func ExtractUpdateValColumnsPlusPlaceHolders(updateStmt *sqlparser.Update) (map[int]map[int]interface{}, int, error) {
278-
// return extractUpdateValColumnsArray(updateStmt, false)
279-
// }
280-
281277
func extractInsertValColumns(
282278
insStmt *sqlparser.Insert,
283279
includePlaceholders bool,

internal/stackql/planbuilder/plan_builder.go

Lines changed: 108 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -949,75 +949,110 @@ func (pgb *standardPlanGraphBuilder) handleInsert(pbi planbuilderinput.PlanBuild
949949
if isPhysicalTable {
950950
bldrInput.SetIsTargetPhysicalTable(true)
951951
}
952-
var bldr primitivebuilder.Builder
953-
if len(node.SelectExprs) > 0 {
954-
// Two cases:
955-
// 1. Synchronous. Equivalent to select.
956-
// 2. Asynchronous. Whole other story.
957-
tableMeta, tableMetaExists := bldrInput.GetTableMetadata()
958-
if !tableMetaExists {
959-
return fmt.Errorf("could not obtain table metadata for node '%s'", node.Action)
960-
}
961-
rc, rcErr := tableinsertioncontainer.NewTableInsertionContainer(
962-
tableMeta,
963-
handlerCtx.GetSQLEngine(),
964-
handlerCtx.GetTxnCounterMgr(),
952+
return pgb.handleMutationOperation(
953+
handlerCtx,
954+
pbi,
955+
primitiveGenerator,
956+
node,
957+
tbl,
958+
selectPrimitiveNode,
959+
bldrInput,
960+
isAwait,
961+
)
962+
}
963+
pr := primitive.NewGenericPrimitive(nil, nil, nil, primitive_context.NewPrimitiveContext())
964+
pgb.planGraphHolder.CreatePrimitiveNode(pr)
965+
return nil
966+
}
967+
968+
func (pgb *standardPlanGraphBuilder) handleMutationOperation(
969+
handlerCtx handler.HandlerContext,
970+
pbi planbuilderinput.PlanBuilderInput,
971+
primitiveGenerator primitivegenerator.PrimitiveGenerator,
972+
node sqlparser.SQLNode,
973+
tbl tablemetadata.ExtendedTableMetadata,
974+
selectPrimitiveNode primitivegraph.PrimitiveNode,
975+
bldrInput builder_input.BuilderInput,
976+
isAwait bool,
977+
) error {
978+
var returningExpressions sqlparser.SelectExprs
979+
var inputAction string
980+
var bldr primitivebuilder.Builder
981+
switch n := node.(type) {
982+
case *sqlparser.Insert:
983+
returningExpressions = n.SelectExprs
984+
inputAction = n.Action
985+
case *sqlparser.Update:
986+
returningExpressions = n.SelectExprs
987+
inputAction = n.Action
988+
default:
989+
return fmt.Errorf("unsupported mutation operation of type '%T'", node)
990+
}
991+
//nolint:nestif // acceptable complexity
992+
if len(returningExpressions) > 0 {
993+
// Two cases:
994+
// 1. Synchronous. Equivalent to select.
995+
// 2. Asynchronous. Whole other story.
996+
tableMeta, tableMetaExists := bldrInput.GetTableMetadata()
997+
if !tableMetaExists {
998+
return fmt.Errorf("could not obtain table metadata for node '%s'", inputAction)
999+
}
1000+
rc, rcErr := tableinsertioncontainer.NewTableInsertionContainer(
1001+
tableMeta,
1002+
handlerCtx.GetSQLEngine(),
1003+
handlerCtx.GetTxnCounterMgr(),
1004+
)
1005+
if rcErr != nil {
1006+
return rcErr
1007+
}
1008+
bldrInput.SetTableInsertionContainer(rc)
1009+
bldrInput.SetIsReturning(true)
1010+
if !isAwait {
1011+
bldr = primitivebuilder.NewSingleAcquireAndSelect(
1012+
bldrInput,
1013+
primitiveGenerator.GetPrimitiveComposer().GetInsertPreparedStatementCtx(),
1014+
primitiveGenerator.GetPrimitiveComposer().GetSelectPreparedStatementCtx(),
1015+
nil,
9651016
)
966-
if rcErr != nil {
967-
return rcErr
968-
}
969-
bldrInput.SetTableInsertionContainer(rc)
970-
bldrInput.SetIsReturning(true)
971-
if !isAwait {
972-
bldr = primitivebuilder.NewSingleAcquireAndSelect(
973-
bldrInput,
974-
primitiveGenerator.GetPrimitiveComposer().GetInsertPreparedStatementCtx(),
975-
primitiveGenerator.GetPrimitiveComposer().GetSelectPreparedStatementCtx(),
976-
nil,
977-
)
978-
} else {
979-
bldrInput.SetIsAwait(true)
980-
bldrInput.SetIsReturning(true)
981-
bldrInput.SetInsertCtx(primitiveGenerator.GetPrimitiveComposer().GetInsertPreparedStatementCtx())
982-
lhsBldr := primitivebuilder.NewInsertOrUpdate(
983-
bldrInput,
984-
)
985-
newBldrInput := builder_input.NewBuilderInput(
986-
pgb.planGraphHolder,
987-
handlerCtx,
988-
tbl,
989-
)
990-
newBldrInput.SetParserNode(node)
991-
newBldrInput.SetAnnotatedAST(pbi.GetAnnotatedAST())
992-
newBldrInput.SetTxnCtrlCtrs(pbi.GetTxnCtrlCtrs())
993-
newBldrInput.SetTableInsertionContainer(rc)
994-
newBldrInput.SetDependencyNode(selectPrimitiveNode)
995-
newBldrInput.SetIsAwait(isAwait)
996-
rhsBldr := primitivebuilder.NewSingleSelect(
997-
pgb.planGraphHolder, handlerCtx, primitiveGenerator.GetPrimitiveComposer().GetSelectPreparedStatementCtx(),
998-
[]tableinsertioncontainer.TableInsertionContainer{rc},
999-
nil,
1000-
streaming.NewNopMapStream(),
1001-
)
1002-
bldr = primitivebuilder.NewDependencySubDAGBuilder(
1003-
pgb.planGraphHolder,
1004-
[]primitivebuilder.Builder{lhsBldr},
1005-
rhsBldr,
1006-
)
1007-
}
10081017
} else {
1009-
bldr = primitivebuilder.NewInsertOrUpdate(
1018+
bldrInput.SetIsAwait(true)
1019+
bldrInput.SetIsReturning(true)
1020+
bldrInput.SetInsertCtx(primitiveGenerator.GetPrimitiveComposer().GetInsertPreparedStatementCtx())
1021+
lhsBldr := primitivebuilder.NewInsertOrUpdate(
10101022
bldrInput,
10111023
)
1024+
newBldrInput := builder_input.NewBuilderInput(
1025+
pgb.planGraphHolder,
1026+
handlerCtx,
1027+
tbl,
1028+
)
1029+
newBldrInput.SetParserNode(node)
1030+
newBldrInput.SetAnnotatedAST(pbi.GetAnnotatedAST())
1031+
newBldrInput.SetTxnCtrlCtrs(pbi.GetTxnCtrlCtrs())
1032+
newBldrInput.SetTableInsertionContainer(rc)
1033+
newBldrInput.SetDependencyNode(selectPrimitiveNode)
1034+
newBldrInput.SetIsAwait(isAwait)
1035+
rhsBldr := primitivebuilder.NewSingleSelect(
1036+
pgb.planGraphHolder, handlerCtx, primitiveGenerator.GetPrimitiveComposer().GetSelectPreparedStatementCtx(),
1037+
[]tableinsertioncontainer.TableInsertionContainer{rc},
1038+
nil,
1039+
streaming.NewNopMapStream(),
1040+
)
1041+
bldr = primitivebuilder.NewDependencySubDAGBuilder(
1042+
pgb.planGraphHolder,
1043+
[]primitivebuilder.Builder{lhsBldr},
1044+
rhsBldr,
1045+
)
10121046
}
1013-
err = bldr.Build()
1014-
if err != nil {
1015-
return err
1016-
}
1017-
return nil
1047+
} else {
1048+
bldr = primitivebuilder.NewInsertOrUpdate(
1049+
bldrInput,
1050+
)
1051+
}
1052+
err := bldr.Build()
1053+
if err != nil {
1054+
return err
10181055
}
1019-
pr := primitive.NewGenericPrimitive(nil, nil, nil, primitive_context.NewPrimitiveContext())
1020-
pgb.planGraphHolder.CreatePrimitiveNode(pr)
10211056
return nil
10221057
}
10231058

@@ -1073,72 +1108,16 @@ func (pgb *standardPlanGraphBuilder) handleUpdate(pbi planbuilderinput.PlanBuild
10731108
bldrInput.SetIsTargetPhysicalTable(true)
10741109
}
10751110
isAwait := primitiveGenerator.GetPrimitiveComposer().IsAwait()
1076-
var bldr primitivebuilder.Builder
1077-
if len(node.SelectExprs) > 0 {
1078-
// Two cases:
1079-
// 1. Synchronous. Equivalent to select.
1080-
// 2. Asynchronous. Whole other story.
1081-
tableMeta, tableMetaExists := bldrInput.GetTableMetadata()
1082-
if !tableMetaExists {
1083-
return fmt.Errorf("could not obtain table metadata for node '%s'", node.Action)
1084-
}
1085-
rc, rcErr := tableinsertioncontainer.NewTableInsertionContainer(
1086-
tableMeta,
1087-
handlerCtx.GetSQLEngine(),
1088-
handlerCtx.GetTxnCounterMgr(),
1089-
)
1090-
if rcErr != nil {
1091-
return rcErr
1092-
}
1093-
bldrInput.SetTableInsertionContainer(rc)
1094-
bldrInput.SetIsReturning(true)
1095-
if !isAwait {
1096-
bldr = primitivebuilder.NewSingleAcquireAndSelect(
1097-
bldrInput,
1098-
primitiveGenerator.GetPrimitiveComposer().GetInsertPreparedStatementCtx(),
1099-
primitiveGenerator.GetPrimitiveComposer().GetSelectPreparedStatementCtx(),
1100-
nil,
1101-
)
1102-
} else {
1103-
bldrInput.SetIsAwait(true)
1104-
bldrInput.SetIsReturning(true)
1105-
bldrInput.SetInsertCtx(primitiveGenerator.GetPrimitiveComposer().GetInsertPreparedStatementCtx())
1106-
lhsBldr := primitivebuilder.NewInsertOrUpdate(
1107-
bldrInput,
1108-
)
1109-
newBldrInput := builder_input.NewBuilderInput(
1110-
pgb.planGraphHolder,
1111-
handlerCtx,
1112-
tbl,
1113-
)
1114-
newBldrInput.SetParserNode(node)
1115-
newBldrInput.SetAnnotatedAST(pbi.GetAnnotatedAST())
1116-
newBldrInput.SetTxnCtrlCtrs(pbi.GetTxnCtrlCtrs())
1117-
newBldrInput.SetTableInsertionContainer(rc)
1118-
newBldrInput.SetDependencyNode(selectPrimitiveNode)
1119-
newBldrInput.SetIsAwait(isAwait)
1120-
rhsBldr := primitivebuilder.NewSingleSelect(
1121-
pgb.planGraphHolder, handlerCtx, primitiveGenerator.GetPrimitiveComposer().GetSelectPreparedStatementCtx(),
1122-
[]tableinsertioncontainer.TableInsertionContainer{rc},
1123-
nil,
1124-
streaming.NewNopMapStream(),
1125-
)
1126-
bldr = primitivebuilder.NewDependencySubDAGBuilder(
1127-
pgb.planGraphHolder,
1128-
[]primitivebuilder.Builder{lhsBldr},
1129-
rhsBldr,
1130-
)
1131-
}
1132-
} else {
1133-
bldr = primitivebuilder.NewInsertOrUpdate(
1134-
bldrInput,
1135-
)
1136-
}
1137-
err = bldr.Build()
1138-
if err != nil {
1139-
return err
1140-
}
1141-
return nil
1111+
return pgb.handleMutationOperation(
1112+
handlerCtx,
1113+
pbi,
1114+
primitiveGenerator,
1115+
node,
1116+
tbl,
1117+
selectPrimitiveNode,
1118+
bldrInput,
1119+
isAwait,
1120+
)
11421121
}
11431122
pr := primitive.NewGenericPrimitive(nil, nil, nil, primitive_context.NewPrimitiveContext())
11441123
pgb.planGraphHolder.CreatePrimitiveNode(pr)

internal/stackql/primitivegenerator/statement_analyzer.go

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -975,7 +975,7 @@ func (pb *standardPrimitiveGenerator) buildRequestContext(
975975
}
976976

977977
func (pb *standardPrimitiveGenerator) buildRequestContextFromMapArray(
978-
node sqlparser.SQLNode,
978+
node sqlparser.SQLNode, //nolint:revive,unparam // TODO: review
979979
meta tablemetadata.ExtendedTableMetadata,
980980
execContext anysdk.ExecContext,
981981
paramMapArray map[int]map[string]interface{},
@@ -1173,6 +1173,7 @@ func (pb *standardPrimitiveGenerator) AnalyzeInsert(pbi planbuilderinput.PlanBui
11731173
return nil
11741174
}
11751175

1176+
//nolint:funlen,gocognit // TODO: refactor
11761177
func (pb *standardPrimitiveGenerator) AnalyzeUpdate(pbi planbuilderinput.PlanBuilderInput) error {
11771178
handlerCtx := pbi.GetHandlerCtx()
11781179
node, ok := pbi.GetUpdate()
@@ -1195,19 +1196,6 @@ func (pb *standardPrimitiveGenerator) AnalyzeUpdate(pbi planbuilderinput.PlanBui
11951196
return nil
11961197
}
11971198

1198-
// prov, err := tbl.GetProvider()
1199-
// if err != nil {
1200-
// return err
1201-
// }
1202-
// currentService, err := tbl.GetServiceStr()
1203-
// if err != nil {
1204-
// return err
1205-
// }
1206-
// currentResource, err := tbl.GetResourceStr()
1207-
// if err != nil {
1208-
// return err
1209-
// }
1210-
12111199
pb.parseComments(node.Comments)
12121200

12131201
method, err := tbl.GetMethod()

test/robot/functional/stackql_mocked_from_cmd_line.robot

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8676,7 +8676,9 @@ Update Async Returning Simple Projection
86768676
... |-----------|-----------------|
86778677
... |${SPACE}INGRESS${SPACE}${SPACE}${SPACE}|${SPACE}My${SPACE}test${SPACE}fw${SPACE}rule${SPACE}|
86788678
... |-----------|-----------------|
8679-
${inputStr} = Set Variable If "${SQL_BACKEND}" == "postgres_tcp" ${inputStrPostgres} ${inputStrSQLite}
8679+
${stdErrStr} = Catenate SEPARATOR=\n
8680+
... compute#operation: insert in progress, 10 seconds elapsed
8681+
... compute#operation: insert complete
86808682
Should Stackql Exec Inline Equal Both Streams
86818683
... ${STACKQL_EXE}
86828684
... ${OKTA_SECRET_STR}
@@ -8687,7 +8689,7 @@ Update Async Returning Simple Projection
86878689
... ${SQL_BACKEND_CFG_STR_CANONICAL}
86888690
... ${inputStr}
86898691
... ${outputStr}
8690-
... ${EMPTY}
8692+
... ${stdErrStr}
86918693
... stdout=${CURDIR}/tmp/Update-Async-Returning-Simple-Projection.tmp
86928694
... stderr=${CURDIR}/tmp/Update-Async-Returning-Simple-Projection-stderr.tmp
86938695

@@ -8704,9 +8706,6 @@ Replace Returning Simple Projection
87048706
... |----------------------|---------|
87058707
... |${SPACE}[email protected]${SPACE}|${SPACE}RUNNING${SPACE}|
87068708
... |----------------------|---------|
8707-
${stdErrStr} = Catenate SEPARATOR=\n
8708-
... compute#operation: insert in progress, 10 seconds elapsed
8709-
... compute#operation: insert complete
87108709
Should Stackql Exec Inline Equal Both Streams
87118710
... ${STACKQL_EXE}
87128711
... ${OKTA_SECRET_STR}
@@ -8717,7 +8716,7 @@ Replace Returning Simple Projection
87178716
... ${SQL_BACKEND_CFG_STR_CANONICAL}
87188717
... ${inputStr}
87198718
... ${outputStr}
8720-
... ${stdErrStr}
8719+
... ${EMPTY}
87218720
... stdout=${CURDIR}/tmp/Replace-Returning-Simple-Projection.tmp
87228721
... stderr=${CURDIR}/tmp/Replace-Returning-Simple-Projection-stderr.tmp
87238722

@@ -8734,7 +8733,9 @@ Replace Async Returning Simple Projection
87348733
... |-----------|-----------------|
87358734
... |${SPACE}INGRESS${SPACE}${SPACE}${SPACE}|${SPACE}My${SPACE}test${SPACE}fw${SPACE}rule${SPACE}|
87368735
... |-----------|-----------------|
8737-
${inputStr} = Set Variable If "${SQL_BACKEND}" == "postgres_tcp" ${inputStrPostgres} ${inputStrSQLite}
8736+
${stdErrStr} = Catenate SEPARATOR=\n
8737+
... compute#operation: insert in progress, 10 seconds elapsed
8738+
... compute#operation: insert complete
87388739
Should Stackql Exec Inline Equal Both Streams
87398740
... ${STACKQL_EXE}
87408741
... ${OKTA_SECRET_STR}
@@ -8745,6 +8746,6 @@ Replace Async Returning Simple Projection
87458746
... ${SQL_BACKEND_CFG_STR_CANONICAL}
87468747
... ${inputStr}
87478748
... ${outputStr}
8748-
... ${EMPTY}
8749+
... ${stdErrStr}
87498750
... stdout=${CURDIR}/tmp/Replace-Async-Returning-Simple-Projection.tmp
87508751
... stderr=${CURDIR}/tmp/Replace-Async-Returning-Simple-Projection-stderr.tmp

0 commit comments

Comments
 (0)