Skip to content

Commit e3792d5

Browse files
committed
Rename event id
1 parent 312c6b5 commit e3792d5

File tree

21 files changed

+158
-144
lines changed

21 files changed

+158
-144
lines changed

internal/log/log.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package log
2+
3+
type Logger interface {
4+
Debug(args ...interface{})
5+
Info(args ...interface{})
6+
Warning(args ...interface{})
7+
Error(args ...interface{})
8+
}

internal/tester/tester.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,6 @@ func (wt *workflowTester) SignalWorkflowInstance(wfi core.WorkflowInstance, name
337337
e := history.NewHistoryEvent(
338338
wt.clock.Now(),
339339
history.EventType_SignalReceived,
340-
-1,
341340
&history.SignalReceivedAttributes{
342341
Name: name,
343342
Arg: arg,
@@ -444,19 +443,19 @@ func (wt *workflowTester) scheduleActivity(wfi core.WorkflowInstance, event hist
444443
ne = history.NewHistoryEvent(
445444
wt.clock.Now(),
446445
history.EventType_ActivityFailed,
447-
event.EventID,
448446
&history.ActivityFailedAttributes{
449447
Reason: activityErr.Error(),
450448
},
449+
history.ScheduleEventID(event.ScheduleEventID),
451450
)
452451
} else {
453452
ne = history.NewHistoryEvent(
454453
wt.clock.Now(),
455454
history.EventType_ActivityCompleted,
456-
event.EventID,
457455
&history.ActivityCompletedAttributes{
458456
Result: activityResult,
459457
},
458+
history.ScheduleEventID(event.ScheduleEventID),
460459
)
461460
}
462461

@@ -551,19 +550,19 @@ func (wt *workflowTester) scheduleSubWorkflow(event core.WorkflowEvent) {
551550
he = history.NewHistoryEvent(
552551
wt.clock.Now(),
553552
history.EventType_SubWorkflowFailed,
554-
event.WorkflowInstance.ParentEventID(),
555553
&history.SubWorkflowFailedAttributes{
556554
Error: workflowErr.Error(),
557555
},
556+
history.ScheduleEventID(event.WorkflowInstance.ParentEventID()),
558557
)
559558
} else {
560559
he = history.NewHistoryEvent(
561560
wt.clock.Now(),
562561
history.EventType_SubWorkflowCompleted,
563-
event.WorkflowInstance.ParentEventID(),
564562
&history.SubWorkflowCompletedAttributes{
565563
Result: workflowResult,
566564
},
565+
history.ScheduleEventID(event.WorkflowInstance.ParentEventID()),
567566
)
568567
}
569568

@@ -585,7 +584,6 @@ func (wt *workflowTester) getInitialEvent(wf workflow.Workflow, args []interface
585584
return history.NewHistoryEvent(
586585
wt.clock.Now(),
587586
history.EventType_WorkflowExecutionStarted,
588-
-1,
589587
&history.ExecutionStartedAttributes{
590588
Name: name,
591589
Inputs: inputs,

internal/worker/activity.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,18 +143,19 @@ func (aw *activityWorker) handleTask(ctx context.Context, task *task.Activity) {
143143
event = history.NewHistoryEvent(
144144
aw.clock.Now(),
145145
history.EventType_ActivityFailed,
146-
task.Event.EventID,
147146
&history.ActivityFailedAttributes{
148147
Reason: err.Error(),
149-
})
148+
},
149+
history.ScheduleEventID(task.Event.ScheduleEventID),
150+
)
150151
} else {
151152
event = history.NewHistoryEvent(
152153
aw.clock.Now(),
153154
history.EventType_ActivityCompleted,
154-
task.Event.EventID,
155155
&history.ActivityCompletedAttributes{
156156
Result: result,
157-
})
157+
},
158+
history.ScheduleEventID(task.Event.ScheduleEventID))
158159
}
159160

160161
if err := aw.backend.CompleteActivityTask(ctx, task.WorkflowInstance, task.ID, event); err != nil {

internal/workflow/activity.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,13 @@ func executeActivity(ctx sync.Context, options ActivityOptions, activity Activit
3636
}
3737

3838
wfState := getWfState(ctx)
39-
eventID := wfState.eventID
40-
wfState.eventID++
39+
scheduleEventID := wfState.getNextScheduleEventID()
4140

4241
name := fn.Name(activity)
43-
cmd := command.NewScheduleActivityTaskCommand(eventID, name, inputs)
42+
cmd := command.NewScheduleActivityTaskCommand(scheduleEventID, name, inputs)
4443
wfState.addCommand(&cmd)
4544

46-
wfState.pendingFutures[eventID] = f
45+
wfState.pendingFutures[scheduleEventID] = f
4746

4847
// Handle cancellation
4948
if d := ctx.Done(); d != nil {
@@ -57,7 +56,7 @@ func executeActivity(ctx sync.Context, options ActivityOptions, activity Activit
5756
}
5857

5958
wfState.removeCommand(cmd)
60-
delete(wfState.pendingFutures, eventID)
59+
delete(wfState.pendingFutures, scheduleEventID)
6160
f.Set(nil, sync.Canceled)
6261
})
6362
}

internal/workflow/executor.go

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) ([]history
7272
}
7373

