Skip to content

Commit 549d213

Browse files
Iachimovschi DanuIachimovschi Danu
authored andcommitted
add avro client register
1 parent 64fc040 commit 549d213

File tree

11 files changed

+216
-67
lines changed

11 files changed

+216
-67
lines changed

.env

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,7 @@ RABBIT_MESSENGER_TRANSPORT_DSN=amqp://guest:guest@rabbitmq:5672/%2f
4141
# DATABASE_URL="mysql://app:!ChangeMe!@127.0.0.1:3306/app?serverVersion=8.0.32&charset=utf8mb4"
4242
DATABASE_URL="postgresql://user:password@db:5432/app_db?serverVersion=16&charset=utf8"
4343
###< doctrine/doctrine-bundle ###
44+
45+
# SCHEMA SCHEMA REGISTRY URL FOR AVRO SCHEMA REGISTRY
46+
# SCHEMA_REGISTRY_URL=http://localhost:8081
47+
SCHEMA_REGISTRY_URL=schema-registry:8081

.env.dev

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,26 @@
1-
21
###> symfony/framework-bundle ###
3-
APP_SECRET=ad170ccb23f26ba4ec3cdd58cbbbbd9f
2+
APP_ENV=dev
3+
APP_DEBUG=true
4+
APP_SECRET=dev-secret-key
45
###< symfony/framework-bundle ###
6+
7+
###> symfony/messenger ###
8+
# In development, you might want to use a local Kafka or disable messaging
9+
MESSENGER_TRANSPORT_DSN=kafka://localhost:9092
10+
KAFKA_BROKERS=localhost:9092
11+
12+
# If you are using RabbitMQ locally for messaging:
13+
RABBIT_MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f
14+
###< symfony/messenger ###
15+
16+
###> doctrine/doctrine-bundle ###
17+
# In development, it's common to use SQLite or PostgreSQL with a dev-specific database
18+
DATABASE_URL="postgresql://user:password@localhost:5432/app_db_dev?serverVersion=16&charset=utf8"
19+
# Alternatively, for SQLite:
20+
# DATABASE_URL="sqlite:///%kernel.project_dir%/var/data_dev.db"
21+
###< doctrine/doctrine-bundle ###
22+
23+
###> Avro Schema Registry ###
24+
# Use a local schema registry for development
25+
SCHEMA_REGISTRY_URL=schema-registry:8081
26+
###< Avro Schema Registry ###
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
parameters:
2+
schema_registry.url: '%env(SCHEMA_REGISTRY_URL)%'
3+
schema_registry.register_missing_schemas: 0
4+
schema_registry.register_missing_subjects: 1

config/services.yaml

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,25 @@
1-
# This file is the entry point to configure your own services.
2-
# Files in the packages/ subdirectory configure your dependencies.
3-
4-
# Put parameters here that don't need to change on each machine where the app is deployed
5-
# https://symfony.com/doc/current/best_practices.html#use-parameters-for-application-configuration
61
parameters:
72

3+
4+
imports:
5+
- { resource: 'packages/schema_registry.yaml' }
6+
87
services:
9-
# default configuration for services in *this* file
108
_defaults:
11-
autowire: true # Automatically injects dependencies in your services.
12-
autoconfigure: true # Automatically registers your services as commands, event subscribers, etc.
9+
autowire: true
10+
autoconfigure: true
1311

14-
# makes classes in src/ available to be used as services
15-
# this creates a service per class whose id is the fully-qualified class name
1612
App\:
1713
resource: '../src/'
1814
exclude:
1915
- '../src/DependencyInjection/'
2016
- '../src/Entity/'
2117
- '../src/Kernel.php'
2218

