Skip to content

Commit 6cb2867

Browse files
authored
opt: delete redundant channel connection.incomingCmdCh (#1343)
* opt: delete redundant channel * fix lint error * merge internalReceivedCommand and receivedCommand
1 parent 84f3677 commit 6cb2867

File tree

1 file changed

+0
-14
lines changed

1 file changed

+0
-14
lines changed

pulsar/internal/connection.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,6 @@ type request struct {
129129
callback func(command *pb.BaseCommand, err error)
130130
}
131131

132-
type incomingCmd struct {
133-
cmd *pb.BaseCommand
134-
headersAndPayload Buffer
135-
}
136-
137132
type connection struct {
138133
started int32
139134
connectionTimeout time.Duration
@@ -160,7 +155,6 @@ type connection struct {
160155

161156
incomingRequestsWG sync.WaitGroup
162157
incomingRequestsCh chan *request
163-
incomingCmdCh chan *incomingCmd
164158
closeCh chan struct{}
165159
readyCh chan struct{}
166160
writeRequestsCh chan Buffer
@@ -209,7 +203,6 @@ func newConnection(opts connectionOptions) *connection {
209203
closeCh: make(chan struct{}),
210204
readyCh: make(chan struct{}),
211205
incomingRequestsCh: make(chan *request, 10),
212-
incomingCmdCh: make(chan *incomingCmd, 10),
213206

214207
// This channel is used to pass data from producers to the connection
215208
// go routine. It can become contended or blocking if we have multiple
@@ -438,9 +431,6 @@ func (c *connection) run() {
438431
select {
439432
case <-c.closeCh:
440433
return
441-
442-
case cmd := <-c.incomingCmdCh:
443-
c.internalReceivedCommand(cmd.cmd, cmd.headersAndPayload)
444434
case data := <-c.writeRequestsCh:
445435
if data == nil {
446436
return
@@ -534,10 +524,6 @@ func (c *connection) writeCommand(cmd *pb.BaseCommand) {
534524
}
535525

536526
func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) {
537-
c.incomingCmdCh <- &incomingCmd{cmd, headersAndPayload}
538-
}
539-
540-
func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) {
541527
c.log.Debugf("Received command: %s -- payload: %v", cmd, headersAndPayload)
542528
c.setLastDataReceived(time.Now())
543529

0 commit comments

Comments
 (0)