Skip to content

Commit 5021035

Browse files
authored
Trigger repeating job on due_time if specified (#26)
Updates scheduler to execute a repeating job at the due time, rather than waiting for the next tick from this time when specified. Setting duetime=0s and schedule=1h will execute the job immediately, then every hour. Updates job name validation to now also allow ":" characters. Signed-off-by: joshvanl <[email protected]>
1 parent c1fa1f1 commit 5021035

File tree

11 files changed

+359
-130
lines changed

11 files changed

+359
-130
lines changed

api.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func (c *cron) Add(ctx context.Context, name string, job *api.Job) error {
2727
return ctx.Err()
2828
}
2929

30-
if err := validateName(name); err != nil {
30+
if err := c.validateName(name); err != nil {
3131
return err
3232
}
3333

@@ -59,7 +59,7 @@ func (c *cron) Get(ctx context.Context, name string) (*api.Job, error) {
5959
return nil, ctx.Err()
6060
}
6161

62-
if err := validateName(name); err != nil {
62+
if err := c.validateName(name); err != nil {
6363
return nil, err
6464
}
6565

@@ -91,7 +91,7 @@ func (c *cron) Delete(ctx context.Context, name string) error {
9191
return ctx.Err()
9292
}
9393

94-
if err := validateName(name); err != nil {
94+
if err := c.validateName(name); err != nil {
9595
return err
9696
}
9797

@@ -103,13 +103,12 @@ func (c *cron) Delete(ctx context.Context, name string) error {
103103
}
104104

105105
// validateName validates the name of a job.
106-
func validateName(name string) error {
106+
func (c *cron) validateName(name string) error {
107107
if len(name) == 0 {
108108
return errors.New("job name cannot be empty")
109109
}
110110

111-
trimmed := strings.TrimRight(strings.ReplaceAll(strings.ToLower(name), "_", "-"), "-")
112-
for _, segment := range strings.Split(trimmed, "||") {
111+
for _, segment := range strings.Split(strings.ToLower(c.validateNameReplacer.Replace(name)), "||") {
113112
if errs := validation.IsDNS1123Subdomain(segment); len(errs) > 0 {
114113
return fmt.Errorf("job name is invalid %q: %s", name, strings.Join(errs, ", "))
115114
}

api/job.pb.go

Lines changed: 92 additions & 45 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api_test.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,13 +488,25 @@ func Test_validateName(t *testing.T) {
488488
name: "actorreminder||dapr-tests||dapr.internal.dapr-tests.perf-workflowsapp.workflow||24b3fbad-0db5-4e81-a272-71f6018a66a6||start-4NYDFil-",
489489
expErr: false,
490490
},
491+
{
492+
name: "aABVCD||dapr-::123:123||dapr.internal.dapr-tests.perf-workflowsapp.workflow||24b3fbad-0db5-4e81-a272-71f6018a66a6||start-4NYDFil-",
493+
expErr: false,
494+
},
491495
}
492496

493497
for _, test := range tests {
494498
test := test
495499
t.Run(test.name, func(t *testing.T) {
496500
t.Parallel()
497-
err := validateName(test.name)
501+
c, err := New(Options{
502+
Log: logr.Discard(),
503+
Namespace: "",
504+
PartitionID: 0,
505+
PartitionTotal: 1,
506+
TriggerFn: func(context.Context, *api.TriggerRequest) bool { return true },
507+
})
508+
require.NoError(t, err)
509+
err = c.(*cron).validateName(test.name)
498510
assert.Equal(t, test.expErr, err != nil, "%v", err)
499511
})
500512
}

cron.go

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"errors"
1111
"fmt"
12+
"strings"
1213
"sync"
1314
"sync/atomic"
1415

@@ -87,15 +88,16 @@ type cron struct {
8788
log logr.Logger
8889
triggerFn TriggerFunction
8990

90-
client client.Interface
91-
part partitioner.Interface
92-
key *key.Key
93-
informer *informer.Informer
94-
yard *grave.Yard
95-
queue *queue.Processor[string, *counter.Counter]
96-
schedBuilder *scheduler.Builder
97-
leadership *leadership.Leadership
98-
collector garbage.Interface
91+
client client.Interface
92+
part partitioner.Interface
93+
key *key.Key
94+
informer *informer.Informer
95+
yard *grave.Yard
96+
queue *queue.Processor[string, *counter.Counter]
97+
schedBuilder *scheduler.Builder
98+
leadership *leadership.Leadership
99+
collector garbage.Interface
100+
validateNameReplacer *strings.Replacer
99101

100102
clock clock.Clock
101103
running atomic.Bool
@@ -161,20 +163,21 @@ func New(opts Options) (Interface, error) {
161163
})
162164

163165
return &cron{
164-
log: log,
165-
client: client,
166-
triggerFn: opts.TriggerFn,
167-
key: key,
168-
leadership: leadership,
169-
yard: yard,
170-
informer: informer,
171-
collector: collector,
172-
part: part,
173-
schedBuilder: scheduler.NewBuilder(),
174-
clock: clock.RealClock{},
175-
readyCh: make(chan struct{}),
176-
closeCh: make(chan struct{}),
177-
errCh: make(chan error),
166+
log: log,
167+
client: client,
168+
triggerFn: opts.TriggerFn,
169+
key: key,
170+
leadership: leadership,
171+
yard: yard,
172+
informer: informer,
173+
collector: collector,
174+
part: part,
175+
schedBuilder: scheduler.NewBuilder(),
176+
validateNameReplacer: strings.NewReplacer("_", "", ":", "", "-", ""),
177+
clock: clock.RealClock{},
178+
readyCh: make(chan struct{}),
179+
closeCh: make(chan struct{}),
180+
errCh: make(chan error),
178181
}, nil
179182
}
180183

0 commit comments

Comments
 (0)