@@ -13,37 +13,18 @@ class KafkaPubSubAdapter implements PubSubAdapterInterface
1313 protected $ producer ;
1414
1515 /**
16- * @var \RdKafka\Consumer
16+ * @var \RdKafka\KafkaConsumer
1717 */
1818 protected $ consumer ;
1919
20- /**
21- * @var \RdKafka\TopicConf
22- */
23- protected $ topicConfig ;
24-
25- /**
26- * @var mixed
27- */
28- protected $ consumerOffset ;
29-
3020 /**
3121 * @param \RdKafka\Producer $producer
32- * @param \RdKafka\Consumer $consumer
33- * @param \RdKafka\TopicConf $topicConfig
34- * @param mixed $consumerOffset The offset at which to start consumption
35- * (RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED)
22+ * @param \RdKafka\KafkaConsumer $consumer
3623 */
37- public function __construct (
38- \RdKafka \Producer $ producer ,
39- \RdKafka \Consumer $ consumer ,
40- \RdKafka \TopicConf $ topicConfig ,
41- $ consumerOffset = RD_KAFKA_OFFSET_END
42- ) {
24+ public function __construct (\RdKafka \Producer $ producer , \RdKafka \KafkaConsumer $ consumer )
25+ {
4326 $ this ->producer = $ producer ;
4427 $ this ->consumer = $ consumer ;
45- $ this ->topicConfig = $ topicConfig ;
46- $ this ->consumerOffset = $ consumerOffset ;
4728 }
4829
4930 /**
@@ -59,45 +40,13 @@ public function getProducer()
5940 /**
6041 * Return the Kafka consumer.
6142 *
62- * @return \RdKafka\Consumer
43+ * @return \RdKafka\KafkaConsumer
6344 */
6445 public function getConsumer ()
6546 {
6647 return $ this ->consumer ;
6748 }
6849
69- /**
70- * Return the Kafka TopicConfig.
71- *
72- * @return \RdKafka\TopicConf
73- */
74- public function getTopicConfig ()
75- {
76- return $ this ->topicConfig ;
77- }
78-
79- /**
80- * Return the Kafka consumer offset at which `subscribe()` calls begin consumption.
81- *
82- * @return mixed
83- */
84- public function getConsumerOffset ()
85- {
86- return $ this ->consumerOffset ;
87- }
88-
89- /**
90- * Set the Kafka consumer offset at which `subscribe()` calls begin consumption.
91- *
92- * This can be one of `RD_KAFKA_OFFSET_BEGINNING`, `RD_KAFKA_OFFSET_END` or `RD_KAFKA_OFFSET_STORED`
93- *
94- * @param mixed $consumerOffset
95- */
96- public function setConsumerOffset ($ consumerOffset )
97- {
98- $ this ->consumerOffset = $ consumerOffset ;
99- }
100-
10150 /**
10251 * Subscribe a handler to a channel.
10352 *
@@ -107,14 +56,12 @@ public function setConsumerOffset($consumerOffset)
10756 */
10857 public function subscribe ($ channel , callable $ handler )
10958 {
110- $ topic = $ this ->consumer ->newTopic ($ channel , $ this ->topicConfig );
111-
112- $ topic ->consumeStart (0 , $ this ->consumerOffset );
59+ $ this ->consumer ->subscribe ([$ channel ]);
11360
11461 $ isSubscriptionLoopActive = true ;
11562
11663 while ($ isSubscriptionLoopActive ) {
117- $ message = $ topic -> consume (0 , 1000 );
64+ $ message = $ this -> consumer -> consume (300 );
11865
11966 if ($ message === null ) {
12067 continue ;
@@ -126,10 +73,12 @@ public function subscribe($channel, callable $handler)
12673
12774 if ($ payload === 'unsubscribe ' ) {
12875 $ isSubscriptionLoopActive = false ;
129- break ;
76+ } else {
77+ call_user_func ($ handler , $ payload );
13078 }
13179
132- call_user_func ($ handler , $ payload );
80+ $ this ->consumer ->commitAsync ($ message );
81+
13382 break ;
13483 case RD_KAFKA_RESP_ERR__PARTITION_EOF :
13584 case RD_KAFKA_RESP_ERR__TIMED_OUT :
@@ -148,7 +97,7 @@ public function subscribe($channel, callable $handler)
14897 */
14998 public function publish ($ channel , $ message )
15099 {
151- $ topic = $ this ->producer ->newTopic ($ channel, $ this -> topicConfig );
100+ $ topic = $ this ->producer ->newTopic ($ channel );
152101 $ topic ->produce (RD_KAFKA_PARTITION_UA , 0 , Utils::serializeMessage ($ message ));
153102 }
154103}
0 commit comments