Skip to content

Commit 55bb665

Browse files
committed
prevents adding useless wait channels
1 parent a3c75ce commit 55bb665

File tree

1 file changed

+31
-20
lines changed

1 file changed

+31
-20
lines changed

fifo_queue.go

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -126,32 +126,43 @@ func (st *FIFO) DequeueOrWaitForNextElementContext(ctx context.Context) (interfa
126126
// enqueue a watcher into the watchForNextElementChannel to wait for the next element
127127
case st.waitForNextElementChan <- waitChan:
128128

129-
// re-checks every i milliseconds (top: 10 times) ... the following verifies if an item was enqueued
130-
// around the same time DequeueOrWaitForNextElementContext was invoked, meaning the waitChan wasn't yet sent over
131-
// st.waitForNextElementChan
132-
for i := 0; i < dequeueOrWaitForNextElementInvokeGapTime; i++ {
133-
select {
134-
case <-ctx.Done():
135-
return nil, ctx.Err()
136-
case dequeuedItem := <-waitChan:
137-
return dequeuedItem, nil
138-
case <-time.After(time.Millisecond * time.Duration(i)):
139-
if dequeuedItem, err := st.Dequeue(); err == nil {
129+
// n
130+
for {
131+
// re-checks every i milliseconds (top: 10 times) ... the following verifies if an item was enqueued
132+
// around the same time DequeueOrWaitForNextElementContext was invoked, meaning the waitChan wasn't yet sent over
133+
// st.waitForNextElementChan
134+
for i := 0; i < dequeueOrWaitForNextElementInvokeGapTime; i++ {
135+
select {
136+
case <-ctx.Done():
137+
return nil, ctx.Err()
138+
case dequeuedItem := <-waitChan:
140139
return dequeuedItem, nil
140+
case <-time.After(time.Millisecond * time.Duration(i)):
141+
if dequeuedItem, err := st.Dequeue(); err == nil {
142+
return dequeuedItem, nil
143+
}
141144
}
142145
}
143-
}
144146

145-
// return the next enqueued element, if any
146-
select {
147-
case <-st.unlockDequeueOrWaitForNextElementChan:
147+
// return the next enqueued element, if any
148+
select {
148149
// new enqueued element, no need to keep waiting
149-
break
150+
case <-st.unlockDequeueOrWaitForNextElementChan:
151+
// check if we got a new element just after we got <-st.unlockDequeueOrWaitForNextElementChan
152+
select {
153+
case item := <-waitChan:
154+
return item, nil
155+
default:
156+
}
157+
// go back to: for loop
158+
continue
150159

151-
case item := <-waitChan:
152-
return item, nil
153-
case <-ctx.Done():
154-
return nil, ctx.Err()
160+
case item := <-waitChan:
161+
return item, nil
162+
case <-ctx.Done():
163+
return nil, ctx.Err()
164+
}
165+
// n
155166
}
156167
default:
157168
// too many watchers (waitForNextElementChanCapacity) enqueued waiting for next elements

0 commit comments

Comments
 (0)