Skip to content

Commit a35db22

Browse files
insert-returning-asynchronous
Summary: - Support for asynchronous `INSERT RETURNINNG` semantics.
1 parent 18933ee commit a35db22

File tree

1 file changed

+68
-2
lines changed

1 file changed

+68
-2
lines changed

internal/stackql/execution/mono_valent_execution.go

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1444,6 +1444,32 @@ func (mv *monoValentExecution) GetExecutor() (func(pc primitive.IPrimitiveCtx) i
14441444
return ex, nil
14451445
}
14461446

1447+
func shimProcessHTTP(
1448+
url string,
1449+
rtCtx dto.RuntimeCtx,
1450+
authCtx *dto.AuthCtx,
1451+
provider anysdk.Provider,
1452+
m anysdk.OperationStore,
1453+
outErrFile io.Writer,
1454+
) (*http.Response, error) {
1455+
req, monitorReqErr := anysdk.GetMonitorRequest(url)
1456+
if monitorReqErr != nil {
1457+
return nil, monitorReqErr
1458+
}
1459+
cc := anysdk.NewAnySdkClientConfigurator(rtCtx, provider.GetName())
1460+
anySdkResponse, apiErr := anysdk.CallFromSignature(
1461+
cc, rtCtx, authCtx, authCtx.Type, false, outErrFile, provider, anysdk.NewAnySdkOpStoreDesignation(m), req)
1462+
1463+
if apiErr != nil {
1464+
return nil, apiErr
1465+
}
1466+
httpResponse, httpResponseErr := anySdkResponse.GetHttpResponse()
1467+
if httpResponseErr != nil {
1468+
return nil, httpResponseErr
1469+
}
1470+
return httpResponse, nil
1471+
}
1472+
14471473
//nolint:funlen,gocognit // acceptable for now
14481474
func GetMonitorExecutor(
14491475
handlerCtx handler.HandlerContext,
@@ -1498,8 +1524,49 @@ func GetMonitorExecutor(
14981524

14991525
operationDescriptor := getOpDescriptor(body)
15001526
endTime, endTimeOk := body["endTime"]
1527+
prStr := provider.GetName()
15011528
if endTimeOk && endTime != "" {
1502-
return prepareResultSet(&asyncPrim, pc, body, operationDescriptor)
1529+
targetLink, targetLinkOK := body["targetLink"]
1530+
if targetLinkOK {
1531+
authCtx, authErr := pc.GetAuthContext(prStr)
1532+
if authErr != nil {
1533+
return internaldto.NewExecutorOutput(nil, nil, nil, nil, authErr)
1534+
}
1535+
if authCtx == nil {
1536+
return internaldto.NewExecutorOutput(nil, nil, nil, nil, fmt.Errorf("cannot execute monitor: no auth context"))
1537+
}
1538+
targetLinkStr, targetLinkStrOk := targetLink.(string)
1539+
if !targetLinkStrOk {
1540+
return internaldto.NewExecutorOutput(
1541+
nil,
1542+
nil,
1543+
nil,
1544+
nil,
1545+
fmt.Errorf("cannot execute monitor: 'targetLink' is not a string"),
1546+
)
1547+
}
1548+
httpResponse, httpResponseErr := shimProcessHTTP(
1549+
targetLinkStr,
1550+
rtCtx,
1551+
authCtx,
1552+
provider,
1553+
m,
1554+
outErrFile,
1555+
)
1556+
if httpResponseErr != nil {
1557+
return internaldto.NewExecutorOutput(nil, nil, nil, nil, httpResponseErr)
1558+
}
1559+
1560+
if httpResponse != nil && httpResponse.Body != nil {
1561+
defer httpResponse.Body.Close()
1562+
}
1563+
target, targetErr := m.DeprecatedProcessResponse(httpResponse)
1564+
handlerCtx.LogHTTPResponseMap(target)
1565+
if targetErr != nil {
1566+
return internaldto.NewExecutorOutput(nil, nil, nil, nil, targetErr)
1567+
}
1568+
return prepareResultSet(&asyncPrim, pc, target, operationDescriptor)
1569+
}
15031570
}
15041571
url, ok := body["selfLink"]
15051572
if !ok {
@@ -1511,7 +1578,6 @@ func GetMonitorExecutor(
15111578
fmt.Errorf("cannot execute monitor: no 'selfLink' property present"),
15121579
)
15131580
}
1514-
prStr := provider.GetName()
15151581
authCtx, authErr := pc.GetAuthContext(prStr)
15161582
if authErr != nil {
15171583
return internaldto.NewExecutorOutput(nil, nil, nil, nil, authErr)

0 commit comments

Comments
 (0)