-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsemaphore.go
More file actions
96 lines (78 loc) · 1.8 KB
/
semaphore.go
File metadata and controls
96 lines (78 loc) · 1.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package semaphore
import (
"errors"
"fmt"
"sync"
)
type Resource interface {
Release() error
}
type Semaphore interface {
Acquire() (Resource, error)
}
type request chan struct{}
type resource struct {
inflightRequests chan request
sync.Mutex
released bool
}
type semaphore struct {
inflightRequests chan request
pendingRequests chan request
maxPending int
schedulerLock chan struct{}
}
func New(maxInflight, maxPending int) Semaphore {
return &semaphore{
inflightRequests: make(chan request, maxInflight),
pendingRequests: make(chan request, maxPending),
maxPending: maxPending,
schedulerLock: make(chan struct{}, 1),
}
}
func (s *semaphore) Acquire() (Resource, error) {
return s.testableAcquire(func() {})
}
func (s *semaphore) testableAcquire(beforeScheduleLock func()) (Resource, error) {
var newRequest request = make(chan struct{})
select {
case s.pendingRequests <- newRequest:
default:
return nil, errors.New(fmt.Sprintf("Cannot queue request, maxPending reached: %d", s.maxPending))
}
beforeScheduleLock()
select {
case s.schedulerLock <- struct{}{}:
case <-newRequest: // Prevent deadlock when request is scheduled by concurrect call
return &resource{inflightRequests: s.inflightRequests}, nil
}
defer func() {
<-s.schedulerLock
}()
for {
select {
case nextRequest := <-s.pendingRequests:
s.inflightRequests <- nextRequest
close(nextRequest)
case <-newRequest:
return &resource{inflightRequests: s.inflightRequests}, nil
}
}
}
func (r *resource) Release() error {
err := r.markReleased()
if err != nil {
return err
}
<-r.inflightRequests
return nil
}
func (r *resource) markReleased() error {
r.Lock()
defer r.Unlock()
if r.released {
return errors.New("Resource has already been released")
}
r.released = true
return nil
}