We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent 48818d8 commit e88d02aCopy full SHA for e88d02a
pulsar/internal/connection.go
@@ -396,6 +396,18 @@ func (c *connection) failLeftRequestsWhenClose() {
396
}
397
398
399
+func (c *connection) drainWriteRequests() {
400
+ for {
401
+ select {
402
+ case req := <-c.writeRequestsCh:
403
+ c.bufferPool.Put(req.data)
404
+
405
+ default:
406
+ return
407
+ }
408
409
+}
410
411
func (c *connection) run() {
412
pingSendTicker := time.NewTicker(c.keepAliveInterval)
413
pingCheckTicker := time.NewTicker(c.keepAliveInterval)
@@ -423,6 +435,7 @@ func (c *connection) run() {
423
435
select {
424
436
case <-c.closeCh:
425
437
c.failLeftRequestsWhenClose()
438
+ c.drainWriteRequests()
426
439
return
427
440
case req := <-c.incomingRequestsCh:
428
441
if req == nil {
0 commit comments