Skip to content

Commit 691337c

Browse files
committed
first release
1 parent dfd0a75 commit 691337c

File tree

9 files changed

+467
-0
lines changed

9 files changed

+467
-0
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
/.idea
2+
/vendor
3+
composer.lock

README.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,24 @@
11
# kafka-queue
22
Laravel Kafka Queue connection with Horizon support
3+
4+
## Installation
5+
6+
### 1. Composer
7+
```shell
8+
composer require laravel-tool/kafka-queue
9+
```
10+
11+
### 2. Add to config/queue.php
12+
```php
13+
'kafka' => [
14+
'driver' => 'kafka',
15+
'broker_list' => env('KAFKA_BROKER_LIST', 'kafka:9092'),
16+
'queue' => env('KAFKA_QUEUE', 'default'),
17+
'heartbeat_ms' => env('KAFKA_HEARTBEAT', 5000),
18+
'group_name' => env('KAFKA_QUEUE_GROUP', 'default'),
19+
'producer_timeout_ms' => 1000,
20+
'consumer_timeout_ms' => 3000,
21+
'horizon' => true,
22+
'after_commit' => false,
23+
],
24+
```

composer.json

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
{
2+
"name": "laravel-tool/kafka-queue",
3+
"description": "Laravel Kafka Queue driver with Horizon support",
4+
"keywords": [
5+
"php",
6+
"laravel",
7+
"kafka",
8+
"queue",
9+
"horizon"
10+
],
11+
"type": "laravel-plugin",
12+
"license": "MIT",
13+
"version": "1.0.0",
14+
"authors": [
15+
{
16+
"name": "Yuriy Potemkin",
17+
"email": "ysoft2000@mail.ru"
18+
}
19+
],
20+
"minimum-stability": "dev",
21+
"require": {
22+
"php": "^8.3",
23+
"ext-rdkafka": "*",
24+
"ext-json": "*",
25+
"laravel/framework": ">=10"
26+
},
27+
"autoload": {
28+
"psr-4": {
29+
"LaravelTool\\KafkaQueue\\": "src/"
30+
}
31+
},
32+
"extra": {
33+
"laravel": {
34+
"providers": [
35+
"LaravelTool\\KafkaQueue\\ServiceProvider"
36+
]
37+
}
38+
}
39+
}

src/Connector.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
namespace LaravelTool\KafkaQueue;
4+
5+
use Illuminate\Contracts\Queue\Queue as QueueInterface;
6+
use Illuminate\Queue\Connectors\ConnectorInterface;
7+
use LaravelTool\KafkaQueue\Kafka\Consumer;
8+
use LaravelTool\KafkaQueue\Kafka\Producer;
9+
10+
class Connector implements ConnectorInterface
11+
{
12+
13+
public function connect(array $config): QueueInterface
14+
{
15+
return new Queue(
16+
new Producer($config),
17+
new Consumer($config),
18+
defaultQueue: $config['queue'] ?? 'default',
19+
horizon: $config['horizon'] ?? false,
20+
);
21+
}
22+
23+
}

src/Job.php

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
<?php
2+
3+
namespace LaravelTool\KafkaQueue;
4+
5+
use Illuminate\Container\Container;
6+
use Illuminate\Queue\Jobs\Job as BaseJob;
7+
use Illuminate\Contracts\Queue\Job as JobInterface;
8+
use RdKafka\Message;
9+
10+
class Job extends BaseJob implements JobInterface
11+
{
12+
protected array $decoded;
13+
14+
public function __construct(
15+
Container $container,
16+
protected Queue $kafkaQueue,
17+
protected string $job,
18+
protected Message $message,
19+
) {
20+
$this->container = $container;
21+
$this->decoded = $this->payload();
22+
$this->connectionName = $this->kafkaQueue->getConnectionName();
23+
$this->queue = $this->message->topic_name;
24+
}
25+
26+
public function getJobId()
27+
{
28+
return $this->decoded['id'] ?? null;
29+
}
30+
31+
public function getRawBody(): string
32+
{
33+
return $this->job;
34+
}
35+
36+
public function attempts()
37+
{
38+
return ($this->decoded['attempts'] ?? null) + 1;
39+
}
40+
41+
public function release($delay = 0): void
42+
{
43+
parent::release($delay);
44+
45+
$this->delete();
46+
$this->kafkaQueue->release($delay, $this);
47+
}
48+
49+
public function delete(): void
50+
{
51+
parent::delete();
52+
53+
$this->kafkaQueue->deleteReserved($this->queue, $this);
54+
}
55+
56+
public function getMessageTimestamp(): int
57+
{
58+
return (int) ($this->message->timestamp / 1000);
59+
}
60+
}

