@@ -55,7 +55,7 @@ func (m *Manager) FetchJobMessagesFirstPage(ctx context.Context, job IAsynchrono
5555 return
5656}
5757
58- func ( m * Manager ) waitForJobToStart ( ctx context.Context , logger messages.IMessageLogger , job IAsynchronousJob ) (err error ) {
58+ func waitForJobState ( ctx context.Context , logger messages.IMessageLogger , job IAsynchronousJob , jobState string , checkStateFunc func (context. Context , IAsynchronousJob ) ( bool , error ) ) (err error ) {
5959 err = parallelisation .DetermineContextError (ctx )
6060 if err != nil {
6161 return
@@ -65,20 +65,28 @@ func (m *Manager) waitForJobToStart(ctx context.Context, logger messages.IMessag
6565 if err != nil {
6666 return
6767 }
68- notStartedError := fmt .Errorf ("%w: job [%v] has not started " , commonerrors .ErrCondition , jobName )
68+ notStartedError := fmt .Errorf ("%w: job [%v] has not reached the expected state [%v] " , commonerrors .ErrCondition , jobName , jobState )
6969 err = retry .RetryOnError (ctx , logs .NewPlainLogrLoggerFromLoggers (logger ), retry .DefaultExponentialBackoffRetryPolicyConfiguration (), func () error {
70- started , subErr := m . HasJobStarted (ctx , job )
70+ inState , subErr := checkStateFunc (ctx , job )
7171 if subErr != nil {
7272 return subErr
7373 }
74- if started {
74+ if inState {
7575 return nil
7676 }
7777 return notStartedError
78- }, fmt .Sprintf ("Waiting for job [%v] to start ..." , jobName ), notStartedError )
78+ }, fmt .Sprintf ("Waiting for job [%v] to %v ..." , jobName , jobState ), notStartedError )
7979 return
8080}
8181
82+ func (m * Manager ) waitForJobToStart (ctx context.Context , logger messages.IMessageLogger , job IAsynchronousJob ) error {
83+ return waitForJobState (ctx , logger , job , "start" , m .HasJobStarted )
84+ }
85+
86+ func (m * Manager ) waitForJobToHaveMessagesAvailable (ctx context.Context , logger messages.IMessageLogger , job IAsynchronousJob ) error {
87+ return waitForJobState (ctx , logger , job , "have messages" , m .areThereMessages )
88+ }
89+
8290func (m * Manager ) createMessagePaginator (ctx context.Context , job IAsynchronousJob ) (paginator pagination.IStreamPaginatorAndPageFetcher , err error ) {
8391 paginator , err = m .messagesPaginatorFactory .Create (ctx , func (subCtx context.Context ) (pagination.IStaticPageStream , error ) {
8492 return m .FetchJobMessagesFirstPage (subCtx , job )
@@ -104,6 +112,10 @@ func (m *Manager) WaitForJobCompletion(ctx context.Context, job IAsynchronousJob
104112 if err != nil {
105113 return
106114 }
115+ err = m .waitForJobToHaveMessagesAvailable (ctx , messageLogger , job )
116+ if err != nil {
117+ return
118+ }
107119 messagePaginator , err := m .createMessagePaginator (ctx , job )
108120 if err != nil {
109121 return
@@ -149,6 +161,37 @@ func (m *Manager) checkForMessageStreamExhaustion(ctx context.Context, paginator
149161 }
150162}
151163
164+ func (m * Manager ) areThereMessages (ctx context.Context , job IAsynchronousJob ) (hasMessages bool , err error ) {
165+ err = parallelisation .DetermineContextError (ctx )
166+ if err != nil {
167+ return
168+ }
169+ if job == nil {
170+ err = fmt .Errorf ("%w: missing job" , commonerrors .ErrUndefined )
171+ return
172+ }
173+ if job .HasMessages () {
174+ hasMessages = true
175+ return
176+ }
177+
178+ jobName , err := job .FetchName ()
179+ if err != nil {
180+ return
181+ }
182+ jobType := job .FetchType ()
183+ jobStatus , resp , apierr := m .fetchJobStatusFunc (ctx , jobName )
184+ if resp != nil {
185+ _ = resp .Body .Close ()
186+ }
187+ err = api .CheckAPICallSuccess (ctx , fmt .Sprintf ("could not fetch %v [%v]'s status" , jobType , jobName ), resp , apierr )
188+ if err != nil {
189+ return
190+ }
191+ hasMessages = jobStatus .HasMessages ()
192+ return
193+ }
194+
152195func (m * Manager ) HasJobStarted (ctx context.Context , job IAsynchronousJob ) (started bool , err error ) {
153196 err = parallelisation .DetermineContextError (ctx )
154197 if err != nil {
@@ -158,6 +201,15 @@ func (m *Manager) HasJobStarted(ctx context.Context, job IAsynchronousJob) (star
158201 err = fmt .Errorf ("%w: missing job" , commonerrors .ErrUndefined )
159202 return
160203 }
204+ if job .GetDone () {
205+ started = true
206+ return
207+ }
208+ if ! job .GetQueued () {
209+ started = true
210+ return
211+ }
212+
161213 jobName , err := job .FetchName ()
162214 if err != nil {
163215 return
0 commit comments