Skip to content
This repository was archived by the owner on Feb 8, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,6 @@ The netty transport code work is based on [Apache Storm](http://storm.apache.org

The cgroup code work is based on [JStorm](https://github.com/alibaba/jstorm). Thanks JStorm contributors.

The cassandra integration code work is based on [spark-cassandra-connector](https://github.com/datastax/spark-cassandra-connector). Thanks spark-cassandra-connector contributors.

Thanks to Jetbrains for providing a [IntelliJ IDEA Free Open Source License](https://www.jetbrains.com/buy/opensource/?product=idea).
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.experiments.cassandra

import java.net.InetAddress
import java.util.Date

import akka.actor.ActorSystem
import com.twitter.bijection.Bijection
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import org.apache.gearpump.experiments.cassandra.lib.BoundStatementBuilder.BoundStatementBuilder
import org.apache.gearpump.experiments.cassandra.lib.RowExtractor.RowExtractor
import org.apache.gearpump.experiments.cassandra.lib.TimeStampExtractor.TimeStampExtractor
import org.apache.gearpump.experiments.cassandra.lib._
import org.apache.gearpump.experiments.cassandra.lib.connector.{CassandraConnectorConf, CqlWhereClause}
import org.apache.gearpump.streaming.StreamApplication
import org.apache.gearpump.streaming.sink.DataSinkProcessor
import org.apache.gearpump.streaming.source.DataSourceProcessor
import org.apache.gearpump.util.Graph._
import org.apache.gearpump.util.{AkkaApp, Graph}

object CassandraTransform extends AkkaApp with ArgumentsParser {

// CREATE KEYSPACE example
// WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }

// CREATE TABLE example.sensor_data(
// id text,
// inserted timestamp,
// temperature int,
// location text,
// PRIMARY KEY(id, inserted)
// )

// CREATE TABLE example.sensor_data_by_location(
// id text,
// inserted timestamp,
// temperature int,
// location text,
// PRIMARY KEY(location, inserted)
// )

// INSERT INTO example.sensor_data(id, inserted, temperature, location)
// VALUES('1', '2016-07-01 15:00:00', 26, 'New York')

// INSERT INTO example.sensor_data(id, inserted, temperature, location)
// VALUES('2', '2016-07-01 15:00:00', 28, 'New York')

// INSERT INTO example.sensor_data(id, inserted, temperature, location)
// VALUES('2', '2016-07-02 15:00:00', 25, 'New York')

// INSERT INTO example.sensor_data(id, inserted, temperature, location)
// VALUES('3', '2016-07-01 15:00:00', 14, 'London')

private[this] val query =
"""
|SELECT * FROM example.sensor_data
""".stripMargin

private[this] val insert =
"""
|INSERT INTO example.sensor_data_by_location(id, inserted, temperature, location)
|VALUES(?, ?, ?, ?)
""".stripMargin

override val options: Array[(String, CLIOption[Any])] = Array(
"host" -> CLIOption[String]("<cassandra host>", required = false,
defaultValue = Some("127.0.0.1")),
"port" -> CLIOption[Int]("<cassandra port>", required = false,
defaultValue = Some(9042)),
"source" -> CLIOption[Int]("<how many cassandra source tasks>", required = false,
defaultValue = Some(1)),
"sink" -> CLIOption[Int]("<how many cassandra sink tasks>", required = false,
defaultValue = Some(1))
)

def application(config: ParseResult, system: ActorSystem): StreamApplication = {
implicit val actorSystem = system
val sourceNum = config.getInt("source")
val sinkNum = config.getInt("sink")
val cassandraHost = InetAddress.getByName(config.getString("host"))
val cassandraPort = config.getInt("port")

val appConfig = UserConfig.empty

val connectorConf = CassandraConnectorConf(hosts = Set(cassandraHost), port = cassandraPort)

case class SensorData(id: String, inserted: Date, temperature: Int, location: String)

implicit val statementBuilder: BoundStatementBuilder[Long] = value => Seq()

implicit val timeStampExtractor: TimeStampExtractor = row =>
row.getTimestamp("inserted").getTime

implicit val rowExtractor: RowExtractor[SensorData] = row =>
SensorData(
row.getString("id"),
row.getTimestamp("inserted"),
row.getInt("temperature"),
row.getString("location"))

val source = new CassandraSource[SensorData](
connectorConf,
ReadConf(),
"example",
"sensor_data",
Seq("id", "inserted", "temperature", "location"),
Seq("id"),
Seq("inserted"),
CqlWhereClause.empty)

val sourceProcessor = DataSourceProcessor(source, sourceNum)

implicit val statementBuilder2: BoundStatementBuilder[SensorData] = value =>
Seq(
value.id,
value.inserted,
Bijection[Int, java.lang.Integer](value.temperature),
value.location)

val sink = new CassandraSink[SensorData](connectorConf, WriteConf(), insert)
val sinkProcessor = DataSinkProcessor(sink, sinkNum)

val computation = sourceProcessor ~> sinkProcessor
val app = StreamApplication("CassandraTransform", Graph(computation), appConfig)
app
}

override def main(akkaConf: Config, args: Array[String]): Unit = {
val config = parse(args)
val context = ClientContext(akkaConf)
val appId = context.submit(application(config, context.system))
context.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.experiments.cassandra

import com.datastax.driver.core.Session
import org.apache.gearpump.Message
import org.apache.gearpump.experiments.cassandra.lib.BoundStatementBuilder.BoundStatementBuilder
import org.apache.gearpump.experiments.cassandra.lib._
import org.apache.gearpump.experiments.cassandra.lib.connector.CassandraConnectorConf
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.task.TaskContext

// TODO: Analyse query, compute token ranges, automatically convert types, batch, ...
class CassandraSink[T: BoundStatementBuilder] (
connectorConf: CassandraConnectorConf,
conf: WriteConf,
writeCql: String)
extends DataSink
with Logging {

private[this] var connector: CassandraConnector = _
private[this] var session: Session = _

private[this] var writer: Option[TableWriter[T]] = None

// TODO: Non blocking
def open(context: TaskContext): Unit = {
connector = new CassandraConnector(connectorConf)
session = connector.openSession()

writer = Some(new TableWriter[T](connector, session.prepare(writeCql), conf))
}

def write(message: Message): Unit = writer.foreach(_.write(message.msg.asInstanceOf[T]))

def close(): Unit = connector.evictCache()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.experiments.cassandra

import java.time.Instant

import com.datastax.driver.core.{Row, Session, Statement}
import org.apache.gearpump.Message
import org.apache.gearpump.experiments.cassandra.lib.RowExtractor.RowExtractor
import org.apache.gearpump.experiments.cassandra.lib.TimeStampExtractor.TimeStampExtractor
import org.apache.gearpump.experiments.cassandra.lib._
import org.apache.gearpump.experiments.cassandra.lib.connector._
import org.apache.gearpump.experiments.cassandra.lib.connector.partitioner.{CassandraPartitionGenerator, CqlTokenRange}
import org.apache.gearpump.streaming.source.DataSource
import org.apache.gearpump.streaming.task.TaskContext

// TODO: Analyse query, automatically convert types, ...
// TODO: Make a TimeReplayableSource
class CassandraSource[T: RowExtractor](
connectorConf: CassandraConnectorConf,
conf: ReadConf,
keyspace: String,
table: String,
columns: Seq[String],
partitionKeyColumns: Seq[String],
clusteringKeyColumns: Seq[String],
where: CqlWhereClause,
clusteringOrder: Option[ClusteringOrder] = None,
limit: Option[Long] = None
)(implicit timeStampExtractor: TimeStampExtractor)
extends DataSource
with Logging {

private[this] var iterator: Option[Iterator[Row]] = None
private[this] var connector: CassandraConnector = _
private[this] var session: Session = _

private[this] var watermark: Instant = Instant.EPOCH

protected val rowExtractor = implicitly[RowExtractor[T]]

private def tokenRangeToCqlQuery(range: CqlTokenRange[_, _]): (String, Seq[Any]) = {
val (cql, values) = if (where.containsPartitionKey) {
("", Seq.empty)
} else {
range.cql(partitionKeyColumns.mkString(","))
}
val filter = (cql +: where.predicates).filter(_.nonEmpty).mkString(" AND ")
val limitClause = limit.map(limit => s"LIMIT $limit").getOrElse("")
val orderBy = clusteringOrder.map(_.toCql(clusteringKeyColumns)).getOrElse("")
val selectColums = columns.mkString(",")
val queryTemplate =
s"SELECT $selectColums " +
s"FROM $keyspace.$table " +
s"WHERE $filter $orderBy $limitClause ALLOW FILTERING"
val queryParamValues = values ++ where.values
(queryTemplate, queryParamValues)
}

private def createStatement(session: Session, cql: String, values: Any*): Statement = {
val stmt = session.prepare(cql)
stmt.setConsistencyLevel(conf.consistencyLevel)
val bstm = stmt.bind(values.map(_.asInstanceOf[AnyRef]): _*)
bstm.setFetchSize(conf.fetchSizeInRows)
bstm
}

private def fetchTokenRange(
session: Session,
range: CqlTokenRange[_, _]
): Iterator[Row] = {

val (cql, values) = tokenRangeToCqlQuery(range)
val stmt = createStatement(session, cql, values: _*)
val rs = session.execute(stmt)
new PrefetchingResultSetIterator(rs, conf.fetchSizeInRows)
}

// TODO: Non blocking
override def open(context: TaskContext, startTime: Instant): Unit = {
connector = new CassandraConnector(connectorConf)
session = connector.openSession()

val partitioner = if (where.containsPartitionKey) {
CassandraPartitionGenerator(connector, keyspace, table, Some(1), conf.splitSizeInMB)
} else {
CassandraPartitionGenerator(connector, keyspace, table, conf.splitCount, conf.splitSizeInMB)
}

val assignedTokenRanges =
new DefaultPartitionGrouper()
.group(context.parallelism, context.taskId.index, partitioner.partitions)

this.watermark = startTime

iterator =
Some(
assignedTokenRanges
.iterator
.flatMap(fetchTokenRange(session, _: CqlTokenRange[_, _])))
}

override def read(): Message =
iterator.map { i =>
if (i.hasNext) {
val message = i.next()
val timeStamp = timeStampExtractor(message)

Message(rowExtractor(message), timeStamp)
} else {
null
}
}.orNull

override def close(): Unit = connector.evictCache()

// TODO: Watermark only set at open. Make this a TimeReplayableSource
override def getWatermark: Instant = watermark
}
Loading