Skip to content

Commit 1a6b71f

Browse files
committed
Merge branch 'master' of github.com:samber/mo
2 parents 1eb9f77 + 1eb2bf5 commit 1a6b71f

File tree

4 files changed

+99
-84
lines changed

4 files changed

+99
-84
lines changed

future.go

Lines changed: 83 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -7,131 +7,148 @@ import (
77
// NewFuture instanciate a new future.
88
func NewFuture[T any](cb func(resolve func(T), reject func(error))) *Future[T] {
99
future := Future[T]{
10-
mu: sync.RWMutex{},
11-
next: nil,
10+
cb: cb,
1211
cancelCb: func() {},
12+
done: make(chan struct{}),
1313
}
1414

15-
go func() {
16-
cb(future.resolve, future.reject)
17-
}()
15+
future.active()
1816

1917
return &future
2018
}
2119

2220
// Future represents a value which may or may not currently be available, but will be
2321
// available at some point, or an exception if that value could not be made available.
2422
type Future[T any] struct {
25-
mu sync.RWMutex
23+
mu sync.Mutex
2624

27-
next func(T, error)
25+
cb func(func(T), func(error))
2826
cancelCb func()
27+
next *Future[T]
28+
done chan struct{}
29+
result Result[T]
30+
}
31+
32+
func (f *Future[T]) active() {
33+
go f.cb(f.resolve, f.reject)
34+
}
35+
36+
func (f *Future[T]) activeSync() {
37+
f.cb(f.resolve, f.reject)
2938
}
3039

3140
func (f *Future[T]) resolve(value T) {
32-
f.mu.RLock()
33-
defer f.mu.RUnlock()
41+
f.mu.Lock()
42+
defer f.mu.Unlock()
3443

44+
f.result = Ok(value)
3545
if f.next != nil {
36-
f.next(value, nil)
46+
f.next.activeSync()
3747
}
48+
close(f.done)
3849
}
3950

4051
func (f *Future[T]) reject(err error) {
41-
f.mu.RLock()
42-
defer f.mu.RUnlock()
52+
f.mu.Lock()
53+
defer f.mu.Unlock()
4354

55+
f.result = Err[T](err)
4456
if f.next != nil {
45-
f.next(empty[T](), err)
57+
f.next.activeSync()
4658
}
59+
close(f.done)
4760
}
4861

49-
// Catch is called when Future is resolved. It returns a new Future.
62+
// Then is called when Future is resolved. It returns a new Future.
5063
func (f *Future[T]) Then(cb func(T) (T, error)) *Future[T] {
5164
f.mu.Lock()
5265
defer f.mu.Unlock()
5366

54-
future := &Future[T]{
55-
mu: sync.RWMutex{},
56-
next: nil,
67+
f.next = &Future[T]{
68+
cb: func(resolve func(T), reject func(error)) {
69+
if f.result.IsError() {
70+
reject(f.result.Error())
71+
return
72+
}
73+
newValue, err := cb(f.result.MustGet())
74+
if err != nil {
75+
reject(err)
76+
return
77+
}
78+
resolve(newValue)
79+
},
5780
cancelCb: func() {
5881
f.Cancel()
5982
},
83+
done: make(chan struct{}),
6084
}
6185

62-
f.next = func(value T, err error) {
63-
if err != nil {
64-
future.reject(err)
65-
return
66-
}
67-
68-
newValue, err := cb(value)
69-
if err != nil {
70-
future.reject(err)
71-
return
72-
}
73-
74-
future.resolve(newValue)
86+
select {
87+
case <-f.done:
88+
f.next.active()
89+
default:
7590
}
76-
77-
return future
91+
return f.next
7892
}
7993

8094
// Catch is called when Future is rejected. It returns a new Future.
8195
func (f *Future[T]) Catch(cb func(error) (T, error)) *Future[T] {
8296
f.mu.Lock()
8397
defer f.mu.Unlock()
8498

85-
future := &Future[T]{
86-
mu: sync.RWMutex{},
87-
next: nil,
99+
f.next = &Future[T]{
100+
cb: func(resolve func(T), reject func(error)) {
101+
if f.result.IsOk() {
102+
resolve(f.result.MustGet())
103+
return
104+
}
105+
newValue, err := cb(f.result.Error())
106+
if err != nil {
107+
reject(err)
108+
return
109+
}
110+
resolve(newValue)
111+
},
88112
cancelCb: func() {
89113
f.Cancel()
90114
},
115+
done: make(chan struct{}),
91116
}
92117

93-
f.next = func(value T, err error) {
94-
if err == nil {
95-
future.resolve(value)
96-
return
97-
}
98-
99-
newValue, err := cb(err)
100-
if err != nil {
101-
future.reject(err)
102-
return
103-
}
104-
105-
future.resolve(newValue)
118+
select {
119+
case <-f.done:
120+
f.next.active()
121+
default:
106122
}
107-
108-
return future
123+
return f.next
109124
}
110125

111126
// Finally is called when Future is processed either resolved or rejected. It returns a new Future.
112127
func (f *Future[T]) Finally(cb func(T, error) (T, error)) *Future[T] {
113128
f.mu.Lock()
114129
defer f.mu.Unlock()
115130

116-
future := &Future[T]{
117-
mu: sync.RWMutex{},
118-
next: nil,
131+
f.next = &Future[T]{
132+
cb: func(resolve func(T), reject func(error)) {
133+
newValue, err := cb(f.result.Get())
134+
if err != nil {
135+
reject(err)
136+
return
137+
}
138+
resolve(newValue)
139+
},
119140
cancelCb: func() {
120141
f.Cancel()
121142
},
143+
done: make(chan struct{}),
122144
}
123145

124-
f.next = func(value T, err error) {
125-
newValue, err := cb(value, err)
126-
if err != nil {
127-
future.reject(err)
128-
return
129-
}
130-
131-
future.resolve(newValue)
146+
select {
147+
case <-f.done:
148+
f.next.active()
149+
default:
132150
}
133-
134-
return future
151+
return f.next
135152
}
136153

137154
// Cancel cancels the Future chain.
@@ -147,31 +164,16 @@ func (f *Future[T]) Cancel() {
147164

148165
// Collect awaits and return result of the Future.
149166
func (f *Future[T]) Collect() (T, error) {
150-
done := make(chan struct{})
151-
152-
var a T
153-
var b error
154-
155-
f.mu.Lock()
156-
f.next = func(value T, err error) {
157-
a = value
158-
b = err
159-
160-
done <- struct{}{}
161-
}
162-
f.mu.Unlock()
163-
164-
<-done
165-
166-
return a, b
167+
<-f.done
168+
return f.result.Get()
167169
}
168170

169171
// Result wraps Collect and returns a Result.
170172
func (f *Future[T]) Result() Result[T] {
171173
return TupleToResult(f.Collect())
172174
}
173175

174-
// Result wraps Collect and returns a Result.
176+
// Either wraps Collect and returns a Either.
175177
func (f *Future[T]) Either() Either[error, T] {
176178
v, err := f.Collect()
177179
if err != nil {

future_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package mo
22

33
import (
4+
"fmt"
45
"sync/atomic"
56
"testing"
67
"time"
@@ -308,3 +309,17 @@ func TestFutureResultEither(t *testing.T) {
308309
is.NotNil(either.Left())
309310
is.Equal(assert.AnError, either.MustLeft())
310311
}
312+
313+
func TestFutureCompleteBeforeThen(t *testing.T) {
314+
completed := make(chan struct{})
315+
fut := NewFuture(func(resolve func(int), reject func(error)) {
316+
resolve(1)
317+
close(completed)
318+
})
319+
320+
<-completed
321+
fut.Then(func(in int) (int, error) {
322+
fmt.Println(in) // will never been print
323+
return in, nil
324+
}).Collect() // deadlock
325+
}

option.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"errors"
99
"fmt"
1010
"reflect"
11-
"time"
1211
)
1312

1413
var optionNoSuchElement = fmt.Errorf("no such element")
@@ -186,7 +185,6 @@ func (o *Option[T]) UnmarshalJSON(b []byte) error {
186185
}
187186

188187
o.isPresent = true
189-
time.Now()
190188
return nil
191189
}
192190

result.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (r Result[T]) Error() error {
6161
return r.err
6262
}
6363

64-
// MustGet returns value and error.
64+
// Get returns value and error.
6565
// Play: https://go.dev/play/p/8KyX3z6TuNo
6666
func (r Result[T]) Get() (T, error) {
6767
if r.isErr {

0 commit comments

Comments
 (0)