Skip to content

Commit 795053b

Browse files
committed
Refactor to use actionwait
1 parent cf3cfe7 commit 795053b

File tree

1 file changed

+52
-85
lines changed

1 file changed

+52
-85
lines changed

internal/service/transcribe/start_transcription_job_action.go

Lines changed: 52 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/hashicorp/terraform-plugin-framework/action/schema"
1616
"github.com/hashicorp/terraform-plugin-framework/types"
1717
"github.com/hashicorp/terraform-plugin-log/tflog"
18+
"github.com/hashicorp/terraform-provider-aws/internal/actionwait"
1819
"github.com/hashicorp/terraform-provider-aws/internal/framework"
1920
fwtypes "github.com/hashicorp/terraform-provider-aws/internal/framework/types"
2021
"github.com/hashicorp/terraform-provider-aws/names"
@@ -207,102 +208,68 @@ func (a *startTranscriptionJobAction) Invoke(ctx context.Context, req action.Inv
207208
return
208209
}
209210

210-
// Wait for job to be in progress or completed
211-
deadline := time.Now().Add(timeout)
212-
pollInterval := 5 * time.Second
213-
progressInterval := 30 * time.Second
214-
lastProgressUpdate := time.Now()
215-
216-
for {
217-
// Check context cancellation
218-
select {
219-
case <-ctx.Done():
220-
resp.Diagnostics.AddError(
221-
"Context Cancelled",
222-
"Transcription job start operation was cancelled",
223-
)
224-
return
225-
default:
211+
// Wait for job to move beyond QUEUED: treat IN_PROGRESS or COMPLETED as success, FAILED as failure, QUEUED transitional.
212+
fr, err := actionwait.WaitForStatus(ctx, func(ctx context.Context) (actionwait.FetchResult[*awstypes.TranscriptionJob], error) {
213+
getOutput, gerr := conn.GetTranscriptionJob(ctx, &transcribe.GetTranscriptionJobInput{TranscriptionJobName: aws.String(transcriptionJobName)})
214+
if gerr != nil {
215+
return actionwait.FetchResult[*awstypes.TranscriptionJob]{}, fmt.Errorf("get transcription job: %w", gerr)
226216
}
227-
228-
// Check timeout
229-
if time.Now().After(deadline) {
230-
resp.Diagnostics.AddError(
231-
"Timeout Waiting for Transcription Job",
232-
fmt.Sprintf("Transcription job %s did not start within %v", transcriptionJobName, timeout),
233-
)
234-
return
235-
}
236-
237-
// Get job status
238-
getInput := &transcribe.GetTranscriptionJobInput{
239-
TranscriptionJobName: aws.String(transcriptionJobName),
217+
if getOutput.TranscriptionJob == nil {
218+
return actionwait.FetchResult[*awstypes.TranscriptionJob]{}, fmt.Errorf("transcription job %s not found", transcriptionJobName)
240219
}
241-
242-
getOutput, err := conn.GetTranscriptionJob(ctx, getInput)
243-
if err != nil {
220+
status := getOutput.TranscriptionJob.TranscriptionJobStatus
221+
return actionwait.FetchResult[*awstypes.TranscriptionJob]{Status: actionwait.Status(status), Value: getOutput.TranscriptionJob}, nil
222+
}, actionwait.Options[*awstypes.TranscriptionJob]{
223+
Timeout: timeout,
224+
Interval: actionwait.FixedInterval(5 * time.Second),
225+
ProgressInterval: 30 * time.Second,
226+
SuccessStates: []actionwait.Status{
227+
actionwait.Status(awstypes.TranscriptionJobStatusInProgress),
228+
actionwait.Status(awstypes.TranscriptionJobStatusCompleted),
229+
},
230+
TransitionalStates: []actionwait.Status{
231+
actionwait.Status(awstypes.TranscriptionJobStatusQueued),
232+
},
233+
FailureStates: []actionwait.Status{
234+
actionwait.Status(awstypes.TranscriptionJobStatusFailed),
235+
},
236+
ProgressSink: func(fr actionwait.FetchResult[any], meta actionwait.ProgressMeta) {
237+
resp.SendProgress(action.InvokeProgressEvent{Message: fmt.Sprintf("Transcription job %s is currently %s", transcriptionJobName, fr.Status)})
238+
},
239+
})
240+
if err != nil {
241+
switch e := err.(type) {
242+
case *actionwait.ErrTimeout:
244243
resp.Diagnostics.AddError(
245-
"Failed to Get Transcription Job Status",
246-
fmt.Sprintf("Could not get status for transcription job %s: %s", transcriptionJobName, err),
244+
"Timeout Waiting for Transcription Job",
245+
fmt.Sprintf("Transcription job %s did not reach a running state within %v", transcriptionJobName, timeout),
247246
)
248-
return
249-
}
250-
251-
if getOutput.TranscriptionJob == nil {
247+
case *actionwait.ErrFailureState:
252248
resp.Diagnostics.AddError(
253-
"Transcription Job Not Found",
254-
fmt.Sprintf("Transcription job %s was not found", transcriptionJobName),
249+
"Transcription Job Failed",
250+
fmt.Sprintf("Transcription job %s failed: %s", transcriptionJobName, e.Status),
255251
)
256-
return
257-
}
258-
259-
status := getOutput.TranscriptionJob.TranscriptionJobStatus
260-
261-
// Send progress updates periodically
262-
if time.Since(lastProgressUpdate) >= progressInterval {
263-
resp.SendProgress(action.InvokeProgressEvent{
264-
Message: fmt.Sprintf("Transcription job %s is currently %s", transcriptionJobName, string(status)),
265-
})
266-
lastProgressUpdate = time.Now()
267-
}
268-
269-
// Check if job has started successfully
270-
switch status {
271-
case awstypes.TranscriptionJobStatusInProgress, awstypes.TranscriptionJobStatusCompleted:
272-
// Job has started successfully
273-
resp.SendProgress(action.InvokeProgressEvent{
274-
Message: fmt.Sprintf("Transcription job %s started successfully and is %s", transcriptionJobName, string(status)),
275-
})
276-
277-
tflog.Info(ctx, "Transcription job started successfully", map[string]any{
278-
"transcription_job_name": transcriptionJobName,
279-
"job_status": string(status),
280-
names.AttrCreationTime: getOutput.TranscriptionJob.CreationTime,
281-
})
282-
return
283-
284-
case awstypes.TranscriptionJobStatusFailed:
285-
failureReason := ""
286-
if getOutput.TranscriptionJob.FailureReason != nil {
287-
failureReason = aws.ToString(getOutput.TranscriptionJob.FailureReason)
288-
}
252+
case *actionwait.ErrUnexpectedState:
289253
resp.Diagnostics.AddError(
290-
"Transcription Job Failed",
291-
fmt.Sprintf("Transcription job %s failed: %s", transcriptionJobName, failureReason),
254+
"Unexpected Transcription Job Status",
255+
fmt.Sprintf("Transcription job %s entered unexpected status: %s", transcriptionJobName, e.Status),
292256
)
293-
return
294-
295-
case awstypes.TranscriptionJobStatusQueued:
296-
// Job is still queued, continue waiting
297-
time.Sleep(pollInterval)
298-
continue
299-
300257
default:
301258
resp.Diagnostics.AddError(
302-
"Unexpected Transcription Job Status",
303-
fmt.Sprintf("Transcription job %s has unexpected status: %s", transcriptionJobName, string(status)),
259+
"Error Waiting for Transcription Job",
260+
fmt.Sprintf("Error while waiting for transcription job %s: %s", transcriptionJobName, err),
304261
)
305-
return
306262
}
263+
return
264+
}
265+
266+
resp.SendProgress(action.InvokeProgressEvent{Message: fmt.Sprintf("Transcription job %s started successfully and is %s", transcriptionJobName, fr.Status)})
267+
logFields := map[string]any{
268+
"transcription_job_name": transcriptionJobName,
269+
"job_status": fr.Status,
270+
}
271+
if fr.Value != nil {
272+
logFields[names.AttrCreationTime] = fr.Value.CreationTime
307273
}
274+
tflog.Info(ctx, "Transcription job started successfully", logFields)
308275
}

0 commit comments

Comments
 (0)