Skip to content

Commit 2a1c674

Browse files
authored
API: Adds DeletePrefixes (#37)
Adds a new DeletePrefixes API which deletes all jobs (as well as counters) whose names match the given prefixes. Note the given strings to this API are prefixes of the _job names_, not the underlying ETCD keys. Prefixes will be serialized into the actual ETCD keys. This is useful for Dapr to add a feature whereby jobs (of all of types) in a particular namespace (some job prefix) are deleted when a Kubernetes Namespace is deleted. Moves `/internal/tests` to `/tests` to expose cron ETCD helper funcs for library consumers. Expands package with a `cron` sub-package to introduce single and clustered cron server setup helpers. Exposes the `CounterGarbageCollectionInterval` time duration pointer option to enable testing time interval triggered counter garbage collection. This is set to a low value during some tests, which has a side effect of soft testing this doesn't prematurely delete counters. Slight refactor of `api_test.go` to reside in a different package to prevent a circular import from `/tests`. Adds `/fake` for exposing a fake cron that library consumers can use for testing. Signed-off-by: joshvanl <[email protected]>
1 parent 009578b commit 2a1c674

File tree

16 files changed

+844
-263
lines changed

16 files changed

+844
-263
lines changed

api.go

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@ import (
99
"context"
1010
"errors"
1111
"fmt"
12-
"strings"
1312

13+
clientv3 "go.etcd.io/etcd/client/v3"
1414
"google.golang.org/protobuf/proto"
15-
"k8s.io/apimachinery/pkg/util/validation"
1615

1716
"github.com/diagridio/go-etcd-cron/api"
1817
)
@@ -103,8 +102,6 @@ func (c *cron) Delete(ctx context.Context, name string) error {
103102
if _, err := c.client.Delete(ctx, jobKey); err != nil {
104103
return err
105104
}
106-
counterKey := c.key.CounterKey(name)
107-
c.collector.Push(counterKey)
108105

109106
if _, ok := c.queueCache.Load(jobKey); !ok {
110107
return nil
@@ -114,17 +111,54 @@ func (c *cron) Delete(ctx context.Context, name string) error {
114111
return c.queue.Dequeue(jobKey)
115112
}
116113

117-
// validateName validates the name of a job.
118-
func (c *cron) validateName(name string) error {
119-
if len(name) == 0 {
120-
return errors.New("job name cannot be empty")
114+
// DeletePrefixes deletes cron jobs with the given prefixes from the cron
115+
// instance.
116+
func (c *cron) DeletePrefixes(ctx context.Context, prefixes ...string) error {
117+
select {
118+
case <-c.readyCh:
119+
case <-c.closeCh:
120+
return errors.New("cron is closed")
121+
case <-ctx.Done():
122+
return ctx.Err()
121123
}
122124

123-
for _, segment := range strings.Split(strings.ToLower(c.validateNameReplacer.Replace(name)), "||") {
124-
if errs := validation.IsDNS1123Subdomain(segment); len(errs) > 0 {
125-
return fmt.Errorf("job name is invalid %q: %s", name, strings.Join(errs, ", "))
125+
for _, prefix := range prefixes {
126+
if len(prefix) == 0 {
127+
continue
128+
}
129+
130+
if err := c.validateName(prefix); err != nil {
131+
return err
132+
}
133+
}
134+
135+
var errs []error
136+
removeFromCache := func(jobKey string) {
137+
c.queueLock.Lock(jobKey)
138+
defer c.queueLock.DeleteUnlock(jobKey)
139+
140+
if _, ok := c.queueCache.Load(jobKey); ok {
141+
return
142+
}
143+
144+
c.queueCache.Delete(jobKey)
145+
errs = append(errs, c.queue.Dequeue(jobKey))
146+
}
147+
148+
for _, prefix := range prefixes {
149+
keyPrefix := c.key.JobKey(prefix)
150+
c.log.V(3).Info("deleting jobs with prefix", "prefix", keyPrefix)
151+
152+
resp, err := c.client.Delete(ctx, keyPrefix, clientv3.WithPrefix(), clientv3.WithPrevKV())
153+
if err != nil {
154+
errs = append(errs, fmt.Errorf("failed to delete jobs with prefix %q: %w", prefix, err))
155+
continue
156+
}
157+
158+
for _, kv := range resp.PrevKvs {
159+
removeFromCache(string(kv.Key))
126160
}
127161
}
128162

129-
return nil
163+
return errors.Join(errs...)
130164
}

0 commit comments

Comments
 (0)