src/Kafka/Consumer.php

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
3+
namespace LaravelTool\KafkaQueue\Kafka;
4+
5+
use RdKafka\Conf as KafkaConfig;
6+
use RdKafka\Exception as KafkaException;
7+
use RdKafka\KafkaConsumer;
8+
use RdKafka\Message;
9+
use RuntimeException;
10+
11+
class Consumer
12+
{
13+
private KafkaConsumer $consumer;
14+
15+
public function __construct(
16+
protected array $config
17+
) {
18+
$this->consumer = new KafkaConsumer($this->generateConfig($config));
19+
}
20+
21+
public function consume(string $topic): ?Message
22+
{
23+
try {
24+
$this->checkSubscription($topic);
25+
26+
$message = $this->consumer->consume($this->config['consumer_timeout_ms']);
27+
} catch (KafkaException) {
28+
return null;
29+
}
30+
31+
return match ($message->err) {
32+
RD_KAFKA_RESP_ERR_NO_ERROR => $message,
33+
RD_KAFKA_RESP_ERR__PARTITION_EOF, RD_KAFKA_RESP_ERR__TIMED_OUT, RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC, RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART => null,
34+
default => throw new RuntimeException($message->errstr(), $message->err),
35+
};
36+
}
37+
38+
/**
39+
* @throws KafkaException
40+
*/
41+
public function commit():void
42+
{
43+
$this->consumer->commit();
44+
}
45+
46+
private function generateConfig(array $config): KafkaConfig
47+
{
48+
$kafkaConfig = new KafkaConfig();
49+
$kafkaConfig->set('metadata.broker.list', $config['broker_list']);
50+
$kafkaConfig->set('group.id', $config['group_name']);
51+
$kafkaConfig->set('heartbeat.interval.ms', $config['heartbeat_ms']);
52+
$kafkaConfig->set('auto.offset.reset', 'earliest');
53+
54+
return $kafkaConfig;
55+
}
56+
57+
/**
58+
* @throws KafkaException
59+
*/
60+
private function checkSubscription(string $topic): void
61+
{
62+
if (!in_array($topic, $this->consumer->getSubscription())) {
63+
$this->consumer->subscribe([$topic]);
64+
}
65+
}
66+
}

src/Kafka/Producer.php

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?php
2+
3+
namespace LaravelTool\KafkaQueue\Kafka;
4+
5+
use Closure;
6+
use Illuminate\Support\Str;
7+
use RdKafka\Conf as KafkaConfig;
8+
use RdKafka\Producer as KafkaProducer;
9+
10+
class Producer
11+
{
12+
private KafkaProducer $producer;
13+
14+
public function __construct(
15+
protected array $config
16+
) {
17+
$this->producer = new KafkaProducer($this->generateConfig($config));
18+
}
19+
20+
public function produce(
21+
string $topic,
22+
string $payload,
23+
?int $delay = null,
24+
callable $callback = null,
25+
): void {
26+
$this->producer->newTopic($topic)->producev(
27+
partition: RD_KAFKA_PARTITION_UA,
28+
msgflags: 0,
29+
payload: $payload,
30+
key: $this->getMessageKey(),
31+
timestamp_ms: ($delay ?? time()) * 1000,
32+
);
33+
34+
if (is_callable($callback)) {
35+
$callback();
36+
}
37+
38+
$this->producer->poll(0);
39+
$this->producer->flush($this->config['producer_timeout_ms']);
40+
}
41+
42+
private function getMessageKey(): string
43+
{
44+
return Str::orderedUuid()->toString();
45+
}
46+
47+
private function generateConfig(array $config): KafkaConfig
48+
{
49+
$kafkaConfig = new KafkaConfig();
50+
$kafkaConfig->set('metadata.broker.list', $config['broker_list']);
51+
52+
return $kafkaConfig;
53+
}
54+
}

0 commit comments

Comments
 (0)