@@ -60,6 +60,8 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
6060 items : map [T ]* item [T ]{},
6161 queue : btree .NewG (32 , less [T ]),
6262 metrics : newQueueMetrics [T ](opts .MetricProvider , name , clock.RealClock {}),
63+ // Use a buffered channel to try and not block callers
64+ adds : make (chan * itemsWithOpts [T ], 1000 ),
6365 // itemOrWaiterAdded indicates that an item or
6466 // waiter was added. It must be buffered, because
6567 // if we currently process items we can't tell
@@ -88,6 +90,7 @@ type priorityqueue[T comparable] struct {
8890 items map [T ]* item [T ]
8991 queue bTree [* item [T ]]
9092 metrics queueMetrics [T ]
93+ adds chan * itemsWithOpts [T ]
9194
9295 // addedCounter is a counter of elements added, we need it
9396 // because unixNano is not guaranteed to be unique.
@@ -117,54 +120,67 @@ type priorityqueue[T comparable] struct {
117120}
118121
119122func (w * priorityqueue [T ]) AddWithOpts (o AddOpts , items ... T ) {
120- w .lock . Lock ()
121- defer w . lock . Unlock ()
123+ w .adds <- & itemsWithOpts [ T ]{ opts : o , items : items }
124+ }
122125
123- for _ , key := range items {
124- if o .RateLimited {
125- after := w .rateLimiter .When (key )
126- if o .After == 0 || after < o .After {
127- o .After = after
126+ func (w * priorityqueue [T ]) drainAddsLocked (added * itemsWithOpts [T ]) {
127+ for {
128+ for _ , key := range added .items {
129+ if added .opts .RateLimited {
130+ after := w .rateLimiter .When (key )
131+ if added .opts .After == 0 || after < added .opts .After {
132+ added .opts .After = after
133+ }
128134 }
129- }
130135
131- var readyAt * time.Time
132- if o .After > 0 {
133- readyAt = ptr .To (w .now ().Add (o .After ))
134- w .metrics .retry ()
135- }
136- if _ , ok := w .items [key ]; ! ok {
137- item := & item [T ]{
138- key : key ,
139- addedAtUnixNano : w .now ().UnixNano (),
140- addedCounter : w .addedCounter ,
141- priority : o .Priority ,
142- readyAt : readyAt ,
136+ var readyAt * time.Time
137+ if added .opts .After > 0 {
138+ readyAt = ptr .To (w .now ().Add (added .opts .After ))
139+ w .metrics .retry ()
140+ }
141+ if _ , ok := w .items [key ]; ! ok {
142+ item := & item [T ]{
143+ key : key ,
144+ addedAtUnixNano : w .now ().UnixNano (),
145+ addedCounter : w .addedCounter ,
146+ priority : added .opts .Priority ,
147+ readyAt : readyAt ,
148+ }
149+ w .items [key ] = item
150+ w .queue .ReplaceOrInsert (item )
151+ w .metrics .add (key )
152+ w .addedCounter ++
153+ continue
143154 }
144- w .items [key ] = item
145- w .queue .ReplaceOrInsert (item )
146- w .metrics .add (key )
147- w .addedCounter ++
148- continue
149- }
150155
151- // The b-tree de-duplicates based on ordering and any change here
152- // will affect the order - Just delete and re-add.
153- item , _ := w .queue .Delete (w .items [key ])
154- if o .Priority > item .priority {
155- item .priority = o .Priority
156- }
156+ // The b-tree de-duplicates based on ordering and any change here
157+ // will affect the order - Just delete and re-add.
158+ item , _ := w .queue .Delete (w .items [key ])
159+ if added .opts .Priority > item .priority {
160+ item .priority = added .opts .Priority
161+ }
162+
163+ if item .readyAt != nil && (readyAt == nil || readyAt .Before (* item .readyAt )) {
164+ item .readyAt = readyAt
165+ }
157166
158- if item .readyAt != nil && (readyAt == nil || readyAt .Before (* item .readyAt )) {
159- item .readyAt = readyAt
167+ w .queue .ReplaceOrInsert (item )
160168 }
161169
162- w .queue .ReplaceOrInsert (item )
170+ // Drain the remainder of the channel. Has to be at the end,
171+ // because the first item is read in spin() and passed on
172+ // to us.
173+ select {
174+ case added = <- w .adds :
175+ default :
176+ return
177+ }
163178 }
179+ }
164180
165- if len ( items ) > 0 {
166- w . notifyItemOrWaiterAdded ()
167- }
181+ type itemsWithOpts [ T comparable ] struct {
182+ opts AddOpts
183+ items [] T
168184}
169185
170186func (w * priorityqueue [T ]) notifyItemOrWaiterAdded () {
@@ -179,11 +195,13 @@ func (w *priorityqueue[T]) spin() {
179195 var nextReady <- chan time.Time
180196 nextReady = blockForever
181197
198+ var addedItem * itemsWithOpts [T ]
182199 for {
183200 select {
184201 case <- w .done :
185202 return
186203 case <- w .itemOrWaiterAdded :
204+ case addedItem = <- w .adds :
187205 case <- nextReady :
188206 }
189207
@@ -193,6 +211,11 @@ func (w *priorityqueue[T]) spin() {
193211 w .lock .Lock ()
194212 defer w .lock .Unlock ()
195213
214+ if addedItem != nil {
215+ w .drainAddsLocked (addedItem )
216+ }
217+ addedItem = nil
218+
196219 w .lockedLock .Lock ()
197220 defer w .lockedLock .Unlock ()
198221
0 commit comments