|
| 1 | += CDC Backfill CLI |
| 2 | + |
| 3 | +When CDC is enabled on a table, the data topic doesn't contain any data from before CDC was enabled. |
| 4 | +The backfill CLI solves this problem by exporting the table's primary key to a Comma Separated Values (CSV) file, storing the CSV file on disk, and sending the primary key from the CSV file to the event topic. |
| 5 | +The Cassandra Source Connector reads the primary key from the event topic and populates the data topic with historical data. |
| 6 | +The backfill CLI is powered by the https://docs.datastax.com/en/dsbulk/docs/reference/dsbulkCmd.html[DataStax Bulk Loader], a battle-tested data loader tool. This means the CLI takes full advantage of optimizations done in DSBulk when exporting data from table to disk. |
| 7 | + |
| 8 | +Developers can also use the backfill CLI to trigger change events for downstream applications without having to insert new data. |
| 9 | + |
| 10 | +[#install] |
| 11 | +== Installation |
| 12 | + |
| 13 | +The CDC backfill CLI is distributed both as a JAR file and as a Pulsar-admin extension NAR file. |
| 14 | +The Pulsar-admin extension is packaged with the DataStax Luna Streaming distribution in the /cliextensions folder, so you don't need to build from source unless you want to make changes to the code. |
| 15 | + |
| 16 | +Both artifacts are built with Gradle. |
| 17 | +To build the CLI, run the following commands: |
| 18 | +[tabs] |
| 19 | +==== |
| 20 | +Gradle:: |
| 21 | ++ |
| 22 | +-- |
| 23 | +[source,bash] |
| 24 | +---- |
| 25 | +git clone [email protected]:datastax/cdc-apache-cassandra.git |
| 26 | +cd cdc-apache-cassandra |
| 27 | +./gradlew backfill-cli:assemble |
| 28 | +---- |
| 29 | +-- |
| 30 | ++ |
| 31 | +Result:: |
| 32 | ++ |
| 33 | +-- |
| 34 | +[source,bash] |
| 35 | +---- |
| 36 | +BUILD SUCCESSFUL in 37s |
| 37 | +17 actionable tasks: 15 executed, 2 up-to-date |
| 38 | +---- |
| 39 | +-- |
| 40 | +==== |
| 41 | + |
| 42 | +Gradle generates two main artifacts: |
| 43 | + |
| 44 | +* An uber JAR file containing the CLI and all its dependencies: backfill-cli/build/libs/backfill-cli-<version>-all.jar |
| 45 | +* A NAR archive that wraps the CLI as a Pulsar-admin Extension: backfill-cli/build/libs/pulsar-cassandra-admin-<version>-nar.nar |
| 46 | +
|
| 47 | +Once the artifacts are generated, you can run the backfill CLI tool as either a standalone Java application or as a Pulsar-admin extension. |
| 48 | +[tabs] |
| 49 | +==== |
| 50 | +Java standalone:: |
| 51 | ++ |
| 52 | +-- |
| 53 | +[source,shell,subs="attributes+"] |
| 54 | +---- |
| 55 | +include::example$java-start.sh[] |
| 56 | +---- |
| 57 | +-- |
| 58 | + |
| 59 | +Pulsar-admin extension:: |
| 60 | ++ |
| 61 | +-- |
| 62 | +include::partial$extension.adoc[] |
| 63 | ++ |
| 64 | +---- |
| 65 | +include::example$extension-start.sh[] |
| 66 | +---- |
| 67 | +-- |
| 68 | +==== |
| 69 | + |
| 70 | +== Test |
| 71 | + |
| 72 | +This test quickly confirms your CDC backfill is working correctly. |
| 73 | + |
| 74 | +*Prerequisites:* |
| 75 | + |
| 76 | +* A running https://docs.datastax.com/en/installing/docs/installTARdse.html[DSE Cassandra cluster] |
| 77 | +* A running Pulsar cluster (https://pulsar.apache.org/docs/getting-started-standalone/[standalone] is fine) |
| 78 | +* Backfill CLI built with Gradle (see <<install>>) |
| 79 | + |
| 80 | +. Start DSE Cassandra from the https://docs.datastax.com/en/installing/docs/installTARdse.html[installation directory]. |
| 81 | ++ |
| 82 | +[source,bash] |
| 83 | +---- |
| 84 | +./bin/dse cassandra -f |
| 85 | +---- |
| 86 | ++ |
| 87 | +. Confirm that a Cassandra node is running: |
| 88 | ++ |
| 89 | +[source,bash] |
| 90 | +---- |
| 91 | +./bin/dse-6.8.33 nodetool status |
| 92 | +Datacenter: Cassandra |
| 93 | +===================== |
| 94 | +Status=Up/Down |
| 95 | +|/ State=Normal/Leaving/Joining/Moving/Stopped |
| 96 | +-- Address Load Owns (effective) Host ID Token Rack |
| 97 | +UN 127.0.0.1 169.82 KiB 100.0% d4a8181f-b248-431f-a218-47651a30ef4d -5094113265891965089 rack1 |
| 98 | +---- |
| 99 | + |
| 100 | +. Create a CDC-enabled table and add values to the table with CQL shell: |
| 101 | ++ |
| 102 | +[source,cql] |
| 103 | +---- |
| 104 | +./bin/dse-6.8.33 cqlsh |
| 105 | +Connected to Test Cluster at 127.0.0.1:9042. |
| 106 | +[cqlsh 6.8.0 | DSE 6.8.33 | CQL spec 3.4.5 | DSE protocol v2] |
| 107 | +cqlsh> create CREATE KEYSPACE ks1 WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}; |
| 108 | +cqlsh> CREATE KEYSPACE ks1 WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}; |
| 109 | +AlreadyExists: Keyspace 'ks1' already exists |
| 110 | +cqlsh> CREATE TABLE ks1.table1 (id text primary key, val text) with cdc=true; |
| 111 | +cqlsh> INSERT INTO ks1.table1 (id,val) VALUES ('1', 'val1'); |
| 112 | +cqlsh> INSERT INTO ks1.table1 (id,val) VALUES ('2', 'val2'); |
| 113 | +cqlsh> INSERT INTO ks1.table1 (id,val) VALUES ('3', 'val3'); |
| 114 | +---- |
| 115 | ++ |
| 116 | +. Start Pulsar standalone. |
| 117 | ++ |
| 118 | +[source,bash] |
| 119 | +---- |
| 120 | +./bin/pulsar standalone |
| 121 | +---- |
| 122 | ++ |
| 123 | +. Create a Cassandra source connector in Pulsar: |
| 124 | ++ |
| 125 | +[source,bash] |
| 126 | +---- |
| 127 | +lunastreaming-2.10.3.1 ./bin/pulsar-admin sources create \ |
| 128 | + --name csc --classname com.datastax.oss.pulsar.source.CassandraSource \ |
| 129 | + --archive connectors/pulsar-cassandra-source-2.2.2.nar \ |
| 130 | + --tenant public \ |
| 131 | + --namespace default \ |
| 132 | + --destination-topic-name "persistent://public/default/data-ks1.table1" \ |
| 133 | + --parallelism 1 \ |
| 134 | + --source-config '{ |
| 135 | + "events.topic": "persistent://public/default/events-ks1.table1", |
| 136 | + "keyspace": "ks1", |
| 137 | + "table": "table1", |
| 138 | + "contactPoints": "localhost", |
| 139 | + "port": "9042", |
| 140 | + "loadBalancing.localDc": "Cassandra" |
| 141 | + }' |
| 142 | +
|
| 143 | +Created successfully |
| 144 | +---- |
| 145 | ++ |
| 146 | +. Create a consumer subscription on the CDC data topic: |
| 147 | ++ |
| 148 | +[source,bash] |
| 149 | +---- |
| 150 | +./bin/pulsar-client consume -s subsc -n 0 -st auto_consume -p Earliest persistent://public/default/data-ks1.table1 |
| 151 | +---- |
| 152 | ++ |
| 153 | +. Now everything is set up: you have a Cassandra table with pre-existing data, a Pulsar topic with a CDC connector, and a Pulsar consumer subscription. |
| 154 | ++ |
| 155 | +*The moment of truth!* |
| 156 | +Run the backfill CLI to hydrate the data topic with the existing data in the Cassandra table: |
| 157 | ++ |
| 158 | +[source,bash] |
| 159 | +---- |
| 160 | +./bin/pulsar-admin cassandra-cdc backfill --data-dir target/export --export-host 127.0.0.1:9042 |
| 161 | + --export-username cassandra --export-password cassandra --keyspace ks1 --table table1 |
| 162 | +---- |
| 163 | ++ |
| 164 | +. You will get a lot of output in the console, but the last line should look like this: |
| 165 | ++ |
| 166 | +[source,bash] |
| 167 | +---- |
| 168 | +2023-04-14T11:38:53,421-0400 [main] INFO com.datastax.oss.cdc.backfill.importer.PulsarImporter - Pulsar Importer Summary: Import status=STATUS_OK, Read mutations from disk=3, Sent mutations=3, Failed mutations=0 |
| 169 | +---- |
| 170 | ++ |
| 171 | +Success! |
| 172 | +Your data topic is now populated with the existing data from the Cassandra table. |
| 173 | +. Check your Pulsar subscription as well to ensure Pulsar received the change events. |
| 174 | ++ |
| 175 | +.Pulsar consumer subscription output: |
| 176 | +[%collapsible] |
| 177 | +==== |
| 178 | +[source,plain] |
| 179 | +---- |
| 180 | +----- got message ----- |
| 181 | +2023-04-14T11:47:48,652-0400 [main] INFO org.apache.pulsar.client.impl.schema.AutoConsumeSchema - Configure topic schema \x00\x00\x00\x00\x00\x00\x00\x00 for topic persistent://public/default/data-ks1.table1 : {"key":{"name":"table1","schema":{"type":"record","name":"table1","namespace":"ks1","doc":"Table ks1.table1","fields":[{"name":"id","type":"string"}]},"type":"AVRO","timestamp":0,"properties":{}},"value":{"name":"table1","schema":{"type":"record","name":"table1","namespace":"ks1","doc":"Table ks1.table1","fields":[{"name":"val","type":["null","string"]}]},"type":"AVRO","timestamp":0,"properties":{}}} |
| 182 | +2023-04-14T11:47:48,654-0400 [main] INFO org.apache.pulsar.client.impl.schema.generic.MultiVersionGenericAvroReader - Load schema reader for version(0), schema is : { |
| 183 | + "name": "table1", |
| 184 | + "schema": { |
| 185 | + "type": "record", |
| 186 | + "name": "table1", |
| 187 | + "namespace": "ks1", |
| 188 | + "doc": "Table ks1.table1", |
| 189 | + "fields": [ |
| 190 | + { |
| 191 | + "name": "id", |
| 192 | + "type": "string" |
| 193 | + } |
| 194 | + ] |
| 195 | + }, |
| 196 | + "type": "AVRO", |
| 197 | + "timestamp": 0, |
| 198 | + "properties": {} |
| 199 | +} |
| 200 | +2023-04-14T11:47:48,674-0400 [main] INFO org.apache.pulsar.client.impl.schema.generic.MultiVersionGenericAvroReader - Load schema reader for version(0), schema is : { |
| 201 | + "name": "table1", |
| 202 | + "schema": { |
| 203 | + "type": "record", |
| 204 | + "name": "table1", |
| 205 | + "namespace": "ks1", |
| 206 | + "doc": "Table ks1.table1", |
| 207 | + "fields": [ |
| 208 | + { |
| 209 | + "name": "val", |
| 210 | + "type": [ |
| 211 | + "null", |
| 212 | + "string" |
| 213 | + ] |
| 214 | + } |
| 215 | + ] |
| 216 | + }, |
| 217 | + "type": "AVRO", |
| 218 | + "timestamp": 0, |
| 219 | + "properties": {} |
| 220 | +} |
| 221 | +key:[AjI=], properties:[writetime=1681487266389000], content:{key={id=2}, value={val=val2}} |
| 222 | +----- got message ----- |
| 223 | +key:[AjM=], properties:[writetime=1681487267244000], content:{key={id=3}, value={val=val3}} |
| 224 | +----- got message ----- |
| 225 | +key:[AjE=], properties:[writetime=1681487267246000], content:{key={id=1}, value={val=val1}} |
| 226 | +2023-04-14T11:48:18,905-0400 [pulsar-timer-6-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [persistent://public/default/data-ks1.table1] [subsc] [5759a] Prefetched messages: 0 --- Consume throughput received: 0.05 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.05 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0 |
| 227 | +---- |
| 228 | +==== |
| 229 | + |
| 230 | +Now that you've confirmed the backfill CLI is working, run it manually when you want to backfill data on a CDC-enabled table, or when you need to create events for existing data or upstream applications. |
| 231 | + |
| 232 | +== Parameters reference |
| 233 | + |
| 234 | +When running the backfill CLI as a Pulsar-admin extension, all `--pulsar-*` parameters are loaded from the `client.conf` file. |
| 235 | + |
| 236 | +The `--dsbulk-log-dir` is only available when running the backfill CLI as a standalone Java application. |
| 237 | + |
| 238 | +The `--export-dsbulk-option` parameter passes extra parameters to DSBulk. |
| 239 | +The relevant DSBulk settings configure the CSV connector and can be found https://github.com/datastax/dsbulk/blob/1.10.x/manual/settings.md#connector.csv[here^]. |
| 240 | +Shortened option names are not supported. |
| 241 | + |
| 242 | +In both the Pulsar-admin extension and the standalone Java application, C* related configurations are exclusively passed as command line arguments. |
| 243 | + |
| 244 | +.CDC backfill CLI parameters |
| 245 | +[cols=2*,options="header"] |
| 246 | +|=== |
| 247 | +|Parameter |
| 248 | +|Description |
| 249 | + |
| 250 | +|--data-dir=PATH |
| 251 | +|The directory where data will be exported to and imported from. The |
| 252 | +default is a 'data' subdirectory in the current working directory. |
| 253 | +The data directory will be created if it does not exist. Tables will |
| 254 | +be exported in subdirectories of the data directory specified here; |
| 255 | +there will be one subdirectory per keyspace inside the data |
| 256 | +directory, then one subdirectory per table inside each keyspace |
| 257 | +directory. |
| 258 | +|--help, -h |
| 259 | +|Displays this help message |
| 260 | +|--dsbulk-log-dir=PATH, -l |
| 261 | +|The directory where DSBulk should store its logs. The default is a |
| 262 | +'logs' subdirectory in the current working directory. This |
| 263 | +subdirectory will be created if it does not exist. Each DSBulk |
| 264 | +operation will create a subdirectory inside the log directory |
| 265 | +specified here. This command is not available in the Pulsar-admin extension. |
| 266 | +|--export-bundle=PATH |
| 267 | +|The path to a secure connect bundle to connect to the Cassandra |
| 268 | +cluster, if that cluster is a DataStax Astra cluster. Options |
| 269 | +--export-host and --export-bundle are mutually exclusive. |
| 270 | +|--export-consistency=CONSISTENCY |
| 271 | +|The consistency level to use when exporting data. The default is |
| 272 | +LOCAL_QUORUM. |
| 273 | +|--export-max-concurrent-files=NUM\|AUTO |
| 274 | +|The maximum number of concurrent files to write to. Must be a positive |
| 275 | +number or the special value AUTO. The default is AUTO. |
| 276 | +|--export-max-concurrent-queries=NUM\|AUTO |
| 277 | +|The maximum number of concurrent queries to execute. Must be a |
| 278 | +positive number or the special value AUTO. The default is AUTO. |
| 279 | +|--export-splits=NUM\|NC |
| 280 | +|The maximum number of token range queries to generate. Use the NC |
| 281 | +syntax to specify a multiple of the number of available cores, e.g. |
| 282 | +8C = 8 times the number of available cores. The default is 8C. This |
| 283 | +is an advanced setting; you should rarely need to modify the default |
| 284 | +value. |
| 285 | +|--export-dsbulk-option=OPT=VALUE |
| 286 | +|An extra DSBulk option to use when exporting. Any valid DSBulk option |
| 287 | +can be specified here, and it will be passed as-is to the DSBulk |
| 288 | +process. DSBulk options, including driver options, must be passed as |
| 289 | +'--long.option.name=<value>'. Short options are not supported. For more DSBulk options, see https://docs.datastax.com/en/dsbulk/docs/reference/commonOptions.html[here]. |
| 290 | +|--export-host=HOST[:PORT] |
| 291 | +|The host name or IP and, optionally, the port of a node from the |
| 292 | +Cassandra cluster. If the port is not specified, it will default to |
| 293 | +9042. This option can be specified multiple times. Options |
| 294 | +--export-host and --export-bundle are mutually exclusive. |
| 295 | +|--export-password |
| 296 | +|The password to use to authenticate against the origin cluster. |
| 297 | +Options --export-username and --export-password must be provided |
| 298 | +together, or not at all. Omit the parameter value to be prompted for |
| 299 | +the password interactively. |
| 300 | +|--export-protocol-version=VERSION |
| 301 | +|The protocol version to use to connect to the Cassandra cluster, e.g. |
| 302 | +'V4'. If not specified, the driver will negotiate the highest |
| 303 | +version supported by both the client and the server. |
| 304 | +|--export-username=STRING |
| 305 | +|The username to use to authenticate against the origin cluster. |
| 306 | +Options --export-username and --export-password must be provided |
| 307 | +together, or not at all. |
| 308 | +|--keyspace=<keyspace>, -k |
| 309 | +|The name of the keyspace where the table to be exported exists |
| 310 | +|--max-rows-per-second=PATH |
| 311 | +|The maximum number of rows per second to read from the Cassandra |
| 312 | +table. Setting this option to any negative value or zero will |
| 313 | +disable it. The default is -1. |
| 314 | +|--table=<table>, -t |
| 315 | +|The name of the table to export data from for cdc back filling |
| 316 | +|--version, -v |
| 317 | +|Displays version info. |
| 318 | +|=== |
| 319 | + |
| 320 | +== Pulsar connectivity parameters |
| 321 | + |
| 322 | +Pulsar connectivity parameters are auto-populated from the `client.conf` file available to the CLI when used as a Pulsar-admin extension. |
| 323 | +These parameters should be passed as command line arguments in the standalone Java application. |
| 324 | + |
| 325 | +.Pulsar connectivity parameters |
| 326 | +[cols=2*,options="header"] |
| 327 | +|=== |
| 328 | +|Parameter |
| 329 | +|Description |
| 330 | +|--events-topic-prefix=<topicPrefix> |
| 331 | +|The event topic name prefix. The `<keyspace_name>.<table_name>` is appended to that prefix to build the topic name. |
| 332 | +|--pulsar-auth-params=<pulsarAuthParams> |
| 333 | +|The Pulsar authentication parameters. |
| 334 | +|--pulsar-auth-plugin-class-name=<pulsarAuthPluginClassName> |
| 335 | +|The Pulsar authentication plugin class name. |
| 336 | +|--pulsar-url=<pulsarServiceUrl> |
| 337 | +|The Pulsar broker service URL. |
| 338 | +|--pulsar-ssl-provider=<sslProvider> |
| 339 | +|The SSL/TLS provider to use. |
| 340 | +|--pulsar-ssl-truststore-path=<sslTruststorePath> |
| 341 | +|The path to the SSL/TLS truststore file. |
| 342 | +|--pulsar-ssl-truststore-password=<sslTruststorePassword> |
| 343 | +|The password for the SSL/TLS truststore. |
| 344 | +|--pulsar-ssl-truststore-type=<sslTruststoreType> |
| 345 | +|The type of the SSL/TLS truststore. |
| 346 | +|--pulsar-ssl-keystore-path=<sslKeystorePath> |
| 347 | +|The path to the SSL/TLS keystore file. |
| 348 | +|--pulsar-ssl-keystore-password=<sslKeystorePassword> |
| 349 | +|The password for the SSL/TLS keystore. |
| 350 | +|--pulsar-ssl-cipher-suites=<sslCipherSuites> |
| 351 | +|Defines one or more cipher suites to use for negotiating the SSL/TLS connection. |
| 352 | +|--pulsar-ssl-enabled-protocols=<sslEnabledProtocols> |
| 353 | +|Enabled SSL/TLS protocols |
| 354 | +|--pulsar-ssl-allow-insecure-connections |
| 355 | +|Allows insecure connections to servers whose certificate has not been signed by an approved CA. You should always disable `sslAllowInsecureConnection` in production environments. |
| 356 | +|--pulsar-ssl-enable-hostname-verification |
| 357 | +|Enable the server hostname verification. |
| 358 | +|--pulsar-ssl-tls-trust-certs-path=<tlsTrustCertsFilePath> |
| 359 | +|The path to the trusted TLS certificate file. |
| 360 | +|--pulsar-ssl-use-key-store-tls |
| 361 | +|If TLS is enabled, specifies whether to use KeyStore type as TLS configuration parameter. |
| 362 | +|=== |
| 363 | + |
| 364 | +== What's next? |
| 365 | + |
| 366 | +* xref:index.adoc[CDC Home] |
| 367 | +* https://docs.datastax.com/en/dsbulk/docs/reference/dsbulkCmd.html[DataStax Bulk Loader] |
0 commit comments