File tree Expand file tree Collapse file tree 4 files changed +41
-1
lines changed
Expand file tree Collapse file tree 4 files changed +41
-1
lines changed Original file line number Diff line number Diff line change @@ -13,6 +13,13 @@ composer require laravel-tool/kafka-queue
1313'kafka' => [
1414 'driver' => 'kafka',
1515 'broker_list' => env('KAFKA_BROKER_LIST', 'kafka:9092'),
16+ 'auth' => [
17+ 'enable' => env('KAFKA_AUTH_ENABLE', false),
18+ 'mechanism' => env('KAFKA_AUTH_MECHANISM', 'PLAIN'),
19+ 'username' => env('KAFKA_AUTH_USERNAME'),
20+ 'password' => env('KAFKA_AUTH_PASSWORD'),
21+ 'ssl_ca_location' => env('KAFKA_AUTH_SSL_CA_LOCATION'),
22+ ],
1623 'queue' => env('KAFKA_QUEUE', 'default'),
1724 'heartbeat_ms' => env('KAFKA_HEARTBEAT', 5000),
1825 'group_name' => env('KAFKA_QUEUE_GROUP', 'default'),
Original file line number Diff line number Diff line change 1+ <?php
2+
3+ namespace LaravelTool \KafkaQueue \Kafka ;
4+
5+ use RdKafka \Conf as KafkaConfig ;
6+
7+ trait AuthConfigTrait
8+ {
9+ public function authConfig (KafkaConfig &$ kafkaConfig , array $ config ): void
10+ {
11+ if (!empty ($ config ['auth ' ]) && $ config ['auth ' ]['enable ' ]) {
12+ $ kafkaConfig ->set ('sasl.mechanisms ' , $ config ['auth ' ]['mechanism ' ]);
13+ $ kafkaConfig ->set ('sasl.username ' , $ config ['auth ' ]['username ' ]);
14+ $ kafkaConfig ->set ('sasl.password ' , $ config ['auth ' ]['password ' ]);
15+ if ($ config ['auth ' ]['ssl_ca_location ' ]) {
16+ $ kafkaConfig ->set ('ssl.ca.location ' , $ config ['auth ' ]['ssl_ca_location ' ]);
17+ }
18+ }
19+ }
20+ }
Original file line number Diff line number Diff line change 1212
1313class Consumer
1414{
15+ use AuthConfigTrait;
16+
1517 private KafkaConsumer $ consumer ;
1618 private Metadata $ metadata ;
1719 private array $ topics ;
@@ -24,7 +26,9 @@ public function __construct(
2426 ) {
2527 $ this ->consumer = new KafkaConsumer ($ this ->generateConfig ($ this ->config ));
2628
27- $ this ->metadata ();
29+ if (!$ this ->config ['queue_disable_length ' ]) {
30+ $ this ->metadata ();
31+ }
2832 }
2933
3034 public function consume (string $ topic ): ?Message
@@ -54,6 +58,9 @@ public function commit(): void
5458
5559 public function size (string $ topicName ): int
5660 {
61+ if ($ this ->config ['queue_disable_length ' ]) {
62+ return 0 ;
63+ }
5764
5865 try {
5966 if ($ topic = $ this ->getTopic ($ topicName )) {
@@ -115,6 +122,8 @@ private function generateConfig(array $config): KafkaConfig
115122 $ kafkaConfig ->set ('heartbeat.interval.ms ' , $ config ['heartbeat_ms ' ]);
116123 $ kafkaConfig ->set ('auto.offset.reset ' , 'earliest ' );
117124
125+ $ this ->authConfig ($ kafkaConfig , $ config );
126+
118127 return $ kafkaConfig ;
119128 }
120129
Original file line number Diff line number Diff line change 99
1010class Producer
1111{
12+ use AuthConfigTrait;
13+
1214 private KafkaProducer $ producer ;
1315
1416 public function __construct (
@@ -49,6 +51,8 @@ private function generateConfig(array $config): KafkaConfig
4951 $ kafkaConfig = new KafkaConfig ();
5052 $ kafkaConfig ->set ('metadata.broker.list ' , $ config ['broker_list ' ]);
5153
54+ $ this ->authConfig ($ kafkaConfig , $ config );
55+
5256 return $ kafkaConfig ;
5357 }
5458}
You can’t perform that action at this time.
0 commit comments