@@ -46,92 +46,229 @@ var (
46
46
)
47
47
48
48
type (
49
- // Channel must be used instead of native go channel by workflow code.
50
- // Use workflow.NewChannel(ctx) method to create Channel instance.
49
+ // Channel must be used in workflows instead of a native Go chan.
50
+ //
51
+ // Use workflow.NewChannel(ctx) to create an unbuffered Channel instance,
52
+ // workflow.NewBufferedChannel(ctx, size) to create a Channel which has a buffer,
53
+ // or workflow.GetSignalChannel(ctx, "name") to get a Channel that can contain encoded data sent from other systems.
54
+ //
55
+ // workflow.GetSignalChannel is named differently because you are not "creating" a new channel. Signal channels
56
+ // are conceptually singletons that exist at all times, and they do not have to be "created" before a signal can be
57
+ // sent to a workflow. The workflow will just have no way to know that the data exists until it inspects the
58
+ // appropriate signal channel.
59
+ //
60
+ // Both NewChannel and NewBufferedChannel have "Named" constructors as well.
61
+ // These names will be visible in stack-trace queries, so they can help with debugging, but they do not otherwise
62
+ // impact behavior at all, and are not recorded anywhere (so you can change them without versioning your code).
51
63
Channel interface {
52
64
// Receive blocks until it receives a value, and then assigns the received value to the provided pointer.
53
- // Returns false when Channel is closed.
54
- // Parameter valuePtr is a pointer to the expected data structure to be received. For example:
55
- // var v string
56
- // c.Receive(ctx, &v)
65
+ // It returns false when Channel is closed and all data has already been consumed from the channel, in the same
66
+ // way as Go channel reads work.
67
+ //
68
+ // This is equivalent to `v, more := <- aChannel`.
69
+ //
70
+ // valuePtr must be assignable, and will be used to assign (for in-memory data in regular channels) or decode
71
+ // (for signal channels) the data in the channel.
72
+ //
73
+ // If decoding or assigning fails:
74
+ // - an error will be logged
75
+ // - the value will be dropped from the channel
76
+ // - Receive will automatically try again
77
+ // - This will continue until a successful value is found, or the channel is emptied and it resumes blocking.
78
+ // Closed channels with no values will always succeed, but they will not change valuePtr.
79
+ //
80
+ // Go would normally prevent incorrect-type failures like this at compile time, but the same cannot be done
81
+ // here. If you need to "try" to assign to multiple things, similar to a Future you can use:
82
+ // - for signal channels, a []byte pointer. This will give you the raw data that Cadence received, and no
83
+ // decoding will be attempted, so you can try it yourself.
84
+ // - for other channels, an interface{} pointer. All values are interfaces, so this will never fail, and you
85
+ // can inspect the type with reflection or type assertions.
57
86
Receive (ctx Context , valuePtr interface {}) (more bool )
58
87
59
- // ReceiveAsync try to receive from Channel without blocking. If there is data available from the Channel, it
60
- // assign the data to valuePtr and returns true. Otherwise, it returns false immediately.
88
+ // ReceiveAsync tries to receive from Channel without blocking.
89
+ // If there is data available from the Channel, it assigns the data to valuePtr and returns true.
90
+ // Otherwise, it returns false immediately.
91
+ //
92
+ // This is equivalent to:
93
+ // select {
94
+ // case v := <- aChannel: ok = true
95
+ // default: ok = false
96
+ // }
97
+ //
98
+ // Decoding or assigning failures are handled like Receive.
61
99
ReceiveAsync (valuePtr interface {}) (ok bool )
62
100
63
- // ReceiveAsyncWithMoreFlag is same as ReceiveAsync with extra return value more to indicate if there could be
64
- // more value from the Channel. The more is false when Channel is closed.
101
+ // ReceiveAsyncWithMoreFlag is the same as ReceiveAsync, with an extra return to indicate if there could be
102
+ // more value from the Channel. more is false when Channel is closed.
103
+ //
104
+ // This is equivalent to:
105
+ // select {
106
+ // case v, more := <- aChannel: ok = true
107
+ // default: ok = false
108
+ // }
109
+ //
110
+ // Decoding or assigning failures are handled like Receive.
65
111
ReceiveAsyncWithMoreFlag (valuePtr interface {}) (ok bool , more bool )
66
112
67
113
// Send blocks until the data is sent.
114
+ //
115
+ // This is equivalent to `aChannel <- v`
68
116
Send (ctx Context , v interface {})
69
117
70
- // SendAsync try to send without blocking. It returns true if the data was sent, otherwise it returns false.
118
+ // SendAsync will try to send without blocking.
119
+ // It returns true if the data was sent (i.e. there was room in the buffer, or a reader was waiting to receive
120
+ // it), otherwise it returns false.
121
+ //
122
+ // This is equivalent to:
123
+ // select {
124
+ // case aChannel <- v: ok = true
125
+ // default: ok = false
126
+ // }
71
127
SendAsync (v interface {}) (ok bool )
72
128
73
- // Close close the Channel, and prohibit subsequent sends.
129
+ // Close closes the Channel, and prohibits subsequent sends.
130
+ // As with a normal Go channel that has been closed, sending to a closed channel will panic.
74
131
Close ()
75
132
}
76
133
77
- // Selector must be used instead of native go select by workflow code for determinism.
78
- // Use workflow.NewSelector(ctx) method to create a Selector instance.
79
- // The interface is to simulate Golang's Select statement.
80
- // For example, the logic of Golang code like below
81
- // chA := make(chan int)
82
- // chB := make(chan int)
83
- // counter := 0
84
- // for {
85
- // select {
86
- // case i, ok := <- chA:
87
- // if ok{
88
- // counter += i
89
- // }
90
- // case i, ok := <- chB:
91
- // if ok{
92
- // counter += i
93
- // }
94
- // }
95
- // }
96
- // should be written as
97
- // s := workflow.NewSelector(ctx)
98
- // counter := 0
99
- // s.AddReceive(workflow.GetSignalChannel(ctx, "channelA"), func(c workflow.Channel, ok bool) {
100
- // if ok{
101
- // var i int
102
- // c.Receive(ctx, &i)
103
- // counter += i
104
- // }
105
- // })
106
- // s.AddReceive(workflow.GetSignalChannel(ctx, "channelB"), func(c workflow.Channel, ok bool) {
107
- // if ok{
108
- // var i int
109
- // c.Receive(ctx, &i)
110
- // counter += i
111
- // }
112
- // })
134
+ // Selector must be used in workflows instead of a native Go select statement.
135
+ //
136
+ // Use workflow.NewSelector(ctx) to create a Selector instance, and then add cases to it with its methods.
137
+ // The interface is intended to simulate Go's select statement, and any Go select can be fairly trivially rewritten
138
+ // for a Selector with effectively identical behavior.
139
+ //
140
+ // For example, normal Go code like below:
141
+ // chA := make(chan int)
142
+ // chB := make(chan int)
143
+ // counter := 0
144
+ // for {
145
+ // select {
146
+ // case i, more := <- chA:
147
+ // if more {
148
+ // counter += i
149
+ // }
150
+ // case i, more := <- chB:
151
+ // if more {
152
+ // counter += i
153
+ // }
154
+ // case <- time.After(time.Hour):
155
+ // break
156
+ // }
157
+ // }
158
+ // can be written as:
159
+ // chA := workflow.NewChannel(ctx)
160
+ // chB := workflow.NewChannel(ctx)
161
+ // counter := 0
162
+ // for {
163
+ // timedout := false
164
+ // s := workflow.NewSelector(ctx)
165
+ // s.AddReceive(chA, func(c workflow.Channel, more bool) {
166
+ // if more {
167
+ // var i int
168
+ // c.Receive(ctx, &i)
169
+ // counter += i
170
+ // }
171
+ // })
172
+ // s.AddReceive(chB, func(c workflow.Channel, more bool) {
173
+ // if more {
174
+ // var i int
175
+ // c.Receive(ctx, &i)
176
+ // counter += i
177
+ // }
178
+ // })
179
+ // s.AddFuture(workflow.NewTimer(ctx, time.Hour), func(f workflow.Future) {
180
+ // timedout = true
181
+ // })
182
+ // s.Select(ctx)
183
+ // if timedout {
184
+ // break
185
+ // }
186
+ // }
187
+ //
188
+ // You can create a new Selector as needed or mutate one and call Select multiple times, but note that:
189
+ //
190
+ // 1. AddFuture will not behave the same across both patterns. Read AddFuture for more details.
113
191
//
114
- // for {
115
- // s.Select(ctx)
116
- // }
192
+ // 2. There is no way to remove a case from a Selector, so you must make a new Selector to "remove" them.
193
+ //
194
+ // Finally, note that Select will not return until a condition's needs are met, like a Go selector - canceling the
195
+ // Context used to construct the Selector, or the Context used to Select, will not (directly) unblock a Select call.
196
+ // Read Select for more details.
117
197
Selector interface {
118
- // AddReceive adds a ReceiveChannel to the selector. f is invoked when the channel has data or closed.
119
- // ok == false indicates the channel is closed
120
- AddReceive (c Channel , f func (c Channel , ok bool )) Selector
121
- // AddSend adds a SendChannel to the selector. f is invoke when the channel is available to send
198
+ // AddReceive waits to until a value can be received from a channel.
199
+ // f is invoked when the channel has data or is closed.
200
+ //
201
+ // This is equivalent to `case v, more := <- aChannel`, and `more` will only
202
+ // be false when the channel is both closed and no data was received.
203
+ //
204
+ // When f is invoked, the data (or closed state) remains untouched in the channel, so
205
+ // you need to `c.Receive(ctx, &out)` (or `c.ReceiveAsync(&out)`) to remove and decode the value.
206
+ // Failure to do this is not an error - the value will simply remain in the channel until a future
207
+ // Receive retrieves it.
208
+ AddReceive (c Channel , f func (c Channel , more bool )) Selector
209
+ // AddSend waits to send a value to a channel.
210
+ // f is invoked when the value was successfully sent to the channel.
211
+ //
212
+ // This is equivalent to `case aChannel <- value`.
213
+ //
214
+ // Unlike AddReceive, the value has already been sent on the channel when f is invoked.
122
215
AddSend (c Channel , v interface {}, f func ()) Selector
123
- // AddFuture adds a Future to the selector f is invoked when future is ready
216
+ // AddFuture invokes f after a Future is ready.
217
+ // If the Future is ready before Select is called, it is eligible to be invoked immediately.
218
+ //
219
+ // There is no direct equivalent in a native Go select statement.
220
+ // It was added because Futures are common in Cadence code, and some patterns are much simpler with it.
221
+ //
222
+ // Each call to AddFuture will invoke its f at most one time, regardless of how many times Select is called.
223
+ // This means, for a Future that is (or will be) ready:
224
+ // - Adding the Future once, then calling Select twice, will invoke the callback once with the first Select
225
+ // call, and then wait for other Selector conditions in the second Select call (or block forever if there are
226
+ // no other eligible conditions).
227
+ // - Adding the same Future twice, then calling Select twice, will invoke each callback once.
228
+ // - Adding the same Future to two different Selectors, then calling Select once on each Selector, will invoke
229
+ // each Selector's callback once.
230
+ //
231
+ // Therefore, with a Future "f" that is or will become ready, this is an infinite loop that will consume as much
232
+ // CPU as possible:
233
+ // for {
234
+ // workflow.NewSelector(ctx).AddFuture(f, func(f workflow.Future){}).Select(ctx)
235
+ // }
236
+ // While this will loop once, and then wait idle forever:
237
+ // s := workflow.NewSelector(ctx).AddFuture(f, func(f workflow.Future){})
238
+ // for {
239
+ // s.Select(ctx)
240
+ // }
124
241
AddFuture (future Future , f func (f Future )) Selector
125
242
// AddDefault adds a default branch to the selector.
126
- // f is invoked when non of the other conditions(ReceiveChannel, SendChannel and Future) is met for one call of Select
243
+ // f is invoked immediately when none of the other conditions (AddReceive, AddSend, AddFuture) are met for a
244
+ // Select call.
245
+ //
246
+ // This is equivalent to a `default:` case.
247
+ //
248
+ // Note that this applies to each Select call. If you create a Selector with only one AddDefault, and then call
249
+ // Select on it twice, f will be invoked twice.
127
250
AddDefault (f func ())
128
- // Select waits for one of the added conditions to be met and invoke the callback as described above.
129
- // When none of the added condition is met:
130
- // if there is no Default(added by AddDefault) and , then it will block the current goroutine
131
- // if Default(added by AddDefault) is used, when Default callback will be executed without blocking
132
- // When more than one of added conditions are met, only one of them will be invoked.
133
- // Usually it's recommended to use a for loop to drain all of them, and use AddDefault to break out the
134
- // loop properly(e.g. not missing any received data in channels)
251
+ // Select waits for one of the added conditions to be met and invokes the callback as described above.
252
+ // If no condition is met, Select will block until one or more are available, then one callback will be invoked.
253
+ // If no condition is ever met, Select will block forever.
254
+ //
255
+ // Note that Select does not return an error, and does not stop waiting if its Context is canceled.
256
+ // This mimics a native Go select statement, which has no way to be interrupted except for its listed cases.
257
+ //
258
+ // If you wish to stop Selecting when the Context is canceled, use AddReceive with the Context's Done() channel,
259
+ // in the same way as you would use a `case <- ctx.Done():` in a Go select statement. E.g.:
260
+ // cancelled := false
261
+ // s := workflow.NewSelector(ctx)
262
+ // s.AddFuture(f, func(f workflow.Future) {}) // assume this is never ready
263
+ // s.AddReceive(ctx.Done(), func(c workflow.Channel, more bool) {
264
+ // // this will be invoked when the Context is cancelled for any reason,
265
+ // // and more will be false.
266
+ // cancelled = true
267
+ // })
268
+ // s.Select(ctx)
269
+ // if cancelled {
270
+ // // this will be executed
271
+ // }
135
272
Select (ctx Context )
136
273
}
137
274
@@ -146,20 +283,37 @@ type (
146
283
147
284
// Future represents the result of an asynchronous computation.
148
285
Future interface {
149
- // Get blocks until the future is ready. When ready it either returns non nil error or assigns result value to
150
- // the provided pointer.
151
- // Example:
152
- // var v string
153
- // if err := f.Get(ctx, &v); err != nil {
154
- // return err
155
- // }
286
+ // Get blocks until the future is ready.
287
+ // When ready it either returns the Future's contained error, or assigns the contained value to the output var.
288
+ // Failures to assign or decode the value will panic.
289
+ //
290
+ // Two common patterns to retrieve data are:
291
+ // var out string
292
+ // // this will assign the string value, which may be "", or an error and leave out as "".
293
+ // err := f.Get(ctx, &out)
294
+ // and
295
+ // var out *string
296
+ // // this will assign the string value, which may be "" or nil, or an error and leave out as nil.
297
+ // err := f.Get(ctx, &out)
298
+ //
299
+ // The valuePtr parameter can be nil when the encoded result value is not needed:
300
+ // err := f.Get(ctx, nil)
301
+ //
302
+ // Futures with values set in-memory via a call to their Settable's methods can be retrieved without knowing the
303
+ // type with an interface, i.e. this will not ever panic:
304
+ // var out interface{}
305
+ // // this will assign the same value that was set,
306
+ // // and you can check its type with reflection or type assertions.
307
+ // err := f.Get(ctx, &out)
156
308
//
157
- // The valuePtr parameter can be nil when the encoded result value is not needed.
158
- // Example:
159
- // err = f.Get(ctx, nil)
309
+ // Futures with encoded data from e.g. activities or child workflows can bypass decoding with a byte slice, and
310
+ // similarly this will not ever panic:
311
+ // var out []byte
312
+ // // out will contain the raw bytes given to Cadence's servers, you should decode it however is necessary
313
+ // err := f.Get(ctx, &out) // err can only be the Future's contained error
160
314
Get (ctx Context , valuePtr interface {}) error
161
315
162
- // When true Get is guaranteed to not block
316
+ // IsReady will return true Get is guaranteed to not block.
163
317
IsReady () bool
164
318
}
165
319
0 commit comments