Skip to content

Commit b7c9be9

Browse files
authored
Merge pull request #14 from RaduBerinde/sema-allow-large
fifo: allow large semaphore requests
2 parents 4a90b18 + 42239c6 commit b7c9be9

File tree

2 files changed

+33
-25
lines changed

2 files changed

+33
-25
lines changed

fifo/semaphore.go

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package fifo
1616

1717
import (
1818
"context"
19-
"errors"
2019
"fmt"
2120
"sync"
2221

@@ -35,7 +34,8 @@ type Semaphore struct {
3534
sync.Mutex
3635

3736
capacity int64
38-
// outstanding can exceed capacity if the capacity is dynamically decreased.
37+
// outstanding can exceed capacity if the capacity is dynamically decreased
38+
// or if a single request exceeds the capacity.
3939
outstanding int64
4040

4141
waiters Queue[semaWaiter]
@@ -64,44 +64,44 @@ func NewSemaphore(capacity int64) *Semaphore {
6464

6565
var semaQueuePool = MakeQueueBackingPool[semaWaiter]()
6666

67-
// ErrRequestExceedsCapacity is returned when an Acquire requests more than the
68-
// current capacity of the semaphore.
69-
var ErrRequestExceedsCapacity = errors.New("request exceeds semaphore capacity")
70-
7167
// TryAcquire attempts to acquire n units from the semaphore without waiting. On
7268
// success, returns true and the caller must later Release the units.
7369
func (s *Semaphore) TryAcquire(n int64) bool {
7470
s.mu.Lock()
7571
defer s.mu.Unlock()
7672

77-
if s.numWaitersLocked() == 0 && s.mu.outstanding+n <= s.mu.capacity {
73+
if s.numWaitersLocked() == 0 && s.canAcquireLocked(n) {
7874
s.mu.outstanding += n
7975
return true
8076
}
8177

8278
return false
8379
}
8480

81+
func (s *Semaphore) canAcquireLocked(n int64) bool {
82+
// We allow a request larger than the capacity as long as there are no
83+
// outstanding units.
84+
return s.mu.outstanding+n <= s.mu.capacity || s.mu.outstanding == 0
85+
}
86+
8587
// Acquire n units from the semaphore, waiting if necessary.
8688
//
8789
// If the context is canceled while we are waiting, returns the context error.
88-
// If n exceeds the current capacity, returns ErrRequestExceedsCapacity.
90+
//
91+
// If n exceeds the current capacity, the request will be allowed when there are
92+
// no other acquisitions (similar to n being equal to the capacity).
93+
//
8994
// On success, the caller must later Release the units.
9095
func (s *Semaphore) Acquire(ctx context.Context, n int64) error {
9196
s.mu.Lock()
9297

9398
// Fast path.
94-
if s.numWaitersLocked() == 0 && s.mu.outstanding+n <= s.mu.capacity {
99+
if s.numWaitersLocked() == 0 && s.canAcquireLocked(n) {
95100
s.mu.outstanding += n
96101
s.mu.Unlock()
97102
return nil
98103
}
99104

100-
if n > s.mu.capacity {
101-
s.mu.Unlock()
102-
return ErrRequestExceedsCapacity
103-
}
104-
105105
c := chanSyncPool.Get().(chan error)
106106
defer chanSyncPool.Put(c)
107107
w := s.mu.waiters.PushBack(semaWaiter{n: n, c: c})
@@ -177,7 +177,8 @@ type SemaphoreStats struct {
177177
// Capacity is the current capacity of the semaphore.
178178
Capacity int64
179179
// Outstanding is the number of units that have been acquired. Note that this
180-
// can exceed Capacity if the capacity was recently decreased.
180+
// can exceed Capacity if the capacity was recently decreased or if a single
181+
// request exceeded the capacity.
181182
Outstanding int64
182183
// NumHadToWait is the total number of Acquire calls (since the semaphore was
183184
// created) that had to wait because the semaphore was exhausted. Useful for
@@ -219,16 +220,11 @@ func (s *Semaphore) processWaitersLocked() {
219220
panic("negative numCanceled")
220221
}
221222

222-
case s.mu.outstanding+w.n <= s.mu.capacity:
223+
case s.canAcquireLocked(w.n):
223224
// Request can be fulfilled.
224225
s.mu.outstanding += w.n
225226
w.c <- nil
226227

227-
case w.n > s.mu.capacity:
228-
// Request must be failed. This can happen if the capacity was decreased
229-
// while the element was queued.
230-
w.c <- ErrRequestExceedsCapacity
231-
232228
default:
233229
// Head of the queue needs to wait some more.
234230
return

fifo/semaphore_test.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"errors"
2020
"math/rand"
2121
"runtime"
22+
"strings"
2223
"sync"
2324
"testing"
2425
"time"
@@ -30,7 +31,6 @@ func TestSemaphoreAPI(t *testing.T) {
3031
s := NewSemaphore(10)
3132
require.Equal(t, s.TryAcquire(5), true)
3233
require.Equal(t, s.TryAcquire(10), false)
33-
require.Equal(t, s.Acquire(context.Background(), 20), ErrRequestExceedsCapacity)
3434
require.Equal(t, "capacity: 10, outstanding: 5, num-had-to-wait: 0", s.Stats().String())
3535

3636
ch := make(chan struct{}, 10)
@@ -58,6 +58,7 @@ func TestSemaphoreAPI(t *testing.T) {
5858
s.Release(8)
5959
require.Recv(t, ch)
6060

61+
require.True(t, strings.Contains(s.Stats().String(), "capacity: 10, outstanding: 5"))
6162
// Test UpdateCapacity.
6263
go func() {
6364
if err := s.Acquire(context.Background(), 8); err != nil {
@@ -68,15 +69,26 @@ func TestSemaphoreAPI(t *testing.T) {
6869
t.Error(err)
6970
}
7071
ch <- struct{}{}
71-
require.Equal(t, s.Acquire(context.Background(), 5), ErrRequestExceedsCapacity)
72-
ch <- struct{}{}
7372
}()
7473
require.NoRecv(t, ch)
7574
s.UpdateCapacity(15)
7675
require.Recv(t, ch)
7776
require.Recv(t, ch)
78-
require.NoRecv(t, ch)
7977
s.UpdateCapacity(2)
78+
go func() {
79+
// Request more than the capacity.
80+
if err := s.Acquire(context.Background(), 5); err != nil {
81+
t.Error(err)
82+
}
83+
ch <- struct{}{}
84+
}()
85+
require.NoRecv(t, ch)
86+
s.Release(5)
87+
require.NoRecv(t, ch)
88+
s.Release(8)
89+
require.NoRecv(t, ch)
90+
s.Release(1)
91+
// Last request should now be allowed, despite being larger than the capacity.
8092
require.Recv(t, ch)
8193
}
8294

0 commit comments

Comments
 (0)