Skip to content

Commit bf91341

Browse files
committed
Use int64 for event ids
1 parent 72abcbe commit bf91341

File tree

6 files changed

+31
-23
lines changed

6 files changed

+31
-23
lines changed

backend/mysql/mysql.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *w
200200

201201
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, ignoreDuplicate bool) error {
202202
var parentInstanceID *string
203-
var parentEventID *int
203+
var parentEventID *int64
204204
if wfi.SubWorkflow() {
205205
i := wfi.ParentInstanceID
206206
parentInstanceID = &i
@@ -291,7 +291,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
291291
var id int
292292
var instanceID, executionID string
293293
var parentInstanceID *string
294-
var parentEventID *int
294+
var parentEventID *int64
295295
var stickyUntil *time.Time
296296
if err := row.Scan(&id, &instanceID, &executionID, &parentInstanceID, &parentEventID, &stickyUntil); err != nil {
297297
if err == sql.ErrNoRows {

backend/sqlite/sqlite.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, m history.W
8989

9090
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, ignoreDuplicate bool) error {
9191
var parentInstanceID *string
92-
var parentEventID *int
92+
var parentEventID *int64
9393
if wfi.SubWorkflow() {
9494
i := wfi.ParentInstanceID
9595
parentInstanceID = &i
@@ -257,7 +257,7 @@ func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, e
257257

258258
var instanceID, executionID string
259259
var parentInstanceID *string
260-
var parentEventID *int
260+
var parentEventID *int64
261261
var stickyUntil *time.Time
262262
if err := row.Scan(&instanceID, &executionID, &parentInstanceID, &parentEventID, &stickyUntil); err != nil {
263263
if err == sql.ErrNoRows {

internal/command/command.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ const (
3535
type Command struct {
3636
State CommandState
3737

38-
ID int
38+
ID int64
3939

4040
Type CommandType
4141

@@ -47,7 +47,7 @@ type ScheduleActivityTaskCommandAttr struct {
4747
Inputs []payload.Payload
4848
}
4949

50-
func NewScheduleActivityTaskCommand(id int, name string, inputs []payload.Payload) Command {
50+
func NewScheduleActivityTaskCommand(id int64, name string, inputs []payload.Payload) Command {
5151
return Command{
5252
ID: id,
5353
Type: CommandType_ScheduleActivityTask,
@@ -64,7 +64,7 @@ type ScheduleSubWorkflowCommandAttr struct {
6464
Inputs []payload.Payload
6565
}
6666

67-
func NewScheduleSubWorkflowCommand(id int, instanceID, name string, inputs []payload.Payload) Command {
67+
func NewScheduleSubWorkflowCommand(id int64, instanceID, name string, inputs []payload.Payload) Command {
6868
if instanceID == "" {
6969
instanceID = uuid.New().String()
7070
}
@@ -84,7 +84,7 @@ type ScheduleTimerCommandAttr struct {
8484
At time.Time
8585
}
8686

87-
func NewScheduleTimerCommand(id int, at time.Time) Command {
87+
func NewScheduleTimerCommand(id int64, at time.Time) Command {
8888
return Command{
8989
ID: id,
9090
Type: CommandType_ScheduleTimer,
@@ -98,7 +98,7 @@ type CancelTimerCommandAttr struct {
9898
TimerID int
9999
}
100100

101-
func NewCancelTimerCommand(id, timerID int) Command {
101+
func NewCancelTimerCommand(id int64, timerID int) Command {
102102
return Command{
103103
ID: id,
104104
Type: CommandType_CancelTimer,
@@ -112,7 +112,7 @@ type SideEffectCommandAttr struct {
112112
Result payload.Payload
113113
}
114114

115-
func NewSideEffectCommand(id int, result payload.Payload) Command {
115+
func NewSideEffectCommand(id int64, result payload.Payload) Command {
116116
return Command{
117117
ID: id,
118118
Type: CommandType_SideEffect,
@@ -127,7 +127,7 @@ type CompleteWorkflowCommandAttr struct {
127127
Error string
128128
}
129129

130-
func NewCompleteWorkflowCommand(id int, result payload.Payload, err error) Command {
130+
func NewCompleteWorkflowCommand(id int64, result payload.Payload, err error) Command {
131131
var error string
132132
if err != nil {
133133
error = err.Error()

internal/core/instance.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ type WorkflowInstance struct {
55
ExecutionID string `json:"execution_id,omitempty"`
66

77
ParentInstanceID string `json:"parent_instance,omitempty"`
8-
ParentEventID int `json:"parent_event_id,omitempty"`
8+
ParentEventID int64 `json:"parent_event_id,omitempty"`
99
}
1010

1111
func NewWorkflowInstance(instanceID, executionID string) *WorkflowInstance {
@@ -15,7 +15,7 @@ func NewWorkflowInstance(instanceID, executionID string) *WorkflowInstance {
1515
}
1616
}
1717

18-
func NewSubWorkflowInstance(instanceID, executionID string, parentInstanceID string, parentEventID int) *WorkflowInstance {
18+
func NewSubWorkflowInstance(instanceID, executionID string, parentInstanceID string, parentEventID int64) *WorkflowInstance {
1919
return &WorkflowInstance{
2020
InstanceID: instanceID,
2121
ExecutionID: executionID,

internal/history/history.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,28 +46,34 @@ func (et EventType) String() string {
4646
return "WorkflowExecutionTerminated"
4747
case EventType_WorkflowExecutionCanceled:
4848
return "WorkflowExecutionCanceled"
49+
4950
case EventType_WorkflowTaskStarted:
5051
return "WorkflowTaskStarted"
5152
case EventType_WorkflowTaskFinished:
5253
return "WorkflowTaskFinished"
54+
5355
case EventType_SubWorkflowScheduled:
5456
return "SubWorkflowScheduled"
5557
case EventType_SubWorkflowCompleted:
5658
return "SubWorkflowCompleted"
5759
case EventType_SubWorkflowFailed:
5860
return "SubWorkflowFailed"
61+
5962
case EventType_ActivityScheduled:
6063
return "ActivityScheduled"
6164
case EventType_ActivityCompleted:
6265
return "ActivityCompleted"
6366
case EventType_ActivityFailed:
6467
return "ActivityFailed"
68+
6569
case EventType_TimerScheduled:
6670
return "TimerScheduled"
6771
case EventType_TimerFired:
6872
return "TimerFired"
73+
6974
case EventType_SignalReceived:
7075
return "SignalReceived"
76+
7177
case EventType_SideEffectResult:
7278
return "SideEffectResult"
7379
default:
@@ -83,10 +89,12 @@ type Event struct {
8389

8490
Timestamp time.Time
8591

92+
SequenceID int64
93+
8694
// ScheduleEventID is used to correlate events belonging together
8795
// For example, if an activity is scheduled, ScheduleEventID of the schedule event and the
8896
// completion/failure event are the same.
89-
ScheduleEventID int
97+
ScheduleEventID int64
9098

9199
// Attributes are event type specific attributes
92100
Attributes interface{}
@@ -100,7 +108,7 @@ func (e Event) String() string {
100108

101109
type HistoryEventOption func(e *Event)
102110

103-
func ScheduleEventID(scheduleEventID int) HistoryEventOption {
111+
func ScheduleEventID(scheduleEventID int64) HistoryEventOption {
104112
return func(e *Event) {
105113
e.ScheduleEventID = scheduleEventID
106114
}

internal/workflowstate/workflowstate.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ type signalChannel struct {
3939

4040
type WfState struct {
4141
instance *core.WorkflowInstance
42-
scheduleEventID int
42+
scheduleEventID int64
4343
commands []*command.Command
44-
pendingFutures map[int]DecodingSettable
44+
pendingFutures map[int64]DecodingSettable
4545
replaying bool
4646

4747
pendingSignals map[string][]payload.Payload
@@ -58,7 +58,7 @@ func NewWorkflowState(instance *core.WorkflowInstance, logger log.Logger, clock
5858
instance: instance,
5959
commands: []*command.Command{},
6060
scheduleEventID: 1,
61-
pendingFutures: map[int]DecodingSettable{},
61+
pendingFutures: map[int64]DecodingSettable{},
6262

6363
pendingSignals: map[string][]payload.Payload{},
6464
signalChannels: make(map[string]*signalChannel),
@@ -81,22 +81,22 @@ func WithWorkflowState(ctx sync.Context, wfState *WfState) sync.Context {
8181
return sync.WithValue(ctx, workflowCtxKey, wfState)
8282
}
8383

84-
func (wf *WfState) GetNextScheduleEventID() int {
84+
func (wf *WfState) GetNextScheduleEventID() int64 {
8585
scheduleEventID := wf.scheduleEventID
8686
wf.scheduleEventID++
8787
return scheduleEventID
8888
}
8989

90-
func (wf *WfState) TrackFuture(scheduleEventID int, f DecodingSettable) {
90+
func (wf *WfState) TrackFuture(scheduleEventID int64, f DecodingSettable) {
9191
wf.pendingFutures[scheduleEventID] = f
9292
}
9393

94-
func (wf *WfState) FutureByScheduleEventID(scheduleEventID int) (DecodingSettable, bool) {
94+
func (wf *WfState) FutureByScheduleEventID(scheduleEventID int64) (DecodingSettable, bool) {
9595
f, ok := wf.pendingFutures[scheduleEventID]
9696
return f, ok
9797
}
9898

99-
func (wf *WfState) RemoveFuture(scheduleEventID int) {
99+
func (wf *WfState) RemoveFuture(scheduleEventID int64) {
100100
delete(wf.pendingFutures, scheduleEventID)
101101
}
102102

@@ -108,7 +108,7 @@ func (wf *WfState) AddCommand(cmd *command.Command) {
108108
wf.commands = append(wf.commands, cmd)
109109
}
110110

111-
func (wf *WfState) RemoveCommandByEventID(eventID int) *command.Command {
111+
func (wf *WfState) RemoveCommandByEventID(eventID int64) *command.Command {
112112
for i, c := range wf.commands {
113113
if c.ID == eventID {
114114
wf.commands = append(wf.commands[:i], wf.commands[i+1:]...)

0 commit comments

Comments
 (0)