23-
# add more service definitions when explicit configuration is needed
24-
# please note that last definitions always *replace* previous ones
19+
App\Infrastructure\Avro\SchemaRegistryClientInterface: '@App\Infrastructure\Avro\ConfluenceSchemaRegistryClient'
20+
App\Infrastructure\Avro\ConfluenceSchemaRegistryClient:
21+
arguments:
22+
$schemaRegistryService: '@App\Infrastructure\Avro\SchemaRegistryService'
23+
public: true
24+
shared: true
25+
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace App\Infrastructure\Avro;
6+
7+
use App\Infrastructure\Avro\Interfaces\SchemaRegistryClientInterface;
8+
use FlixTech\AvroSerializer\Objects\RecordSerializer;
9+
use FlixTech\SchemaRegistryApi\Registry;
10+
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
11+
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
12+
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
13+
use GuzzleHttp\Client;
14+
use Jobcloud\Kafka\Message\Decoder\AvroDecoder;
15+
use Jobcloud\Kafka\Message\Decoder\AvroDecoderInterface;
16+
use Jobcloud\Kafka\Message\Encoder\AvroEncoder;
17+
use Jobcloud\Kafka\Message\Encoder\AvroEncoderInterface;
18+
use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistry;
19+
use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistryInterface;
20+
21+
class ConfluenceSchemaRegistryClient implements SchemaRegistryClientInterface
22+
{
23+
private readonly Registry $cachedRegistry;
24+
25+
private readonly AvroSchemaRegistryInterface $schemaRegistryClient;
26+
27+
private readonly RecordSerializer $recordSerializer;
28+
29+
public function __construct(SchemaRegistryService $schemaRegistryService)
30+
{
31+
$this->cachedRegistry = new CachedRegistry(
32+
new PromisingRegistry(
33+
new Client(['base_uri' => $schemaRegistryService->getUrl()])
34+
),
35+
new AvroObjectCacheAdapter()
36+
);
37+
$this->schemaRegistryClient = new AvroSchemaRegistry($this->cachedRegistry);
38+
39+
$this->recordSerializer = new RecordSerializer(
40+
$this->cachedRegistry,
41+
[
42+
RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS => $schemaRegistryService->getRegisterMissingSchemas(),
43+
RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS => $schemaRegistryService->getRegisterMissingSubjects(),
44+
]
45+
);
46+
}
47+
48+
public function getRegistry(): Registry
49+
{
50+
return $this->cachedRegistry;
51+
}
52+
53+
public function getSchemaRegistryClient(): AvroSchemaRegistryInterface
54+
{
55+
return $this->schemaRegistryClient;
56+
}
57+
58+
public function getRecordSerializer(): RecordSerializer
59+
{
60+
return $this->recordSerializer;
61+
}
62+
63+
public function getEncoder(): AvroEncoderInterface
64+
{
65+
return new AvroEncoder($this->schemaRegistryClient, $this->recordSerializer);
66+
}
67+
68+
public function getDecoder(): AvroDecoderInterface
69+
{
70+
return new AvroDecoder($this->schemaRegistryClient, $this->recordSerializer);
71+
}
72+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace App\Infrastructure\Avro\Interfaces;
6+
7+
interface SchemaRegistryClientInterface extends SchemaSerializerInterface, SchemaRegistryInterface
8+
{
9+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace App\Infrastructure\Avro\Interfaces;
6+
7+
use FlixTech\SchemaRegistryApi\Registry;
8+
use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistryInterface;
9+
10+
interface SchemaRegistryInterface
11+
{
12+
public function getRegistry(): Registry;
13+
14+
public function getSchemaRegistryClient(): AvroSchemaRegistryInterface;
15+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace App\Infrastructure\Avro\Interfaces;
6+
7+
use FlixTech\AvroSerializer\Objects\RecordSerializer;
8+
use Jobcloud\Kafka\Message\Decoder\AvroDecoderInterface;
9+
use Jobcloud\Kafka\Message\Encoder\AvroEncoderInterface;
10+
11+
interface SchemaSerializerInterface
12+
{
13+
public function getRecordSerializer(): RecordSerializer;
14+
15+
public function getEncoder(): AvroEncoderInterface;
16+
17+
public function getDecoder(): AvroDecoderInterface;
18+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace App\Infrastructure\Avro;
6+
7+
use Symfony\Component\DependencyInjection\ParameterBag\ParameterBagInterface;
8+
9+
readonly class SchemaRegistryService
10+
{
11+
private string $url;
12+
13+
private bool $register_missing_schemas;
14+
15+
private bool $register_missing_subjects;
16+
17+
public function __construct(ParameterBagInterface $params)
18+
{
19+
$this->url = $params->get('schema_registry.url');
20+
$this->register_missing_schemas = (bool) $params->get('schema_registry.register_missing_schemas');
21+
$this->register_missing_subjects = (bool) $params->get('schema_registry.register_missing_subjects');
22+
}
23+
24+
public function getUrl(): string
25+
{
26+
return $this->url;
27+
}
28+
29+
public function getRegisterMissingSchemas(): bool
30+
{
31+
return $this->register_missing_schemas;
32+
}
33+
34+
public function getRegisterMissingSubjects(): bool
35+
{
36+
return $this->register_missing_subjects;
37+
}
38+
}

src/Infrastructure/Command/UserConsumerCommand.php

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,15 @@
55
namespace App\Infrastructure\Command;
66

77
use App\Domain\user\Entity\User;
8+
use App\Infrastructure\Avro\Interfaces\SchemaRegistryClientInterface;
89
use AvroSchema;
910
use Doctrine\ORM\EntityManagerInterface;
10-
use FlixTech\AvroSerializer\Objects\RecordSerializer;
11-
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
12-
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
13-
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
14-
use GuzzleHttp\Client;
1511
use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder;
1612
use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
1713
use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException;
1814
use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException;
1915
use Jobcloud\Kafka\Message\Decoder\AvroDecoder;
20-
use Jobcloud\Kafka\Message\Encoder\AvroEncoder;
21-
use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistry;
16+
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
2217
use Symfony\Component\Console\Attribute\AsCommand;
2318
use Symfony\Component\Console\Command\Command;
2419
use Symfony\Component\Console\Input\InputArgument;
@@ -32,10 +27,11 @@
3227
)]
3328
class UserConsumerCommand extends Command
3429
{
35-
private EntityManagerInterface $entityManager;
36-
37-
public function __construct(EntityManagerInterface $entityManager, string $name = null)
38-
{
30+
public function __construct(
31+
private EntityManagerInterface $entityManager,
32+
protected readonly SchemaRegistryClientInterface $schemaRegistryClient,
33+
string $name = null
34+
) {
3935
$this->entityManager = $entityManager;
4036
parent::__construct($name);
4137
}
@@ -49,23 +45,6 @@ protected function execute(InputInterface $input, OutputInterface $output): int
4945
{
5046
$io = new SymfonyStyle($input, $output);
5147

52-
$schemaRegistryClient = new CachedRegistry(
53-
new PromisingRegistry(
54-
new Client(['base_uri' => 'schema-registry:8081'])
55-
),
56-
new AvroObjectCacheAdapter()
57-
);
58-
59-
$registry = new AvroSchemaRegistry($schemaRegistryClient);
60-
61-
$recordSerializer = new RecordSerializer(
62-
$schemaRegistryClient,
63-
[
64-
RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS => false,
65-
RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS => true,
66-
]
67-
);
68-
6948
$schema = <<<'JSON'
7049
{
7150
"type": "record",
@@ -80,15 +59,13 @@ protected function execute(InputInterface $input, OutputInterface $output): int
8059

8160
$avroSchema = AvroSchema::parse($schema);
8261

83-
$decoder = new AvroDecoder($registry, $recordSerializer);
84-
8562
$consumer = KafkaConsumerBuilder::create()
8663
->withAdditionalConfig(
8764
[
8865
'enable.auto.commit' => false,
8966
]
9067
)
91-
->withDecoder($decoder)
68+
->withDecoder($this->schemaRegistryClient->getDecoder())
9269
->withAdditionalBroker('kafka:9092')
9370
->withConsumerGroup('testGroup')
9471
->withAdditionalSubscription('users')
@@ -100,7 +77,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
10077
try {
10178
$message = $consumer->consume();
10279

103-
$userData = $recordSerializer->decodeMessage($message->getBody(), $avroSchema);
80+
$userData = $this->schemaRegistryClient->getRecordSerializer()->decodeMessage($message->getBody(), $avroSchema);
10481

10582
$io->success('Message received: '. $userData['name']);
10683

0 commit comments

Comments
 (0)