Skip to content

Commit 08aaeac

Browse files
committed
Failure Policy: Adds FailurePolicy API field to Job
Based of dapr/proposals#66 Adds a `FailurePolicy` option to the `Job` API to allow re-triggering job which are marked as failed by the caller. Adds two types of policy; `Drop` and `Constant`. `Drop` has no retry policy, `Constant` will constantly retry the job trigger for a configurable delay, up to a configurable maximum number of retries (which could be infinite). Note that the failure policy retry cadence has no effect on the actual Job schedule, meaning if a job was to be retired and eventually succeeded, the Job would continue to trigger at the origin configured schedule. By default, all Jobs will have a `Constant` policy with a delay of 1s. Signed-off-by: joshvanl <[email protected]>
1 parent 05f7c29 commit 08aaeac

File tree

2 files changed

+176
-1
lines changed

2 files changed

+176
-1
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ A Job itself is made up of the following fields:
7373
Optional.
7474
- `FailurePolicy` Controls whether the Job should be retired if the trigger
7575
function returns false. `Drop` doesn't retry the job, `Constant `Constant` will
76-
constantly retry the job trigger for a configurable internal, up to a configurable
76+
constantly retry the job trigger for a configurable interval, up to a configurable
7777
maximum number of retries (which could be infinite). By default, Jobs have a
7878
`Constant` policy, with a 1s interval and 3 maximum retries.
7979

cron/cron_test.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,181 @@ func Test_FailurePolicy(t *testing.T) {
710710
})
711711
}
712712

