This tool transforms Cassandra’s SSTable(s) to Parquet or Avro file(s). SSTables can be stored locally or it is possible to stream them remotely directly from a cluster. Spark integration is implemented as well.
The source code compiles to Java 11.
This will create "bundled" JAR artifact with all dependencies.
./mvnw clean install
This will leave out all dependencies which are already in Spark so we do not need to bundle them.
./mvnw clean install -Pspark
This will build RPM and Debian packages for bundled artifact.
./mvnw clean install -Pdefault,deb,rpm
When packages are installed, there will be /usr/local/bin/sstable-transformer
script which
invokes the tool.
The tool has two subcommands for now, called transform
and partitions
.
$ java -jar target/sstable-transformer-1.0.0-bundled.jar help Usage: transformer [-hV] [COMMAND] Transforms Cassandra SSTable to Parquet or Avro file -h, --help Show this help message and exit. -V, --version Print version information and exit. Commands: transform Transform SSTables to Parquet or Avro files. partitions Realize how many Spark partitions your Cassandra ring consists of. help Display help information about the specified command.
The best explanation will be an example.
Imagine you have a directory with SSTables from Cassandra. Let’s say that you have two SSTables.
You can look at the problem of "transformation of SSTables" in two ways:
-
You want to transform each SSTable into one Parquet file.
-
You want to transform all SSTables into one Parquet file.
If you transform each SSTable into a Parquet file, that means that you will end up with two Parquet files. On the other hand, if you transform all SSTables into one Parquet file, two SSTables will be basically reduced into one Parquet file, obviously.
There are consequences for each transformation strategy.
Imagine we have a table like this:
CREATE TABLE spark_test.test3 ( id int PRIMARY KEY, col1 int, col2 int )
and you insert data in this fashion:
cqlsh> INSERT INTO spark_test.test3(id , col1 ) VALUES ( 1, 10); shell> nodetool flush cqlsh> INSERT INTO spark_test.test3(id , col2 ) VALUES ( 1, 20); shell> nodetool flush
Because we flushed, we end up with 2 SSTables on disk. Due to the nature of the transformation (explained later on), if we transform these 2 SSTables into one Parquet file, the data in Parquet file will be effectively compacted, they would be presented to you the same way as if you selected data in Cassandra:
id|col1|col2 1 | 10| 20
On the other hand, if you transformed each SSTable to a Parquet file, they will be just transformed and that’s it. There will be two Parquet files with this content:
the first Parquet file:
id|col1|col2 1| 10|null
the second Parquet file:
id|col1|col2 1|null| 20
The strategy of the transformation is specified via --strategy
option, and it can have two values:
-
ONE_FILE_ALL_SSTABLES
-
ONE_FILE_PER_SSTABLE
The default is ONE_FILE_PER_SSTABLE
when not specified,
so you will have as many Parquet files as many input SSTables there are.
For compression of Parquet files, there is a support of UNCOMPRESSED
, SNAPPY
and ZSTD
.
This option, --compression
, is just pushed down to a Parquet writer and data are compressed accordingly.
Same compression algorithms are available for Avro output format.
Exhaustive enumeration of all parameters is located at the end of this document.
We can process data which are co-located with this tool - locally on disk, or we can process data directly from Cassandra.
For the former case, we expect SSTables to be in a directory, or we can enumerate
them one by one. We expect that whole SSTables will be present even though
we are checking the presence of data files only (-Data.db
component).
SSTables or directories with them to process are specified with flag --input
.
For transforming data remotely, we do that when --input
is not present, and --sidecar
option is specified
(which we can repeat). By doing so,
we point this tool to Apache Cassandra Sidecar which might in practice
run on a different host. Sidecar instance(s) will read data from disk of a node Cassandra runs at, and it will
stream data via HTTP to Spark Cassandra Analytics where we will iterate over them (over Spark’s InternalRow
)
and we will transform them to Parquet data. This iteration occurs also in case of local processing as SparkRowIterator
from Cassandra Analytics is agnostic to what DataLayer
implementation is used with it.
For remote transformation, we expect Spark partitions to process to be specified. This is done via --partitions
flag.
By default, all partitions are processed. In case of a need to process data in a more granular manner, a user needs to specify particular partitions:
--partitions=0 - this will process only partition 0 --partition=0,1,2,3 - this will process partitions 0, 1, 2 and 3 --partitions=0..10 - this will process parititions from 0 to 10 (both included)
It is important to realize that partitions are internally mapped to ranges. Each range is owned by a respective Cassandra node. Hence, for successful transformation of remote data, we need sidecars which can read local data of each Cassandra node.
For example, if we have a cluster consisting of three nodes, each on a physically distinct machine while each node would have 16 tokens (together 48), this would map to Spark 48 partitions. Each partition would be then mapped to specific token range. A token range is owned by a specific Cassandra node (as a primary replica), so streaming would occur from that Sidecar while backup replicas would be used in case of node’s unavailability.
If there is a necessity to create Parquet files which contain at maximum certain number of rows, this can be
specified by --max-rows-per-file
option.
This utility uses Apache Cassandra Analytics subproject and its DataLayer abstraction.
For the purposes of the local transformation, we are using LocalDataLayer
which will be looking at local disk
when processing SSTables. For remote transformation, we are using CassandraDataLayer
.
Next, we put either data layer implementation to Analytics' SparkRowIterator
which will transparently
read data from supplied SSTables over which we iterate, one row at a time.
We can use both transformation strategies while using LocalDataLayer
because it has
direct access to the disk. However, we can use only ONE_FILE_ALL_SSTABLES
for
CassandraDataLayer
(remote processing) because we can not remotely point Sidecar to
one SSTable only as this is all hidden behind SparkRowIterator
. You can further specify maximum amount
of rows in one file, even with ONE_FILE_ALL_SSTABLES
option, so data are split into multiple files, each having
maximum number rows.
SparkRowIterator
is encapsulating all complexity when it comes to the compaction and reading from multiple SSTables
so it seems like we are just getting one continuous stream of rows which are already compacted
internally.
Then, we create a ParquetWriter
. ParquetWriter
needs an Avro schema. This schema is obtained by calling
Spark’s SchemaConverters.toAvroType
, where its first argument, StructType
, is internally constructed by DataLayer
from --create-table-statement
we supplied to the tool.
For now, only simple / primitive data types are supported.
Note
|
Please use the tool with the bellow JDK options for java command, they are not showed in the examples for brevity,
you can also see how it is used in ./run.sh script.
|
-DSKIP_STARTUP_VALIDATIONS=true -Dfile.encoding=UTF-8 -Djdk.attach.allowAttachSelf=true --add-exports java.base/jdk.internal.misc=ALL-UNNAMED --add-exports java.base/jdk.internal.ref=ALL-UNNAMED --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-exports java.management.rmi/com.sun.jmx.remote.internal.rmi=ALL-UNNAMED --add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports java.rmi/sun.rmi.server=ALL-UNNAMED --add-exports java.sql/java.sql=ALL-UNNAMED --add-opens java.base/java.lang.module=ALL-UNNAMED --add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens java.base/jdk.internal.ref=ALL-UNNAMED --add-opens java.base/jdk.internal.reflect=ALL-UNNAMED --add-opens java.base/jdk.internal.math=ALL-UNNAMED --add-opens java.base/jdk.internal.module=ALL-UNNAMED --add-opens java.base/jdk.internal.util.jar=ALL-UNNAMED --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED
SSTables remotely on Cassandra nodes, streamed by Sidecars.
java -jar target/sstable-transformer-1.0.0-bundled.jar transform \ --keyspace=spark_test \ --table=test3 \ --output="/tmp/output-dir" \ --sidecar sidecar-node-1:9043 \ --sidecar sidecar-node-2:9043 \ --sidecar sidecar-node-3:9043
java -jar target/sstable-transformer.jar transform \ --create-table-statement='CREATE TABLE spark_test.test3 (id int PRIMARY KEY, col1 int, col2 int)' \ --strategy=ONE_FILE_PER_SSTABLE \ --input=/tmp/input-dir-with-sstables \ --output=/tmp/output-dir
Specifying a directory for --input
, each SSTable in it will be transformed to one Parquet file
under random name, e.g. /tmp/output-dir/4882f391-ddfb-45db-94e2-4e9499212ace.parquet
When --output=/tmp/my-transformation.parquet
, all SSTables will be transformed into one Parquet file
called like that.
java -jar target/sstable-transformer.jar transform \ --create-table-statement='CREATE TABLE spark_test.test3 (id int PRIMARY KEY, col1 int, col2 int)' \ --strategy=ONE_FILE_ALL_SSTABLES \ --input=/tmp/input-tables \ --output=/tmp/my-transformation.parquet
It is possible to use this tool together with Spark. First, you need to build the tool with spark
profile.
Next we need to realize how many partitions there are in your Cassandra cluster. Yo do this either
by following code:
val partitions = CassandraPartitionsResolver.partitions("dc1", "spark_test", "spark-master-1", 9043).toSeq
or you can invoke this tool like this:
$ java -jar target/sstable-transformer-1.0.0-bundled.jar partitions \ --dc=dc1 \ --keyspace=spark_test \ --sidecar=sidecar-1:9043
If you know all your partitions, you can start to build transformer options. You can re-use this builder and
pass a partition to it in map
. The end result is that Spark will parallelize our partitions
and it will
run one transformation on that particular partition. That will invoke Transformer on a Spark worker while
processing a particular partition it was assigned to.
This will effectively invoke remote transformation on a Spark worker by reading data from a cluster for given
partition, and it will store transformed Parquet files into /data/transformed
directory (where all Spark workers will
be storing their output Parquet files as well.)
val options = new PartitionResolverOptions options.sidecar = "sidecar-1:9043" options.dc = "dc1" options.keyspace = "ks" options.rf = 3 val partitions = new CassandraPartitionsResolver(options).getPartitions.toSeq val builder = new TransformerOptions.Builder() .keyspace("ks") .table("test") .maxRowsPerFile(100000) .output("/data/transformed") .outputFormat(TransformerOptions.OutputFormat.PARQUET) .sidecar("sidecar-1:9043") .sidecar("sidecar-2:9043") .sidecar("sidecar-3:9043") // we parallelize partitions, each one will transform respective data to Parquet files // "files" will contain all Parquet files with transformed data val files = sc.parallelize(partitions, 6).map(p => { new SSTableTransformer(builder.partition(p).build()) .runTransformation() .asScala.toList.map(t => t.getPath) }).collect().flatten
You can produce deb and rpm packages by -Pdeb
and -Prpm
profile respectively.
$ java -jar target/sstable-transformer-1.0.0-bundled.jar transform help Usage: transformer transform [--bloom-filter] [--keep-snapshot] [--sorted] [--compression=<compression>] [--create-table-statement=<createTableStmt>] [--keyspace=<keyspace>] [--max-rows-per-file=<maxRowsPerFile>] --output=<output> [--output-format=<outputFormat>] [--parallelism=<parallelism>] [--partitions=<partitions>] [--strategy=<transformationStrategy>] [--table=<table>] [--input=<input>...]... [--sidecar=<sidecar>...]... [COMMAND] Transform SSTables to Parquet or Avro files. --bloom-filter Flag for telling whether bloom filter should be used upon writing of a Parquet file. --compression=<compression> Use compression for output files, it can be UNCOMPRESSED, SNAPPY, ZSTD. --create-table-statement=<createTableStmt> CQL statement as for table creation. You do not need to specify it for remote data layer. --input=<input>... List of directories or individual files to transform. Directories can be mixed with files. You do not need to specify it if you specify --sidecar --keep-snapshot Flag for telling whether we should keep snapshot used for remote transformation. --keyspace=<keyspace> Cassandra keyspace name. You do not need to specify it for local data layers. --max-rows-per-file=<maxRowsPerFile> Maximal number of rows per file. Has to be bigger than 0. Defaults to undefined which will put all rows to one file. --output=<output> Output file or destination --output-format=<outputFormat> Output format of data, either AVRO or PARQUET --parallelism=<parallelism> Number of transformation tasks to run simultaneously. Defaults to number of processors. --partitions=<partitions> Spark partitions to process. Can be a number, a range (n..m), or enumeration (1,2,3...). Defaults to all partitions. --sidecar=<sidecar>... List of sidecar hostnames with ports. --sorted Flag for telling whether rows in each file should be sorted or not. Use with caution as sorting will happen in memory and all Spark rows will be held in memory until sorting is done. For large datasets, use this flag together with --max-rows-per-file so sorting will be limited to number of rows per that option only. --strategy=<transformationStrategy> Whether to convert all SSTables into one file or there will be one output file per SSTable. Can be one of ONE_FILE_PER_SSTABLE, ONE_FILE_ALL_SSTABLES. Defaults to ONE_FILE_PER_SSTABLE - can not be used when --sidecar is specified. --table=<table> Cassandra table name. You do not need to specify it for local data layer. Commands: help Display help information about the specified command.
java -jar target/sstable-transformer-1.0.0-bundled.jar help partitions Usage: transformer partitions --dc=<dc> --keyspace=<keyspace> --rf=<rf> --sidecar=<sidecar> [COMMAND] Realize how many Spark partitions your Cassandra ring consists of. --dc=<dc> --keyspace=<keyspace> --rf=<rf> --sidecar=<sidecar> Commands: help Display help information about the specified command.