Skip to content

Commit d1dab20

Browse files
authored
[APIE-720,APIE-724] Flink Shell: don't fetch statement results if the statement phase is failed (#3229)
1 parent 6bebc69 commit d1dab20

File tree

4 files changed

+41
-3
lines changed

4 files changed

+41
-3
lines changed

pkg/cmd/authenticated_cli_command.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func (c *AuthenticatedCLICommand) getGatewayUrlForComputePool(access, id string)
120120
return privateURL, nil
121121
}
122122
if access == "" {
123-
output.Printf(c.Config.EnableColor, "No Flink endpoint is specified, defaulting to public endpoint: `%s`\n", publicURL)
123+
output.ErrPrintf(c.Config.EnableColor, "No Flink endpoint is specified, defaulting to public endpoint: `%s`\n", publicURL)
124124
}
125125
return publicURL, nil
126126
}
@@ -146,7 +146,7 @@ func (c *AuthenticatedCLICommand) getGatewayUrlForRegion(accessType, provider, r
146146
return "", errors.NewErrorWithSuggestions("invalid region", "Please select a valid region - use `confluent flink region list` to see available regions")
147147
}
148148
if accessType == "" {
149-
output.Printf(c.Config.EnableColor, "No Flink endpoint is specified, defaulting to public endpoint: `%s`\n", hostUrl)
149+
output.ErrPrintf(c.Config.EnableColor, "No Flink endpoint is specified, defaulting to public endpoint: `%s`\n", hostUrl)
150150
}
151151

152152
u, err := purl.Parse(hostUrl)

pkg/flink/internal/results/result_fetcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func (t *ResultFetcher) updateState(newResults *types.ProcessedStatement, err *t
120120
return
121121
}
122122

123-
if err != nil {
123+
if err != nil || (newResults != nil && newResults.Status == types.FAILED) {
124124
t.refreshState.setState(types.Failed)
125125
return
126126
}

pkg/flink/internal/store/store.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,12 @@ func (s *Store) FetchStatementResults(statement types.ProcessedStatement) (*type
146146
return &statement, nil
147147
}
148148

149+
statementObj, err := s.authenticatedGatewayClient().GetStatement(s.appOptions.GetEnvironmentId(), statement.StatementName, s.appOptions.GetOrganizationId())
150+
if err != nil {
151+
return nil, &types.StatementError{Message: err.Error()}
152+
}
153+
statement.Status = types.PHASE(statementObj.Status.GetPhase())
154+
149155
// Process remote statements that are now running or completed
150156
statementResultObj, err := s.authenticatedGatewayClient().GetStatementResults(s.appOptions.GetEnvironmentId(), statement.StatementName, s.appOptions.GetOrganizationId(), statement.PageToken)
151157
if err != nil {

pkg/flink/internal/store/store_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,6 +1268,14 @@ func (s *StoreTestSuite) TestFetchResultsNoRetryWithCompletedStatement() {
12681268
userProperties := NewUserProperties(&appOptions)
12691269
store := NewStore(client, mockAppController.ExitApplication, userProperties, &appOptions, tokenRefreshFunc)
12701270

1271+
statementObj := flinkgatewayv1.SqlV1Statement{
1272+
Name: flinkgatewayv1.PtrString(testStatementName),
1273+
Status: &flinkgatewayv1.SqlV1StatementStatus{
1274+
Phase: "COMPLETED",
1275+
},
1276+
}
1277+
client.EXPECT().GetStatement("envId", statement.StatementName, "orgId").Return(statementObj, nil)
1278+
12711279
statementResultObj := flinkgatewayv1.SqlV1StatementResult{
12721280
Metadata: flinkgatewayv1.ResultListMeta{},
12731281
Results: &flinkgatewayv1.SqlV1StatementResultResults{},
@@ -1319,6 +1327,14 @@ func (s *StoreTestSuite) TestFetchResultsWithRunningStatement() {
13191327
userProperties := NewUserProperties(&appOptions)
13201328
store := NewStore(client, mockAppController.ExitApplication, userProperties, &appOptions, tokenRefreshFunc)
13211329

1330+
statementObj := flinkgatewayv1.SqlV1Statement{
1331+
Name: flinkgatewayv1.PtrString(testStatementName),
1332+
Status: &flinkgatewayv1.SqlV1StatementStatus{
1333+
Phase: "RUNNING",
1334+
},
1335+
}
1336+
client.EXPECT().GetStatement("envId", statement.StatementName, "orgId").Return(statementObj, nil)
1337+
13221338
statementResultObj := flinkgatewayv1.SqlV1StatementResult{
13231339
Metadata: flinkgatewayv1.ResultListMeta{},
13241340
Results: &flinkgatewayv1.SqlV1StatementResultResults{},
@@ -1374,6 +1390,14 @@ func (s *StoreTestSuite) TestFetchResultsNoRetryWhenPageTokenExists() {
13741390
userProperties := NewUserProperties(&appOptions)
13751391
store := NewStore(client, mockAppController.ExitApplication, userProperties, &appOptions, tokenRefreshFunc)
13761392

1393+
statementObj := flinkgatewayv1.SqlV1Statement{
1394+
Name: flinkgatewayv1.PtrString(testStatementName),
1395+
Status: &flinkgatewayv1.SqlV1StatementStatus{
1396+
Phase: "RUNNING",
1397+
},
1398+
}
1399+
client.EXPECT().GetStatement("envId", statement.StatementName, "orgId").Return(statementObj, nil)
1400+
13771401
nextPage := "https://devel.cpdev.cloud/some/results?page_token=eyJWZX"
13781402
statementResultObj := flinkgatewayv1.SqlV1StatementResult{
13791403
Metadata: flinkgatewayv1.ResultListMeta{Next: &nextPage},
@@ -1441,6 +1465,14 @@ func (s *StoreTestSuite) TestFetchResultsNoRetryWhenResultsExist() {
14411465
userProperties := NewUserProperties(&appOptions)
14421466
store := NewStore(client, mockAppController.ExitApplication, userProperties, &appOptions, tokenRefreshFunc)
14431467

1468+
statementObj := flinkgatewayv1.SqlV1Statement{
1469+
Name: flinkgatewayv1.PtrString(testStatementName),
1470+
Status: &flinkgatewayv1.SqlV1StatementStatus{
1471+
Phase: "RUNNING",
1472+
},
1473+
}
1474+
client.EXPECT().GetStatement("envId", statement.StatementName, "orgId").Return(statementObj, nil)
1475+
14441476
statementResultObj := flinkgatewayv1.SqlV1StatementResult{
14451477
Metadata: flinkgatewayv1.ResultListMeta{},
14461478
Results: &flinkgatewayv1.SqlV1StatementResultResults{Data: &[]any{map[string]any{"op": 0}}},

0 commit comments

Comments
 (0)