diff --git a/kafka/consumer.go b/kafka/consumer.go index efc54d00e..0e02a4e3b 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -1015,7 +1015,7 @@ func cEventToRebalanceEvent(rkev *C.rd_kafka_event_t) Event { // In the polling case (not channel based consumer) the rebalance event // is returned in retval, else nil is returned. -func (c *Consumer) handleRebalanceEvent(channel chan Event, rkev *C.rd_kafka_event_t) (retval Event) { +func (c *Consumer) handleRebalanceEvent(channel chan<- Event, rkev *C.rd_kafka_event_t) (retval Event) { var ev Event diff --git a/kafka/event.go b/kafka/event.go index aefa87bea..13a80d954 100644 --- a/kafka/event.go +++ b/kafka/event.go @@ -150,7 +150,7 @@ func (o OAuthBearerTokenRefresh) String() string { // to indicate that `channel` is being terminated. // returns (event Event, terminate Bool) tuple, where Terminate indicates // if termChan received a termination event. -func (h *handle) eventPoll(channel chan Event, timeoutMs int, maxEvents int, termChan chan bool) (Event, bool) { +func (h *handle) eventPoll(channel chan<- Event, timeoutMs int, maxEvents int, termChan chan bool) (Event, bool) { var prevRkev *C.rd_kafka_event_t term := false @@ -228,7 +228,7 @@ out: for _, rkmessage := range rkmessages[:cnt] { msg := h.newMessageFromC(rkmessage) - var ch *chan Event + var ch *chan<- Event if rkmessage._private != nil { // Find cgoif by id diff --git a/kafka/handle.go b/kafka/handle.go index 643d80bc6..f8ccc382f 100644 --- a/kafka/handle.go +++ b/kafka/handle.go @@ -247,7 +247,7 @@ type cgoif interface{} // delivery report cgoif container type cgoDr struct { - deliveryChan chan Event + deliveryChan chan<- Event opaque interface{} } diff --git a/kafka/producer.go b/kafka/producer.go index 47bc2ee94..f96b89d53 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -172,7 +172,7 @@ func (p *Producer) gethandle() *handle { return &p.handle } -func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event) error { +func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan<- Event) error { if msg == nil || msg.TopicPartition.Topic == nil || len(*msg.TopicPartition.Topic) == 0 { return newErrorFromString(ErrInvalidArg, "") }