7474
// Always pad the received events with WorkflowTaskStarted/Finished events to indicate the execution
75-
events := []history.Event{history.NewHistoryEvent(e.clock.Now(), history.EventType_WorkflowTaskStarted, -1, &history.WorkflowTaskStartedAttributes{})}
75+
events := []history.Event{history.NewHistoryEvent(e.clock.Now(), history.EventType_WorkflowTaskStarted, &history.WorkflowTaskStartedAttributes{})}
7676
events = append(events, t.NewEvents...)
7777

7878
// Execute new events received from the backend
@@ -87,7 +87,7 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) ([]history
8787
events = append(events, newCommandEvents...)
8888

8989
// Execution of this task is finished, add event to history
90-
events = append(events, history.NewHistoryEvent(e.clock.Now(), history.EventType_WorkflowTaskFinished, -1, &history.WorkflowTaskFinishedAttributes{}))
90+
events = append(events, history.NewHistoryEvent(e.clock.Now(), history.EventType_WorkflowTaskFinished, &history.WorkflowTaskFinishedAttributes{}))
9191

9292
e.lastEventID = events[len(events)-1].ID
9393

@@ -204,7 +204,7 @@ func (e *executor) handleWorkflowTaskStarted(event history.Event, a *history.Wor
204204
}
205205

206206
func (e *executor) handleActivityScheduled(event history.Event, a *history.ActivityScheduledAttributes) error {
207-
c := e.workflowState.removeCommandByEventID(event.EventID)
207+
c := e.workflowState.removeCommandByEventID(event.ScheduleEventID)
208208
if c != nil {
209209
// Ensure the same activity is scheduled again
210210
ca := c.Attr.(*command.ScheduleActivityTaskCommandAttr)
@@ -217,52 +217,52 @@ func (e *executor) handleActivityScheduled(event history.Event, a *history.Activ
217217
}
218218

219219
func (e *executor) handleActivityCompleted(event history.Event, a *history.ActivityCompletedAttributes) error {
220-
f, ok := e.workflowState.pendingFutures[event.EventID]
220+
f, ok := e.workflowState.pendingFutures[event.ScheduleEventID]
221221
if !ok {
222222
return nil
223223
}
224224

225-
e.workflowState.removeCommandByEventID(event.EventID)
225+
e.workflowState.removeCommandByEventID(event.ScheduleEventID)
226226
f.Set(a.Result, nil)
227227

228228
return e.workflow.Continue(e.workflowCtx)
229229
}
230230

231231
func (e *executor) handleActivityFailed(event history.Event, a *history.ActivityFailedAttributes) error {
232-
f, ok := e.workflowState.pendingFutures[event.EventID]
232+
f, ok := e.workflowState.pendingFutures[event.ScheduleEventID]
233233
if !ok {
234234
return errors.New("no pending future found for activity failed event")
235235
}
236236

237-
e.workflowState.removeCommandByEventID(event.EventID)
237+
e.workflowState.removeCommandByEventID(event.ScheduleEventID)
238238

239239
f.Set(nil, errors.New(a.Reason))
240240

241241
return e.workflow.Continue(e.workflowCtx)
242242
}
243243

244244
func (e *executor) handleTimerScheduled(event history.Event, a *history.TimerScheduledAttributes) error {
245-
e.workflowState.removeCommandByEventID(event.EventID)
245+
e.workflowState.removeCommandByEventID(event.ScheduleEventID)
246246

247247
return nil
248248
}
249249

250250
func (e *executor) handleTimerFired(event history.Event, a *history.TimerFiredAttributes) error {
251-
f, ok := e.workflowState.pendingFutures[event.EventID]
251+
f, ok := e.workflowState.pendingFutures[event.ScheduleEventID]
252252
if !ok {
253253
// Timer already canceled ignore
254254
return nil
255255
}
256256

257-
e.workflowState.removeCommandByEventID(event.EventID)
257+
e.workflowState.removeCommandByEventID(event.ScheduleEventID)
258258

259259
f.Set(nil, nil)
260260

261261
return e.workflow.Continue(e.workflowCtx)
262262
}
263263

264264
func (e *executor) handleSubWorkflowScheduled(event history.Event, a *history.SubWorkflowScheduledAttributes) error {
265-
c := e.workflowState.removeCommandByEventID(event.EventID)
265+
c := e.workflowState.removeCommandByEventID(event.ScheduleEventID)
266266
if c != nil {
267267
ca := c.Attr.(*command.ScheduleSubWorkflowCommandAttr)
268268
if a.Name != ca.Name {
@@ -274,25 +274,25 @@ func (e *executor) handleSubWorkflowScheduled(event history.Event, a *history.Su
274274
}
275275

276276
func (e *executor) handleSubWorkflowFailed(event history.Event, a *history.SubWorkflowFailedAttributes) error {
277-
f, ok := e.workflowState.pendingFutures[event.EventID]
277+
f, ok := e.workflowState.pendingFutures[event.ScheduleEventID]
278278
if !ok {
279279
return errors.New("no pending future found for sub workflow failed event")
280280
}
281281

282-
e.workflowState.removeCommandByEventID(event.EventID)
282+
e.workflowState.removeCommandByEventID(event.ScheduleEventID)
283283

284284
f.Set(nil, errors.New(a.Error))
285285

286286
return e.workflow.Continue(e.workflowCtx)
287287
}
288288

289289
func (e *executor) handleSubWorkflowCompleted(event history.Event, a *history.SubWorkflowCompletedAttributes) error {
290-
f, ok := e.workflowState.pendingFutures[event.EventID]
290+
f, ok := e.workflowState.pendingFutures[event.ScheduleEventID]
291291
if !ok {
292292
return errors.New("no pending future found for sub workflow completed event")
293293
}
294294

295-
e.workflowState.removeCommandByEventID(event.EventID)
295+
e.workflowState.removeCommandByEventID(event.ScheduleEventID)
296296

297297
f.Set(a.Result, nil)
298298

@@ -304,13 +304,13 @@ func (e *executor) handleSignalReceived(event history.Event, a *history.SignalRe
304304
sc := e.workflowState.getSignalChannel(a.Name)
305305
sc.SendNonblocking(e.workflowCtx, a.Arg)
306306

307-
e.workflowState.removeCommandByEventID(event.EventID)
307+
e.workflowState.removeCommandByEventID(event.ScheduleEventID)
308308

309309
return e.workflow.Continue(e.workflowCtx)
310310
}
311311

312312
func (e *executor) handleSideEffectResult(event history.Event, a *history.SideEffectResultAttributes) error {
313-
f, ok := e.workflowState.pendingFutures[event.EventID]
313+
f, ok := e.workflowState.pendingFutures[event.ScheduleEventID]
314314
if !ok {
315315
return errors.New("no pending future found for side effect result event")
316316
}
@@ -321,8 +321,8 @@ func (e *executor) handleSideEffectResult(event history.Event, a *history.SideEf
321321
}
322322

323323
func (e *executor) workflowCompleted(result payload.Payload, err error) error {
324-
eventId := e.workflowState.eventID
325-
e.workflowState.eventID++
324+
eventId := e.workflowState.scheduleEventID
325+
e.workflowState.scheduleEventID++
326326

327327
cmd := command.NewCompleteWorkflowCommand(eventId, result, err)
328328
e.workflowState.addCommand(&cmd)
@@ -349,11 +349,11 @@ func (e *executor) processCommands(ctx context.Context, t *task.Workflow) ([]his
349349
newEvents = append(newEvents, history.NewHistoryEvent(
350350
e.clock.Now(),
351351
history.EventType_ActivityScheduled,
352-
c.ID,
353352
&history.ActivityScheduledAttributes{
354353
Name: a.Name,
355354
Inputs: a.Inputs,
356355
},
356+
history.ScheduleEventID(c.ID),
357357
))
358358

359359
case command.CommandType_ScheduleSubWorkflow:
@@ -364,12 +364,12 @@ func (e *executor) processCommands(ctx context.Context, t *task.Workflow) ([]his
364364
newEvents = append(newEvents, history.NewHistoryEvent(
365365
e.clock.Now(),
366366
history.EventType_SubWorkflowScheduled,
367-
c.ID,
368367
&history.SubWorkflowScheduledAttributes{
369368
InstanceID: subWorkflowInstance.GetInstanceID(),
370369
Name: a.Name,
371370
Inputs: a.Inputs,
372371
},
372+
history.ScheduleEventID(c.ID),
373373
))
374374

375375
// Send message to new workflow instance
@@ -378,11 +378,11 @@ func (e *executor) processCommands(ctx context.Context, t *task.Workflow) ([]his
378378
HistoryEvent: history.NewHistoryEvent(
379379
e.clock.Now(),
380380
history.EventType_WorkflowExecutionStarted,
381-
c.ID,
382381
&history.ExecutionStartedAttributes{
383382
Name: a.Name,
384383
Inputs: a.Inputs,
385384
},
385+
history.ScheduleEventID(c.ID),
386386
),
387387
})
388388

@@ -391,10 +391,10 @@ func (e *executor) processCommands(ctx context.Context, t *task.Workflow) ([]his
391391
newEvents = append(newEvents, history.NewHistoryEvent(
392392
e.clock.Now(),
393393
history.EventType_SideEffectResult,
394-
c.ID,
395394
&history.SideEffectResultAttributes{
396395
Result: a.Result,
397396
},
397+
history.ScheduleEventID(c.ID),
398398
))
399399

400400
case command.CommandType_ScheduleTimer:
@@ -403,23 +403,23 @@ func (e *executor) processCommands(ctx context.Context, t *task.Workflow) ([]his
403403
newEvents = append(newEvents, history.NewHistoryEvent(
404404
e.clock.Now(),
405405
history.EventType_TimerScheduled,
406-
c.ID,
407406
&history.TimerScheduledAttributes{
408407
At: a.At,
409408
},
409+
history.ScheduleEventID(c.ID),
410410
))
411411

412412
// Create timer_fired event which will become visible in the future
413413
workflowEvents = append(workflowEvents, core.WorkflowEvent{
414414
WorkflowInstance: instance,
415-
HistoryEvent: history.NewFutureHistoryEvent(
415+
HistoryEvent: history.NewHistoryEvent(
416416
e.clock.Now(),
417417
history.EventType_TimerFired,
418-
c.ID,
419418
&history.TimerFiredAttributes{
420419
At: a.At,
421420
},
422-
a.At,
421+
history.ScheduleEventID(c.ID),
422+
history.VisibleAt(a.At),
423423
)},
424424
)
425425

@@ -429,11 +429,11 @@ func (e *executor) processCommands(ctx context.Context, t *task.Workflow) ([]his
429429
newEvents = append(newEvents, history.NewHistoryEvent(
430430
e.clock.Now(),
431431
history.EventType_WorkflowExecutionFinished,
432-
c.ID,
433432
&history.ExecutionCompletedAttributes{
434433
Result: a.Result,
435434
Error: a.Error,
436435
},
436+
history.ScheduleEventID(c.ID),
437437
))
438438

439439
if instance.SubWorkflow() {
@@ -445,19 +445,21 @@ func (e *executor) processCommands(ctx context.Context, t *task.Workflow) ([]his
445445
historyEvent = history.NewHistoryEvent(
446446
e.clock.Now(),
447447
history.EventType_SubWorkflowFailed,
448-
instance.ParentEventID(), // Ensure the message gets sent back to the parent workflow with the right eventID
449448
&history.SubWorkflowFailedAttributes{
450449
Error: a.Error,
451450
},
451+
// Ensure the message gets sent back to the parent workflow with the right eventID
452+
history.ScheduleEventID(instance.ParentEventID()),
452453
)
453454
} else {
454455
historyEvent = history.NewHistoryEvent(
455456
e.clock.Now(),
456457
history.EventType_SubWorkflowCompleted,
457-
instance.ParentEventID(), // Ensure the message gets sent back to the parent workflow with the right eventID
458458
&history.SubWorkflowCompletedAttributes{
459459
Result: a.Result,
460460
},
461+
// Ensure the message gets sent back to the parent workflow with the right eventID
462+
history.ScheduleEventID(instance.ParentEventID()),
461463
)
462464
}
463465

0 commit comments

Comments
 (0)