-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfuture.go
More file actions
82 lines (72 loc) · 2.44 KB
/
future.go
File metadata and controls
82 lines (72 loc) · 2.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Package future implements futures - simple value wrappers that integrate seamlessly with other Go's concurrency primitives, allowing easy retrieval of the results of an asynchronous operation when it becomes available.
package future
import (
"context"
"sync/atomic"
)
// Future is a wrapper that allows return a result of an asynchronous operation at some point in the future.
//
// Futures are similar to channels with capacity of 1, with a notable difference that futures cannot be closed (unlike channels) and they store value, making them easier to use for single value broadcasts.
type Future[T any] struct {
vp atomic.Pointer[T] // value pointer
dp atomic.Pointer[chan struct{}] // done pointer
}
// Resolved creates a new future that is already resolved with the provided value.
func Resolved[T any](v T) *Future[T] {
f := &Future[T]{}
f.vp.Store(&v)
d := make(chan struct{})
close(d)
f.dp.Store(&d)
return f
}
func (f *Future[T]) done() chan struct{} {
if dp := f.dp.Load(); dp != nil {
return *dp
}
d := make(chan struct{})
f.dp.CompareAndSwap(nil, &d)
return *f.dp.Load()
}
// Resolve resolves the future with the provided value. It panics if the future was already resolved.
func (f *Future[T]) Resolve(v T) {
if !f.TryResolve(v) {
panic("future: already resolved")
}
}
// TryResolve attempts to resolve the given future with the provided value. It returns false if the future was already resolved, otherwise it resolves it with the provided value and returns true.
func (f *Future[T]) TryResolve(v T) bool {
if !f.vp.CompareAndSwap(nil, &v) {
return false
}
close(f.done())
return true
}
// Get awaits for the resolvement of the given future and returns its value.
func (f *Future[T]) Get() T {
if vp := f.vp.Load(); vp != nil {
return *vp
}
f.Wait()
return *f.vp.Load()
}
// Wait awaits for the resolvement of the given future.
func (f *Future[T]) Wait() {
<-f.done()
}
// Done returns channel that will be closed when the given future is resolved.
func (f *Future[T]) Done() <-chan struct{} {
return f.done()
}
// Await waits for either the given context to be cancelled - in which case the function returns false - or for all of the supplied struct channels to be closed - in which case the function returns true.
func Await(ctx context.Context, chs ...<-chan struct{}) bool {
if len(chs) == 0 {
return ctx.Err() == nil
}
select {
case <-ctx.Done():
return false
case <-chs[0]:
return Await(ctx, chs[1:]...)
}
}