diff --git a/README.md b/README.md index 66ddcdf22..c735ffd46 100644 --- a/README.md +++ b/README.md @@ -125,4 +125,6 @@ The netty transport code work is based on [Apache Storm](http://storm.apache.org The cgroup code work is based on [JStorm](https://github.com/alibaba/jstorm). Thanks JStorm contributors. +The cassandra integration code work is based on [spark-cassandra-connector](https://github.com/datastax/spark-cassandra-connector). Thanks spark-cassandra-connector contributors. + Thanks to Jetbrains for providing a [IntelliJ IDEA Free Open Source License](https://www.jetbrains.com/buy/opensource/?product=idea). diff --git a/experiments/cassandra-examples/src/main/scala/org/apache/gearpump/experiments/cassandra/CassandraTransform.scala b/experiments/cassandra-examples/src/main/scala/org/apache/gearpump/experiments/cassandra/CassandraTransform.scala new file mode 100644 index 000000000..2d86d7058 --- /dev/null +++ b/experiments/cassandra-examples/src/main/scala/org/apache/gearpump/experiments/cassandra/CassandraTransform.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra + +import java.net.InetAddress +import java.util.Date + +import akka.actor.ActorSystem +import com.twitter.bijection.Bijection +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import org.apache.gearpump.experiments.cassandra.lib.BoundStatementBuilder.BoundStatementBuilder +import org.apache.gearpump.experiments.cassandra.lib.RowExtractor.RowExtractor +import org.apache.gearpump.experiments.cassandra.lib.TimeStampExtractor.TimeStampExtractor +import org.apache.gearpump.experiments.cassandra.lib._ +import org.apache.gearpump.experiments.cassandra.lib.connector.{CassandraConnectorConf, CqlWhereClause} +import org.apache.gearpump.streaming.StreamApplication +import org.apache.gearpump.streaming.sink.DataSinkProcessor +import org.apache.gearpump.streaming.source.DataSourceProcessor +import org.apache.gearpump.util.Graph._ +import org.apache.gearpump.util.{AkkaApp, Graph} + +object CassandraTransform extends AkkaApp with ArgumentsParser { + + // CREATE KEYSPACE example + // WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } + + // CREATE TABLE example.sensor_data( + // id text, + // inserted timestamp, + // temperature int, + // location text, + // PRIMARY KEY(id, inserted) + // ) + + // CREATE TABLE example.sensor_data_by_location( + // id text, + // inserted timestamp, + // temperature int, + // location text, + // PRIMARY KEY(location, inserted) + // ) + + // INSERT INTO example.sensor_data(id, inserted, temperature, location) + // VALUES('1', '2016-07-01 15:00:00', 26, 'New York') + + // INSERT INTO example.sensor_data(id, inserted, temperature, location) + // VALUES('2', '2016-07-01 15:00:00', 28, 'New York') + + // INSERT INTO example.sensor_data(id, inserted, temperature, location) + // VALUES('2', '2016-07-02 15:00:00', 25, 'New York') + + // INSERT INTO example.sensor_data(id, inserted, temperature, location) + // VALUES('3', '2016-07-01 15:00:00', 14, 'London') + + private[this] val query = + """ + |SELECT * FROM example.sensor_data + """.stripMargin + + private[this] val insert = + """ + |INSERT INTO example.sensor_data_by_location(id, inserted, temperature, location) + |VALUES(?, ?, ?, ?) + """.stripMargin + + override val options: Array[(String, CLIOption[Any])] = Array( + "host" -> CLIOption[String]("", required = false, + defaultValue = Some("127.0.0.1")), + "port" -> CLIOption[Int]("", required = false, + defaultValue = Some(9042)), + "source" -> CLIOption[Int]("", required = false, + defaultValue = Some(1)), + "sink" -> CLIOption[Int]("", required = false, + defaultValue = Some(1)) + ) + + def application(config: ParseResult, system: ActorSystem): StreamApplication = { + implicit val actorSystem = system + val sourceNum = config.getInt("source") + val sinkNum = config.getInt("sink") + val cassandraHost = InetAddress.getByName(config.getString("host")) + val cassandraPort = config.getInt("port") + + val appConfig = UserConfig.empty + + val connectorConf = CassandraConnectorConf(hosts = Set(cassandraHost), port = cassandraPort) + + case class SensorData(id: String, inserted: Date, temperature: Int, location: String) + + implicit val statementBuilder: BoundStatementBuilder[Long] = value => Seq() + + implicit val timeStampExtractor: TimeStampExtractor = row => + row.getTimestamp("inserted").getTime + + implicit val rowExtractor: RowExtractor[SensorData] = row => + SensorData( + row.getString("id"), + row.getTimestamp("inserted"), + row.getInt("temperature"), + row.getString("location")) + + val source = new CassandraSource[SensorData]( + connectorConf, + ReadConf(), + "example", + "sensor_data", + Seq("id", "inserted", "temperature", "location"), + Seq("id"), + Seq("inserted"), + CqlWhereClause.empty) + + val sourceProcessor = DataSourceProcessor(source, sourceNum) + + implicit val statementBuilder2: BoundStatementBuilder[SensorData] = value => + Seq( + value.id, + value.inserted, + Bijection[Int, java.lang.Integer](value.temperature), + value.location) + + val sink = new CassandraSink[SensorData](connectorConf, WriteConf(), insert) + val sinkProcessor = DataSinkProcessor(sink, sinkNum) + + val computation = sourceProcessor ~> sinkProcessor + val app = StreamApplication("CassandraTransform", Graph(computation), appConfig) + app + } + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + val context = ClientContext(akkaConf) + val appId = context.submit(application(config, context.system)) + context.close() + } +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/CassandraSink.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/CassandraSink.scala new file mode 100644 index 000000000..119ced343 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/CassandraSink.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra + +import com.datastax.driver.core.Session +import org.apache.gearpump.Message +import org.apache.gearpump.experiments.cassandra.lib.BoundStatementBuilder.BoundStatementBuilder +import org.apache.gearpump.experiments.cassandra.lib._ +import org.apache.gearpump.experiments.cassandra.lib.connector.CassandraConnectorConf +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.task.TaskContext + +// TODO: Analyse query, compute token ranges, automatically convert types, batch, ... +class CassandraSink[T: BoundStatementBuilder] ( + connectorConf: CassandraConnectorConf, + conf: WriteConf, + writeCql: String) + extends DataSink + with Logging { + + private[this] var connector: CassandraConnector = _ + private[this] var session: Session = _ + + private[this] var writer: Option[TableWriter[T]] = None + + // TODO: Non blocking + def open(context: TaskContext): Unit = { + connector = new CassandraConnector(connectorConf) + session = connector.openSession() + + writer = Some(new TableWriter[T](connector, session.prepare(writeCql), conf)) + } + + def write(message: Message): Unit = writer.foreach(_.write(message.msg.asInstanceOf[T])) + + def close(): Unit = connector.evictCache() +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/CassandraSource.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/CassandraSource.scala new file mode 100644 index 000000000..8f109d538 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/CassandraSource.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra + +import java.time.Instant + +import com.datastax.driver.core.{Row, Session, Statement} +import org.apache.gearpump.Message +import org.apache.gearpump.experiments.cassandra.lib.RowExtractor.RowExtractor +import org.apache.gearpump.experiments.cassandra.lib.TimeStampExtractor.TimeStampExtractor +import org.apache.gearpump.experiments.cassandra.lib._ +import org.apache.gearpump.experiments.cassandra.lib.connector._ +import org.apache.gearpump.experiments.cassandra.lib.connector.partitioner.{CassandraPartitionGenerator, CqlTokenRange} +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.TaskContext + +// TODO: Analyse query, automatically convert types, ... +// TODO: Make a TimeReplayableSource +class CassandraSource[T: RowExtractor]( + connectorConf: CassandraConnectorConf, + conf: ReadConf, + keyspace: String, + table: String, + columns: Seq[String], + partitionKeyColumns: Seq[String], + clusteringKeyColumns: Seq[String], + where: CqlWhereClause, + clusteringOrder: Option[ClusteringOrder] = None, + limit: Option[Long] = None + )(implicit timeStampExtractor: TimeStampExtractor) + extends DataSource + with Logging { + + private[this] var iterator: Option[Iterator[Row]] = None + private[this] var connector: CassandraConnector = _ + private[this] var session: Session = _ + + private[this] var watermark: Instant = Instant.EPOCH + + protected val rowExtractor = implicitly[RowExtractor[T]] + + private def tokenRangeToCqlQuery(range: CqlTokenRange[_, _]): (String, Seq[Any]) = { + val (cql, values) = if (where.containsPartitionKey) { + ("", Seq.empty) + } else { + range.cql(partitionKeyColumns.mkString(",")) + } + val filter = (cql +: where.predicates).filter(_.nonEmpty).mkString(" AND ") + val limitClause = limit.map(limit => s"LIMIT $limit").getOrElse("") + val orderBy = clusteringOrder.map(_.toCql(clusteringKeyColumns)).getOrElse("") + val selectColums = columns.mkString(",") + val queryTemplate = + s"SELECT $selectColums " + + s"FROM $keyspace.$table " + + s"WHERE $filter $orderBy $limitClause ALLOW FILTERING" + val queryParamValues = values ++ where.values + (queryTemplate, queryParamValues) + } + + private def createStatement(session: Session, cql: String, values: Any*): Statement = { + val stmt = session.prepare(cql) + stmt.setConsistencyLevel(conf.consistencyLevel) + val bstm = stmt.bind(values.map(_.asInstanceOf[AnyRef]): _*) + bstm.setFetchSize(conf.fetchSizeInRows) + bstm + } + + private def fetchTokenRange( + session: Session, + range: CqlTokenRange[_, _] + ): Iterator[Row] = { + + val (cql, values) = tokenRangeToCqlQuery(range) + val stmt = createStatement(session, cql, values: _*) + val rs = session.execute(stmt) + new PrefetchingResultSetIterator(rs, conf.fetchSizeInRows) + } + + // TODO: Non blocking + override def open(context: TaskContext, startTime: Instant): Unit = { + connector = new CassandraConnector(connectorConf) + session = connector.openSession() + + val partitioner = if (where.containsPartitionKey) { + CassandraPartitionGenerator(connector, keyspace, table, Some(1), conf.splitSizeInMB) + } else { + CassandraPartitionGenerator(connector, keyspace, table, conf.splitCount, conf.splitSizeInMB) + } + + val assignedTokenRanges = + new DefaultPartitionGrouper() + .group(context.parallelism, context.taskId.index, partitioner.partitions) + + this.watermark = startTime + + iterator = + Some( + assignedTokenRanges + .iterator + .flatMap(fetchTokenRange(session, _: CqlTokenRange[_, _]))) + } + + override def read(): Message = + iterator.map { i => + if (i.hasNext) { + val message = i.next() + val timeStamp = timeStampExtractor(message) + + Message(rowExtractor(message), timeStamp) + } else { + null + } + }.orNull + + override def close(): Unit = connector.evictCache() + + // TODO: Watermark only set at open. Make this a TimeReplayableSource + override def getWatermark: Instant = watermark +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/CassandraStore.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/CassandraStore.scala new file mode 100644 index 000000000..3465d5569 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/CassandraStore.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra + +import java.nio.ByteBuffer + +import scala.collection.JavaConverters._ + +import com.twitter.bijection.Bijection +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.experiments.cassandra.AbstractCassandraStoreFactory._ +import org.apache.gearpump.experiments.cassandra.CassandraStore._ +import org.apache.gearpump.experiments.cassandra.lib.connector.CassandraConnectorConf +import org.apache.gearpump.experiments.cassandra.lib.{CassandraConnector, Logging, StoreConf} +import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory} + +object AbstractCassandraStoreFactory { + private def createKeyspace(keyspaceName: String, replicationCql: String): String = + s""" + |CREATE KEYSPACE IF NOT EXISTS $keyspaceName + |WITH REPLICATION = $replicationCql + """.stripMargin + + private def createTable(keyspaceName: String, tableName: String, compactionCql: String): String = + s""" + |CREATE TABLE IF NOT EXISTS $keyspaceName.$tableName( + | name text, + | timestamp bigint, + | checkpoint blob, + | PRIMARY KEY (name, timestamp)) + |WITH compaction = $compactionCql + """.stripMargin +} + +class AbstractCassandraStoreFactory( + connectorConf: CassandraConnectorConf, + storeConf: StoreConf) + extends CheckpointStoreFactory { + + // TODO: Non blocking + override def getCheckpointStore(name: String): CheckpointStore = { + val connector = new CassandraConnector(connectorConf) + val session = connector.openSession() + session.execute(createKeyspace(storeConf.keyspaceName, storeConf.replicationStrategyCql)) + session.execute( + createTable(storeConf.keyspaceName, storeConf.tableName, storeConf.compactionStrategyCql)) + + connector.evictCache() + new CassandraStore(name, connectorConf, storeConf) + } +} + +object CassandraStore { + private def writeCheckpoint(keyspaceName: String, tableName: String): String = + s""" + |INSERT INTO $keyspaceName.$tableName (name, timestamp, checkpoint) + |VALUES (?, ?, ?) + """.stripMargin + + private def readCheckpoint(keyspaceName: String, tableName: String): String = + s""" + |SELECT * FROM $keyspaceName.$tableName + |WHERE name = ? + |AND timestamp = ? + """.stripMargin +} + +class CassandraStore private[cassandra] ( + name: String, + connectorConf: CassandraConnectorConf, + storeConf: StoreConf) + extends CheckpointStore + with Logging { + + private[this] val connector = new CassandraConnector(connectorConf) + private[this] val session = connector.openSession() + + private[this] val preparedRead = + session.prepare(readCheckpoint(storeConf.keyspaceName, storeConf.tableName)) + private[this] val preparedWrite = + session.prepare(writeCheckpoint(storeConf.keyspaceName, storeConf.tableName)) + + // TODO: Non blocking + override def persist(timeStamp: TimeStamp, checkpoint: Array[Byte]): Unit = + session.execute( + preparedWrite + .bind(name, Bijection[Long, java.lang.Long](timeStamp), ByteBuffer.wrap(checkpoint)) + .setConsistencyLevel(storeConf.persistConsistencyLevel)) + + // TODO: Non blocking + override def recover(timestamp: TimeStamp): Option[Array[Byte]] = { + session + .execute( + preparedRead + .bind(name, Bijection[Long, java.lang.Long](timestamp)) + .setConsistencyLevel(storeConf.recoverConsistencyLevel)) + .all() + .asScala + .headOption.map(_.getBytes("checkpoint").array()) + } + + override def close(): Unit = connector.evictCache() +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/dsl/CassandraDSLSink.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/dsl/CassandraDSLSink.scala new file mode 100644 index 000000000..e6fde3be0 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/dsl/CassandraDSLSink.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.dsl + +import scala.concurrent.ExecutionContext + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.experiments.cassandra.CassandraSink +import org.apache.gearpump.experiments.cassandra.lib.BoundStatementBuilder.BoundStatementBuilder +import org.apache.gearpump.experiments.cassandra.lib.WriteConf +import org.apache.gearpump.experiments.cassandra.lib.connector.CassandraConnectorConf +import org.apache.gearpump.streaming.dsl + +class CassandraDSLSink[T: BoundStatementBuilder](stream: dsl.Stream[T]) { + + def writeToCassandra( + connectorConf: CassandraConnectorConf, + conf: WriteConf, + query: String, + parallelism: Int, + description: String + )(implicit ec: ExecutionContext): dsl.Stream[T] = + stream.sink( + new CassandraSink[T](connectorConf, conf, query), + parallelism, + UserConfig.empty, + description) +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/BoundStatementBuilder.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/BoundStatementBuilder.scala new file mode 100644 index 000000000..041c574d2 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/BoundStatementBuilder.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib + +object BoundStatementBuilder { + type BoundStatementBuilder[A] = A => Seq[Object] +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/CassandraConnector.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/CassandraConnector.scala new file mode 100644 index 000000000..7db13c007 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/CassandraConnector.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib + +import com.datastax.driver.core.Session +import org.apache.gearpump.experiments.cassandra.lib.connector.CassandraConnectorConf + +// TODO: Proper cluster, session and statement management +class CassandraConnector(conf: CassandraConnectorConf) extends Serializable { + + private[this] var session: Session = _ + private[this] var counter = 0 + + private def openSessionInternal() = synchronized { + if (counter == 0) { + val cluster = conf.connectionFactory.createCluster(conf) + session = cluster.connect() + } + + counter = counter + 1 + session + } + + def openSession(): Session = + openSessionInternal() + + def evictCache(): Unit = synchronized { + if (counter > 0) { + counter = 0 + val cluster = session.getCluster + session.close() + cluster.close() + } + } + + def close(session: Session): Unit = synchronized { + if (counter > 0) { + counter = counter - 1 + + if (counter == 0) { + val cluster = session.getCluster + session.close() + cluster.close() + } + } + } + + // TODO: Ensure the caller can not close the session + def withSession[T](block: Session => T): T = { + val session = openSession() + val result = block(session) + close(session) + result + } +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/DefaultPartitionGrouper.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/DefaultPartitionGrouper.scala new file mode 100644 index 000000000..4d9e0ac6d --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/DefaultPartitionGrouper.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib + +import scala.math.ceil + +import org.apache.gearpump.experiments.cassandra.lib.connector.partitioner.dht.Token +import org.apache.gearpump.experiments.cassandra.lib.connector.partitioner.{CassandraPartition, CqlTokenRange} + +// TODO: Group based on locality +class DefaultPartitionGrouper { + def group[V, T <: Token[V]]( + taskNum: Int, + taskIndex: Int, + cassandraPartitions: Seq[CassandraPartition[V, T]] + ): Seq[CqlTokenRange[V, T]] = { + + val tokenRanges = cassandraPartitions.flatMap(_.tokenRanges) + val array = + tokenRanges.grouped(ceil(tokenRanges.size.toDouble / taskNum.toDouble).toInt).toArray + array(taskIndex) + } +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/Logging.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/Logging.scala new file mode 100644 index 000000000..0ac5e317c --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/Logging.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib + +import org.apache.gearpump.util.LogUtil +import org.slf4j.Logger + +private[cassandra] trait Logging { + protected val LOG: Logger = LogUtil.getLogger(getClass) +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/ReadConf.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/ReadConf.scala new file mode 100644 index 000000000..923091d00 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/ReadConf.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib + +import com.datastax.driver.core.ConsistencyLevel + +case class ReadConf( + splitCount: Option[Int] = None, + splitSizeInMB: Int = 64, + fetchSizeInRows: Int = 1000, + consistencyLevel: ConsistencyLevel = ConsistencyLevel.LOCAL_ONE) diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/RowExtractor.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/RowExtractor.scala new file mode 100644 index 000000000..ef83317cb --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/RowExtractor.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib + +import com.datastax.driver.core.Row + +object RowExtractor { + type RowExtractor[A] = Row => A +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/StoreConf.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/StoreConf.scala new file mode 100644 index 000000000..7900a567d --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/StoreConf.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib + +import com.datastax.driver.core.ConsistencyLevel + +case class StoreConf( + keyspaceName: String = "gearpump", + tableName: String = "checkpoints", + replicationStrategyCql: String = "{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }", + persistConsistencyLevel: ConsistencyLevel = ConsistencyLevel.LOCAL_ONE, + recoverConsistencyLevel: ConsistencyLevel = ConsistencyLevel.LOCAL_ONE, + compactionStrategyCql: String = "{'class' : 'SizeTieredCompactionStrategy' }") diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/TableWriter.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/TableWriter.scala new file mode 100644 index 000000000..8323e8bbf --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/TableWriter.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib + +import java.io.IOException + +import com.datastax.driver.core.PreparedStatement +import org.apache.gearpump.experiments.cassandra.lib.BoundStatementBuilder.BoundStatementBuilder +import org.apache.gearpump.experiments.cassandra.lib.connector.QueryExecutor + +class TableWriter[T: BoundStatementBuilder] ( + connector: CassandraConnector, + statement: PreparedStatement, + writeConf: WriteConf) { + + def write(data: T) { + val session = connector.openSession() + val queryExecutor = new QueryExecutor(session, writeConf.parallelismLevel, None, None) + + // TODO: Batch writes? + val stmtToWrite = statement + .setConsistencyLevel(writeConf.consistencyLevel) + .bind(implicitly[BoundStatementBuilder[T]].apply(data): _*) + + queryExecutor.executeAsync(stmtToWrite) + queryExecutor.waitForCurrentlyExecutingTasks() + + if (!queryExecutor.successful) { + throw new IOException(s"Failed to write statements to $statement.") + } + } +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/TimeStampExtractor.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/TimeStampExtractor.scala new file mode 100644 index 000000000..ebd46aa40 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/TimeStampExtractor.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib + +import com.datastax.driver.core.Row +import org.apache.gearpump.TimeStamp + +object TimeStampExtractor { + type TimeStampExtractor = Row => TimeStamp +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/WriteConf.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/WriteConf.scala new file mode 100644 index 000000000..060c69baa --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/WriteConf.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib + +import com.datastax.driver.core.ConsistencyLevel + +case class WriteConf( + consistencyLevel: ConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM, + parallelismLevel: Int = 5) diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/AsyncExecutor.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/AsyncExecutor.scala new file mode 100644 index 000000000..d7a28ae0f --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/AsyncExecutor.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector + +import java.util.concurrent.Semaphore + +import scala.collection.concurrent.TrieMap +import scala.util.Try + +import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture, SettableFuture} +import org.apache.gearpump.experiments.cassandra.lib.connector.AsyncExecutor.Handler + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +class AsyncExecutor[T, R]( + asyncAction: T => ListenableFuture[R], + maxConcurrentTasks: Int, + successHandler: Option[Handler[T]] = None, + failureHandler: Option[Handler[T]]) { + + @volatile private var _successful = true + + private val semaphore = new Semaphore(maxConcurrentTasks) + private val pendingFutures = new TrieMap[ListenableFuture[R], Boolean] + + def executeAsync(task: T): ListenableFuture[R] = { + val submissionTimestamp = System.nanoTime() + semaphore.acquire() + + val settable = SettableFuture.create[R]() + pendingFutures.put(settable, true) + + val executionTimestamp = System.nanoTime() + val future = asyncAction(task) + + Futures.addCallback(future, new FutureCallback[R] { + def release() { + semaphore.release() + pendingFutures.remove(settable) + } + def onSuccess(result: R) { + release() + settable.set(result) + successHandler.foreach(_(task, submissionTimestamp, executionTimestamp)) + } + def onFailure(throwable: Throwable) { + if (_successful) _successful = false + release() + settable.setException(throwable) + failureHandler.foreach(_(task, submissionTimestamp, executionTimestamp)) + } + }) + + settable + } + + def waitForCurrentlyExecutingTasks() { + for ((future, _) <- pendingFutures.snapshot()) + Try(future.get()) + } + + def successful: Boolean = _successful +} + +object AsyncExecutor { + type Handler[T] = (T, Long, Long) => Unit +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/AuthConf.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/AuthConf.scala new file mode 100644 index 000000000..4a69722ed --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/AuthConf.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector + +import com.datastax.driver.core.{AuthProvider, PlainTextAuthProvider} + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +trait AuthConf extends Serializable { + def authProvider: AuthProvider +} + +case object NoAuthConf extends AuthConf { + override def authProvider: AuthProvider = AuthProvider.NONE +} + +case class PasswordAuthConf(user: String, password: String) extends AuthConf { + override def authProvider: AuthProvider = + new PlainTextAuthProvider(user, password) +} + diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/CassandraConnectionFactory.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/CassandraConnectionFactory.scala new file mode 100644 index 000000000..015a2540e --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/CassandraConnectionFactory.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector + +import java.io.FileInputStream +import java.security.{KeyStore, SecureRandom} +import javax.net.ssl.{SSLContext, TrustManagerFactory} + +import com.datastax.driver.core.policies.{ExponentialReconnectionPolicy, RoundRobinPolicy} +import com.datastax.driver.core.{Cluster, JdkSSLOptions, SSLOptions, SocketOptions} +import CassandraConnectorConf.CassandraSSLConf + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +trait CassandraConnectionFactory extends Serializable { + def createCluster(conf: CassandraConnectorConf): Cluster + def properties: Set[String] = Set.empty +} + +object DefaultConnectionFactory extends CassandraConnectionFactory { + + def clusterBuilder(conf: CassandraConnectorConf): Cluster.Builder = { + val options = new SocketOptions() + .setConnectTimeoutMillis(conf.connectTimeoutMillis) + .setReadTimeoutMillis(conf.readTimeoutMillis) + + val builder = Cluster.builder() + .addContactPoints(conf.hosts.toSeq: _*) + .withPort(conf.port) + .withRetryPolicy( + new MultipleRetryPolicy(conf.queryRetryCount, conf.queryRetryDelay)) + .withReconnectionPolicy( + new ExponentialReconnectionPolicy( + conf.minReconnectionDelayMillis, + conf.maxReconnectionDelayMillis)) + .withLoadBalancingPolicy(new RoundRobinPolicy()) + .withAuthProvider(conf.authConf.authProvider) + .withSocketOptions(options) + .withCompression(conf.compression) + + if (conf.cassandraSSLConf.enabled) { + maybeCreateSSLOptions(conf.cassandraSSLConf) match { + case Some(sslOptions) => builder.withSSL(sslOptions) + case None => builder.withSSL() + } + } else { + builder + } + } + + private def maybeCreateSSLOptions(conf: CassandraSSLConf): Option[SSLOptions] = { + conf.trustStorePath map { + case path => + + val trustStoreFile = new FileInputStream(path) + val tmf = try { + val keyStore = KeyStore.getInstance(conf.trustStoreType) + conf.trustStorePassword match { + case None => keyStore.load(trustStoreFile, null) + case Some(password) => keyStore.load(trustStoreFile, password.toCharArray) + } + val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) + tmf.init(keyStore) + tmf + } finally { + trustStoreFile.close() + } + + val context = SSLContext.getInstance(conf.protocol) + context.init(null, tmf.getTrustManagers, new SecureRandom) + JdkSSLOptions.builder() + .withSSLContext(context) + .withCipherSuites(conf.enabledAlgorithms.toArray) + .build() + } + } + + override def createCluster(conf: CassandraConnectorConf): Cluster = { + clusterBuilder(conf).build() + } +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/CassandraConnectorConf.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/CassandraConnectorConf.scala new file mode 100644 index 000000000..3dbf75900 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/CassandraConnectorConf.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector + +import java.net.InetAddress + +import scala.concurrent.duration.{Duration, _} + +import com.datastax.driver.core.ProtocolOptions +import org.apache.gearpump.experiments.cassandra.lib.connector.CassandraConnectorConf.{CassandraSSLConf, RetryDelayConf} + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +case class CassandraConnectorConf( + hosts: Set[InetAddress] = Set(InetAddress.getLocalHost), + port: Int = 9042, + authConf: AuthConf = NoAuthConf, + minReconnectionDelayMillis: Int = 1000, + maxReconnectionDelayMillis: Int = 6000, + compression: ProtocolOptions.Compression = ProtocolOptions.Compression.NONE, + queryRetryCount: Int = 10, + connectTimeoutMillis: Int = 5000, + readTimeoutMillis: Int = 120000, + connectionFactory: CassandraConnectionFactory = DefaultConnectionFactory, + cassandraSSLConf: CassandraConnectorConf.CassandraSSLConf = CassandraSSLConf(), + queryRetryDelay: CassandraConnectorConf.RetryDelayConf = + RetryDelayConf.ExponentialDelay(4.seconds, 1.5d)) + +object CassandraConnectorConf { + + case class CassandraSSLConf( + enabled: Boolean = false, + trustStorePath: Option[String] = None, + trustStorePassword: Option[String] = None, + trustStoreType: String = "JKS", + protocol: String = "TLS", + enabledAlgorithms: Set[String] = + Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA")) + + trait RetryDelayConf { + def forRetry(retryNumber: Int): Duration + } + + object RetryDelayConf extends Serializable { + + case class ConstantDelay(delay: Duration) extends RetryDelayConf { + require(delay.length >= 0, "Delay must not be negative") + + override def forRetry(nbRetry: Int): Duration = delay + override def toString: String = s"${delay.length}" + } + + case class LinearDelay(initialDelay: Duration, increaseBy: Duration) extends RetryDelayConf { + require(initialDelay.length >= 0, "Initial delay must not be negative") + require(increaseBy.length > 0, "Delay increase must be greater than 0") + + override def forRetry(nbRetry: Int): Duration = + initialDelay + (increaseBy * (nbRetry - 1).max(0)) + override def toString: String = s"${initialDelay.length} + $increaseBy" + } + + case class ExponentialDelay(initialDelay: Duration, increaseBy: Double) extends RetryDelayConf { + require(initialDelay.length >= 0, "Initial delay must not be negative") + require(increaseBy > 0, "Delay increase must be greater than 0") + + override def forRetry(nbRetry: Int): Duration = + (initialDelay.toMillis * math.pow(increaseBy, (nbRetry - 1).max(0))).toLong.milliseconds + override def toString: String = s"${initialDelay.length} * $increaseBy" + } + } +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/ClusteringOrder.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/ClusteringOrder.scala new file mode 100644 index 000000000..2d5c3d944 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/ClusteringOrder.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +sealed trait ClusteringOrder extends Serializable { + private[cassandra] def toCql(clusteringColumns: Seq[String]): String +} + +object ClusteringOrder { + private[cassandra] def cqlClause(clusteringColumns: Seq[String], order: String) = + clusteringColumns.headOption.map(cc => s"""ORDER BY "$cc" $order""") + .getOrElse( + throw new IllegalArgumentException( + "Order by can be specified only if there are some clustering columns")) + + case object Ascending extends ClusteringOrder { + override private[cassandra] def toCql( + clusteringColumns: Seq[String] + ): String = cqlClause(clusteringColumns, "ASC") + } + + case object Descending extends ClusteringOrder { + override private[cassandra] def toCql( + clusteringColumns: Seq[String] + ): String = cqlClause(clusteringColumns, "DESC") + } +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/CqlWhereClause.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/CqlWhereClause.scala new file mode 100644 index 000000000..72735bc69 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/CqlWhereClause.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +case class CqlWhereClause( + predicates: Seq[String], + values: Seq[Any], + containsPartitionKey: Boolean) + +object CqlWhereClause { + val empty = new CqlWhereClause(Nil, Nil, false) +} \ No newline at end of file diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/MultipleRetryPolicy.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/MultipleRetryPolicy.scala new file mode 100644 index 000000000..92c3bb219 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/MultipleRetryPolicy.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector + +import com.datastax.driver.core.exceptions.DriverException +import com.datastax.driver.core.policies.RetryPolicy +import com.datastax.driver.core.policies.RetryPolicy.RetryDecision +import com.datastax.driver.core.{ConsistencyLevel, Statement, WriteType} + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +class MultipleRetryPolicy(maxRetryCount: Int, retryDelay: CassandraConnectorConf.RetryDelayConf) + extends RetryPolicy { + + private def retryOrThrow(cl: ConsistencyLevel, nbRetry: Int): RetryDecision = { + if (nbRetry < maxRetryCount) { + if (nbRetry > 0) { + val delay = retryDelay.forRetry(nbRetry).toMillis + if (delay > 0) Thread.sleep(delay) + } + RetryDecision.retry(cl) + } else { + RetryDecision.rethrow() + } + } + + override def init(cluster: com.datastax.driver.core.Cluster): Unit = {} + override def close(): Unit = { } + + override def onReadTimeout( + stmt: Statement, + cl: ConsistencyLevel, + requiredResponses: Int, + receivedResponses: Int, + dataRetrieved: Boolean, + nbRetry: Int): RetryDecision = retryOrThrow(cl, nbRetry) + + override def onUnavailable( + stmt: Statement, + cl: ConsistencyLevel, + requiredReplica: Int, + aliveReplica: Int, + nbRetry: Int): RetryDecision = retryOrThrow(cl, nbRetry) + + override def onWriteTimeout( + stmt: Statement, + cl: ConsistencyLevel, + writeType: WriteType, + requiredAcks: Int, + receivedAcks: Int, + nbRetry: Int): RetryDecision = retryOrThrow(cl, nbRetry) + + override def onRequestError( + stmt: Statement, + cl: ConsistencyLevel, + ex: DriverException, + nbRetry: Int): RetryDecision = retryOrThrow(cl, nbRetry) +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/PrefetchingResultSetIterator.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/PrefetchingResultSetIterator.scala new file mode 100644 index 000000000..986ea1de7 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/PrefetchingResultSetIterator.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector + +import com.datastax.driver.core.{ResultSet, Row} + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +class PrefetchingResultSetIterator( + resultSet: ResultSet, + prefetchWindowSize: Int + ) extends Iterator[Row] { + + private[this] val iterator = resultSet.iterator() + + override def hasNext: Boolean = iterator.hasNext + + private[this] def maybePrefetch(): Unit = { + if (!resultSet.isFullyFetched && resultSet.getAvailableWithoutFetching < prefetchWindowSize) { + resultSet.fetchMoreResults() + } + } + + override def next(): Row = { + maybePrefetch() + iterator.next() + } +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/QueryExecutor.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/QueryExecutor.scala new file mode 100644 index 000000000..296052bf1 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/QueryExecutor.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +package org.apache.gearpump.experiments.cassandra.lib.connector + +import com.datastax.driver.core._ +import org.apache.gearpump.experiments.cassandra.lib.connector.AsyncExecutor.Handler + +class QueryExecutor( + session: Session, + maxConcurrentQueries: Int, + successHandler: Option[Handler[Statement]], + failureHandler: Option[Handler[Statement]]) + extends AsyncExecutor[Statement, ResultSet]( + stmt => session.executeAsync(stmt), + maxConcurrentQueries, + successHandler, + failureHandler) diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/BucketingRangeIndex.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/BucketingRangeIndex.scala new file mode 100644 index 000000000..837acc7ec --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/BucketingRangeIndex.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector.partitioner + +import scala.collection.mutable.ArrayBuffer + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +trait MonotonicBucketing[-T] { + def bucket(n: Int): T => Int +} + +object MonotonicBucketing { + + val lnOf2: Double = scala.math.log(2) + def log2(x: Double): Double = scala.math.log(x) / lnOf2 + + implicit object IntBucketing extends MonotonicBucketing[Int] { + override def bucket(n: Int): Int => Int = { + val shift = 31 - log2(n).toInt + x => (x / 2 - Int.MinValue / 2) >> shift + } + } + + implicit object LongBucketing extends MonotonicBucketing[Long] { + override def bucket(n: Int): Long => Int = { + val shift = 63 - log2(n).toInt + x => ((x / 2 - Long.MinValue / 2) >> shift).toInt + } + } +} + +trait RangeBounds[-R, T] { + def start(range: R): T + def end(range: R): T + def contains(range: R, point: T): Boolean + def isFull(range: R): Boolean +} + +class BucketingRangeIndex[R, T]( + ranges: Seq[R] + )(implicit bounds: RangeBounds[R, T], + ordering: Ordering[T], bucketing: MonotonicBucketing[T]) { + + private val sizeLog = MonotonicBucketing.log2(ranges.size).toInt + 1 + private val size = math.pow(2, sizeLog).toInt + private val table = Array.fill(size)(new ArrayBuffer[R]) + private val bucket: T => Int = bucketing.bucket(size) + + private def add(r: R, startHash: Int, endHash: Int): Unit = { + var i = startHash + while (i <= endHash) { + table(i) += r + i += 1 + } + } + + for (r <- ranges) { + val start = bounds.start(r) + val end = bounds.end(r) + val startBucket = bucket(start) + val endBucket = bucket(end) + if (bounds.isFull(r)) { + add(r, 0, size - 1) + } else if (ordering.lt(end, start)) { + add(r, startBucket, size - 1) + add(r, 0, endBucket) + } + else { + add(r, startBucket, endBucket) + } + } + + def rangesContaining(point: T): IndexedSeq[R] = + table(bucket(point)).filter(bounds.contains(_, point)) +} + diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/CassandraPartition.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/CassandraPartition.scala new file mode 100644 index 000000000..345309258 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/CassandraPartition.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector.partitioner + +import java.net.InetAddress + +import org.apache.gearpump.experiments.cassandra.lib.connector.partitioner.dht.{TokenRange, TokenFactory, Token} + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +case class CqlTokenRange[V, T <: Token[V]]( + range: TokenRange[V, T] + )(implicit tf: TokenFactory[V, T]) { + + require(!range.isWrappedAround) + + def cql(pk: String): (String, Seq[Any]) = + if (range.start == tf.minToken && range.end == tf.minToken) { + (s"token($pk) >= ?", Seq(range.start.value)) + } else if (range.start == tf.minToken) { + (s"token($pk) <= ?", Seq(range.end.value)) + } else if (range.end == tf.minToken) { + (s"token($pk) > ?", Seq(range.start.value)) + } else { + (s"token($pk) > ? AND token($pk) <= ?", Seq(range.start.value, range.end.value)) + } +} + +trait EndpointPartition { + def endpoints: Iterable[InetAddress] +} + +case class CassandraPartition[V, T <: Token[V]]( + index: Int, + endpoints: Iterable[InetAddress], + tokenRanges: Iterable[CqlTokenRange[V, T]], + dataSize: Long) + extends EndpointPartition + diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/CassandraPartitionGenerator.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/CassandraPartitionGenerator.scala new file mode 100644 index 000000000..69990c591 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/CassandraPartitionGenerator.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector.partitioner + +import scala.collection.JavaConverters._ +import scala.collection.parallel.ForkJoinTaskSupport +import scala.concurrent.forkjoin.ForkJoinPool +import scala.language.existentials + +import com.datastax.driver.core.{Metadata, TokenRange => DriverTokenRange} +import org.apache.gearpump.experiments.cassandra.lib.connector.partitioner.dht.{Token, TokenFactory} +import org.apache.gearpump.experiments.cassandra.lib.{CassandraConnector, Logging} + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +private[cassandra] class CassandraPartitionGenerator[V, T <: Token[V]] private ( + connector: CassandraConnector, + keyspaceName: String, + tableName: String, + splitCount: Option[Int], + splitSize: Long + )(implicit tokenFactory: TokenFactory[V, T]) extends Logging { + + type Token = dht.Token[T] + type TokenRange = dht.TokenRange[V, T] + + private val totalDataSize: Long = { + splitCount match { + case Some(c) => c * splitSize + case None => new DataSizeEstimates(connector, keyspaceName, tableName).dataSizeInBytes + } + } + + private def tokenRange(range: DriverTokenRange, metadata: Metadata): TokenRange = { + val startToken = tokenFactory.tokenFromString(range.getStart.getValue.toString) + val endToken = tokenFactory.tokenFromString(range.getEnd.getValue.toString) + val replicas = + metadata.getReplicas(Metadata.quote(keyspaceName), range).asScala.map(_.getAddress).toSet + val dataSize = (tokenFactory.ringFraction(startToken, endToken) * totalDataSize).toLong + new TokenRange(startToken, endToken, replicas, dataSize) + } + + private def describeRing: Seq[TokenRange] = { + val session = connector.openSession() + val cluster = session.getCluster + + val metadata = cluster.getMetadata + val ranges = for (tr <- metadata.getTokenRanges.asScala.toSeq) yield tokenRange(tr, metadata) + + if (splitCount.contains(1)) { + Seq(ranges.head.copy[V, T](tokenFactory.minToken, tokenFactory.maxToken)) + } else { + ranges + } + } + + private def splitsOf( + tokenRanges: Iterable[TokenRange], + splitter: TokenRangeSplitter[V, T] + ): Iterable[TokenRange] = { + + val parTokenRanges = tokenRanges.par + parTokenRanges.tasksupport = new ForkJoinTaskSupport(CassandraPartitionGenerator.pool) + + (for { + tokenRange <- parTokenRanges + split <- splitter.split(tokenRange, splitSize) + } yield split).seq + } + + private def createTokenRangeSplitter: TokenRangeSplitter[V, T] = { + tokenFactory.asInstanceOf[TokenFactory[_, _]] match { + case TokenFactory.RandomPartitionerTokenFactory => + new RandomPartitionerTokenRangeSplitter(totalDataSize) + .asInstanceOf[TokenRangeSplitter[V, T]] + case TokenFactory.Murmur3TokenFactory => + new Murmur3PartitionerTokenRangeSplitter(totalDataSize) + .asInstanceOf[TokenRangeSplitter[V, T]] + case _ => + throw new UnsupportedOperationException(s"Unsupported TokenFactory $tokenFactory") + } + } + + private def rangeToCql(range: TokenRange): Seq[CqlTokenRange[V, T]] = + range.unwrap.map(CqlTokenRange(_)) + + def partitions: Seq[CassandraPartition[V, T]] = { + val tokenRanges = describeRing + val endpointCount = tokenRanges.map(_.replicas).reduce(_ ++ _).size + val splitter = createTokenRangeSplitter + val splits = splitsOf(tokenRanges, splitter).toSeq + val maxGroupSize = tokenRanges.size / endpointCount + val clusterer = new TokenRangeClusterer[V, T](splitSize, maxGroupSize) + val tokenRangeGroups = clusterer.group(splits).toArray + val partitions = for (group <- tokenRangeGroups) yield { + val replicas = group.map(_.replicas).reduce(_ intersect _) + val rowCount = group.map(_.dataSize).sum + val cqlRanges = group.flatMap(rangeToCql) + CassandraPartition(0, replicas, cqlRanges, rowCount) + } + + partitions + .sortBy(p => (p.endpoints.size, -p.dataSize)) + .zipWithIndex + .map { case (p, index) => p.copy(index = index) } + } +} + +object CassandraPartitionGenerator { + + val MaxParallelism = 16 + val TokenRangeSampleSize = 16 + + private val pool: ForkJoinPool = new ForkJoinPool(MaxParallelism) + + // scalastyle:off + type V = t forSome { type t } + type T = t forSome { type t <: Token[V] } + // scalastyle:on + + def apply( + conn: CassandraConnector, + keyspaceName: String, + tableName: String, + splitCount: Option[Int], + splitSize: Int + ): CassandraPartitionGenerator[V, T] = { + + val tokenFactory = getTokenFactory(conn) + new CassandraPartitionGenerator( + conn, + keyspaceName, + tableName, + splitCount, + splitSize)(tokenFactory) + } + + def getTokenFactory(conn: CassandraConnector) : TokenFactory[V, T] = { + val session = conn.openSession() + val partitionerName = + session.execute("SELECT partitioner FROM system.local").one().getString(0) + + TokenFactory.forCassandraPartitioner(partitionerName) + } +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/DataSizeEstimates.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/DataSizeEstimates.scala new file mode 100644 index 000000000..2955e52f9 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/DataSizeEstimates.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector.partitioner + +import scala.collection.JavaConverters._ + +import com.datastax.driver.core.exceptions.InvalidQueryException +import org.apache.gearpump.experiments.cassandra.lib.{Logging, CassandraConnector} +import org.apache.gearpump.experiments.cassandra.lib.connector.partitioner.dht.{TokenFactory, Token} + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +class DataSizeEstimates[V, T <: Token[V]]( + conn: CassandraConnector, + keyspaceName: String, + tableName: String)( + implicit tokenFactory: TokenFactory[V, T]) + extends Logging { + + private case class TokenRangeSizeEstimate( + rangeStart: T, + rangeEnd: T, + partitionsCount: Long, + meanPartitionSize: Long) { + + def ringFraction: Double = + tokenFactory.ringFraction(rangeStart, rangeEnd) + + def totalSizeInBytes: Long = + partitionsCount * meanPartitionSize + } + + private lazy val tokenRanges: Seq[TokenRangeSizeEstimate] = { + val session = conn.openSession() + + try { + val rs = session.execute( + "SELECT range_start, range_end, partitions_count, mean_partition_size " + + "FROM system.size_estimates " + + "WHERE keyspace_name = ? AND table_name = ?", keyspaceName, tableName) + + for (row <- rs.all().asScala) yield TokenRangeSizeEstimate( + rangeStart = tokenFactory.tokenFromString(row.getString("range_start")), + rangeEnd = tokenFactory.tokenFromString(row.getString("range_end")), + partitionsCount = row.getLong("partitions_count"), + meanPartitionSize = row.getLong("mean_partition_size") + ) + } + catch { + case e: InvalidQueryException => + LOG.error( + s"Failed to fetch size estimates for $keyspaceName.$tableName from" + + s"system.size_estimates table. The number of created Spark partitions" + + s"may be inaccurate. Please make sure you use Cassandra 2.1.5 or newer.", e) + Seq.empty + } + } + + private lazy val ringFraction = + tokenRanges.map(_.ringFraction).sum + + lazy val partitionCount: Long = { + val partitionsCount = tokenRanges.map(_.partitionsCount).sum + val normalizedCount = (partitionsCount / ringFraction).toLong + LOG.debug(s"Estimated partition count of $keyspaceName.$tableName is $normalizedCount") + normalizedCount + } + + lazy val dataSizeInBytes: Long = { + val tokenRangeSizeInBytes = (totalDataSizeInBytes / ringFraction).toLong + LOG.debug(s"Estimated size of $keyspaceName.$tableName is $tokenRangeSizeInBytes bytes") + tokenRangeSizeInBytes + } + + lazy val totalDataSizeInBytes: Long = { + tokenRanges.map(_.totalSizeInBytes).sum + } +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/Murmur3PartitionerTokenRangeSplitter.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/Murmur3PartitionerTokenRangeSplitter.scala new file mode 100644 index 000000000..411f07b3e --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/Murmur3PartitionerTokenRangeSplitter.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector.partitioner + +import org.apache.gearpump.experiments.cassandra.lib.connector.partitioner.dht.{LongToken, TokenFactory, TokenRange} + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +class Murmur3PartitionerTokenRangeSplitter(dataSize: Long) + extends TokenRangeSplitter[Long, LongToken] { + + private val tokenFactory = + TokenFactory.Murmur3TokenFactory + + private type TR = TokenRange[Long, LongToken] + + def split(range: TR, splitSize: Long): Seq[TR] = { + val rangeSize = range.dataSize + val rangeTokenCount = tokenFactory.distance(range.start, range.end) + val n = math.max(1, math.round(rangeSize.toDouble / splitSize).toInt) + + val left = range.start.value + val right = range.end.value + val splitPoints = + (for (i <- 0 until n) yield left + (rangeTokenCount * i / n).toLong) :+ right + + for (Seq(l, r) <- splitPoints.sliding(2).toSeq) yield + new TokenRange[Long, LongToken]( + new LongToken(l), + new LongToken(r), + range.replicas, + rangeSize / n) + } +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/RandomPartitionerTokenRangeSplitter.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/RandomPartitionerTokenRangeSplitter.scala new file mode 100644 index 000000000..e509ca50a --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/RandomPartitionerTokenRangeSplitter.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector.partitioner + +import org.apache.gearpump.experiments.cassandra.lib.connector.partitioner.dht.{BigIntToken, TokenFactory, TokenRange} + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +class RandomPartitionerTokenRangeSplitter(dataSize: Long) + extends TokenRangeSplitter[BigInt, BigIntToken] { + + private val tokenFactory = + TokenFactory.RandomPartitionerTokenFactory + + private def wrap(token: BigInt): BigInt = { + val max = tokenFactory.maxToken.value + if (token <= max) token else token - max + } + + private type TR = TokenRange[BigInt, BigIntToken] + + def split(range: TR, splitSize: Long): Seq[TR] = { + val rangeSize = range.dataSize + val rangeTokenCount = tokenFactory.distance(range.start, range.end) + val n = math.max(1, math.round(rangeSize.toDouble / splitSize)).toInt + + val left = range.start.value + val right = range.end.value + val splitPoints = + (for (i <- 0 until n) yield wrap(left + (rangeTokenCount * i / n))) :+ right + + for (Seq(l, r) <- splitPoints.sliding(2).toSeq) yield + new TokenRange[BigInt, BigIntToken]( + new BigIntToken(l.bigInteger), + new BigIntToken(r.bigInteger), + range.replicas, + rangeSize / n) + } +} \ No newline at end of file diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/TokenRangeClusterer.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/TokenRangeClusterer.scala new file mode 100644 index 000000000..57d4d08c4 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/TokenRangeClusterer.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector.partitioner + +import java.net.InetAddress + +import scala.Ordering.Implicits._ +import scala.annotation.tailrec + +import org.apache.gearpump.experiments.cassandra.lib.connector.partitioner.dht.{Token, TokenRange} + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +class TokenRangeClusterer[V, T <: Token[V]]( + maxRowCountPerGroup: Long, + maxGroupSize: Int = Int.MaxValue) { + + private implicit object InetAddressOrdering extends Ordering[InetAddress] { + override def compare( + x: InetAddress, + y: InetAddress): Int = + x.getHostAddress.compareTo(y.getHostAddress) + } + + @tailrec + private def group( + tokenRanges: Stream[TokenRange[V, T]], + result: Vector[Seq[TokenRange[V, T]]] + ): Iterable[Seq[TokenRange[V, T]]] = { + + tokenRanges match { + case Stream.Empty => result + case head #:: rest => + val firstEndpoint = head.replicas.min + val rowCounts = tokenRanges.map(_.dataSize) + val cumulativeRowCounts = rowCounts.scanLeft(0L)(_ + _).tail + val rowLimit = math.max(maxRowCountPerGroup, head.dataSize) + val cluster = tokenRanges + .take(math.max(1, maxGroupSize)) + .zip(cumulativeRowCounts) + .takeWhile { case (tr, count) => count <= rowLimit && tr.replicas.min == firstEndpoint } + .map(_._1) + .toVector + val remainingTokenRanges = tokenRanges.drop(cluster.length) + group(remainingTokenRanges, result :+ cluster) + } + } + + def group(tokenRanges: Seq[TokenRange[V, T]]): Iterable[Seq[TokenRange[V, T]]] = { + val sortedRanges = tokenRanges.sortBy(_.replicas.toSeq.sorted) + group(sortedRanges.toStream, Vector.empty) + } +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/TokenRangeSplitter.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/TokenRangeSplitter.scala new file mode 100644 index 000000000..3fc3bc2bc --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/TokenRangeSplitter.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector.partitioner + +import org.apache.gearpump.experiments.cassandra.lib.connector.partitioner.dht.{Token, TokenRange} + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +trait TokenRangeSplitter[V, T <: Token[V]] { + def split(range: TokenRange[V, T], splitSize: Long): Seq[TokenRange[V, T]] +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/dht/Token.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/dht/Token.scala new file mode 100644 index 000000000..58f18f02a --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/dht/Token.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector.partitioner.dht + +import org.apache.gearpump.experiments.cassandra.lib.connector.partitioner.MonotonicBucketing +import org.apache.gearpump.experiments.cassandra.lib.connector.partitioner.MonotonicBucketing.LongBucketing + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +trait Token[T] extends Ordered[Token[T]] { + def ord: Ordering[T] + def value: T +} + +case class LongToken(value: Long) extends Token[Long] { + override def compare(that: Token[Long]): Int = value.compareTo(that.value) + override def toString: String = value.toString + override def ord: Ordering[Long] = implicitly[Ordering[Long]] +} + +object LongToken { + + implicit object LongTokenBucketing extends MonotonicBucketing[Token[Long]] { + override def bucket(n: Int): Token[Long] => Int = { + val longBucket = LongBucketing.bucket(n) + x => longBucket(x.value) + } + } + + implicit val LongTokenOrdering: Ordering[LongToken] = + Ordering.by(_.value) +} + +case class BigIntToken(value: BigInt) extends Token[BigInt] { + override def compare(that: Token[BigInt]): Int = value.compare(that.value) + override def toString: String = value.toString() + + override def ord: Ordering[BigInt] = implicitly[Ordering[BigInt]] +} + +object BigIntToken { + + implicit object BigIntTokenBucketing extends MonotonicBucketing[Token[BigInt]] { + override def bucket(n: Int): Token[BigInt] => Int = { + val shift = 127 - MonotonicBucketing.log2(n).toInt + def clamp(x: BigInt): BigInt = if (x == BigInt(-1)) BigInt(0) else x + x => (clamp(x.value) >> shift).toInt + } + } + + implicit val BigIntTokenOrdering: Ordering[BigIntToken] = + Ordering.by(_.value) +} + diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/dht/TokenFactory.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/dht/TokenFactory.scala new file mode 100644 index 000000000..1f078e3b1 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/dht/TokenFactory.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector.partitioner.dht + +import scala.language.existentials + +import org.apache.gearpump.experiments.cassandra.lib.connector.partitioner.MonotonicBucketing + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +trait TokenFactory[V, T <: Token[V]] extends Serializable { + def minToken: T + def maxToken: T + + def totalTokenCount: BigInt + + def distance(token1: T, token2: T): BigInt + + def ringFraction(token1: T, token2: T): Double = + distance(token1, token2).toDouble / totalTokenCount.toDouble + + def tokenFromString(string: String): T + + def tokenToString(token: T): String + + implicit def tokenOrdering: Ordering[T] + + implicit def tokenBucketing: MonotonicBucketing[T] +} + +object TokenFactory { + + // scalastyle:off + type V = t forSome { type t } + type T = t forSome { type t <: Token[V] } + // scalastyle:on + + implicit object Murmur3TokenFactory extends TokenFactory[Long, LongToken] { + override val minToken = LongToken(Long.MinValue) + override val maxToken = LongToken(Long.MaxValue) + override val totalTokenCount = BigInt(maxToken.value) - BigInt(minToken.value) + override def tokenFromString(string: String): LongToken = LongToken(string.toLong) + override def tokenToString(token: LongToken): String = token.value.toString + + override def distance(token1: LongToken, token2: LongToken): BigInt = { + val left = token1.value + val right = token2.value + if (right > left) BigInt(right) - BigInt(left) + else BigInt(right) - BigInt(left) + totalTokenCount + } + + override def tokenBucketing: MonotonicBucketing[LongToken] = + implicitly[MonotonicBucketing[LongToken]] + + override def tokenOrdering: Ordering[LongToken] = + Ordering.by(_.value) + } + + implicit object RandomPartitionerTokenFactory extends TokenFactory[BigInt, BigIntToken] { + override val minToken = BigIntToken(-1) + override val maxToken = BigIntToken(BigInt(2).pow(127)) + override val totalTokenCount = maxToken.value - minToken.value + override def tokenFromString(string: String): BigIntToken = BigIntToken(BigInt(string)) + override def tokenToString(token: BigIntToken): String = token.value.toString() + + override def distance(token1: BigIntToken, token2: BigIntToken): BigInt = { + val left = token1.value + val right = token2.value + + if (right > left) { + right - left + } else { + right - left + totalTokenCount + } + } + + override def tokenBucketing: MonotonicBucketing[BigIntToken] = + implicitly[MonotonicBucketing[BigIntToken]] + + override def tokenOrdering: Ordering[BigIntToken] = + Ordering.by(_.value) + } + + def forCassandraPartitioner(partitionerClassName: String): TokenFactory[V, T] = { + val partitioner = + partitionerClassName match { + case "org.apache.cassandra.dht.Murmur3Partitioner" => Murmur3TokenFactory + case "org.apache.cassandra.dht.RandomPartitioner" => RandomPartitionerTokenFactory + case _ => + throw new IllegalArgumentException(s"Unsupported partitioner: $partitionerClassName") + } + partitioner.asInstanceOf[TokenFactory[V, T]] + } +} diff --git a/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/dht/TokenRange.scala b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/dht/TokenRange.scala new file mode 100644 index 000000000..552bdb466 --- /dev/null +++ b/experiments/cassandra/src/main/scala/org/apache/gearpump/experiments/cassandra/lib/connector/partitioner/dht/TokenRange.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra.lib.connector.partitioner.dht + +import java.net.InetAddress + +/** + * The original file (spark-cassandra-connector 1.6.0) was modified + */ +case class TokenRange[V, T <: Token[V]] ( + start: T, + end: T, + replicas: Set[InetAddress], + dataSize: Long) { + + def isWrappedAround(implicit tf: TokenFactory[V, T]): Boolean = + start >= end && end != tf.minToken + + def isFull(implicit tf: TokenFactory[V, T]): Boolean = + start == end && end == tf.minToken + + def isEmpty(implicit tf: TokenFactory[V, T]): Boolean = + start == end && end != tf.minToken + + def unwrap(implicit tf: TokenFactory[V, T]): Seq[TokenRange[V, T]] = { + val minToken = tf.minToken + + if (isWrappedAround) { + Seq( + TokenRange(start, minToken, replicas, dataSize / 2), + TokenRange(minToken, end, replicas, dataSize / 2)) + } else { + Seq(this) + } + } + + def contains(token: T)(implicit tf: TokenFactory[V, T]): Boolean = { + (end == tf.minToken && token > start + || start == tf.minToken && token <= end + || !isWrappedAround && token > start && token <= end + || isWrappedAround && (token > start || token <= end)) + } +} + diff --git a/experiments/cassandra/src/test/resources/log4j.properties b/experiments/cassandra/src/test/resources/log4j.properties new file mode 100644 index 000000000..052d21e87 --- /dev/null +++ b/experiments/cassandra/src/test/resources/log4j.properties @@ -0,0 +1,3 @@ +log4j.rootLogger=ERROR, stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.SimpleLayout diff --git a/experiments/cassandra/src/test/scala/org/apache/gearpump/experiments/cassandra/CassandraSinkSpec.scala b/experiments/cassandra/src/test/scala/org/apache/gearpump/experiments/cassandra/CassandraSinkSpec.scala new file mode 100644 index 000000000..d5b798423 --- /dev/null +++ b/experiments/cassandra/src/test/scala/org/apache/gearpump/experiments/cassandra/CassandraSinkSpec.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra + +import scala.collection.JavaConverters._ + +import com.twitter.bijection.Bijection +import org.apache.gearpump.Message +import org.apache.gearpump.experiments.cassandra.lib.BoundStatementBuilder.BoundStatementBuilder +import org.apache.gearpump.experiments.cassandra.lib.WriteConf +import org.apache.gearpump.streaming.task.TaskContext + +class CassandraSinkSpec extends CassandraSpecBase { + + private[this] val insertCql = + s""" + |INSERT INTO $keyspace.$table(partitioning_key, clustering_key, data) + |VALUES(?, ?, ?) + """.stripMargin + + private def selectAll() = { + val session = connector.openSession() + session.execute(selectAllCql) + } + + "CassandraSink" should "write data to Cassandra" in { + implicit val builder: BoundStatementBuilder[(String, Int, String)] = + value => Seq(value._1, Bijection[Int, java.lang.Integer](value._2), value._3) + + val sink = new CassandraSink[(String, Int, String)]( + connectorConf, + WriteConf(), + insertCql) + + val taskContext = mock[TaskContext] + sink.open(taskContext) + + val message = Message(("1", 1, "data")) + sink.write(message) + + val data = selectAll().all().asScala + assert(data.size == 1) + val first = data.head + assert(first.getString("partitioning_key") == "1") + assert(first.getInt("clustering_key") == 1) + assert(first.getString("data") == "data") + + val message2 = Message(("1", 2, "data")) + sink.write(message2) + + val data2 = selectAll().all().asScala + assert(data2.size == 2) + val last = data2.last + assert(last.getString("partitioning_key") == "1") + assert(last.getInt("clustering_key") == 2) + assert(last.getString("data") == "data") + } +} diff --git a/experiments/cassandra/src/test/scala/org/apache/gearpump/experiments/cassandra/CassandraSourceSpec.scala b/experiments/cassandra/src/test/scala/org/apache/gearpump/experiments/cassandra/CassandraSourceSpec.scala new file mode 100644 index 000000000..cd43a5d53 --- /dev/null +++ b/experiments/cassandra/src/test/scala/org/apache/gearpump/experiments/cassandra/CassandraSourceSpec.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra + +import java.time.Instant + +import org.apache.gearpump.experiments.cassandra.lib.ReadConf +import org.apache.gearpump.experiments.cassandra.lib.RowExtractor._ +import org.apache.gearpump.experiments.cassandra.lib.TimeStampExtractor.TimeStampExtractor +import org.apache.gearpump.experiments.cassandra.lib.connector.CqlWhereClause +import org.apache.gearpump.streaming.task.{TaskContext, TaskId} +import org.mockito.Mockito._ + +class CassandraSourceSpec extends CassandraSpecBase { + + override def beforeAll(): Unit = { + super.beforeAll() + storeTestData(10, 10) + } + + case class Data(partitioningKey: String, clusteringKey: Int, data: String) + + implicit val rowExtractor: RowExtractor[Data] = row => + Data( + row.getString("partitioning_key"), + row.getInt("clustering_key"), + row.getString("data")) + + implicit val timeStampExtractor: TimeStampExtractor = row => + row.getInt("clustering_key") + + "CassandraSource" should "read data from Cassandra without a where predicate" in { + + val source = new CassandraSource[Data]( + connectorConf, + ReadConf(), + keyspace, + table, + Seq("partitioning_key", "clustering_key", "data"), + Seq("partitioning_key"), + Seq("clustering_key"), + CqlWhereClause.empty, + None, + None) + + + val taskContext = mock[TaskContext] + when(taskContext.parallelism).thenReturn(1) + when(taskContext.taskId).thenReturn(TaskId(1, 0)) + + source.open(taskContext, Instant.now()) + + val result = source.read() + assert(result.timestamp == 0L) + assert(result.msg.asInstanceOf[Data].clusteringKey == 0) + assert(result.msg.asInstanceOf[Data].data == "data") + + val result2 = source.read() + assert(result2.timestamp == 1L) + assert(result2.msg.asInstanceOf[Data].data == "data") + assert(result2.msg.asInstanceOf[Data].clusteringKey == 1) + } + + it should "read data from Cassandra when where clause is specified" in { + val source = new CassandraSource[Data]( + connectorConf, + ReadConf(), + keyspace, + table, + Seq("partitioning_key", "clustering_key", "data"), + Seq("partitioning_key"), + Seq("clustering_key"), + CqlWhereClause( + Seq("partitioning_key = ?", "clustering_key >= ?"), + Seq("5", 5), + containsPartitionKey = true), + None, + None) + + val taskContext = mock[TaskContext] + when(taskContext.parallelism).thenReturn(1) + when(taskContext.taskId).thenReturn(TaskId(1, 0)) + + source.open(taskContext, Instant.now()) + + val result = source.read() + assert(result.timestamp == 5L) + assert(result.msg.asInstanceOf[Data].data == "data") + assert(result.msg.asInstanceOf[Data].partitioningKey == "5") + assert(result.msg.asInstanceOf[Data].clusteringKey == 5) + + val result2 = source.read() + assert(result2.timestamp == 6L) + assert(result2.msg.asInstanceOf[Data].data == "data") + assert(result2.msg.asInstanceOf[Data].partitioningKey == "5") + assert(result2.msg.asInstanceOf[Data].clusteringKey == 6) + } +} diff --git a/experiments/cassandra/src/test/scala/org/apache/gearpump/experiments/cassandra/CassandraSpecBase.scala b/experiments/cassandra/src/test/scala/org/apache/gearpump/experiments/cassandra/CassandraSpecBase.scala new file mode 100644 index 000000000..b97fc61d6 --- /dev/null +++ b/experiments/cassandra/src/test/scala/org/apache/gearpump/experiments/cassandra/CassandraSpecBase.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra + +import org.cassandraunit.utils.EmbeddedCassandraServerHelper +import org.scalatest.mock.MockitoSugar +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +trait CassandraSpecBase + extends FlatSpec + with Matchers + with BeforeAndAfterAll + with MockitoSugar + with CassandraSpecConnection { + + protected val keyspace = "demo" + protected val table = "CassandraSourceEmbeddedSpec" + + protected def createTables() = { + val session = connector.openSession() + + session.execute( + s""" + |CREATE KEYSPACE IF NOT EXISTS $keyspace + |WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; + """.stripMargin) + + session.execute( + s""" + |CREATE TABLE IF NOT EXISTS $keyspace.$table( + | partitioning_key text, + | clustering_key int, + | data text, + | PRIMARY KEY(partitioning_key, clustering_key) + |) + """.stripMargin) + } + + protected val selectAllCql = s"SELECT * FROM $keyspace.$table" + + override def beforeAll(): Unit = { + EmbeddedCassandraServerHelper.startEmbeddedCassandra(120000) + createTables() + } + + override def afterAll(): Unit = { + connector.evictCache() + EmbeddedCassandraServerHelper.cleanEmbeddedCassandra() + } + + protected def storeTestData(partitions: Int, rows: Int) = { + val session = connector.openSession() + + (0 to partitions).map { partition => + (0 to rows).map { row => + session.execute( + s""" + |INSERT INTO $keyspace.$table(partitioning_key, clustering_key, data) + |VALUES('$partition', $row, 'data') + """.stripMargin) + } + } + } +} diff --git a/experiments/cassandra/src/test/scala/org/apache/gearpump/experiments/cassandra/CassandraSpecConnection.scala b/experiments/cassandra/src/test/scala/org/apache/gearpump/experiments/cassandra/CassandraSpecConnection.scala new file mode 100644 index 000000000..e2c9287f1 --- /dev/null +++ b/experiments/cassandra/src/test/scala/org/apache/gearpump/experiments/cassandra/CassandraSpecConnection.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra + +import java.net.InetAddress + +import org.apache.gearpump.experiments.cassandra.lib.CassandraConnector +import org.apache.gearpump.experiments.cassandra.lib.connector.CassandraConnectorConf + +trait CassandraSpecConnection { + + protected val connectorConf = CassandraConnectorConf( + port = 9142, + hosts = Set(InetAddress.getByName("127.0.0.1"))) + + protected val connector = new CassandraConnector(connectorConf) +} diff --git a/experiments/cassandra/src/test/scala/org/apache/gearpump/experiments/cassandra/CassandraStoreSpec.scala b/experiments/cassandra/src/test/scala/org/apache/gearpump/experiments/cassandra/CassandraStoreSpec.scala new file mode 100644 index 000000000..6e40b2d5e --- /dev/null +++ b/experiments/cassandra/src/test/scala/org/apache/gearpump/experiments/cassandra/CassandraStoreSpec.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.cassandra + +import org.apache.gearpump.experiments.cassandra.lib.StoreConf +import org.cassandraunit.utils.EmbeddedCassandraServerHelper +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +class CassandraStoreSpec + extends FlatSpec + with Matchers + with BeforeAndAfterAll + with CassandraSpecConnection { + + private[this] val keyspace = "demo" + private[this] val table = "CassandraStoreEmbeddedSpec" + private[this] val name = "name" + + private[this] val checkTableExistsCql = + s"SELECT * FROM $keyspace.$table" + + private[this] val storeConfig = StoreConf(keyspace, table) + + override def beforeAll(): Unit = { + EmbeddedCassandraServerHelper.startEmbeddedCassandra(120000) + } + + override def afterAll(): Unit = { + connector.evictCache() + EmbeddedCassandraServerHelper.cleanEmbeddedCassandra() + } + + "CassandraStoreFactory" should "create the appropriate tables" in { + val factory = new AbstractCassandraStoreFactory(connectorConf, storeConfig) + factory.getCheckpointStore(name) + + val session = connector.openSession() + assert(session.execute(checkTableExistsCql).all().isEmpty === true) + } + + it should "persist and recover snapshots" in { + val store = + new AbstractCassandraStoreFactory(connectorConf, storeConfig) + .getCheckpointStore(name) + + val checkpoint = "test" + val bytes = checkpoint.getBytes() + + store.persist(0L, bytes) + + val recovered = store.recover(0L) + assert(new String(recovered.get) === checkpoint) + } + + it should "not recover non existent snapshots" in { + val store = + new AbstractCassandraStoreFactory(connectorConf, storeConfig) + .getCheckpointStore(name) + + val recovered = store.recover(1L) + assert(recovered === None) + } +} diff --git a/project/Build.scala b/project/Build.scala index 4ce30532b..c4a1a215d 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -36,6 +36,7 @@ object Build extends sbt.Build { val copySharedSourceFiles = TaskKey[Unit]("copied shared services source code") val akkaVersion = "2.4.3" + val cassandraVersion = "3.0.2" val hadoopVersion = "2.6.0" val hbaseVersion = "1.0.0" val commonsHttpVersion = "3.1" @@ -213,10 +214,18 @@ object Build extends sbt.Build { }, assemblyJarName in assembly := { s"${name.value.split("-").last}-${scalaBinaryVersion.value}-${version.value}-assembly.jar" + }, + assemblyMergeStrategy in assembly := { + case PathList("META-INF", "io.netty.versions.properties") => + MergeStrategy.first + case x => + val oldStrategy = (assemblyMergeStrategy in assembly).value + oldStrategy(x) } ) val projectsWithDoc = inProjects( + cassandra, core, streaming, external_kafka, @@ -249,9 +258,9 @@ object Build extends sbt.Build { id = "gearpump", base = file("."), settings = commonSettings ++ noPublish ++ gearpumpUnidocSetting) - .aggregate(shaded, core, daemon, streaming, services, external_kafka, external_monoid, - external_serializer, examples, storm, yarn, external_hbase, packProject, - external_hadoopfs, integration_test).settings(Defaults.itSettings: _*) + .aggregate(shaded, cassandra, core, daemon, streaming, services, external_kafka, + external_monoid, external_serializer, examples, storm, yarn, external_hbase, + packProject, external_hadoopfs, integration_test).settings(Defaults.itSettings: _*) .disablePlugins(sbtassembly.AssemblyPlugin) lazy val core = Project( @@ -447,6 +456,22 @@ object Build extends sbt.Build { .dependsOn(services % "test->test;compile->compile", daemon % "provided", core % "provided") .disablePlugins(sbtassembly.AssemblyPlugin) + lazy val cassandra = Project( + id = "gearpump-experiments-cassandra", + base = file("experiments/cassandra"), + settings = commonSettings ++ noPublish ++ // myAssemblySettings ++ + Seq( + libraryDependencies ++= Seq( + "com.datastax.cassandra" % "cassandra-driver-core" % cassandraVersion, + "com.twitter" %% "bijection-core" % bijectionVersion, + "org.cassandraunit" % "cassandra-unit" % "3.0.0.1" % "test" + excludeAll ExclusionRule("org.slf4j") + ) + ) + ) + .dependsOn (streaming % "test->test; provided") + .disablePlugins(sbtassembly.AssemblyPlugin) + lazy val external_hbase = Project( id = "gearpump-external-hbase", base = file("external/hbase"), diff --git a/project/BuildExample.scala b/project/BuildExample.scala index 75fc9be43..fd9aba83c 100644 --- a/project/BuildExample.scala +++ b/project/BuildExample.scala @@ -28,7 +28,8 @@ object BuildExample extends sbt.Build { base = file("examples"), settings = commonSettings ++ noPublish ).aggregate(wordcount, wordcountJava, complexdag, sol, fsio, examples_kafka, - distributedshell, stockcrawler, transport, examples_state, pagerank, distributeservice). + distributedshell, stockcrawler, transport, examples_state, pagerank, + distributeservice, examples_cassandra). disablePlugins(sbtassembly.AssemblyPlugin) lazy val wordcountJava = Project( @@ -233,4 +234,18 @@ object BuildExample extends sbt.Build { CrossVersion.binaryScalaVersion(scalaVersion.value) ) ) dependsOn (streaming % "test->test; provided") + + lazy val examples_cassandra = Project( + id = "gearpump-examples-cassandra", + base = file("experiments/cassandra-examples/src/main/scala" + + "/org/apache/gearpump/experiments/cassandra"), + settings = commonSettings ++ noPublish ++ myAssemblySettings ++ + Seq( + mainClass in(Compile, packageBin) := + Some("org.apache.gearpump.experiments.cassandra.CassandraTransform"), + + target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / + CrossVersion.binaryScalaVersion(scalaVersion.value) + ) + ) dependsOn(streaming % "test->test; provided", cassandra) }