Skip to content

Commit 3654225

Browse files
authored
Merge pull request #28 from datastax/telling-the-cdc-story
Added CDC area to learning
2 parents 2215079 + 5fb9d33 commit 3654225

File tree

10 files changed

+315
-1
lines changed

10 files changed

+315
-1
lines changed
3.14 KB
Loading
3.71 KB
Loading
4.38 KB
Loading
4.96 KB
Loading
3.11 KB
Loading

modules/use-cases-architectures/nav.adoc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,9 @@
44
* xref:starlight/index.adoc[Pulsar extensions]
55
** xref:starlight/kafka/index.adoc[Starlight for Kafka]
66
** xref:starlight/rabbitmq/index.adoc[Starlight for RabbitMQ]
7-
** xref:starlight/jms/index.adoc[Starlight for JMS]
7+
** xref:starlight/jms/index.adoc[Starlight for JMS]
8+
9+
* xref:change-data-capture/index.adoc[Change data capture]
10+
** xref:change-data-capture/table-schema-evolution.adoc[Table schema evolution]
11+
** xref:change-data-capture/consuming-change-data.adoc[Consuming change data]
12+
** xref:change-data-capture/questions-and-patterns.adoc[Questions and patterns]
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
= Consuming change data with Apache Pulsar
2+
David Dieruf <david[email protected]>
3+
1.0, March 13, 2023: Consuming change data with Apache Pulsar
4+
:description: This article describes how to consume change data with Apache Pulsar.
5+
:title: Consuming change data with Apache Pulsar
6+
:navtitle: Consuming change data with Apache Pulsar
7+
8+
NOTE: This article is a continuation of the "xref:change-data-capture/index.adoc[]" article. Please read that ahead of this article to understand the fundamentals of what resources are being used.
9+
10+
== Pulsar clients
11+
12+
Each client handles message consumption a little differently but there is one overall pattern to follow. As we learned in the previous sections, a CDC message will arrive as an Avro GenericRecord of type KeyValue. Typically, the first step will be to separate the key and value portions of the message. You will find the Cassandra table's key fields in the key of the record and the change data in the value portion of the record. Both of which are Avro records themselves. From there you'll want to deserialize the Avro record and extract the interesting info.
13+
14+
Below are example implementations for each runtime consuming messages from the CDC data topic.
15+
16+
[NOTE]
17+
While these examples are in an “astra-streaming-examples” repository, they are not Astra specific. You can use these to consume CDC data topics in your own Cassandra/Pulsar clusters.
18+
19+
[cols="^1,^1,^1,^1,^1", grid=none,frame=none]
20+
|===
21+
| https://github.com/datastax/astra-streaming-examples/blob/master/csharp/astra-cdc/Program.cs[image:csharp-icon.png[]^]
22+
23+
https://github.com/datastax/astra-streaming-examples/blob/master/csharp/astra-cdc/Program.cs[C#^]{external-link-icon}
24+
| https://github.com/datastax/astra-streaming-examples/blob/master/go/astra-cdc/main/main.go[image:golang-icon.png[]^]
25+
26+
https://github.com/datastax/astra-streaming-examples/blob/master/go/astra-cdc/main/main.go[Golang^]{external-link-icon}
27+
| https://github.com/datastax/astra-streaming-examples/blob/master/java/astra-cdc/javaexamples/consumers/CDCConsumer.java[image:java-icon.png[]^]
28+
29+
https://github.com/datastax/astra-streaming-examples/blob/master/java/astra-cdc/javaexamples/consumers/CDCConsumer.java[Java^]{external-link-icon}
30+
| https://github.com/datastax/astra-streaming-examples/blob/master/nodejs/astra-cdc/consumer.js[image:node-icon.png[]^]
31+
32+
https://github.com/datastax/astra-streaming-examples/blob/master/nodejs/astra-cdc/consumer.js[Node.js^]{external-link-icon}
33+
| https://github.com/datastax/astra-streaming-examples/blob/master/python/astra-cdc/cdc_consumer.py[image:python-icon.png[]^]
34+
35+
https://github.com/datastax/astra-streaming-examples/blob/master/python/astra-cdc/cdc_consumer.py[Python^]{external-link-icon}
36+
|===
37+
38+
== Pulsar functions
39+
40+
It is very common to have a function consuming the CDC data.Functions usually do added processing on the data and continue to another topic. Similar to a client consumer, it will need to deserialize the message data. Below are examples of different functions consuming messages from the CDC data topic.
41+
42+
[NOTE]
43+
While these examples are in an “astra-streaming-examples” repository, they are not Astra specific. You can use these to consume CDC data topics in your own Cassandra/Pulsar clusters.
44+
45+
[cols="^1,^1,^1", grid=none,frame=none]
46+
|===
47+
| https://github.com/datastax/astra-streaming-examples/blob/master/go/astra-cdc/main/main.go[image:golang-icon.png[]^]
48+
49+
https://github.com/datastax/astra-streaming-examples/blob/master/go/astra-cdc/main/main.go[Golang^]{external-link-icon}
50+
| https://github.com/datastax/astra-streaming-examples/blob/master/java/astra-cdc/javaexamples/functions/CDCFunction.java[image:java-icon.png[]^]
51+
52+
https://github.com/datastax/astra-streaming-examples/blob/master/java/astra-cdc/javaexamples/functions/CDCFunction.java[Java^]{external-link-icon}
53+
| https://github.com/datastax/astra-streaming-examples/blob/master/python/cdc-in-pulsar-function/deschemaer.py[image:python-icon.png[]^]
54+
55+
https://github.com/datastax/astra-streaming-examples/blob/master/python/cdc-in-pulsar-function/deschemaer.py[Python^]{external-link-icon}
56+
|===
57+
58+
== Next
59+
60+
You're ready to tackle CDC like a pro! Use our "xref:use-cases-architectures:change-data-capture/questions-and-patterns.adoc[]" as reference as you near production.
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
= Change Data Capture (CDC) pattern with Apache Cassandra and Apache Pulsar
2+
David Dieruf <david[email protected]>
3+
1.0, March 13, 2023: About change data capture
4+
:description: This article describes how to capture changes in an Apache Cassandra database and publish them to Apache Pulsar as events.
5+
:title: CDC with Cassandra and Pulsar
6+
:navtitle: CDC with Cassandra and Pulsar
7+
8+
Change Data Capture (CDC) is a design pattern used in software development to capture and propagate changes made to data in a system. The CDC pattern is commonly used in real-time data streaming applications to enable near-real-time processing of data changes. In a typical CDC implementation, a change to a row of data (insert, update, delete) is detected and recorded. The change (or mutation) is made available to downstream systems as an event for further processing. This allows applications to react quickly to changes in the data while not adding unneeded load on the data store, enabling real-time data processing and analytics.
9+
10+
Before we get into the specifics of CDC let’s first understand the resources needed to complete the flow.
11+
12+
== Pulsar source connectors
13+
14+
Source connectors in Apache Pulsar are responsible for ingesting data from external sources into the Pulsar system. They can be used to collect data from a variety of sources including databases, message queues, and file systems. When the source connector “sees” data, it streams it to a Pulsar topic. This enables users to easily integrate data from disparate sources into their Pulsar-based applications. Source connectors make it easy to ingest, process, and analyze large volumes of data from a variety of sources into Pulsar.
15+
16+
Pulsar offers extensible APIs where one can use a defined interface to develop their own connector. The interface takes much of the boilerplate burdens away from a developer and gets them right to the purpose of the connector. Creating a connector means adding in the know-how to work with data from the source and adapt it to produce a compliant message with the Pulsar client.
17+
18+
As you’ll learn in the next section, among the processes needed to capture change data, the Cassandra source connector is a very important part. To run a source connector you provide configuration about what data will be selected, how to connect with the upstream system, and the destination topic for the new message. The source connector takes care of producing the message. Pulsar source connectors run as Pulsar functions within the cluster. So many of the features of functions (like the # of instances to run and how to configure its running environment), apply. Metrics and logs for a Source Connector are automatically made a part of the Cluster.
19+
20+
[discrete]
21+
=== Monitoring source connectors
22+
23+
Monitoring a source connector includes two areas, health and performance. Every connector in Pulsar emits basic metrics about its health. It includes stats like the number of records received from the source and the number of messages written to the destination topic. It also includes debugging metrics like the number of exceptions thrown by the source. Refer to the https://pulsar.apache.org/docs/reference-metrics/#connectors[connectors area of Pulsar metrics^]{external-link-icon} for a complete list and explanation of metrics.
24+
25+
Performance metrics include health metrics as well as specific knowledge about the source. The Cassandra Source Connector includes quite a few performance metrics. Refer to the "https://docs.datastax.com/en/cdc-for-cassandra/docs/2.2.2/monitor.html[Monitoring CDC for Cassandra]" reference.
26+
27+
[discrete]
28+
=== Source connector logs
29+
30+
Most Pulsar source connectors emit logs that show lifecycle events as well as custom events specific to the connector type. All logs are handled the same way core cluster logs are handled. By default, they are written to the console and collected by log4j destinations. If you are using function workers you can access log files on their disk. Refer to Pulsar’s https://pulsar.apache.org/docs/io-debug/[connector debugging guide^]{external-link-icon} for more information.
31+
32+
== Pulsar schemas and the schema registry
33+
34+
Apache Pulsar schema registry is a feature of a Pulsar cluster that enables users to manage the schemas of messages sent and received on Pulsar topics. In Pulsar, messages are stored as bytes. Schemas provide a way to serialize and deserialize messages with a particular structure or type, allowing for interoperability between different systems.
35+
36+
The schema registry in Pulsar stores and manages the schema definitions for all message types sent and received in Pulsar. The schema registry can enforce schema compatibility rules, such as requiring a producer to send messages that conform to a certain schema, or rejecting messages that don't match the schema.
37+
38+
Schemas follow a primitive or complex type. Primitive schemas are simple data types like bool, int, string, and float. Because Pulsar is written in Java that is where the primitives are based. When a different client runtime is used, a conversation may need to be done. Refer to the https://pulsar.apache.org/docs/schema-understand/#primitive-type[Pulsar primitive types table^]{external-link-icon} for a full reference.
39+
40+
Complex schemas introduce a more structured way of messaging. The two types of complex messages are KeyValue and Struct. KeyValue is JSON formatted text that offers a separation of custom labels and their values. Struct is a custom class definition set as Avro, Json, or Protobuf.
41+
42+
KeyValue offers an interesting way to encode a message called “Separated”. This option separates a message key and the message payload. This in turn, has the option to store message key information as a different data type than the message payload. It also offers special compression capabilities. CDC takes advantage of separating KeyValue messages when it produces both the event and data topic. Learn more in the "https://docs.datastax.com/en/cdc-for-cassandra/docs/2.2.2/cdc-cassandra-events.html[CDC for Cassandra Events]" reference.
43+
44+
[discrete]
45+
=== Namespace schema configurations
46+
47+
In the context of CDC there are a few schema configurations of note. All of these are specific to the namespace where the event and data topics are logically located.
48+
49+
- *schema-compatibility-strategy*: this setting is for producers, it directs the Broker how to handle new schemes introduced to existing topics. In CDC this is relevant when you change a table’s design. For example if a new column is added, the schema is changed to include that new value. This configuration decides if the namespace will allow this. CDC uses a strategy of “BACKWARD_TRANSITIVE”. This means adding new optional fields and the removal of fields are both allowed. Learn more about the different types of strategies in the https://pulsar.apache.org/docs/next/schema-understand/#schema-compatibility-check-strategy[pulsar docs^]{external-link-icon}.
50+
51+
- *allow-auto-update-schema*: given the compatibility strategy this setting is a flag deciding if an update to the schema is generally allowed. CDC sets this to ‘true’, so changes in a table’s design can automatically propagate to the topic.
52+
53+
- *schema-autoupdate-strategy*: when auto update is enabled (true) this setting is for consumers. It directs the Broker how to ensure consumers of a topic are able to process messages. If a consumer attempts to connect with a schema that does not match the current schema, this strategy will decide if it is allowed to receive messages. CDC sets this to “BACKWARDTRANSITIVE” which means if optional table columns have been added or a column has been removed, the old schema is allowed.
54+
55+
- *schema-validation-enforce*: this flag can limit how producers and consumers are allowed to be configured. When enabled (true) producer and consumer clients must have a schema set before sending the message. When disabled (false) Pulsar will allow producers and consumers without a set schema to send or receive messages. CDC disables this option (false), so producers and consumers do not have to know the message schema ahead of time.
56+
57+
== Cassandra change data capture (CDC) agent
58+
59+
The Cassandra CDC agent is a process running on each node in a Cassandra cluster, watching for data changes on tables that have enabled the CDC feature. Using Cassandra’s https://cassandra.apache.org/doc/4.0/cassandra/configuration/cass_yaml_file.html#commitlog_sync[commitlog_sync option^]{external-link-icon}, the agent periodically syncs a separate log in a special “cdc_raw” directory. Each log entry is a CDC event. It creates a new event message and produces a downstream Pulsar cluster, containing the row coordinates of the changed data. For more information about the agent, how to include its configuration in cassandra.yaml, and event data specifics read the "https://docs.datastax.com/en/cdc-for-cassandra/docs/2.2.2/index.html[DataStax CDC for Apache Cassandra® Documentation]".
60+
61+
== Cassandra Source Connector for Apache Pulsar
62+
63+
Each table that has CDC enabled also has a corresponding source connector in Pulsar. This is unlike the CDC agent where the process runs on each Cassandra node, keeping a log of all table changes. Each table-specific source connector subscribes to the events topic the agent is producing messages to. When the connector “sees” a message for its table, it uses the row coordinates within the message to retrieve the mutated data from Cassandra and create a new message with the specifics. That new message is written to a data topic where others can subscribe and receive CDC messages. For more information about the Cassandra Source Connector, its configuration, and how to create it read the "https://docs.datastax.com/en/cdc-for-cassandra/docs/2.2.2/index.html[DataStax CDC for Apache Cassandra® Documentation]".
64+
65+
[discrete]
66+
=== Event deduplication
67+
68+
A particular advantage in the Source Connector is its deduplication feature. You might have read about Pulsar’s built in https://pulsar.apache.org/docs/2.11.x/concepts-messaging/#message-deduplication[deduplication capabilities^]{external-link-icon}. This is not utilized in the message flow because CDC needs a finer grain control to detect duplicates. As the CDC agent discovers a new commit log, an authentic identifier is created using the MD5 hash algorithm. That key identifier is added to the event message.
69+
70+
When message consumers like the Source Connector connect to the event topic, they establish a subscription type. Pulsar has 3 types exclusive, shared, failover, and key_shared. In a typical CDC flow the Source Connector will have multiple instances running in parallel. When multiple consumers are a part of a key_shared subscription, Pulsar will deliver a duplicate hash key to the same consumer no matter how many times it’s sent.
71+
72+
When a Cassandra cluster has multiple hosts (with multiple commit logs), and they all use the same mutation to calculate the same hash key, the same consumer will always receive it. Each Source Connector keeps a cache of hashes it has seen and ensures duplicates are dropped before producing the data message.
73+
74+
Learn more about Pulsar’s key_shared subscription type in https://pulsar.apache.org/docs/2.11.x/concepts-messaging/#key_shared[Pulsar documentation^]{external-link-icon}.
75+
76+
== Putting together the CDC flow
77+
78+
Now that you understand the different resources used in this CDC pattern, let’s follow the flow to see how a CDC message is produced.
79+
80+
. Create a Pulsar tenant to hold CDC messages
81+
.. Create a namespace (or use the “default”)
82+
.. Create a topic for event messages
83+
.. Create a topic for data messages
84+
. Start the CDC source connector in Pulsar by setting the destination topic (aka the data topic), the event topic, and Cassandra connection info (along with other settings)
85+
. Configure the Cassandra change agent with a working directory and the pulsar service URL (along with other settings) in the Cassandra node (restart is required)
86+
. Create a Cassandra table and enable CDC
87+
. Insert a row of data into the table
88+
.. The change agent will detect a mutation to the table and write a log
89+
.. The log will be converted to an event message and written to the events topic
90+
.. The source connector will complete the flow by producing a final change message to the data topic
91+
92+
== Next
93+
94+
With a solid understanding of the resources and flow used within the CDC pattern, let's move on to the next section to learn about xref:use-cases-architectures:change-data-capture/table-schema-evolution.adoc[].

0 commit comments

Comments
 (0)