713+
func Test_FailurePolicy(t *testing.T) {
714+
t.Parallel()
715+
716+
t.Run("default policy should retry 3 times with a 1sec delay", func(t *testing.T) {
717+
t.Parallel()
718+
719+
gotCh := make(chan *api.TriggerRequest, 1)
720+
var got atomic.Uint32
721+
cron := testCronWithOptions(t, testCronOptions{
722+
total: 1,
723+
client: tests.EmbeddedETCDBareClient(t),
724+
triggerFn: func(*api.TriggerRequest) bool {
725+
assert.GreaterOrEqual(t, uint32(8), got.Add(1))
726+
return false
727+
},
728+
gotCh: gotCh,
729+
})
730+
731+
require.NoError(t, cron.api.Add(context.Background(), "test", &api.Job{
732+
DueTime: ptr.Of(time.Now().Format(time.RFC3339)),
733+
Schedule: ptr.Of("@every 1s"),
734+
Repeats: ptr.Of(uint32(2)),
735+
}))
736+
737+
for range 8 {
738+
resp, err := cron.api.Get(context.Background(), "test")
739+
require.NoError(t, err)
740+
assert.NotNil(t, resp)
741+
select {
742+
case <-gotCh:
743+
case <-time.After(time.Second * 3):
744+
assert.Fail(t, "timeout waiting for trigger")
745+
}
746+
}
747+
748+
assert.EventuallyWithT(t, func(c *assert.CollectT) {
749+
resp, err := cron.api.Get(context.Background(), "test")
750+
assert.NoError(c, err)
751+
assert.Nil(c, resp)
752+
}, time.Second*5, time.Millisecond*10)
753+
})
754+
755+
t.Run("drop policy should not retry triggering", func(t *testing.T) {
756+
t.Parallel()
757+
758+
gotCh := make(chan *api.TriggerRequest, 1)
759+
var got atomic.Uint32
760+
cron := testCronWithOptions(t, testCronOptions{
761+
total: 1,
762+
client: tests.EmbeddedETCDBareClient(t),
763+
triggerFn: func(*api.TriggerRequest) bool {
764+
assert.GreaterOrEqual(t, uint32(2), got.Add(1))
765+
return false
766+
},
767+
gotCh: gotCh,
768+
})
769+
770+
require.NoError(t, cron.api.Add(context.Background(), "test", &api.Job{
771+
DueTime: ptr.Of(time.Now().Format(time.RFC3339)),
772+
Schedule: ptr.Of("@every 1s"),
773+
Repeats: ptr.Of(uint32(2)),
774+
FailurePolicy: &api.FailurePolicy{
775+
Policy: new(api.FailurePolicy_Drop),
776+
},
777+
}))
778+
779+
for range 2 {
780+
resp, err := cron.api.Get(context.Background(), "test")
781+
require.NoError(t, err)
782+
assert.NotNil(t, resp)
783+
select {
784+
case <-gotCh:
785+
case <-time.After(time.Second * 3):
786+
assert.Fail(t, "timeout waiting for trigger")
787+
}
788+
}
789+
790+
assert.EventuallyWithT(t, func(c *assert.CollectT) {
791+
resp, err := cron.api.Get(context.Background(), "test")
792+
assert.NoError(c, err)
793+
assert.Nil(c, resp)
794+
}, time.Second*5, time.Millisecond*10)
795+
})
796+
797+
t.Run("constant policy should only retry when it fails ", func(t *testing.T) {
798+
t.Parallel()
799+
800+
gotCh := make(chan *api.TriggerRequest, 1)
801+
var got atomic.Uint32
802+
cron := testCronWithOptions(t, testCronOptions{
803+
total: 1,
804+
client: tests.EmbeddedETCDBareClient(t),
805+
triggerFn: func(*api.TriggerRequest) bool {
806+
assert.GreaterOrEqual(t, uint32(5), got.Add(1))
807+
return got.Load() == 3
808+
},
809+
gotCh: gotCh,
810+
})
811+
812+
require.NoError(t, cron.api.Add(context.Background(), "test", &api.Job{
813+
DueTime: ptr.Of(time.Now().Format(time.RFC3339)),
814+
Schedule: ptr.Of("@every 1s"),
815+
Repeats: ptr.Of(uint32(3)),
816+
FailurePolicy: &api.FailurePolicy{
817+
Policy: &api.FailurePolicy_Constant{
818+
Constant: &api.FailurePolicyConstant{
819+
Delay: durationpb.New(time.Millisecond), MaxRetries: ptr.Of(uint32(1)),
820+
},
821+
},
822+
},
823+
}))
824+
825+
for range 5 {
826+
resp, err := cron.api.Get(context.Background(), "test")
827+
require.NoError(t, err)
828+
assert.NotNil(t, resp)
829+
select {
830+
case <-gotCh:
831+
case <-time.After(time.Second * 3):
832+
assert.Fail(t, "timeout waiting for trigger")
833+
}
834+
}
835+
836+
assert.EventuallyWithT(t, func(c *assert.CollectT) {
837+
resp, err := cron.api.Get(context.Background(), "test")
838+
assert.NoError(c, err)
839+
assert.Nil(c, resp)
840+
}, time.Second*5, time.Millisecond*10)
841+
})
842+
843+
t.Run("constant policy can retry forever until it succeeds", func(t *testing.T) {
844+
t.Parallel()
845+
846+
gotCh := make(chan *api.TriggerRequest, 1)
847+
var got atomic.Uint32
848+
cron := testCronWithOptions(t, testCronOptions{
849+
total: 1,
850+
client: tests.EmbeddedETCDBareClient(t),
851+
triggerFn: func(*api.TriggerRequest) bool {
852+
assert.GreaterOrEqual(t, uint32(100), got.Add(1))
853+
return got.Load() == 100
854+
},
855+
gotCh: gotCh,
856+
})
857+
858+
require.NoError(t, cron.api.Add(context.Background(), "test", &api.Job{
859+
DueTime: ptr.Of(time.Now().Format(time.RFC3339)),
860+
FailurePolicy: &api.FailurePolicy{
861+
Policy: &api.FailurePolicy_Constant{
862+
Constant: &api.FailurePolicyConstant{
863+
Delay: durationpb.New(time.Millisecond),
864+
},
865+
},
866+
},
867+
}))
868+
869+
for range 100 {
870+
resp, err := cron.api.Get(context.Background(), "test")
871+
require.NoError(t, err)
872+
assert.NotNil(t, resp)
873+
select {
874+
case <-gotCh:
875+
case <-time.After(time.Second * 3):
876+
assert.Fail(t, "timeout waiting for trigger")
877+
}
878+
}
879+
880+
assert.EventuallyWithT(t, func(c *assert.CollectT) {
881+
resp, err := cron.api.Get(context.Background(), "test")
882+
assert.NoError(c, err)
883+
assert.Nil(c, resp)
884+
}, time.Second*5, time.Millisecond*10)
885+
})
886+
}
887+
713888
type testCronOptions struct {
714889
total uint32
715890
gotCh chan *api.TriggerRequest

0 commit comments

Comments
 (0)