Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/locks/priority_semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import "context"
type (
PrioritySemaphore interface {
Acquire(ctx context.Context, priority Priority, n int) error
TryAcquire(n int) bool
TryAcquire(priority Priority, n int) bool
Release(n int)
}
)
12 changes: 6 additions & 6 deletions common/locks/priority_semaphore_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *PrioritySemaphoreImpl) Acquire(ctx context.Context, priority Priority,
default:
}
// Check if acquisition can proceed without waiting
if s.size-s.cur >= n && s.noWaiters() {
if s.size-s.cur >= n && s.noWaiters(priority) {
// Since we hold s.mu and haven't synchronized since checking done, if
// ctx becomes done before we return here, it becoming done must have
// "happened concurrently" with this call - it cannot "happen before"
Expand Down Expand Up @@ -156,10 +156,10 @@ func (s *PrioritySemaphoreImpl) Acquire(ctx context.Context, priority Priority,

// TryAcquire acquires the semaphore with a weight of n without blocking.
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
func (s *PrioritySemaphoreImpl) TryAcquire(n int) bool {
func (s *PrioritySemaphoreImpl) TryAcquire(priority Priority, n int) bool {
s.mu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is good to validate the value of priority like we do in Acquire() here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah makes sense. will do!

defer s.mu.Unlock()
if s.size-s.cur >= n && s.noWaiters() {
if s.size-s.cur >= n && s.noWaiters(priority) {
s.cur += n
return true
}
Expand Down Expand Up @@ -211,9 +211,9 @@ func (s *PrioritySemaphoreImpl) notifyWaiters() {
}
}

// noWaiters return true if all waitLists are empty, and false otherwise.
func (s *PrioritySemaphoreImpl) noWaiters() bool {
for _, l := range s.waitLists {
// noWaiters returns if there is no waiter that has priority higher or equal to lowestPriority.
func (s *PrioritySemaphoreImpl) noWaiters(lowestPriority Priority) bool {
for _, l := range s.waitLists[:lowestPriority+1] {
if l.Len() > 0 {
return false
}
Expand Down
24 changes: 12 additions & 12 deletions common/locks/priority_semaphore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,40 +52,40 @@ func (s *prioritySemaphoreSuite) SetupSuite() {

func (s *prioritySemaphoreSuite) TestTryAcquire() {
semaphore := NewPrioritySemaphore(2)
s.True(semaphore.TryAcquire(1))
s.True(semaphore.TryAcquire(1))
s.False(semaphore.TryAcquire(1))
s.False(semaphore.TryAcquire(1))
s.True(semaphore.TryAcquire(PriorityHigh, 1))
s.True(semaphore.TryAcquire(PriorityHigh, 1))
s.False(semaphore.TryAcquire(PriorityHigh, 1))
s.False(semaphore.TryAcquire(PriorityHigh, 1))
semaphore.Release(2)
s.True(semaphore.TryAcquire(1))
s.True(semaphore.TryAcquire(PriorityHigh, 1))
}

func (s *prioritySemaphoreSuite) TestAcquire_High_Success() {
semaphore := NewPrioritySemaphore(1)
ctx := context.Background()
err := semaphore.Acquire(ctx, PriorityHigh, 1)
s.NoError(err)
s.False(semaphore.TryAcquire(1))
s.False(semaphore.TryAcquire(PriorityHigh, 1))
semaphore.Release(1)
s.True(semaphore.TryAcquire(1))
s.True(semaphore.TryAcquire(PriorityHigh, 1))
}

func (s *prioritySemaphoreSuite) TestAcquire_Low_Success() {
semaphore := NewPrioritySemaphore(1)
ctx := context.Background()
err := semaphore.Acquire(ctx, PriorityLow, 1)
s.NoError(err)
s.False(semaphore.TryAcquire(1))
s.False(semaphore.TryAcquire(PriorityHigh, 1))
semaphore.Release(1)
s.True(semaphore.TryAcquire(1))
s.True(semaphore.TryAcquire(PriorityHigh, 1))
}

func (s *prioritySemaphoreSuite) TestTryAcquire_HighAfterWaiting() {
semaphore := NewPrioritySemaphore(1)
cLock := make(chan struct{})
go func() {
// Acquire the function to make the next call blocking.
s.True(semaphore.TryAcquire(1))
s.True(semaphore.TryAcquire(PriorityHigh, 1))
// Let the other thread start which will block on this semaphore.
cLock <- struct{}{}
// Wait for other thread to block on this semaphore.
Expand All @@ -102,7 +102,7 @@ func (s *prioritySemaphoreSuite) TestTryAcquire_LowAfterWaiting() {
cLock := make(chan struct{})
go func() {
// Acquire the function to make the next call blocking.
s.True(semaphore.TryAcquire(1))
s.True(semaphore.TryAcquire(PriorityHigh, 1))
// Let the other thread start which will block on this semaphore.
cLock <- struct{}{}
// Wait for other thread to block on this semaphore.
Expand All @@ -117,7 +117,7 @@ func (s *prioritySemaphoreSuite) TestTryAcquire_LowAfterWaiting() {
func (s *prioritySemaphoreSuite) TestTryAcquire_HighAllowedBeforeLow() {
semaphore := NewPrioritySemaphore(1)
wg := sync.WaitGroup{}
s.True(semaphore.TryAcquire(1))
s.True(semaphore.TryAcquire(PriorityHigh, 1))
wg.Add(1)
go func() {
s.waitUntilBlockedInSemaphore(2)
Expand Down
Loading