@@ -23,9 +23,15 @@ type monoprocessBackend struct {
23
23
logger * slog.Logger
24
24
}
25
25
26
+ // NewMonoprocessBackend wraps an existing backend and improves its responsiveness
27
+ // in case the backend and worker are running in the same process. This backend
28
+ // uses channels to notify the worker every time there is a new task ready to be
29
+ // worked on. Note that only one worker will be notified.
30
+ // IMPORTANT: Only use this backend if the backend and worker are running in the
31
+ // same process.
26
32
func NewMonoprocessBackend (b backend.Backend , signalBufferSize int , signalTimeout time.Duration ) * monoprocessBackend {
27
33
if signalTimeout <= 0 {
28
- signalTimeout = time .Second
34
+ signalTimeout = time .Second // default
29
35
}
30
36
mb := & monoprocessBackend {
31
37
Backend : b ,
@@ -41,12 +47,12 @@ func (b *monoprocessBackend) GetWorkflowTask(ctx context.Context) (*task.Workflo
41
47
if w , err := b .Backend .GetWorkflowTask (ctx ); w != nil || err != nil {
42
48
return w , err
43
49
}
44
- b .logger .Debug ( "worker waiting for workflow task signal" )
50
+ b .logger .DebugContext ( ctx , "worker waiting for workflow task signal" )
45
51
select {
46
52
case <- ctx .Done ():
47
53
return nil , ctx .Err ()
48
54
case <- b .workflowSignal :
49
- b .logger .Debug ( "worker got a workflow task signal" )
55
+ b .logger .DebugContext ( ctx , "worker got a workflow task signal" )
50
56
return b .GetWorkflowTask (ctx )
51
57
}
52
58
}
@@ -55,12 +61,12 @@ func (b *monoprocessBackend) GetActivityTask(ctx context.Context) (*task.Activit
55
61
if a , err := b .Backend .GetActivityTask (ctx ); a != nil || err != nil {
56
62
return a , err
57
63
}
58
- b .logger .Debug ( "worker waiting for activity task signal" )
64
+ b .logger .DebugContext ( ctx , "worker waiting for activity task signal" )
59
65
select {
60
66
case <- ctx .Done ():
61
67
return nil , ctx .Err ()
62
68
case <- b .activitySignal :
63
- b .logger .Debug ( "worker got an activity task signal" )
69
+ b .logger .DebugContext ( ctx , "worker got an activity task signal" )
64
70
return b .GetActivityTask (ctx )
65
71
}
66
72
}
@@ -89,7 +95,7 @@ func (b *monoprocessBackend) CompleteWorkflowTask(
89
95
continue
90
96
}
91
97
if ! b .notifyActivityWorker (ctx ) {
92
- break // no reason to notify more
98
+ break // no reason to notify more, queue is full
93
99
}
94
100
}
95
101
for _ , e := range timerEvents {
@@ -98,7 +104,7 @@ func (b *monoprocessBackend) CompleteWorkflowTask(
98
104
b .logger .Warn ("unknown attributes type in timer event" , "type" , reflect .TypeOf (e .Attributes ).String ())
99
105
continue
100
106
}
101
- b .logger .Debug ( "scheduling timer to notify workflow worker" )
107
+ b .logger .DebugContext ( ctx , "scheduling timer to notify workflow worker" )
102
108
time .AfterFunc (attr .At .Sub (time .Now ()), func () { b .notifyWorkflowWorker (ctx ) }) // TODO: cancel timer if the event gets cancelled
103
109
}
104
110
for _ , e := range workflowEvents {
@@ -108,7 +114,7 @@ func (b *monoprocessBackend) CompleteWorkflowTask(
108
114
continue
109
115
}
110
116
if ! b .notifyWorkflowWorker (ctx ) {
111
- break // no reason to notify more
117
+ break // no reason to notify more, queue is full
112
118
}
113
119
}
114
120
return nil
@@ -139,35 +145,31 @@ func (b *monoprocessBackend) SignalWorkflow(ctx context.Context, instanceID stri
139
145
}
140
146
141
147
func (b * monoprocessBackend ) notifyActivityWorker (ctx context.Context ) bool {
148
+ ctx , cancel := context .WithTimeout (ctx , b .signalTimeout )
149
+ defer cancel ()
142
150
select {
143
151
case <- ctx .Done ():
144
152
// we didn't manage to notify the worker that there is a new task, it
145
153
// will pick it up after the poll timeout
146
- b .logger .Debug ("failed to signal activity task to worker, context cancelled" )
147
- case <- time .After (b .signalTimeout ):
148
- // we didn't manage to notify the worker that there is a new task, it
149
- // will pick it up after the poll timeout
150
- b .logger .Debug ("failed to signal activity task to worker, timeout" )
154
+ b .logger .DebugContext (ctx , "failed to signal activity task to worker" , "reason" , ctx .Err ())
155
+ return false
151
156
case b .activitySignal <- struct {}{}:
152
- b .logger .Debug ( "signalled a new activity task to worker" )
157
+ b .logger .DebugContext ( ctx , "signalled a new activity task to worker" )
153
158
return true
154
159
}
155
- return false
156
160
}
157
161
158
162
func (b * monoprocessBackend ) notifyWorkflowWorker (ctx context.Context ) bool {
163
+ ctx , cancel := context .WithTimeout (ctx , b .signalTimeout )
164
+ defer cancel ()
159
165
select {
160
166
case <- ctx .Done ():
161
167
// we didn't manage to notify the worker that there is a new task, it
162
168
// will pick it up after the poll timeout
163
- b .logger .Debug ("failed to signal workflow task to worker, context cancelled" )
164
- case <- time .After (b .signalTimeout ):
165
- // we didn't manage to notify the worker that there is a new task, it
166
- // will pick it up after the poll timeout
167
- b .logger .Debug ("failed to signal workflow task to worker, timeout" )
169
+ b .logger .DebugContext (ctx , "failed to signal workflow task to worker" , "reason" , ctx .Err ())
170
+ return false
168
171
case b .workflowSignal <- struct {}{}:
169
- b .logger .Debug ( "signalled a new workflow task to worker" )
172
+ b .logger .DebugContext ( ctx , "signalled a new workflow task to worker" )
170
173
return true
171
174
}
172
- return false
173
175
}
0 commit comments