@@ -2,74 +2,76 @@ package sync
2
2
3
3
import (
4
4
"github.com/cschleiden/go-workflows/internal/converter"
5
- "github.com/pkg/errors"
6
5
)
7
6
8
- type Channel interface {
9
- Send (ctx Context , v interface {} )
7
+ type Channel [ T any ] interface {
8
+ Send (ctx Context , v T )
10
9
11
- SendNonblocking (ctx Context , v interface {} ) (ok bool )
10
+ SendNonblocking (ctx Context , v T ) (ok bool )
12
11
13
- Receive (ctx Context , vptr interface {} ) (more bool )
12
+ Receive (ctx Context ) (v T , ok bool )
14
13
15
- ReceiveNonblocking (ctx Context , vptr interface {} ) (more bool )
14
+ ReceiveNonblocking (ctx Context ) (v T , ok bool )
16
15
17
16
Close ()
18
17
}
19
18
20
- type ChannelInternal interface {
19
+ type ChannelInternal [ T any ] interface {
21
20
Closed () bool
22
21
23
- ReceiveNonBlocking (ctx Context , cb func (v interface {})) ( ok bool )
22
+ ReceiveNonBlocking (ctx Context ) (v T , ok bool )
24
23
25
- AddReceiveCallback (cb func (v interface {} ))
24
+ AddReceiveCallback (cb func (v T , ok bool ))
26
25
}
27
26
28
- func NewChannel () Channel {
29
- return & channel {
30
- c : make ([]interface {}, 0 ),
27
+ // Ensure channel implementation support internal interface
28
+ var _ ChannelInternal [struct {}] = (* channel [struct {}])(nil )
29
+
30
+ func NewChannel [T any ]() Channel [T ] {
31
+ return & channel [T ]{
32
+ c : make ([]T , 0 ),
31
33
converter : converter .DefaultConverter ,
32
34
}
33
35
}
34
36
35
- func NewBufferedChannel (size int ) Channel {
36
- return & channel {
37
- c : make ([]interface {} , 0 , size ),
37
+ func NewBufferedChannel [ T any ] (size int ) Channel [ T ] {
38
+ return & channel [ T ] {
39
+ c : make ([]T , 0 , size ),
38
40
size : size ,
39
41
converter : converter .DefaultConverter ,
40
42
}
41
43
}
42
44
43
- type channel struct {
44
- c []interface {}
45
- receivers []func (interface {} )
46
- senders []func () interface {}
45
+ type channel [ T any ] struct {
46
+ c []T
47
+ receivers []func (value T , ok bool )
48
+ senders []func () T
47
49
closed bool
48
50
size int
49
51
converter converter.Converter
50
52
}
51
53
52
- var _ Channel = (* channel )(nil )
53
- var _ ChannelInternal = (* channel )(nil )
54
-
55
- func (c * channel ) Close () {
54
+ func (c * channel [T ]) Close () {
56
55
c .closed = true
57
56
58
57
// If there are still blocked senders, error
59
58
if len (c .senders ) > 0 {
60
59
panic ("send on closed channel" )
61
60
}
62
61
62
+ // TODO: Drain buffered values
63
63
for len (c .receivers ) > 0 {
64
64
r := c .receivers [0 ]
65
65
c .receivers [0 ] = nil
66
66
c .receivers = c .receivers [1 :]
67
67
68
- r (nil )
68
+ // Send zero value to pending receiver
69
+ var v T
70
+ r (v , false )
69
71
}
70
72
}
71
73
72
- func (c * channel ) Send (ctx Context , v interface {} ) {
74
+ func (c * channel [ T ] ) Send (ctx Context , v T ) {
73
75
cr := getCoState (ctx )
74
76
75
77
addedSender := false
@@ -84,50 +86,48 @@ func (c *channel) Send(ctx Context, v interface{}) {
84
86
if ! addedSender {
85
87
addedSender = true
86
88
87
- cb := func () interface {} {
89
+ cb := func () T {
88
90
sentValue = true
89
91
return v
90
92
}
91
93
92
94
c .senders = append (c .senders , cb )
93
95
}
94
96
97
+ // No waiting receiver, yield
95
98
cr .Yield ()
96
99
100
+ // Was our sender called while we yielded? If so, we can return
97
101
if sentValue {
98
102
cr .MadeProgress ()
99
103
return
100
104
}
101
105
}
102
106
}
103
107
104
- func (c * channel ) SendNonblocking (ctx Context , v interface {} ) bool {
108
+ func (c * channel [ T ] ) SendNonblocking (ctx Context , v T ) bool {
105
109
return c .trySend (v )
106
110
}
107
111
108
- func (c * channel ) Receive (ctx Context , vptr interface {} ) (more bool ) {
112
+ func (c * channel [ T ] ) Receive (ctx Context ) (v T , ok bool ) {
109
113
cr := getCoState (ctx )
110
114
111
115
addedListener := false
112
116
receivedValue := false
113
117
114
118
for {
115
119
// Try to receive from buffered channel or blocked sender
116
- if c .tryReceive (vptr ) {
120
+ if v , ok , rok := c .tryReceive (); rok {
117
121
cr .MadeProgress ()
118
- return ! c . closed
122
+ return v , ok
119
123
}
120
124
121
125
// Register handler to receive value once
122
126
if ! addedListener {
123
- cb := func (v interface {} ) {
127
+ cb := func (rv T , rok bool ) {
124
128
receivedValue = true
125
-
126
- if vptr != nil {
127
- if err := converter .AssignValue (c .converter , v , vptr ); err != nil {
128
- panic (err )
129
- }
130
- }
129
+ v = rv
130
+ ok = rok
131
131
}
132
132
133
133
c .receivers = append (c .receivers , cb )
@@ -139,25 +139,30 @@ func (c *channel) Receive(ctx Context, vptr interface{}) (more bool) {
139
139
// If we received a value via the callback, return
140
140
if receivedValue {
141
141
cr .MadeProgress ()
142
- return ! c . closed
142
+ return v , ok
143
143
}
144
144
}
145
145
}
146
146
147
- func (c * channel ) ReceiveNonblocking (ctx Context , vptr interface {}) (ok bool ) {
148
- return c .tryReceive (vptr )
147
+ func (c * channel [T ]) ReceiveNonblocking (ctx Context ) (T , bool ) {
148
+ if v , ok , rok := c .tryReceive (); rok {
149
+ return v , ok
150
+ }
151
+
152
+ var z T
153
+ return z , false
149
154
}
150
155
151
- func (c * channel ) hasValue () bool {
156
+ func (c * channel [ T ] ) hasValue () bool {
152
157
return len (c .c ) > 0
153
158
}
154
159
155
- func (c * channel ) canReceive () bool {
160
+ func (c * channel [ T ] ) canReceive () bool {
156
161
return c .hasValue () || len (c .senders ) > 0 || c .closed
157
162
}
158
163
159
- func (c * channel ) trySend (v interface {} ) bool {
160
- // If closed, we can't send, exit .
164
+ func (c * channel [ T ] ) trySend (v T ) bool {
165
+ // If closed, we can't send, panic .
161
166
if c .closed {
162
167
panic ("channel closed" )
163
168
}
@@ -168,7 +173,9 @@ func (c *channel) trySend(v interface{}) bool {
168
173
r := c .receivers [0 ]
169
174
c .receivers [0 ] = nil
170
175
c .receivers = c .receivers [1 :]
171
- r (v )
176
+
177
+ r (v , true )
178
+
172
179
return true
173
180
}
174
181
@@ -182,72 +189,52 @@ func (c *channel) trySend(v interface{}) bool {
182
189
return false
183
190
}
184
191
185
- func (c * channel ) tryReceive (vptr interface {}) bool {
192
+ func (c * channel [ T ] ) tryReceive () ( v T , ok bool , rok bool ) {
186
193
// If channel is buffered, return value if available
187
194
if c .hasValue () {
188
- v : = c .c [0 ]
195
+ v = c .c [0 ]
189
196
c .c = c .c [1 :]
190
197
191
- if vptr != nil {
192
- if err := converter .AssignValue (c .converter , v , vptr ); err != nil {
193
- panic (errors .Wrap (err , "could not assign value when receiving from channel" ))
194
- }
195
- }
196
-
197
- return true
198
+ return v , true , true
198
199
}
199
200
200
201
// If channel has been closed and no values in buffer (if buffered) return zero
201
202
// element
202
203
if c .closed {
203
- if vptr != nil {
204
- if err := converter .AssignValue (c .converter , nil , vptr ); err != nil {
205
- panic (err )
206
- }
207
- }
208
-
209
- return true
204
+ var z T
205
+ return z , false , true
210
206
}
211
207
208
+ // Any blocked senders? If so, receive from the first one
212
209
if len (c .senders ) > 0 {
213
210
s := c .senders [0 ]
214
211
c .senders [0 ] = nil
215
212
c .senders = c .senders [1 :]
216
213
217
- v := s ()
218
-
219
- if vptr != nil {
220
- if err := converter .AssignValue (c .converter , v , vptr ); err != nil {
221
- panic (err )
222
- }
223
- }
224
-
225
- return true
214
+ return s (), true , true
226
215
}
227
216
228
- return false
217
+ // Could not receive value
218
+ return v , ok , false
229
219
}
230
220
231
- func (c * channel ) hasCapacity () bool {
221
+ func (c * channel [ T ] ) hasCapacity () bool {
232
222
return len (c .c ) < c .size
233
223
}
234
224
235
- func (c * channel ) AddReceiveCallback (cb func (v interface {} )) {
225
+ func (c * channel [ T ] ) AddReceiveCallback (cb func (v T , ok bool )) {
236
226
c .receivers = append (c .receivers , cb )
237
227
}
238
228
239
- func (c * channel ) ReceiveNonBlocking (ctx Context , cb func (v interface {})) (ok bool ) {
240
- var vptr interface {}
241
- if c .tryReceive (vptr ) {
242
- cb (vptr )
243
- return true
229
+ func (c * channel [T ]) ReceiveNonBlocking (ctx Context ) (T , bool ) {
230
+ if v , ok , rok := c .tryReceive (); rok {
231
+ return v , ok
244
232
}
245
233
246
- c .AddReceiveCallback (cb )
247
-
248
- return false
234
+ var z T
235
+ return z , false
249
236
}
250
237
251
- func (c * channel ) Closed () bool {
238
+ func (c * channel [ T ] ) Closed () bool {
252
239
return c .closed
253
240
}
0 commit comments