Skip to content

Commit cc77e8d

Browse files
authored
Add explanatory comments for keys used in example (#296)
* Add explanatory comments for keys used in example * Improve printing and cleanup * Formatting
1 parent f3e3d72 commit cc77e8d

File tree

4 files changed

+79
-67
lines changed

4 files changed

+79
-67
lines changed

go/patterns-use-cases/src/cron/cron.go

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,18 @@ type JobRequest struct {
1313
CronExpression string `json:"cronExpression"` // The cron expression e.g. "0 0 * * *" (every day at midnight)
1414
Service string `json:"service"`
1515
Method string `json:"method"` // Handler to execute with this schedule
16-
Key string `json:"key,omitempty"` // Optional: Virtual Object key to call
16+
Key string `json:"key,omitempty"` // Optional: Virtual Object key of task to call
1717
Payload string `json:"payload,omitempty"` // Optional payload to pass to the handler
1818
}
1919

2020
// JobInfo represents the stored job information
2121
type JobInfo struct {
22-
Req JobRequest `json:"req"`
22+
Request JobRequest `json:"request"`
2323
NextExecutionTime time.Time `json:"next_execution_time"`
2424
NextExecutionID string `json:"next_execution_id"`
2525
}
2626

27-
const JOB_KEY = "job" // Key for storing job information in the Restate object
27+
const JOB_STATE = "job-state" // K/V state key for storing job info in the Restate
2828

2929
// CronJobInitiator service for creating new cron jobs
3030
//
@@ -44,9 +44,11 @@ const JOB_KEY = "job" // Key for storing job information in the Restate object
4444
type CronJobInitiator struct{}
4545

4646
func (CronJobInitiator) Create(ctx restate.Context, req JobRequest) (string, error) {
47+
// Create a new job ID and initiate the cron job object for that ID
48+
// We can then address this job object by its ID
4749
jobID := restate.Rand(ctx).UUID().String()
4850

49-
fmt.Printf("Creating new cron job with ID:", jobID, "for service:", req.Service, "method:", req.Method)
51+
fmt.Printf("Creating new cron job with ID %s for service %s and method %s", jobID, req.Service, req.Method)
5052
job, err := restate.Object[*JobInfo](ctx, "CronJob", jobID, "Initiate").Request(req)
5153
if err != nil {
5254
return "", err
@@ -59,59 +61,59 @@ func (CronJobInitiator) Create(ctx restate.Context, req JobRequest) (string, err
5961
type CronJob struct{}
6062

6163
func (CronJob) Initiate(ctx restate.ObjectContext, req JobRequest) (*JobInfo, error) {
62-
// Check if job already exists
63-
job, err := restate.Get[*JobInfo](ctx, JOB_KEY)
64+
// Check if jobState already exists
65+
jobState, err := restate.Get[*JobInfo](ctx, JOB_STATE)
6466
if err != nil {
6567
return nil, err
6668
}
67-
if job != nil {
68-
return nil, restate.TerminalErrorf("job already exists for this ID", 500)
69+
if jobState != nil {
70+
return nil, restate.TerminalErrorf("jobState already exists for this ID", 500)
6971
}
7072

7173
return scheduleNextExecution(ctx, req)
7274
}
7375

74-
func (CronJob) Execute(ctx restate.ObjectContext, req JobRequest) error {
76+
func (CronJob) Execute(ctx restate.ObjectContext) error {
7577
// Get the job information
76-
job, err := restate.Get[*JobInfo](ctx, JOB_KEY)
78+
jobState, err := restate.Get[*JobInfo](ctx, JOB_STATE)
7779
if err != nil {
7880
return err
7981
}
80-
if job == nil {
82+
if jobState == nil {
8183
return restate.TerminalErrorf("job not found", 500)
8284
}
8385

8486
// Add key if it's a virtual object call
85-
fmt.Printf("Executing job with ID:", restate.Key(ctx), "for service:", req.Service, "method:", req.Method)
87+
req := jobState.Request
88+
fmt.Printf("Executing job with ID: %s for service %s for method %s", restate.Key(ctx), req.Service, req.Method)
8689
if req.Key != "" {
8790
restate.ObjectSend(ctx, req.Service, req.Key, req.Method).Send(req.Payload)
8891
} else {
8992
restate.ServiceSend(ctx, req.Service, req.Method).Send(req.Payload)
9093
}
9194

9295
// Schedule the next execution
93-
_, err = scheduleNextExecution(ctx, job.Req)
96+
_, err = scheduleNextExecution(ctx, req)
9497
return err
9598
}
9699

97100
func (CronJob) Cancel(ctx restate.ObjectContext) error {
98101
// Get the job to cancel the next execution
99-
job, err := restate.Get[*JobInfo](ctx, JOB_KEY)
102+
job, err := restate.Get[*JobInfo](ctx, JOB_STATE)
100103
if err != nil {
101104
return err
102105
}
103106
if job == nil {
104107
return restate.TerminalErrorf("job not found for cancellation", 404)
105108
}
106109
restate.CancelInvocation(ctx, job.NextExecutionID)
107-
restate.ObjectSend(ctx, "CronJob", restate.Key(ctx), "Cleanup").Send(nil)
108110

109111
restate.ClearAll(ctx)
110112
return nil
111113
}
112114

113115
func (CronJob) GetInfo(ctx restate.ObjectSharedContext) (*JobInfo, error) {
114-
return restate.Get[*JobInfo](ctx, JOB_KEY)
116+
return restate.Get[*JobInfo](ctx, JOB_STATE)
115117
}
116118

117119
// scheduleNextExecution calculates and schedules the next execution of the cron job
@@ -132,15 +134,16 @@ func scheduleNextExecution(ctx restate.ObjectContext, req JobRequest) (*JobInfo,
132134
nextTime := schedule.Next(currentTime)
133135
delay := nextTime.Sub(currentTime)
134136

135-
// Schedule the next execution
136-
handle := restate.ObjectSend(ctx, "CronJob", restate.Key(ctx), "Execute").Send(req, restate.WithDelay(delay))
137+
// Schedule next execution for this job
138+
thisJobID := restate.Key(ctx) // This got generated by the CronJobInitiator
139+
handle := restate.ObjectSend(ctx, "CronJob", thisJobID, "Execute").Send(nil, restate.WithDelay(delay))
137140

138141
// Store the job information
139-
job := &JobInfo{
140-
Req: req,
142+
jobState := &JobInfo{
143+
Request: req,
141144
NextExecutionTime: nextTime,
142145
NextExecutionID: handle.GetInvocationId(),
143146
}
144-
restate.Set(ctx, JOB_KEY, job)
145-
return job, nil
147+
restate.Set(ctx, JOB_STATE, jobState)
148+
return jobState, nil
146149
}

go/patterns-use-cases/src/cron/task.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
type TaskService struct{}
1111

1212
func (TaskService) ExecuteTask(ctx restate.Context, payload string) error {
13-
fmt.Printf("Executing task: ", payload)
13+
fmt.Printf("Executing task: %s", payload)
1414
return nil
1515
}
1616

java/patterns-use-cases/src/main/java/my/example/cron/Cron.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,18 +37,20 @@ public record JobRequest(
3737
String cronExpression, // e.g. "0 0 * * *" (every day at midnight)
3838
String service,
3939
String method, // Handler to execute with this schedule
40-
Optional<String> key, // Optional Virtual Object key to call
40+
Optional<String> key, // Optional Virtual Object key of the task to call
4141
Optional<String> payload) {} // Optional data to pass to the handler
4242

43-
public record JobInfo(JobRequest req, String nextExecutionTime, String nextExecutionId) {}
43+
public record JobInfo(JobRequest request, String nextExecutionTime, String nextExecutionId) {}
4444

4545
@Name("CronJobInitiator")
4646
@Service
4747
public static class JobInitiator {
4848
@Handler
49-
public String create(Context ctx, JobRequest req) {
49+
public String create(Context ctx, JobRequest request) {
50+
// Create a new job ID and initiate the cron job object for that ID
51+
// We can then address this job object by its ID
5052
var jobId = ctx.random().nextUUID().toString();
51-
var cronJob = CronJobClient.fromContext(ctx, jobId).initiate(req).await();
53+
var cronJob = CronJobClient.fromContext(ctx, jobId).initiate(request).await();
5254
return String.format(
5355
"Job created with ID %s and next execution time %s", jobId, cronJob.nextExecutionTime());
5456
}
@@ -58,37 +60,39 @@ public String create(Context ctx, JobRequest req) {
5860
@VirtualObject
5961
public static class Job {
6062

61-
private final StateKey<JobInfo> JOB = StateKey.of("job", JobInfo.class);
63+
private final StateKey<JobInfo> JOB_STATE = StateKey.of("job-state", JobInfo.class);
6264
private final CronParser PARSER =
6365
new CronParser(CronDefinitionBuilder.instanceDefinitionFor(UNIX));
6466

6567
@Handler
66-
public JobInfo initiate(ObjectContext ctx, JobRequest req) {
67-
if (ctx.get(JOB).isPresent()) {
68+
public JobInfo initiate(ObjectContext ctx, JobRequest request) {
69+
if (ctx.get(JOB_STATE).isPresent()) {
6870
throw new TerminalException("Job already exists for this ID");
6971
}
70-
return scheduleNextExecution(ctx, req);
72+
return scheduleNextExecution(ctx, request);
7173
}
7274

7375
@Handler
7476
public void execute(ObjectContext ctx) {
75-
JobRequest req = ctx.get(JOB).orElseThrow(() -> new TerminalException("Job not found")).req;
77+
JobRequest request =
78+
ctx.get(JOB_STATE).orElseThrow(() -> new TerminalException("Job not found")).request;
7679

77-
executeTask(ctx, req);
78-
scheduleNextExecution(ctx, req);
80+
executeTask(ctx, request);
81+
scheduleNextExecution(ctx, request);
7982
}
8083

8184
@Handler
8285
public void cancel(ObjectContext ctx) {
83-
ctx.get(JOB).ifPresent(job -> ctx.invocationHandle(job.nextExecutionId).cancel());
86+
ctx.get(JOB_STATE)
87+
.ifPresent(jobState -> ctx.invocationHandle(jobState.nextExecutionId).cancel());
8488

8589
// Clear the job state
8690
ctx.clearAll();
8791
}
8892

8993
@Shared
9094
public Optional<JobInfo> getInfo(SharedObjectContext ctx) {
91-
return ctx.get(JOB);
95+
return ctx.get(JOB_STATE);
9296
}
9397

9498
private void executeTask(ObjectContext ctx, JobRequest job) {
@@ -104,11 +108,11 @@ private void executeTask(ObjectContext ctx, JobRequest job) {
104108
ctx.send(request);
105109
}
106110

107-
private JobInfo scheduleNextExecution(ObjectContext ctx, JobRequest req) {
111+
private JobInfo scheduleNextExecution(ObjectContext ctx, JobRequest request) {
108112
// Parse cron expression
109113
ExecutionTime executionTime;
110114
try {
111-
executionTime = ExecutionTime.forCron(PARSER.parse(req.cronExpression));
115+
executionTime = ExecutionTime.forCron(PARSER.parse(request.cronExpression));
112116
} catch (IllegalArgumentException e) {
113117
throw new TerminalException("Invalid cron expression: " + e.getMessage());
114118
}
@@ -124,13 +128,14 @@ private JobInfo scheduleNextExecution(ObjectContext ctx, JobRequest req) {
124128
.nextExecution(now)
125129
.orElseThrow(() -> new TerminalException("Cannot determine next execution time"));
126130

127-
// Schedule next execution
128-
var handle = CronJobClient.fromContext(ctx, ctx.key()).send().execute(delay);
131+
// Schedule next execution for this job
132+
String thisJobId = ctx.key(); // This got generated by the CronJobInitiator
133+
var handle = CronJobClient.fromContext(ctx, thisJobId).send().execute(delay);
129134

130135
// Save job state
131-
var job = new JobInfo(req, next.toString(), handle.invocationId());
132-
ctx.set(JOB, job);
133-
return job;
136+
var jobState = new JobInfo(request, next.toString(), handle.invocationId());
137+
ctx.set(JOB_STATE, jobState);
138+
return jobState;
134139
}
135140
}
136141
}

typescript/patterns-use-cases/src/cron/cron_service.ts

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,18 @@ type JobRequest = {
66
cronExpression: string; // The cron expression e.g. "0 0 * * *" (every day at midnight)
77
service: string;
88
method: string; // Handler to execute with this schedule
9-
key?: string; // Optional: Virtual Object key to call
9+
key?: string; // Optional: Virtual Object key of task to call
1010
payload?: string; // Optional payload to pass to the handler
1111
};
1212

1313
type JobInfo = {
14-
req: JobRequest;
14+
request: JobRequest;
1515
next_execution_time: string;
1616
next_execution_id: InvocationId;
1717
};
1818

19-
const JOB = "job"; // Key for storing job information in the Restate object
19+
// Key of the K/V state that we store in Restate
20+
const JOB_STATE = "job-state";
2021

2122
/*
2223
* A distributed cron service built with Restate that schedules tasks based on cron expressions.
@@ -35,9 +36,11 @@ const JOB = "job"; // Key for storing job information in the Restate object
3536
export const cronJobInitiator = restate.service({
3637
name: "CronJobInitiator",
3738
handlers: {
38-
create: async (ctx: restate.Context, req: JobRequest) => {
39+
create: async (ctx: restate.Context, request: JobRequest) => {
40+
// Create a new job ID and initiate the cron job object for that ID
41+
// We can then address this job object by its ID
3942
const jobId = ctx.rand.uuidv4();
40-
const job = await ctx.objectClient(cronJob, jobId).initiate(req);
43+
const job = await ctx.objectClient(cronJob, jobId).initiate(request);
4144
return `Job created with ID ${jobId} and next execution time ${job.next_execution_time}`;
4245
},
4346
},
@@ -46,21 +49,21 @@ export const cronJobInitiator = restate.service({
4649
export const cronJob = restate.object({
4750
name: "CronJob",
4851
handlers: {
49-
initiate: async (ctx: restate.ObjectContext, req: JobRequest): Promise<JobInfo> => {
50-
if (await ctx.get<JobInfo>(JOB)) {
52+
initiate: async (ctx: restate.ObjectContext, request: JobRequest): Promise<JobInfo> => {
53+
if (await ctx.get<JobInfo>(JOB_STATE)) {
5154
throw new TerminalError("Job already exists for this ID.");
5255
}
5356

54-
return await scheduleNextExecution(ctx, req);
57+
return await scheduleNextExecution(ctx, request);
5558
},
5659
execute: async (ctx: restate.ObjectContext) => {
57-
const job = await ctx.get<JobInfo>(JOB);
58-
if (!job) {
60+
const jobState = await ctx.get<JobInfo>(JOB_STATE);
61+
if (!jobState) {
5962
throw new TerminalError("Job not found.");
6063
}
6164

6265
// execute the task
63-
const { service, method, key, payload } = job.req;
66+
const { service, method, key, payload } = jobState.request;
6467
if (payload) {
6568
ctx.genericSend({
6669
service,
@@ -79,48 +82,49 @@ export const cronJob = restate.object({
7982
});
8083
}
8184

82-
await scheduleNextExecution(ctx, job.req);
85+
await scheduleNextExecution(ctx, jobState.request);
8386
},
8487
cancel: async (ctx: restate.ObjectContext) => {
8588
// Cancel the next execution
86-
const job = await ctx.get<JobInfo>(JOB);
87-
if (job) {
88-
ctx.cancel(job.next_execution_id);
89+
const jobState = await ctx.get<JobInfo>(JOB_STATE);
90+
if (jobState) {
91+
ctx.cancel(jobState.next_execution_id);
8992
}
9093

9194
// Clear the job state
9295
ctx.clearAll();
9396
},
94-
getInfo: async (ctx: restate.ObjectSharedContext) => ctx.get<JobInfo>(JOB),
97+
getInfo: async (ctx: restate.ObjectSharedContext) => ctx.get<JobInfo>(JOB_STATE),
9598
},
9699
});
97100

98101
const scheduleNextExecution = async (
99102
ctx: restate.ObjectContext,
100-
req: JobRequest
103+
request: JobRequest
101104
): Promise<JobInfo> => {
102105
// Parse cron expression
103106
// Persist current date in Restate for deterministic replay
104107
const currentDate = await ctx.date.now();
105108
let interval;
106109
try {
107-
interval = CronExpressionParser.parse(req.cronExpression, { currentDate });
110+
interval = CronExpressionParser.parse(request.cronExpression, { currentDate });
108111
} catch (e) {
109112
throw new TerminalError(`Invalid cron expression: ${(e as Error).message}`);
110113
}
111114

112115
const next = interval.next().toDate();
113116
const delay = next.getTime() - currentDate;
114117

115-
// Schedule next execution
116-
const handle = ctx.objectSendClient(cronJob, ctx.key, { delay }).execute();
118+
// Schedule next execution for this job
119+
const thisJobId = ctx.key; // This got generated by the CronJobInitiator
120+
const handle = ctx.objectSendClient(cronJob, thisJobId, { delay }).execute();
117121

118122
// Store the job information
119-
const job = {
120-
req,
123+
const jobState = {
124+
request,
121125
next_execution_time: next.toString(),
122126
next_execution_id: await handle.invocationId,
123127
};
124-
ctx.set<JobInfo>(JOB, job);
125-
return job;
128+
ctx.set<JobInfo>(JOB_STATE, jobState);
129+
return jobState;
126130
};

0 commit comments

Comments
 (0)