diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala index 311596d03..251d66b94 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala @@ -1,5 +1,7 @@ package com.datastax.spark.connector.cql +import java.io.IOException + import com.datastax.spark.connector._ import com.datastax.spark.connector.mapper.{DataFrameColumnMapper, ColumnMapper} import org.apache.spark.Logging @@ -92,7 +94,8 @@ case object RegularColumn extends ColumnRole case class ColumnDef( columnName: String, columnRole: ColumnRole, - columnType: ColumnType[_]) extends FieldDef { + columnType: ColumnType[_], + clusteringOrder: ClusteringOrder = ClusteringOrder.ASC) extends FieldDef { def ref: ColumnRef = ColumnName(columnName) def isStatic = columnRole == StaticColumn @@ -128,6 +131,15 @@ object ColumnDef { val columnType = ColumnType.fromDriverType(column.getType) ColumnDef(column.getName, columnRole, columnType) } + + def apply( + column: ColumnMetadata, + columnRole: ColumnRole, + clusteringOrder: ClusteringOrder): ColumnDef = { + + val columnType = ColumnType.fromDriverType(column.getType) + ColumnDef(column.getName, columnRole, columnType, clusteringOrder) + } } /** A Cassandra table metadata that can be serialized. */ @@ -138,7 +150,8 @@ case class TableDef( clusteringColumns: Seq[ColumnDef], regularColumns: Seq[ColumnDef], indexes: Seq[IndexDef] = Seq.empty, - isView: Boolean = false) extends StructDef { + isView: Boolean = false, + tableOptions: Map[String,String] = Map.empty) extends StructDef { require(partitionKey.forall(_.isPartitionKeyColumn), "All partition key columns must have role PartitionKeyColumn") require(clusteringColumns.forall(_.isClusteringColumn), "All clustering columns must have role ClusteringColumn") @@ -185,10 +198,24 @@ case class TableDef( val clusteringColumnNames = clusteringColumns.map(_.columnName).map(quote) val primaryKeyClause = (partitionKeyClause +: clusteringColumnNames).mkString(", ") - s"""CREATE TABLE ${quote(keyspaceName)}.${quote(tableName)} ( + val stmt = s"""CREATE TABLE ${quote(keyspaceName)}.${quote(tableName)} ( | $columnList, | PRIMARY KEY ($primaryKeyClause) |)""".stripMargin + + val clusteringOrderingClause = clusteringColumns.map( col => s"${quote(col.columnName)} ${col.clusteringOrder}") + .mkString("CLUSTERING ORDER BY (", ", ",")") + + val tableOptionsString:Seq[String] = tableOptions.map(option => s"${option._1} = ${option._2}").toSeq + + val tableOptionsClause:Seq[String] = if (clusteringColumns.size > 0) + tableOptionsString.+:(clusteringOrderingClause) + else tableOptionsString + + if (tableOptionsClause.size > 0) + s"""$stmt${Properties.lineSeparator}WITH ${tableOptionsClause.mkString(s"${Properties.lineSeparator} AND ")}""" + else + stmt } type ValueRepr = CassandraRow diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala index 01c289d6a..f644392d2 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala @@ -1,8 +1,9 @@ package com.datastax.spark.connector.cql -import com.datastax.spark.connector.{PartitionKeyColumns, TTL, SomeColumns, AllColumns} +import com.datastax.driver.core.ClusteringOrder +import com.datastax.spark.connector.{AllColumns, PartitionKeyColumns, SomeColumns, TTL} import com.datastax.spark.connector.types._ -import org.scalatest.{WordSpec, Matchers} +import org.scalatest.{Matchers, WordSpec} class TableDefSpec extends WordSpec with Matchers { @@ -32,7 +33,28 @@ class TableDefSpec extends WordSpec with Matchers { | "c2" varchar, | "c3" varchar, | PRIMARY KEY (("c1"), "c2") - |)""".stripMargin + |) + |WITH CLUSTERING ORDER BY ("c2" ASC)""".stripMargin + ) + } + + "it contains clustering columns with order" in { + val column1 = ColumnDef("c1", PartitionKeyColumn, IntType) + val column2 = ColumnDef("c2", PartitionKeyColumn, VarCharType) + val column3 = ColumnDef("c3", ClusteringColumn(0), VarCharType, ClusteringOrder.DESC) + val column4 = ColumnDef("c4", ClusteringColumn(1), VarCharType) + val column5 = ColumnDef("c5", RegularColumn, VarCharType) + val tableDef = TableDef("keyspace", "table", Seq(column1, column2), Seq(column3, column4), Seq(column5)) + tableDef.cql should be( + """CREATE TABLE "keyspace"."table" ( + | "c1" int, + | "c2" varchar, + | "c3" varchar, + | "c4" varchar, + | "c5" varchar, + | PRIMARY KEY (("c1", "c2"), "c3", "c4") + |) + |WITH CLUSTERING ORDER BY ("c3" DESC, "c4" ASC)""".stripMargin ) } @@ -51,7 +73,8 @@ class TableDefSpec extends WordSpec with Matchers { | "c4" varchar, | "c5" varchar, | PRIMARY KEY (("c1", "c2"), "c3", "c4") - |)""".stripMargin + |) + |WITH CLUSTERING ORDER BY ("c3" ASC, "c4" ASC)""".stripMargin ) } @@ -67,6 +90,40 @@ class TableDefSpec extends WordSpec with Matchers { |)""".stripMargin ) } + + "it contains options" in { + val column1 = ColumnDef("c1", PartitionKeyColumn, IntType) + val column2 = ColumnDef("c2", RegularColumn, VarCharType) + val column3 = ColumnDef("c3", RegularColumn, VarCharType) + val tableDef = TableDef("keyspace", "table", Seq(column1), Seq.empty, Seq(column2,column3), + tableOptions=Map("bloom_filter_fp_chance" -> "0.01")) + tableDef.cql should be( + """CREATE TABLE "keyspace"."table" ( + | "c1" int, + | "c2" varchar, + | "c3" varchar, + | PRIMARY KEY (("c1")) + |) + |WITH bloom_filter_fp_chance = 0.01""".stripMargin + ) + } + + "it contains clustering column and options" in { + val column1 = ColumnDef("c1", PartitionKeyColumn, IntType) + val column2 = ColumnDef("c2", ClusteringColumn(0), VarCharType) + val column3 = ColumnDef("c3", RegularColumn, VarCharType) + val tableDef = TableDef("keyspace", "table", Seq(column1), Seq(column2), Seq(column3),tableOptions=Map("bloom_filter_fp_chance" -> "0.01")) + tableDef.cql should be( + """CREATE TABLE "keyspace"."table" ( + | "c1" int, + | "c2" varchar, + | "c3" varchar, + | PRIMARY KEY (("c1"), "c2") + |) + |WITH CLUSTERING ORDER BY ("c2" ASC) + | AND bloom_filter_fp_chance = 0.01""".stripMargin + ) + } } } }