Skip to content

Latest commit

 

History

History
349 lines (251 loc) · 11.6 KB

File metadata and controls

349 lines (251 loc) · 11.6 KB

Quarkus Debezium Extension

Quarkus Debezium Extension should provide a simple way to receive data source events inside a Quarkus Native Application and apply some logic to them (see image) thanks to the engine and an appropriate connector.

In order to be able to receive those events from the data-source, the actual way to achieve it is to import the debezium-engine and a connector like in this way:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${version.debezium}</version>
</dependency>

This approach doesn't work out-of-the-box in situations in which we want to build a native image of the application.

Module Organization (Debezium quarkus engine with Quarkus connector)

The module proposed contains the engine and the connector like in this way:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-quarkus-mysql</artifactId>
    <version>${version.debezium}</version>
</dependency>

With this solution, the configuration property delegated to define the connector class should be unavailable and already defined inside the extension:

connector.class=io.debezium.connector.mysql.MySqlConnector

Quarkus Debezium Extension configuration

The extensions must be configurable using the properties and yaml like any Quarkus application. The configuration properties available for the debezium engine must be available using a prefix quarkus.debezium.xxx like:

quarkus.debezium.configuration.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
quarkus.debezium.configuration.database.hostname=localhost
quarkus.debezium.configuration.database.port=5432
quarkus.debezium.configuration.database.user=postgresuser
quarkus.debezium.configuration.database.password=postgrespw
quarkus.debezium.configuration.database.dbname=postgresuser
quarkus.debezium.configuration.snapshot.mode=never

Apart from the usual configuration properties like data source addresses, Debezium traditionally follows a configuration over code approach, defining certain behavioral aspects using external configuration files interpreted at runtime. However, this approach changes in the Quarkus extension, which favors code over configuration—or more specifically, annotation over configuration. In this model, some features of the Debezium Engine are exposed through annotations, making Debezium instrumentation more expressive and developer-friendly.

Quarkus Debezium Extension DI

Debezium internally use the ServiceRegistry to inject and manage object lifecycle thanks to ServiceLoader mechanism. Quarkus includes a lightweight CDI implementation called ArC which can be used to manage the classes loaded through the ServiceLoader.

Quarkus Debezium Extension additional feature

The extension permits to address some use-cases already present in Debezium but in a Quarkus way:

Quarkus Debezium Engine Instance

The proposal introduces an interface Debezium as an abstraction that exposes the engine's snapshot status.

/**
 * The Debezium engine abstraction in the Quarkus CDI
 * <p>
 * The Engine is submitted to an {@link Executor} or {@link ExecutorService} for execution by a single thread
 */
public interface Debezium {

    /**
     * @return engine's signaler, if it supports signaling
     * @throws UnsupportedOperationException if signaling is not supported by this engine
     */
    Signaler signaler();

    /**
     * @return engine's configuration
     */
    Map<String, String> configuration();

    /**
     * @return engine's status information
     */
    DebeziumManifest manifest();
}

example of usage:

@Path("/api/debezium")
public class DebeziumEndpoint {

    @Inject
    private Debezium debezium;

    @GET
    @Path("manifest")
    public DebeziumManifest getManifest() {
        return debezium.manifest();
    }
}

Quarkus Debezium Lifecycle Events

We can summarize the lifecycle of a Debezium Embedded Engine in the follows steps:

Phase description code
initialization configuration is built and the engine is created, but not yet running after the configuration is validated
startup connectors are initialized, DB connection established right after engine.run() is invoked
shutdown engine terminates (graceful or with error) observable via CompletitionCallback

The Quarkus Debezium extension allows you to be notified of the engine's state using the following annotation:

import io.debezium.engine.source;
import jakarta.enterprise.context.ApplicationScoped;


@ApplicationScoped
class DebeziumEngineLifeCycle {
    
    public void init(@Observes DebeziumConfigured event) {
        /// some logic to apply
    }
    
    public void startup(@Observes DebeziumStarted event) {
        /// some logic to apply 
    }
    
    public void shutdown(@Observes DebeziumShutdown event) {
        /// some logic to apply
    }
}

Quarkus Debezium Heartbeats events

In Debezium, heartbeat events are lightweight, periodic messages emitted when no database changes occur. They confirm that the connector is alive and still connected to the source, helping to detect liveness. In the Quarkus Debezium Extension can detect heartbeats events in this way:

