Skip to content

Commit 64fc040

Browse files
Iachimovschi DanuIachimovschi Danu
authored andcommitted
add decoder encoder to kafka
1 parent f9ea2d7 commit 64fc040

File tree

2 files changed

+35
-14
lines changed

2 files changed

+35
-14
lines changed

src/Infrastructure/Command/UserConsumerCommand.php

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
1717
use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException;
1818
use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException;
19+
use Jobcloud\Kafka\Message\Decoder\AvroDecoder;
20+
use Jobcloud\Kafka\Message\Encoder\AvroEncoder;
21+
use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistry;
1922
use Symfony\Component\Console\Attribute\AsCommand;
2023
use Symfony\Component\Console\Command\Command;
2124
use Symfony\Component\Console\Input\InputArgument;
@@ -46,14 +49,15 @@ protected function execute(InputInterface $input, OutputInterface $output): int
4649
{
4750
$io = new SymfonyStyle($input, $output);
4851

49-
5052
$schemaRegistryClient = new CachedRegistry(
5153
new PromisingRegistry(
5254
new Client(['base_uri' => 'schema-registry:8081'])
5355
),
5456
new AvroObjectCacheAdapter()
5557
);
5658

59+
$registry = new AvroSchemaRegistry($schemaRegistryClient);
60+
5761
$recordSerializer = new RecordSerializer(
5862
$schemaRegistryClient,
5963
[
@@ -62,19 +66,6 @@ protected function execute(InputInterface $input, OutputInterface $output): int
6266
]
6367
);
6468

65-
$consumer = KafkaConsumerBuilder::create()
66-
->withAdditionalConfig(
67-
[
68-
'enable.auto.commit' => false
69-
]
70-
)
71-
->withAdditionalBroker('kafka:9092')
72-
->withConsumerGroup('testGroup')
73-
->withAdditionalSubscription('users')
74-
->build();
75-
76-
$consumer->subscribe();
77-
7869
$schema = <<<'JSON'
7970
{
8071
"type": "record",
@@ -89,6 +80,22 @@ protected function execute(InputInterface $input, OutputInterface $output): int
8980

9081
$avroSchema = AvroSchema::parse($schema);
9182

83+
$decoder = new AvroDecoder($registry, $recordSerializer);
84+
85+
$consumer = KafkaConsumerBuilder::create()
86+
->withAdditionalConfig(
87+
[
88+
'enable.auto.commit' => false,
89+
]
90+
)
91+
->withDecoder($decoder)
92+
->withAdditionalBroker('kafka:9092')
93+
->withConsumerGroup('testGroup')
94+
->withAdditionalSubscription('users')
95+
->build();
96+
97+
$consumer->subscribe();
98+
9299
while (true) {
93100
try {
94101
$message = $consumer->consume();

src/Infrastructure/Command/UserProducerCommand.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@
1111
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
1212
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
1313
use GuzzleHttp\Client;
14+
use Jobcloud\Kafka\Message\Decoder\AvroDecoder;
15+
use Jobcloud\Kafka\Message\Encoder\AvroEncoder;
1416
use Jobcloud\Kafka\Message\KafkaProducerMessage;
17+
use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistry;
1518
use Jobcloud\Kafka\Producer\KafkaProducerBuilder;
1619
use League\Csv\Reader;
1720
use Symfony\Component\Console\Attribute\AsCommand;
@@ -37,6 +40,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
3740
new AvroObjectCacheAdapter()
3841
);
3942

43+
$registry = new AvroSchemaRegistry($schemaRegistryClient);
44+
4045
$recordSerializer = new RecordSerializer(
4146
$schemaRegistryClient,
4247
[
@@ -60,7 +65,16 @@ protected function execute(InputInterface $input, OutputInterface $output): int
6065

6166
$avroSchema = AvroSchema::parse($schema);
6267

68+
$encoder = new AvroEncoder($registry, $recordSerializer);
69+
6370
$producer = KafkaProducerBuilder::create()
71+
->withAdditionalConfig(
72+
[
73+
'compression.codec' => 'lz4',
74+
'auto.commit.interval.ms' => 500
75+
]
76+
)
77+
->withEncoder($encoder)
6478
->withAdditionalBroker('kafka:9092')
6579
->build();
6680

0 commit comments

Comments
 (0)