@@ -189,6 +189,352 @@ func TestProcessTask_failures(t *testing.T) {
189189 })
190190}
191191
192+ func TestActivityTaskPoller_PollTask (t * testing.T ) {
193+ tests := []struct {
194+ name string
195+ setupPoller func (t * testing.T ) (* activityTaskPoller , * workflowservicetest.MockClient )
196+ setupMocks func (* workflowservicetest.MockClient )
197+ expectedResult interface {}
198+ expectedError error
199+ validateResult func (t * testing.T , result interface {})
200+ }{
201+ {
202+ name : "success with valid activity task" ,
203+ setupPoller : func (t * testing.T ) (* activityTaskPoller , * workflowservicetest.MockClient ) {
204+ return buildActivityTaskPoller (t , false )
205+ },
206+ setupMocks : func (mockService * workflowservicetest.MockClient ) {
207+ mockService .EXPECT ().PollForActivityTask (
208+ gomock .Any (),
209+ & s.PollForActivityTaskRequest {
210+ Domain : common .StringPtr (_testDomainName ),
211+ TaskList : common .TaskListPtr (s.TaskList {Name : common .StringPtr (_testTaskList )}),
212+ Identity : common .StringPtr (_testIdentity ),
213+ TaskListMetadata : & s.TaskListMetadata {MaxTasksPerSecond : common .Float64Ptr (0.0 )},
214+ },
215+ gomock .Any (),
216+ ).Return (& s.PollForActivityTaskResponse {
217+ TaskToken : []byte ("test-task-token" ),
218+ WorkflowExecution : & s.WorkflowExecution {WorkflowId : common .StringPtr ("test-workflow" )},
219+ ActivityId : common .StringPtr ("test-activity" ),
220+ ActivityType : & s.ActivityType {Name : common .StringPtr ("TestActivity" )},
221+ WorkflowType : & s.WorkflowType {Name : common .StringPtr ("TestWorkflow" )},
222+ ScheduledTimestampOfThisAttempt : common .Int64Ptr (time .Now ().UnixNano ()),
223+ StartedTimestamp : common .Int64Ptr (time .Now ().UnixNano ()),
224+ AutoConfigHint : & s.AutoConfigHint {PollerWaitTimeInMs : common .Int64Ptr (1000 )},
225+ }, nil )
226+ },
227+ expectedError : nil ,
228+ validateResult : func (t * testing.T , result interface {}) {
229+ assert .NotNil (t , result )
230+ activityTask , ok := result .(* activityTask )
231+ assert .True (t , ok , "result should be *activityTask" )
232+ assert .NotNil (t , activityTask .task )
233+ assert .Equal (t , []byte ("test-task-token" ), activityTask .task .TaskToken )
234+ assert .NotNil (t , activityTask .autoConfigHint )
235+ assert .Equal (t , & s.AutoConfigHint {PollerWaitTimeInMs : common .Int64Ptr (1000 )}, activityTask .autoConfigHint )
236+ },
237+ },
238+ {
239+ name : "success with empty task (no work available)" ,
240+ setupPoller : func (t * testing.T ) (* activityTaskPoller , * workflowservicetest.MockClient ) {
241+ return buildActivityTaskPoller (t , false )
242+ },
243+ setupMocks : func (mockService * workflowservicetest.MockClient ) {
244+ mockService .EXPECT ().PollForActivityTask (
245+ gomock .Any (),
246+ gomock .Any (),
247+ gomock .Any (),
248+ ).Return (& s.PollForActivityTaskResponse {
249+ TaskToken : []byte {}, // Empty task token indicates no work
250+ AutoConfigHint : & s.AutoConfigHint {PollerWaitTimeInMs : common .Int64Ptr (1000 )},
251+ }, nil )
252+ },
253+ expectedError : nil ,
254+ validateResult : func (t * testing.T , result interface {}) {
255+ assert .NotNil (t , result )
256+ activityTask , ok := result .(* activityTask )
257+ assert .True (t , ok , "result should be *activityTask" )
258+ assert .Nil (t , activityTask .task )
259+ assert .NotNil (t , activityTask .autoConfigHint )
260+ assert .Equal (t , & s.AutoConfigHint {PollerWaitTimeInMs : common .Int64Ptr (1000 )}, activityTask .autoConfigHint )
261+ },
262+ },
263+ {
264+ name : "service error during poll" ,
265+ setupPoller : func (t * testing.T ) (* activityTaskPoller , * workflowservicetest.MockClient ) {
266+ return buildActivityTaskPoller (t , false )
267+ },
268+ setupMocks : func (mockService * workflowservicetest.MockClient ) {
269+ mockService .EXPECT ().PollForActivityTask (
270+ gomock .Any (),
271+ gomock .Any (),
272+ gomock .Any (),
273+ ).Return (nil , & s.InternalServiceError {Message : "service unavailable" })
274+ },
275+ expectedError : & s.InternalServiceError {Message : "service unavailable" },
276+ validateResult : func (t * testing.T , result interface {}) {
277+ assert .Nil (t , result )
278+ },
279+ },
280+ {
281+ name : "service busy error during poll" ,
282+ setupPoller : func (t * testing.T ) (* activityTaskPoller , * workflowservicetest.MockClient ) {
283+ return buildActivityTaskPoller (t , false )
284+ },
285+ setupMocks : func (mockService * workflowservicetest.MockClient ) {
286+ mockService .EXPECT ().PollForActivityTask (
287+ gomock .Any (),
288+ gomock .Any (),
289+ gomock .Any (),
290+ ).Return (nil , & s.ServiceBusyError {Message : "service busy" })
291+ },
292+ expectedError : & s.ServiceBusyError {Message : "service busy" },
293+ validateResult : func (t * testing.T , result interface {}) {
294+ assert .Nil (t , result )
295+ },
296+ },
297+ {
298+ name : "poller shutting down" ,
299+ setupPoller : func (t * testing.T ) (* activityTaskPoller , * workflowservicetest.MockClient ) {
300+ return buildActivityTaskPoller (t , true ) // shutdown = true
301+ },
302+ setupMocks : func (mockService * workflowservicetest.MockClient ) {
303+ // No mock setup needed as doPoll should return early due to shutdown
304+ },
305+ expectedError : errShutdown ,
306+ validateResult : func (t * testing.T , result interface {}) {
307+ assert .Nil (t , result )
308+ },
309+ },
310+ {
311+ name : "context timeout during poll" ,
312+ setupPoller : func (t * testing.T ) (* activityTaskPoller , * workflowservicetest.MockClient ) {
313+ return buildActivityTaskPoller (t , false )
314+ },
315+ setupMocks : func (mockService * workflowservicetest.MockClient ) {
316+ mockService .EXPECT ().PollForActivityTask (
317+ gomock .Any (),
318+ gomock .Any (),
319+ gomock .Any (),
320+ ).Return (nil , context .DeadlineExceeded )
321+ },
322+ expectedError : context .DeadlineExceeded ,
323+ validateResult : func (t * testing.T , result interface {}) {
324+ assert .Nil (t , result )
325+ },
326+ },
327+ }
328+
329+ for _ , tt := range tests {
330+ t .Run (tt .name , func (t * testing.T ) {
331+ poller , mockService := tt .setupPoller (t )
332+ tt .setupMocks (mockService )
333+
334+ result , err := poller .PollTask ()
335+
336+ if tt .expectedError != nil {
337+ assert .Error (t , err )
338+ assert .Equal (t , tt .expectedError , err )
339+ } else {
340+ assert .NoError (t , err )
341+ }
342+
343+ tt .validateResult (t , result )
344+ })
345+ }
346+ }
347+
348+ func buildActivityTaskPoller (t * testing.T , shutdown bool ) (* activityTaskPoller , * workflowservicetest.MockClient ) {
349+ ctrl := gomock .NewController (t )
350+ mockService := workflowservicetest .NewMockClient (ctrl )
351+
352+ var shutdownC <- chan struct {}
353+ if shutdown {
354+ ch := make (chan struct {})
355+ close (ch )
356+ shutdownC = ch
357+ } else {
358+ shutdownC = make (<- chan struct {})
359+ }
360+
361+ return & activityTaskPoller {
362+ basePoller : basePoller {
363+ shutdownC : shutdownC ,
364+ },
365+ domain : _testDomainName ,
366+ taskListName : _testTaskList ,
367+ identity : _testIdentity ,
368+ service : mockService ,
369+ metricsScope : & metrics.TaggedScope {Scope : tally .NewTestScope ("test" , nil )},
370+ logger : testlogger .NewZap (t ),
371+ activitiesPerSecond : 0.0 ,
372+ featureFlags : FeatureFlags {},
373+ }, mockService
374+ }
375+
376+ func TestWorkflowTaskPoller_PollTask (t * testing.T ) {
377+ tests := []struct {
378+ name string
379+ setupPoller func (t * testing.T ) (* workflowTaskPoller , * workflowservicetest.MockClient )
380+ setupMocks func (* workflowservicetest.MockClient )
381+ expectedResult interface {}
382+ expectedError error
383+ validateResult func (t * testing.T , result interface {})
384+ }{
385+ {
386+ name : "success with valid workflow task" ,
387+ setupPoller : func (t * testing.T ) (* workflowTaskPoller , * workflowservicetest.MockClient ) {
388+ wP , mockService , _ , _ := buildWorkflowTaskPoller (t )
389+ return wP , mockService
390+ },
391+ setupMocks : func (mockService * workflowservicetest.MockClient ) {
392+ mockService .EXPECT ().PollForDecisionTask (
393+ gomock .Any (),
394+ gomock .Any (),
395+ gomock .Any (),
396+ ).Return (& s.PollForDecisionTaskResponse {
397+ TaskToken : []byte ("test-task-token" ),
398+ WorkflowExecution : & s.WorkflowExecution {WorkflowId : common .StringPtr ("test-workflow" ), RunId : common .StringPtr ("test-run" )},
399+ WorkflowType : & s.WorkflowType {Name : common .StringPtr ("TestWorkflow" )},
400+ StartedEventId : common .Int64Ptr (1 ),
401+ NextEventId : common .Int64Ptr (2 ),
402+ ScheduledTimestamp : common .Int64Ptr (time .Now ().UnixNano ()),
403+ StartedTimestamp : common .Int64Ptr (time .Now ().UnixNano ()),
404+ History : & s.History {Events : []* s.HistoryEvent {{EventId : common .Int64Ptr (1 )}}},
405+ AutoConfigHint : & s.AutoConfigHint {PollerWaitTimeInMs : common .Int64Ptr (1000 )},
406+ }, nil )
407+ },
408+ expectedError : nil ,
409+ validateResult : func (t * testing.T , result interface {}) {
410+ assert .NotNil (t , result )
411+ workflowTask , ok := result .(* workflowTask )
412+ assert .True (t , ok , "result should be *workflowTask" )
413+ assert .NotNil (t , workflowTask .task )
414+ assert .Equal (t , []byte ("test-task-token" ), workflowTask .task .TaskToken )
415+ assert .NotNil (t , workflowTask .historyIterator )
416+ assert .Equal (t , & s.AutoConfigHint {PollerWaitTimeInMs : common .Int64Ptr (1000 )}, workflowTask .autoConfigHint )
417+ },
418+ },
419+ {
420+ name : "success with empty task (no work available)" ,
421+ setupPoller : func (t * testing.T ) (* workflowTaskPoller , * workflowservicetest.MockClient ) {
422+ wP , mockService , _ , _ := buildWorkflowTaskPoller (t )
423+ return wP , mockService
424+ },
425+ setupMocks : func (mockService * workflowservicetest.MockClient ) {
426+ mockService .EXPECT ().PollForDecisionTask (
427+ gomock .Any (),
428+ gomock .Any (),
429+ gomock .Any (),
430+ ).Return (& s.PollForDecisionTaskResponse {
431+ TaskToken : []byte {}, // Empty task token indicates no work
432+ AutoConfigHint : & s.AutoConfigHint {PollerWaitTimeInMs : common .Int64Ptr (1000 )},
433+ }, nil )
434+ },
435+ expectedError : nil ,
436+ validateResult : func (t * testing.T , result interface {}) {
437+ assert .NotNil (t , result )
438+ workflowTask , ok := result .(* workflowTask )
439+ assert .True (t , ok , "result should be *workflowTask" )
440+ assert .Nil (t , workflowTask .task )
441+ assert .NotNil (t , workflowTask .autoConfigHint )
442+ assert .Equal (t , & s.AutoConfigHint {PollerWaitTimeInMs : common .Int64Ptr (1000 )}, workflowTask .autoConfigHint )
443+ },
444+ },
445+ {
446+ name : "service error during poll" ,
447+ setupPoller : func (t * testing.T ) (* workflowTaskPoller , * workflowservicetest.MockClient ) {
448+ wP , mockService , _ , _ := buildWorkflowTaskPoller (t )
449+ return wP , mockService
450+ },
451+ setupMocks : func (mockService * workflowservicetest.MockClient ) {
452+ mockService .EXPECT ().PollForDecisionTask (
453+ gomock .Any (),
454+ gomock .Any (),
455+ gomock .Any (),
456+ ).Return (nil , & s.InternalServiceError {Message : "service unavailable" })
457+ },
458+ expectedError : & s.InternalServiceError {Message : "service unavailable" },
459+ validateResult : func (t * testing.T , result interface {}) {
460+ assert .Nil (t , result )
461+ },
462+ },
463+ {
464+ name : "service busy error during poll" ,
465+ setupPoller : func (t * testing.T ) (* workflowTaskPoller , * workflowservicetest.MockClient ) {
466+ wP , mockService , _ , _ := buildWorkflowTaskPoller (t )
467+ return wP , mockService
468+ },
469+ setupMocks : func (mockService * workflowservicetest.MockClient ) {
470+ mockService .EXPECT ().PollForDecisionTask (
471+ gomock .Any (),
472+ gomock .Any (),
473+ gomock .Any (),
474+ ).Return (nil , & s.ServiceBusyError {Message : "service busy" })
475+ },
476+ expectedError : & s.ServiceBusyError {Message : "service busy" },
477+ validateResult : func (t * testing.T , result interface {}) {
478+ assert .Nil (t , result )
479+ },
480+ },
481+ {
482+ name : "context timeout during poll" ,
483+ setupPoller : func (t * testing.T ) (* workflowTaskPoller , * workflowservicetest.MockClient ) {
484+ wP , mockService , _ , _ := buildWorkflowTaskPoller (t )
485+ return wP , mockService
486+ },
487+ setupMocks : func (mockService * workflowservicetest.MockClient ) {
488+ mockService .EXPECT ().PollForDecisionTask (
489+ gomock .Any (),
490+ gomock .Any (),
491+ gomock .Any (),
492+ ).Return (nil , context .DeadlineExceeded )
493+ },
494+ expectedError : context .DeadlineExceeded ,
495+ validateResult : func (t * testing.T , result interface {}) {
496+ assert .Nil (t , result )
497+ },
498+ },
499+ {
500+ name : "domain not exists error during poll" ,
501+ setupPoller : func (t * testing.T ) (* workflowTaskPoller , * workflowservicetest.MockClient ) {
502+ wP , mockService , _ , _ := buildWorkflowTaskPoller (t )
503+ return wP , mockService
504+ },
505+ setupMocks : func (mockService * workflowservicetest.MockClient ) {
506+ mockService .EXPECT ().PollForDecisionTask (
507+ gomock .Any (),
508+ gomock .Any (),
509+ gomock .Any (),
510+ ).Return (nil , & s.EntityNotExistsError {Message : "domain does not exist" })
511+ },
512+ expectedError : & s.EntityNotExistsError {Message : "domain does not exist" },
513+ validateResult : func (t * testing.T , result interface {}) {
514+ assert .Nil (t , result )
515+ },
516+ },
517+ }
518+
519+ for _ , tt := range tests {
520+ t .Run (tt .name , func (t * testing.T ) {
521+ poller , mockService := tt .setupPoller (t )
522+ tt .setupMocks (mockService )
523+
524+ result , err := poller .PollTask ()
525+
526+ if tt .expectedError != nil {
527+ assert .Error (t , err )
528+ assert .Equal (t , tt .expectedError , err )
529+ } else {
530+ assert .NoError (t , err )
531+ }
532+
533+ tt .validateResult (t , result )
534+ })
535+ }
536+ }
537+
192538func buildWorkflowTaskPoller (t * testing.T ) (* workflowTaskPoller , * workflowservicetest.MockClient , * MockWorkflowTaskHandler , * mockLocalDispatcher ) {
193539 ctrl := gomock .NewController (t )
194540 mockService := workflowservicetest .NewMockClient (ctrl )
@@ -207,7 +553,7 @@ func buildWorkflowTaskPoller(t *testing.T) (*workflowTaskPoller, *workflowservic
207553 ldaTunnel : lda ,
208554 metricsScope : & metrics.TaggedScope {Scope : tally .NewTestScope ("test" , nil )},
209555 logger : testlogger .NewZap (t ),
210- stickyUUID : "" ,
556+ stickyUUID : "sticky-uuid " ,
211557 disableStickyExecution : false ,
212558 StickyScheduleToStartTimeout : time .Millisecond ,
213559 featureFlags : FeatureFlags {},
0 commit comments