Skip to content

Commit ae97d02

Browse files
authored
Stop retrying get-workflow-history with an impossibly-short timeout (#1171)
* Stop retrying get-workflow-history with an impossibly-short timeout Previously, when a `GetWorkflow(...).Get(...)` call timed out while waiting for the workflow to complete, this happened: - N-1 requests of the default long-poll timeout occurred, and it correctly retried - The final request would have something like 5 seconds left, so it performs that call - The server gives up the request *slightly before* that timeout so it can return the next page token for a future request to use (as happened in the N-1 earlier requests) - Since no history was received and there's still time left in the context (~50ms internally), another impossibly-short request was sent - This request fails immediately with a "insufficient time for long poll request" error. - *This bad-request error* is what is returned from `Get(...)` Which is pretty clearly sub-optimal. Both because we sent a request that is almost guaranteed to fail, and because the error returned to the caller is fairly generic looking / doesn't describe what happened. What happened is that we ran out of time. So this now returns a DeadlineExceeded error, like any other timeout, and does not cause that final request to occur.
1 parent 2f29329 commit ae97d02

File tree

2 files changed

+75
-0
lines changed

2 files changed

+75
-0
lines changed

internal/internal_workflow_client.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,12 +512,18 @@ func (wc *workflowClient) GetWorkflowHistory(
512512
var err error
513513
Loop:
514514
for {
515+
var isFinalLongPoll bool
515516
err = backoff.Retry(ctx,
516517
func() error {
517518
var err1 error
518519
tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags, func(builder *contextBuilder) {
519520
if isLongPoll {
520521
builder.Timeout = defaultGetHistoryTimeoutInSecs * time.Second
522+
deadline, ok := ctx.Deadline()
523+
if ok && deadline.Before(time.Now().Add(builder.Timeout)) {
524+
// insufficient time for another poll, so this needs to be the last attempt
525+
isFinalLongPoll = true
526+
}
521527
}
522528
})
523529
defer cancel()
@@ -546,6 +552,13 @@ func (wc *workflowClient) GetWorkflowHistory(
546552
return nil, err
547553
}
548554
if isLongPoll && len(response.History.Events) == 0 && len(response.NextPageToken) != 0 {
555+
if isFinalLongPoll {
556+
// essentially a deadline exceeded, the last attempt did not get a result.
557+
// this is necessary because the server does not know if we are able to try again,
558+
// so it returns an empty result slightly before a timeout occurs, so the next
559+
// attempt's token can be returned if it wishes to retry.
560+
return nil, fmt.Errorf("timed out waiting for the workflow to finish: %w", context.DeadlineExceeded)
561+
}
549562
request.NextPageToken = response.NextPageToken
550563
continue Loop
551564
}

internal/internal_workflow_client_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"time"
3131

3232
"go.uber.org/cadence/internal/common/serializer"
33+
"go.uber.org/yarpc"
3334

3435
"go.uber.org/cadence/.gen/go/cadence/workflowservicetest"
3536
"go.uber.org/cadence/.gen/go/shared"
@@ -293,6 +294,67 @@ func (s *historyEventIteratorSuite) TestIterator_Error() {
293294
s.NotNil(err)
294295
}
295296

297+
func (s *historyEventIteratorSuite) TestIterator_StopsTryingNearTimeout() {
298+
// ensuring "when GetWorkflow().Get(...) times out while waiting", we return a timed-out error of some kind,
299+
// and stop sending requests rather than trying again and getting some other kind of error.
300+
// historically this led to a "bad request", insufficient time left for long poll, which was confusing and noisy.
301+
302+
filterType := shared.HistoryEventFilterTypeCloseEvent
303+
reqNormal := getGetWorkflowExecutionHistoryRequest(filterType)
304+
reqFinal := getGetWorkflowExecutionHistoryRequest(filterType)
305+
306+
// all items filtered out for both requests
307+
resEmpty := &shared.GetWorkflowExecutionHistoryResponse{
308+
History: &shared.History{Events: nil}, // this or RawHistory must be non-nil, but they can be empty
309+
NextPageToken: []byte{1, 2, 3, 4, 5},
310+
}
311+
reqFinal.NextPageToken = resEmpty.NextPageToken
312+
313+
s.True(time.Second < (defaultGetHistoryTimeoutInSecs*time.Second), "sanity check: default timeout must be longer than how long we extend the timeout for tests")
314+
315+
// begin with a deadline that is long enough to allow 2 requests
316+
d := time.Now().Add((defaultGetHistoryTimeoutInSecs * time.Second) + time.Second)
317+
baseCtx, cancel := context.WithDeadline(context.Background(), d)
318+
defer cancel()
319+
ctx := &fakeDeadlineContext{baseCtx, d}
320+
321+
// prep the iterator
322+
iter := s.wfClient.GetWorkflowHistory(ctx, workflowID, runID, true, filterType)
323+
324+
// first attempt should occur, and trigger a second request with less than the requested timeout
325+
s.workflowServiceClient.EXPECT().GetWorkflowExecutionHistory(gomock.Any(), reqNormal, gomock.Any()).DoAndReturn(func(_ context.Context, _ *shared.GetWorkflowExecutionHistoryRequest, _ ...yarpc.CallOption) (*shared.GetWorkflowExecutionHistoryResponse, error) {
326+
// first request is being sent, modify the context to simulate time passing,
327+
// and give the second request insufficient time to trigger another poll.
328+
//
329+
// without this, you should see an attempt at a third call, as the context is not canceled,
330+
// and this mock took no time to respond.
331+
ctx.d = time.Now().Add(time.Second)
332+
return resEmpty, nil
333+
}).Times(1)
334+
// second request should occur, but not another
335+
s.workflowServiceClient.EXPECT().GetWorkflowExecutionHistory(gomock.Any(), reqFinal, gomock.Any()).Return(resEmpty, nil).Times(1)
336+
337+
// trigger both paginated requests as part of the single HasNext call
338+
s.True(iter.HasNext())
339+
event, err := iter.Next()
340+
s.Nil(event, "iterator should not have returned any events")
341+
s.Error(err, "iterator should have errored")
342+
// canceled may also be appropriate, but currently this is true
343+
s.Truef(errors.Is(err, context.DeadlineExceeded), "iterator should have returned a deadline-exceeded error, but returned a: %#v", err)
344+
s.Contains(err.Error(), "waiting for the workflow to finish", "should be descriptive of what happened")
345+
}
346+
347+
// minor helper type to allow faking deadlines between calls, as we cannot normally modify a context that way.
348+
type fakeDeadlineContext struct {
349+
context.Context
350+
351+
d time.Time
352+
}
353+
354+
func (f *fakeDeadlineContext) Deadline() (time.Time, bool) {
355+
return f.d, true
356+
}
357+
296358
// workflowRunSuite
297359

298360
type (

0 commit comments

Comments
 (0)