@@ -257,7 +257,7 @@ func (p *availablePermits) flowIfNeed() {
257257 availablePermits := current
258258 requestedPermits := current
259259 // check if permits changed
260- if ! p .permits .CAS (current , 0 ) {
260+ if ! p .permits .CompareAndSwap (current , 0 ) {
261261 return
262262 }
263263
@@ -2084,13 +2084,13 @@ func (pc *partitionConsumer) expectMoreIncomingMessages() {
20842084 if ! pc .options .autoReceiverQueueSize {
20852085 return
20862086 }
2087- if pc .scaleReceiverQueueHint .CAS (true , false ) {
2087+ if pc .scaleReceiverQueueHint .CompareAndSwap (true , false ) {
20882088 oldSize := pc .currentQueueSize .Load ()
20892089 maxSize := int32 (pc .options .receiverQueueSize )
20902090 newSize := int32 (math .Min (float64 (maxSize ), float64 (oldSize * 2 )))
20912091 usagePercent := pc .client .memLimit .CurrentUsagePercent ()
20922092 if usagePercent < receiverQueueExpansionMemThreshold && newSize > oldSize {
2093- pc .currentQueueSize .CAS (oldSize , newSize )
2093+ pc .currentQueueSize .CompareAndSwap (oldSize , newSize )
20942094 pc .availablePermits .add (newSize - oldSize )
20952095 pc .log .Debugf ("update currentQueueSize from %d -> %d" , oldSize , newSize )
20962096 }
@@ -2116,7 +2116,7 @@ func (pc *partitionConsumer) shrinkReceiverQueueSize() {
21162116 minSize := int32 (math .Min (float64 (initialReceiverQueueSize ), float64 (pc .options .receiverQueueSize )))
21172117 newSize := int32 (math .Max (float64 (minSize ), float64 (oldSize / 2 )))
21182118 if newSize < oldSize {
2119- pc .currentQueueSize .CAS (oldSize , newSize )
2119+ pc .currentQueueSize .CompareAndSwap (oldSize , newSize )
21202120 pc .availablePermits .add (newSize - oldSize )
21212121 pc .log .Debugf ("update currentQueueSize from %d -> %d" , oldSize , newSize )
21222122 }
0 commit comments