Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/client/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Interface interface {
DeletePair(context.Context, string, string) error
PutIfNotExists(context.Context, string, string, ...clientv3.OpOption) (bool, error)

DeleteIfHasRevision(context.Context, string, int64) error
PutIfOtherHasRevision(context.Context, PutIfOtherHasRevisionOpts) (bool, error)
DeleteBothIfOtherHasRevision(context.Context, DeleteBothIfOtherHasRevisionOpts) error

Expand Down
14 changes: 14 additions & 0 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,20 @@ func (c *client) DeleteBothIfOtherHasRevision(ctx context.Context, opts api.Dele
})
}

func (c *client) DeleteIfHasRevision(ctx context.Context, key string, revision int64) error {
return generic(ctx, c.log, c, func(ctx context.Context) error {
_, err := c.kv.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", revision)).
Then(clientv3.OpDelete(key)).
Commit()
if err != nil {
return err
}

return nil
})
}

// DeletePrefixes deletes all keys with the given prefixes.
func (c *client) DeletePrefixes(ctx context.Context, prefixes ...string) error {
return generic(ctx, c.log, c, func(ctx context.Context) error {
Expand Down
14 changes: 14 additions & 0 deletions internal/client/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Fake struct {

putIfNotExistsFn func(context.Context, string, string, ...clientv3.OpOption) (bool, error)
delPairFn func(context.Context, string, string) error
delIfHasRevisionFn func(context.Context, string, int64) error
putIfOtherHasRevisionFn func(context.Context, api.PutIfOtherHasRevisionOpts) (bool, error)
deleteBothIfOtherHasRevionFn func(context.Context, api.DeleteBothIfOtherHasRevisionOpts) error
deletePrefixesFn func(context.Context, ...string) error
Expand Down Expand Up @@ -82,6 +83,11 @@ func (f *Fake) WithDeletePrefixesFn(fn func(context.Context, ...string) error) *
return f
}

func (f *Fake) WithDeleteIfHasRevisionFn(fn func(context.Context, string, int64) error) *Fake {
f.delIfHasRevisionFn = fn
return f
}

func (f *Fake) Put(_ context.Context, k string, b string, _ ...clientv3.OpOption) (*clientv3.PutResponse, error) {
f.calls.Add(1)
if f.putFn != nil {
Expand Down Expand Up @@ -177,3 +183,11 @@ func (f *Fake) DeletePrefixes(_ context.Context, prefixes ...string) error {
}
return f.err
}

func (f *Fake) DeleteIfHasRevision(_ context.Context, k string, r int64) error {
f.calls.Add(1)
if f.delIfHasRevisionFn != nil {
return f.delIfHasRevisionFn(context.Background(), k, r)
}
return f.err
}
25 changes: 21 additions & 4 deletions internal/counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"time"

clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

Expand Down Expand Up @@ -74,9 +75,13 @@ func New(ctx context.Context, opts Options) (Interface, bool, error) {
jobKey := opts.Key.JobKey(opts.Name)

// Get the existing counter, if it exists.
res, err := opts.Client.Get(ctx, counterKey)
if err != nil {
return nil, false, err
res := new(clientv3.GetResponse)
if storeCounter(opts.Schedule, opts.Job.GetJob().GetFailurePolicy()) {
var err error
res, err = opts.Client.Get(ctx, counterKey)
if err != nil {
return nil, false, err
}
}

c := &counter{
Expand Down Expand Up @@ -111,7 +116,8 @@ func New(ctx context.Context, opts Options) (Interface, bool, error) {

// If the job partition ID is the same, recover the counter state, else we
// start again.
if count.GetJobPartitionId() != opts.Job.GetPartitionId() {
if storeCounter(opts.Schedule, opts.Job.GetJob().GetFailurePolicy()) &&
count.GetJobPartitionId() != opts.Job.GetPartitionId() {
count = &stored.Counter{JobPartitionId: opts.Job.GetPartitionId()}
if ok, err := c.put(ctx, count); err != nil || !ok {
return nil, ok, err
Expand Down Expand Up @@ -242,6 +248,10 @@ func (c *counter) tickNext() (bool, error) {
return true, nil
}

if !storeCounter(c.schedule, c.job.GetJob().GetFailurePolicy()) {
return false, c.client.DeleteIfHasRevision(context.Background(), c.jobKey, c.modRevision)
}

// Delete the job and counter keys.
// If the Job have been updated, then we leave the job alone to preserve it.
// Always attempt to delete the counter key.
Expand Down Expand Up @@ -299,3 +309,10 @@ func (c *counter) put(ctx context.Context, count *stored.Counter) (bool, error)
OtherRevision: c.modRevision,
})
}

func storeCounter(schedule scheduler.Interface, policy *api.FailurePolicy) bool {
//nolint:staticcheck
return !(schedule.IsOneShot() &&
(policy != nil &&
(policy.GetDrop() != nil || policy.GetConstant().MaxRetries == nil)))
}
4 changes: 4 additions & 0 deletions internal/scheduler/oneshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ func (o *oneshot) Next(count uint32, _ *timestamppb.Timestamp) *time.Time {

return &o.dueTime
}

func (o *oneshot) IsOneShot() bool {
return true
}
4 changes: 4 additions & 0 deletions internal/scheduler/repeats.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,7 @@ func (r *repeats) Next(count uint32, last *timestamppb.Timestamp) *time.Time {

return &next
}

func (r *repeats) IsOneShot() bool {
return false
}
3 changes: 3 additions & 0 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ type Interface interface {
// time.
// Returns nil if the schedule will never trigger again.
Next(count uint32, last *timestamppb.Timestamp) *time.Time

// IsOneShot checks if the schedule is a one-shot schedule.
IsOneShot() bool
}
Loading
Loading