Skip to content

Commit f9ea2d7

Browse files
Iachimovschi DanuIachimovschi Danu
authored andcommitted
example of avro serialize desirialize message
1 parent f4678e2 commit f9ea2d7

File tree

2 files changed

+86
-6
lines changed

2 files changed

+86
-6
lines changed

src/Infrastructure/Command/UserConsumerCommand.php

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,13 @@
55
namespace App\Infrastructure\Command;
66

77
use App\Domain\user\Entity\User;
8+
use AvroSchema;
89
use Doctrine\ORM\EntityManagerInterface;
10+
use FlixTech\AvroSerializer\Objects\RecordSerializer;
11+
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
12+
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
13+
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
14+
use GuzzleHttp\Client;
915
use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder;
1016
use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
1117
use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException;
@@ -40,6 +46,22 @@ protected function execute(InputInterface $input, OutputInterface $output): int
4046
{
4147
$io = new SymfonyStyle($input, $output);
4248

49+
50+
$schemaRegistryClient = new CachedRegistry(
51+
new PromisingRegistry(
52+
new Client(['base_uri' => 'schema-registry:8081'])
53+
),
54+
new AvroObjectCacheAdapter()
55+
);
56+
57+
$recordSerializer = new RecordSerializer(
58+
$schemaRegistryClient,
59+
[
60+
RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS => false,
61+
RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS => true,
62+
]
63+
);
64+
4365
$consumer = KafkaConsumerBuilder::create()
4466
->withAdditionalConfig(
4567
[
@@ -53,16 +75,32 @@ protected function execute(InputInterface $input, OutputInterface $output): int
5375

5476
$consumer->subscribe();
5577

78+
$schema = <<<'JSON'
79+
{
80+
"type": "record",
81+
"name": "User",
82+
"fields": [
83+
{"name": "name", "type": "string"},
84+
{"name": "surname", "type": "string"},
85+
{"name": "email", "type": "string"}
86+
]
87+
}
88+
JSON;
89+
90+
$avroSchema = AvroSchema::parse($schema);
91+
5692
while (true) {
5793
try {
5894
$message = $consumer->consume();
59-
$userData = json_decode($message->getBody(), true);
60-
$io->success('Message received: '. $userData['Name']);
95+
96+
$userData = $recordSerializer->decodeMessage($message->getBody(), $avroSchema);
97+
98+
$io->success('Message received: '. $userData['name']);
6199

62100
$user = new User();
63-
$user->setName($userData['Name']);
64-
$user->setSurname($userData['Surname']);
65-
$user->setEmail($userData['Email']);
101+
$user->setName($userData['name']);
102+
$user->setSurname($userData['surname']);
103+
$user->setEmail($userData['email']);
66104

67105
$this->entityManager->persist($user);
68106
$this->entityManager->flush();

src/Infrastructure/Command/UserProducerCommand.php

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@
44

55
namespace App\Infrastructure\Command;
66

7+
use AvroSchema;
78
use Exception;
9+
use FlixTech\AvroSerializer\Objects\RecordSerializer;
10+
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
11+
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
12+
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
13+
use GuzzleHttp\Client;
814
use Jobcloud\Kafka\Message\KafkaProducerMessage;
915
use Jobcloud\Kafka\Producer\KafkaProducerBuilder;
1016
use League\Csv\Reader;
@@ -24,6 +30,36 @@ protected function execute(InputInterface $input, OutputInterface $output): int
2430
{
2531
$io = new SymfonyStyle($input, $output);
2632

33+
$schemaRegistryClient = new CachedRegistry(
34+
new PromisingRegistry(
35+
new Client(['base_uri' => 'schema-registry:8081'])
36+
),
37+
new AvroObjectCacheAdapter()
38+
);
39+
40+
$recordSerializer = new RecordSerializer(
41+
$schemaRegistryClient,
42+
[
43+
RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS => false,
44+
RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS => true,
45+
]
46+
);
47+
48+
// Define Avro Schema
49+
$schema = <<<'JSON'
50+
{
51+
"type": "record",
52+
"name": "User",
53+
"fields": [
54+
{"name": "name", "type": "string"},
55+
{"name": "surname", "type": "string"},
56+
{"name": "email", "type": "string"}
57+
]
58+
}
59+
JSON;
60+
61+
$avroSchema = AvroSchema::parse($schema);
62+
2763
$producer = KafkaProducerBuilder::create()
2864
->withAdditionalBroker('kafka:9092')
2965
->build();
@@ -35,8 +71,14 @@ protected function execute(InputInterface $input, OutputInterface $output): int
3571

3672
foreach ($records as $record) {
3773
try {
74+
$avroData = $recordSerializer->encodeRecord('User', $avroSchema, [
75+
'name' => $record['Name'],
76+
'surname' => $record['Surname'],
77+
'email' => $record['Email']
78+
]);
79+
3880
$message = KafkaProducerMessage::create('users', RD_KAFKA_PARTITION_UA)
39-
->withBody(json_encode($record, JSON_HEX_TAG));
81+
->withBody($avroData);
4082

4183
$producer->produce($message);
4284
$producer->flush(20000);

0 commit comments

Comments
 (0)