Skip to content

Commit 06d7c48

Browse files
committed
Provided some clarifications in the astra db sink doc
1 parent 58971bc commit 06d7c48

File tree

1 file changed

+66
-24
lines changed

1 file changed

+66
-24
lines changed

modules/pulsar-io/pages/connectors/sinks/astra-db.adoc

Lines changed: 66 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,18 @@
55
:page-tag: astra-db,cdc,sink-connector
66
:page-aliases: docs@astra-streaming::connectors/sinks/astream-astradb-sink.adoc
77

8-
= Astra DB
8+
= Astra DB (Cassandra Enhanced)
99

10-
DataStax Astra DB Sink Connector is an enhanced version of the open-source Cassandra sink connector for Apache Pulsar.
10+
DataStax Astra DB Sink Connector is based on the open-source https://docs.datastax.com/en/pulsar-connector/docs/index.html[Cassandra sink connector for Apache Pulsar^]. Depending on how you deploy the connector, it can be used to sink topic messages with a table in Astra DB or a table in a Cassandra cluster outside of DB.
11+
12+
The Astra Streaming portal provides simple way to connect this sink and a table in Astra DB with simply a token. Using pulsar-admin or the REST API, you can configure the sink to connect with a Cassandra connection manually.
13+
14+
This reference assumes you are manually connecting to a Cassandra table.
15+
16+
[TIP]
17+
====
18+
If you would like to see the code, refer to the https://github.com/datastax/pulsar-sink[open source here^]{external-link-icon}.
19+
====
1120

1221
== Get Started
1322

@@ -25,17 +34,19 @@ include::partial$connectors/sinks/monitoring.adoc[]
2534

2635
There are two sets of parameters that support sink connectors.
2736

28-
=== Astra Streaming
37+
=== Pulsar sink connector parameters
2938

3039
[%header,format=csv,cols="2,1,1,3"]
3140
|===
3241
include::example$connectors/sinks/astra.csv[]
3342
|===
3443

35-
=== Astra DB (configs)
44+
=== Cassandra Connection
3645

3746
These values are provided in the *Configs* area.
3847

48+
The "cloud.secureConnectBundle" can either be a path to your bundle zip or you can base64 encode the zip and provide it in the format: "base64:<b64 string>".
49+
3950
[%header,format=csv,cols="2,1,1,3"]
4051
|===
4152
include::example$connectors/sinks/{connectorType}/config.csv[]
@@ -44,34 +55,65 @@ include::example$connectors/sinks/{connectorType}/config.csv[]
4455
// TODO: Need descriptions of every param
4556
=== Auth Properties
4657

58+
These values are provided in the *auth* area in the above cassandra connection parameters.
59+
4760
[%header,format=csv,cols="2,1,1,3"]
4861
|===
4962
include::example$connectors/sinks/{connectorType}/auth.csv[]
5063
|===
5164

52-
// TODO: This not really helpful. Need explanations, especially about mapping
5365
=== Topic Properties
54-
[source,json,subs="attributes+"]
66+
67+
These values are provided in the *topic* area in the above cassandra connection parameters.
68+
69+
Refer to the official documentation for a https://docs.datastax.com/en/pulsar-connector/docs/cfgRefPulsarDseConnection.html[connection properties reference^].
70+
71+
=== Mapping topic data to table columns
72+
73+
[TIP]
74+
====
75+
There are quite a few examples in the "https://docs.datastax.com/en/pulsar-connector/docs/cfgPulsarMapTopicTable.html[Mapping pulsar topics to database tables^]" area of the official documentation
76+
====
77+
78+
An essential part of using this sink connector is mapping message values to table columns. There are many factors that influence how this done and what is possible. The 'mapping' string is a simple comma-separated list of column names and message value fields.
79+
80+
While the getting started examples above show how to configure the connector in one large command, it is easier to manage this as a separate file. The following example show how to configure the connector using a configuration in json format. The "https://docs.datastax.com/en/pulsar-connector/docs/pulsarQuickStart.html[Pulsar Connector single instance quick start^]" guide provides a good example of this. Below are the minimum requirements.
81+
82+
Create a file named configs.json using the following structure:
83+
84+
[source]
5585
----
56-
"<TABLE_NAME>": {
57-
"<KEYSPACE>": {
58-
"<TABLE_NAME>": {
59-
"consistencyLevel": "LOCAL_QUORUM",
60-
"deletesEnabled": true,
61-
"mapping": "<Each field must be prefixed with either 'key' or 'value' or use the now() function. Example: 'part=value.name, id=value.id, num=value.number, fact=value.isfact, added=now()'>",
62-
"nullToUnset": true,
63-
"timestampTimeUnit": "MICROSECONDS",
64-
"ttl": -1,
65-
"ttlTimeUnit": "SECONDS"
86+
"archive": "builtin://cassandra-enhanced",
87+
"tenant": "<your tenant>",
88+
"namespace": "<your namespace>",
89+
"name": "<a really great name>",
90+
"inputs": ["<your topic name>"],
91+
"configs:": {
92+
"topics": <the topic name (just the name, not the full address)>,
93+
"cloud.secureConnectBundle": </path/to/secure-connect-database_name.zip OR base64 encode of the zip>,
94+
"topic": {
95+
"<the topic name (usually the same as above)>": {
96+
"<the table keyspace name>": {
97+
"<the table name>": {
98+
"<all the connection properties>",
99+
...
100+
"mapping": <field-to-column mapping>
66101
}
67-
},
68-
"codec": {
69-
"date": "ISO_LOCAL_DATE",
70-
"locale": "en_US",
71-
"time": "ISO_LOCAL_TIME",
72-
"timeZone": "UTC",
73-
"timestamp": "CQL_TIMESTAMP",
74-
"unit": "MILLISECONDS"
102+
}
75103
}
104+
}
76105
}
77106
----
107+
108+
Use the pulsar-admin cli to create the connector
109+
110+
[source,shell]
111+
----
112+
./bin/pulsar-admin sinks create \
113+
--name dse-sink-kv \
114+
--classname com.datastax.oss.sink.pulsar.StringCassandraSinkTask \
115+
--sink-config-file configs.json \
116+
--sink-type cassandra-enhanced
117+
----
118+
119+
To create the value for the mapping parameter you would provide direction how each column value will be filled. An example string is: `symbol=value.symbol, ts=value.ts, exchange=value.exchange, industry=value.industry, name=key, value=value.value`.

0 commit comments

Comments
 (0)