Skip to content

Commit a9aa871

Browse files
committed
forward: add retry logic
1 parent 0d5abdd commit a9aa871

File tree

1 file changed

+49
-14
lines changed

1 file changed

+49
-14
lines changed

cmd/forward/main.go

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"path/filepath"
1010
"pbench/log"
1111
"pbench/presto"
12+
"pbench/presto/query_json"
1213
"pbench/utils"
1314
"regexp"
1415
"sync"
@@ -36,6 +37,16 @@ var (
3637
forwarded atomic.Uint32
3738
)
3839

40+
const maxRetry = 10
41+
42+
func waitForNextPoll(ctx context.Context) {
43+
timer := time.NewTimer(PollInterval)
44+
select {
45+
case <-ctx.Done():
46+
case <-timer.C:
47+
}
48+
}
49+
3950
func Run(_ *cobra.Command, _ []string) {
4051
OutputPath = filepath.Join(OutputPath, RunName)
4152
utils.PrepareOutputDirectory(OutputPath)
@@ -102,19 +113,29 @@ func Run(_ *cobra.Command, _ []string) {
102113

103114
sourceClient := clients[0]
104115
trueValue := true
116+
// We do not need query text from the queryState because we will need to query the detailed info for session params anyway.
117+
queryTextSizeLimit := 1
105118
// lastQueryStateCheckCutoffTime is the query create time of the most recent query in the previous batch.
106119
// We only look at queries created later than this timestamp in the following batch.
107120
lastQueryStateCheckCutoffTime := time.Time{}
108121
// GetQueryState API will always return the full query history for all queries that have not yet been expired by the
109122
// Presto server. This can be a huge list. We do not want to do forwarding for queries that were already submitted
110123
// before this program started. So we need to skip the forwarding for the API result we got in the first batch.
111124
firstBatch := true
112-
// Keep running until the source cluster becomes unavailable or the user interrupts or quits using Ctrl + C or Ctrl + D.
113-
for ctx.Err() == nil {
114-
states, _, err := sourceClient.GetQueryState(ctx, &presto.GetQueryStatsOptions{IncludeAllQueries: &trueValue})
125+
// Keep running until user interrupts or quits using Ctrl + C or Ctrl + D.
126+
// When the cluster is unavailable to return the running queries, wait and retry for at most 10 times before quitting.
127+
for attempt := 1; ctx.Err() == nil && attempt <= maxRetry; {
128+
states, _, err := sourceClient.GetQueryState(ctx, &presto.GetQueryStatsOptions{
129+
IncludeAllQueries: &trueValue,
130+
QueryTextSizeLimit: &queryTextSizeLimit,
131+
})
115132
if err != nil {
116-
log.Error().Err(err).Msgf("failed to get query states")
117-
break
133+
log.Error().Err(err).Msgf("failed to get query states, attempt %d/%d", attempt, maxRetry)
134+
attempt++
135+
waitForNextPoll(ctx)
136+
continue
137+
} else {
138+
attempt = 1
118139
}
119140
// GetQueryState API does not return results (queries) in chronological order. Therefore, we cannot update
120141
// lastQueryStateCheckCutoffTime directly because we may update it to be too recent so some queries we do need
@@ -136,11 +157,12 @@ func Run(_ *cobra.Command, _ []string) {
136157
}
137158
firstBatch = false
138159
lastQueryStateCheckCutoffTime = newCutoffTime
139-
timer := time.NewTimer(PollInterval)
140-
select {
141-
case <-ctx.Done():
142-
case <-timer.C:
143-
}
160+
waitForNextPoll(ctx)
161+
}
162+
if ctx.Err() == nil {
163+
// We exited the loop because the source server is not able to return the query states after maxRetry attempts.
164+
// Not because of user interruption.
165+
log.Error().Msgf("failed to get query state info from the source cluster after %d attempts", maxRetry)
144166
}
145167
runningTasks.Wait()
146168
// This causes the signal handler to exit.
@@ -151,10 +173,23 @@ func Run(_ *cobra.Command, _ []string) {
151173

152174
func forwardQuery(ctx context.Context, queryState *presto.QueryStateInfo, clients []*presto.Client) {
153175
defer runningTasks.Done()
154-
queryInfo, _, queryInfoErr := clients[0].GetQueryInfo(ctx, queryState.QueryId, false, nil)
155-
if queryInfoErr != nil {
156-
log.Error().Str("source_query_id", queryState.QueryId).Err(queryInfoErr).
157-
Msg("failed to get query info for forwarding")
176+
var (
177+
queryInfo *query_json.QueryInfo
178+
queryInfoErr error
179+
)
180+
for attempt := 1; attempt <= maxRetry; attempt++ {
181+
queryInfo, _, queryInfoErr = clients[0].GetQueryInfo(ctx, queryState.QueryId, false, nil)
182+
if queryInfoErr != nil {
183+
log.Error().Str("source_query_id", queryState.QueryId).Err(queryInfoErr).
184+
Msgf("failed to get query info for forwarding, attempt %d/%d", attempt, maxRetry)
185+
waitForNextPoll(ctx)
186+
} else {
187+
break
188+
}
189+
}
190+
if queryInfo == nil {
191+
log.Error().Str("source_query_id", queryState.QueryId).
192+
Msgf("cannot get query info for forwarding after %d retries, skipping", maxRetry)
158193
failedToForward.Add(1)
159194
return
160195
}

0 commit comments

Comments
 (0)