diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 1faccc918e..a04fa2a270 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -129,11 +129,6 @@ type request struct { callback func(command *pb.BaseCommand, err error) } -type incomingCmd struct { - cmd *pb.BaseCommand - headersAndPayload Buffer -} - type connection struct { started int32 connectionTimeout time.Duration @@ -160,7 +155,6 @@ type connection struct { incomingRequestsWG sync.WaitGroup incomingRequestsCh chan *request - incomingCmdCh chan *incomingCmd closeCh chan struct{} readyCh chan struct{} writeRequestsCh chan Buffer @@ -209,7 +203,6 @@ func newConnection(opts connectionOptions) *connection { closeCh: make(chan struct{}), readyCh: make(chan struct{}), incomingRequestsCh: make(chan *request, 10), - incomingCmdCh: make(chan *incomingCmd, 10), // This channel is used to pass data from producers to the connection // go routine. It can become contended or blocking if we have multiple @@ -438,9 +431,6 @@ func (c *connection) run() { select { case <-c.closeCh: return - - case cmd := <-c.incomingCmdCh: - c.internalReceivedCommand(cmd.cmd, cmd.headersAndPayload) case data := <-c.writeRequestsCh: if data == nil { return @@ -534,10 +524,6 @@ func (c *connection) writeCommand(cmd *pb.BaseCommand) { } func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) { - c.incomingCmdCh <- &incomingCmd{cmd, headersAndPayload} -} - -func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) { c.log.Debugf("Received command: %s -- payload: %v", cmd, headersAndPayload) c.setLastDataReceived(time.Now())