Skip to content

Commit e33e154

Browse files
authored
List API (#43)
* List API Adds a new `List` API to the cron service. This API returns the list of current committed active jobs that have a name matching the given prefix. An empty prefix will return all jobs. ```go List(ctx context.Context, prefix string) (*ListResponse, error) ``` A new `ListResponse` proto message has been added which contains a list of named Jobs. As part of this change to clean up the codebase, the cron API definition has been moved into `api/api.go`. A new `queue` runnable has been created which is responsible for managing the underlying job queue based on events from the informer. A new `internal/api` package is responsible for implementing the server CRUD APIs. Uses dapr/kit concurrency/map from dapr/kit#104 Signed-off-by: joshvanl <[email protected]> * Skip cache Signed-off-by: joshvanl <[email protected]> * Decrease number of jobs to delete Signed-off-by: joshvanl <[email protected]> * Updates dapr/kit to origin remote HEAD Signed-off-by: joshvanl <[email protected]> * Adds comments around `List` `more` usage and requested limits. Signed-off-by: joshvanl <[email protected]> * Removes `more` from `ListResponse` proto message as it is not used Signed-off-by: joshvanl <[email protected]> * Fix defer triggered Signed-off-by: joshvanl <[email protected]> * Updates gomod2nix.yaml Signed-off-by: joshvanl <[email protected]> --------- Signed-off-by: joshvanl <[email protected]>
1 parent 5b5215e commit e33e154

File tree

27 files changed

+2218
-1776
lines changed

27 files changed

+2218
-1776
lines changed

.github/workflows/test.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ jobs:
1111
- uses: golangci/[email protected]
1212
with:
1313
version: v1.61.0
14+
skip-cache: true
1415
- run: go test -v --race ./...

Makefile

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ endif
1313

1414
.PHONY: lint
1515
lint:
16-
$(GOLANGCI_LINT) run --timeout=20m
16+
$(GOLANGCI_LINT) run --timeout=20m ./...
1717

1818

1919
################################################################################
@@ -42,7 +42,6 @@ PROTOC_GEN_GO_VERSION = v1.32.0
4242

4343
PROTOC_GEN_GO_GRPC_VERSION = 1.3.0
4444

45-
PROTOS:=$(shell ls proto)
4645
PROTO_PREFIX:=github.com/diagridio/go-etcd-cron
4746

4847
.PHONY: check-proto-version
@@ -56,20 +55,16 @@ check-proto-version: ## Checking the version of proto related tools
5655
@test "$(shell protoc-gen-go --version 2>&1)" = "protoc-gen-go $(PROTOC_GEN_GO_VERSION)" \
5756
|| { echo "please use protoc-gen-go $(PROTOC_GEN_GO_VERSION) to generate proto"; exit 1; }
5857

59-
# Generate archive files for each binary
60-
# $(1): the binary name to be archived
61-
define genProtoc
62-
.PHONY: gen-proto-$(1)
63-
gen-proto-$(1):
64-
$(PROTOC) --go_out=. --go_opt=module=$(PROTO_PREFIX) --go-grpc_out=. --go-grpc_opt=require_unimplemented_servers=false,module=$(PROTO_PREFIX) ./proto/$(1)/*
65-
endef
66-
67-
$(foreach ITEM,$(PROTOS),$(eval $(call genProtoc,$(ITEM))))
68-
69-
GEN_PROTOS:=$(foreach ITEM,$(PROTOS),gen-proto-$(ITEM))
70-
7158
.PHONY: gen-proto
72-
gen-proto: check-proto-version $(GEN_PROTOS) modtidy
59+
gen-proto: check-proto-version _gen-proto modtidy
60+
61+
.PHONY: _gen-proto
62+
_gen-proto:
63+
$(PROTOC) --go_out=. \
64+
--go_opt=module=$(PROTO_PREFIX) \
65+
--go-grpc_out=. \
66+
--go-grpc_opt=require_unimplemented_servers=false,module=$(PROTO_PREFIX) \
67+
./proto/*.proto
7368

7469
test:
7570
go test -count 1 -timeout 300s --race ./...

README.md

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,36 +13,40 @@ Recurring jobs can have a TTL, a delayed start, expiry time, and a maximum numbe
1313
## Getting started
1414

1515
```go
16-
import etcdcron "github.com/diagridio/go-etcd-cron"
17-
import "github.com/diagridio/go-etcd-cron/api"
18-
19-
cron, err := etcdcron.New(Options{
20-
Client: client,
21-
Namespace: "abc",
22-
PartitionID: 0,
23-
PartitionTotal: 1,
24-
TriggerFn: func(context.Context, *api.TriggerRequest) bool {
25-
// Do something with your trigger here.
26-
// Return true if the trigger was successful, false otherwise.
27-
// Note, returning false will cause the job to be retried *immediately*.
28-
return true
29-
},
30-
})
31-
if err != nil {
32-
panic(err)
33-
}
34-
35-
// TODO: Pass proper context and do something with returned error.
36-
go cron.Run(context.Background())
37-
38-
payload, _ := anypb.New(wrapperspb.String("hello"))
39-
meta, _ := anypb.New(wrapperspb.String("world"))
40-
tt := time.Now().Add(time.Second).Format(time.RFC3339)
41-
42-
cron.Add(ctx, "my-job", &api.Job{
43-
DueTime: &tt,
44-
Payload: payload,
45-
Metadata: meta,
16+
import (
17+
"github.com/diagridio/go-etcd-cron/api"
18+
"github.com/diagridio/go-etcd-cron/cron"
19+
)
20+
21+
func main() {
22+
cron, err := cron.New(cron.Options{
23+
Client: client,
24+
Namespace: "abc",
25+
PartitionID: 0,
26+
PartitionTotal: 1,
27+
TriggerFn: func(context.Context, *api.TriggerRequest) bool {
28+
// Do something with your trigger here.
29+
// Return true if the trigger was successful, false otherwise.
30+
// Note, returning false will cause the job to be retried *immediately*.
31+
return true
32+
},
33+
})
34+
if err != nil {
35+
panic(err)
36+
}
37+
38+
// TODO: Pass proper context and do something with returned error.
39+
go cron.Run(context.Background())
40+
41+
payload, _ := anypb.New(wrapperspb.String("hello"))
42+
meta, _ := anypb.New(wrapperspb.String("world"))
43+
tt := time.Now().Add(time.Second).Format(time.RFC3339)
44+
45+
cron.Add(context.TODO(), "my-job", &api.Job{
46+
DueTime: &tt,
47+
Payload: payload,
48+
Metadata: meta,
49+
})
4650
}
4751
```
4852

api.go

Lines changed: 0 additions & 164 deletions
This file was deleted.

api/api.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
Copyright (c) 2024 Diagrid Inc.
3+
Licensed under the MIT License.
4+
*/
5+
6+
package api
7+
8+
import (
9+
"context"
10+
)
11+
12+
// TriggerFunction is the type of the function that is called when a job is
13+
// triggered.
14+
// Returning true will "tick" the job forward to the next scheduled time.
15+
// Returning false will cause the job to be re-enqueued and triggered
16+
// immediately.
17+
type TriggerFunction func(context.Context, *TriggerRequest) bool
18+
19+
// API is the interface for interacting with the cron instance.
20+
type API interface {
21+
// Add adds a job to the cron instance.
22+
Add(ctx context.Context, name string, job *Job) error
23+
24+
// Get gets a job from the cron instance.
25+
Get(ctx context.Context, name string) (*Job, error)
26+
27+
// Delete deletes a job from the cron instance.
28+
Delete(ctx context.Context, name string) error
29+
30+
// DeletePrefixes deletes all jobs with the given prefixes from the cron
31+
// instance.
32+
DeletePrefixes(ctx context.Context, prefixes ...string) error
33+
34+
// List lists all jobs under a given job name prefix.
35+
List(ctx context.Context, prefix string) (*ListResponse, error)
36+
}
37+
38+
// Interface is a cron interface. It schedules and manages job which are stored
39+
// and informed from ETCD. It uses a trigger function to call when a job is
40+
// triggered.
41+
// Jobs may be oneshot or recurring. Recurring jobs are scheduled to run at
42+
// their next scheduled time. Oneshot jobs are scheduled to run once and are
43+
// removed from the schedule after they are triggered.
44+
type Interface interface {
45+
// Run is a blocking function that runs the cron instance. It will return an
46+
// error if the instance is already running.
47+
// Returns when the given context is cancelled, after doing all cleanup.
48+
Run(ctx context.Context) error
49+
50+
// API implements the client API for the cron instance.
51+
API
52+
}

0 commit comments

Comments
 (0)