import io.debezium.engine.ChangeEvent;
import jakarta.enterprise.context.ApplicationScoped;  


@ApplicationScoped  
class HeartbeatListener {
    
    public void heartbeat(@Observes DebeziumHeartbeat event) {  
        /// some logic to apply 
    }  
}

Quarkus Debezium Listener

a Quarkus Developer using a Debezium Listener can intercept events (INSERT, UPDATE, DELETE...) from a table like order, with a simple annotation like:

import io.debezium.engine.ChangeEvent;
import jakarta.enterprise.context.ApplicationScoped;  


@ApplicationScoped  
class OrderListener {
  
    @Capturing("order")  
    public void capture(ChangeEvent<String, String> event) {  
        /// some logic to apply 
    }  
}

or in batch:

import io.debezium.engine.ChangeEvent;
import jakarta.enterprise.context.ApplicationScoped;  


@ApplicationScoped  
class OrderListener {
  
    @Capturings("order")  
    public void capture(List<ChangeEvent<String, String>> events) {  
        /// some logic to apply
    }  
}

even listen only a certain type of event (using for example the configuration skipped.operations)

import io.debezium.engine.InsertEvent;
import jakarta.enterprise.context.ApplicationScoped;  
import io.debezium.engine.quarkus.Operation.INSERT;

@ApplicationScoped  
class OrderListener {
  
    @Capturings("order", INSERT)  
    public void capture(List<InsertEvent<String, String>> events) {  
        /// some logic to apply
    }  
}

It will support also SourceRecord objects.

Custom Debezium Data Converter

It should be possible to receive events mapped as data classes like:

public record Order(long id, String name, int price) {}
import io.debezium.engine.InsertEvent;
import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped  
class OrderListener {  
  
    @Capturing("order")  
    public void listener(InsertEvent<String, Order> event) {  
        /// some logic to apply
    }  
}

using something similar for the quarkus kafka library:

package com.acme.order.jackson;

import io.quarkus.debezium.client.serialization.ObjectMapperDeserializer;

public class OrderDeserializer extends ObjectMapperDeserializer<Order> {
    public OrderDeserializer() {
        super(Order.class);
    }
}
quarkus.debezium.deserializer=com.acme.order.jackson.OrderDeserializer

Quarkus Debezium SchemaChange Listener

Debezium automatically detects and captures schema changes in the source database, such as adding or removing columns, modifying data types, or altering primary keys. These changes are parsed from the database's DDL statements and used to update Debezium's internal schema history, ensuring that change events reflect the current table structure. The Quarkus extension can expose a SchemaChangeEvent that can be observed:

import io.debezium.engine.InsertEvent;
import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped  
class SchemaChangeListener {  
    
    public void listener(@Observes DebeziumSchemaChange event) {  
        /// some logic to apply
    }  
}

Quarkus Debezium Notification Handler

Debezium notifications provide a mechanism to obtain status information about the connector. It should be possible to receive notifications in the quarkus extension like:

import io.debezium.engine.InsertEvent;
import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped  
class SnapshotListener {  
    
    public void handler(@Observes DebeziumNotification notification) {
        /// some logic to apply
    }
    
}

Quarkus Debezium PostProcessor

In Debezium Embedded Engine, a PostProcessor is an internal hook used to modify or transform events right before they are emitted to the application’s .notifying() callback. In the quarkus extension can be accessible by:

import io.debezium.engine.InsertEvent;
import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped  
class CustomPostProcessor {  
  
    @DebeziumPostProcessor()  
    public void process(Object key, Struct value) {
        /// some logic to apply
    }
    
}

Quarkus Debezium Custom Converter

A custom converter in Debezium is a way to transform specific column values as they're read from the source database before they're emitted in the change event.

Quarkus Debezium Relational Converter

import io.debezium.engine.InsertEvent;
import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
class RawToStringConverter {

    @DebeziumRelationalConverter(typeName="RAW")
    public ConverterDefinition<SchemaBuilder> bind(RelationalColumn raw) { 
        return new ConverterDefinition<>(SchemaBuilder.string(), Object::toString);
    }
    
}

*example based on a simplified RawToStringConverter.

Considerations

This approach that is inspired by Kafka can be useful for the development of Debezium Server.