Skip to content
This repository was archived by the owner on Jan 20, 2022. It is now read-only.
Open
11 changes: 7 additions & 4 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ object SummingbirdBuild extends Build {
Opts.resolver.sonatypeReleases,
"Clojars Repository" at "http://clojars.org/repo",
"Conjars Repository" at "http://conjars.org/repo",
"Twitter Maven" at "http://maven.twttr.com"
"Twitter Maven" at "http://maven.twttr.com",
"Apache" at "http://repo.maven.apache.org/maven2"
),

parallelExecution in Test := true,
Expand Down Expand Up @@ -127,7 +128,7 @@ object SummingbirdBuild extends Build {
val utilVersion = "6.3.8"
val chillVersion = "0.3.3"
val tormentaVersion = "0.5.4"

val hbaseVersion = "0.94.6"
lazy val slf4jVersion = "1.6.6"

/**
Expand Down Expand Up @@ -229,7 +230,9 @@ object SummingbirdBuild extends Build {
"com.twitter" %% "chill-bijection" % chillVersion,
"commons-lang" % "commons-lang" % "2.6",
"com.twitter" %% "scalding-core" % scaldingVersion,
"com.twitter" %% "scalding-commons" % scaldingVersion
"com.twitter" %% "scalding-commons" % scaldingVersion,
"com.twitter" %% "storehaus-hbase" % storehausVersion,
"org.apache.hbase" % "hbase" % hbaseVersion % "provided"
)
).dependsOn(
summingbirdCore % "test->test;compile->compile",
Expand All @@ -251,5 +254,5 @@ object SummingbirdBuild extends Build {
"com.twitter" %% "tormenta-twitter" % tormentaVersion exclude("org.slf4j", "log4j-over-slf4j") exclude("ch.qos.logback", "logback-classic"),
"com.twitter" %% "storehaus-memcache" % storehausVersion
)
).dependsOn(summingbirdCore, summingbirdStorm)
).dependsOn(summingbirdCore, summingbirdStorm, summingbirdScalding)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.twitter.summingbird.example

import com.twitter.scalding.{ Hdfs, TextLine }
import com.twitter.summingbird.batch.{ Batcher, Timestamp, BatchID }
import com.twitter.summingbird.scalding.{ InitialBatchedStore, Scalding, HBaseVersionedStore }
import com.twitter.summingbird.scalding.state.HDFSState
import com.twitter.summingbird.scalding.store.VersionedStore
import com.twitter.summingbird.{ Platform, Producer, TimeExtractor }
import com.twitter.storehaus.ReadableStore
import com.twitter.util.Await
import java.util.Date
import org.apache.hadoop.conf.Configuration

/**
* The following object contains code to execute a similar Scalding
* job to the WordCount job defined in ExampleJob.scala. This job works
* on plain text files, as opposed to tweet Status objects.
* The example job uses a Store on top of HBase. This does require you to
* set up a local running hbase with zookeeper.
*
* @author Josh Buffum
* @author Riju Kallivalappil
*/

object ScaldingRunner {
final val MillisInHour = 60 * 60 * 1000

/**
* Directory location to store state and read input file.
*/
final val JobDir = "/user/mydir/wordcount"

/**
* pull in the serialization injections and WordCount job
*/
import Serialization._

implicit val batcher = Batcher.ofHours(1)

// taken from ExampleJob
def tokenize(text: String) : TraversableOnce[String] =
text.toLowerCase
.replaceAll("[^a-zA-Z0-9\\s]", "")
.split("\\s+")

/**
* The actual Summingbird job. Works against text instead of tweet Status
*/
def wordCount[P <: Platform[P]](source: Producer[P, String], store: P#Store[String, Long]) = {
source
.filter(_ != null)
.flatMap { text: String => tokenize(text).map(_ -> 1L) }
.sumByKey(store)
}

// Always use an hour before the current time as the batch id.
// The storm job uses the current hour. This way we can get the "merger" to work across 2 batches
implicit val timeOf: TimeExtractor[String] = TimeExtractor(_ => new Date().getTime - MillisInHour)

val now = System.currentTimeMillis
val waitingState = HDFSState(JobDir + "/waitstate", startTime = Some(Timestamp(now - 2 * MillisInHour)),
numBatches = 3)

// read text lines in input.txt as job input
val src = Producer.source[Scalding, String](Scalding.pipeFactoryExact(_ => TextLine(JobDir + "/input.txt")))

/**
* Create the HBaseVersionedStore. Results from the Scalding job will be written
* as String => (BatchID, Long) pairs into a HBase cluster defined in a Zookeeper
* quorum at "localhost" in a table "wordcountJob"
*/
val versionedStore = HBaseVersionedStore[String, Long] (
Seq("localhost"),
"wordcountJob"
)

/**
* wrap the HBaseVersionedStore with an InitialBatchedStore to take care of the early batches
*/
val store = new InitialBatchedStore(batcher.currentBatch - 2L, versionedStore)
val mode = Hdfs(false, new Configuration())

/**
* main
* Create the Scalding job and run it
*/
def runJob(args: Array[String]) {
val job = Scalding("wordcountJob")
job.run(waitingState, mode, job.plan(wordCount[Scalding](src, store)))
}

/**
* lookup a Key value in the HBase store
*/
def lookup(key: String) : Option[(BatchID, Long)] = {
val reader = versionedStore.toReadableStore

Await.result {
reader.get(key)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package com.twitter.summingbird.scalding

import cascading.flow.FlowDef
import cascading.tap.Tap
import cascading.tuple.Fields
import com.twitter.algebird.monad.Reader
import com.twitter.bijection.hbase.HBaseBijections.ImmutableBytesWritableBijection
import com.twitter.bijection.Injection
import com.twitter.bijection.Inversion.attempt
import com.twitter.maple.hbase.{HBaseScheme, HBaseTap}
import com.twitter.scalding.{AccessMode, Dsl, Mappable, Mode, Source, TupleConverter, TupleSetter, TypedPipe}
import com.twitter.scalding.typed.TypedSink
import com.twitter.storehaus.hbase.HBaseByteArrayStore
import com.twitter.storehaus.ReadableStore
import com.twitter.summingbird.batch.{Batcher, BatchID}
import com.twitter.util.{Await, Future}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapred.{ JobConf, OutputCollector, RecordReader }
import scala.util.{Failure, Success}

import Injection._

/**
* Scalding implementation of the batch read and write components of a
* store that uses the VersionedKeyValSource from scalding-commons.
*
* @author Josh Buffum
*/


object HBaseVersionedStore {

def apply[K, V](quorum: Seq[String],
table: String)(
implicit
batcher: Batcher,
injection: Injection[(K, (BatchID,V)), (Array[Byte], Array[Byte])],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need this? Doesn't it come for free with the keyInj + valueInj?

keyInj: Injection[K, Array[Byte]],
valueInj: Injection[(BatchID,V), Array[Byte]],
ordering: Ordering[K]): HBaseVersionedStore[K, V, K, (BatchID,V)] = {
new HBaseVersionedStore[K, V, K, (BatchID,V)](quorum, table, batcher)(
{ case (batchID, (k, v)) => (k, (batchID.next, v)) })(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment the assymmetry on the .next? (I'm confused actually).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay. I'll add the comment here. Just for reference, I borrowed the logic here from https://github.com/twitter/summingbird/blob/develop/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/store/VersionedStore.scala. The comment I'll add will look very similar to the description of the VersionedStore object

{ case (k, (batchID, v)) => (batchID, (k, v)) })
}
}

class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String],
table: String,
override val batcher: Batcher)
(pack: (BatchID, (K, V)) => (K2, V2))
(unpack: ((K2, V2)) => (BatchID, (K,V)))(
implicit
injection: Injection[(K2, V2), (Array[Byte], Array[Byte])],
keyInj: Injection[K, Array[Byte]],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be K2?

valueInj: Injection[V2, Array[Byte]], override val ordering: Ordering[K]) extends BatchedScaldingStore[K, V]
{
val KeyColumnName = "key"
val ValColumnName = "value"
val ColumnFamily = "versionedstore"

val scheme = new HBaseScheme(new Fields(KeyColumnName), ColumnFamily, new Fields(ValColumnName))

implicit lazy val byteArray2BytesWritableInj : Injection[Array[Byte], ImmutableBytesWritable] = fromBijection[Array[Byte], ImmutableBytesWritable](ImmutableBytesWritableBijection[Array[Byte]])

implicit def kvpInjection: Injection[(K2, V2), (ImmutableBytesWritable,ImmutableBytesWritable)] = {
Injection.connect[(K2,V2), (Array[Byte],Array[Byte]), (ImmutableBytesWritable,ImmutableBytesWritable)]
}


// this is only used for client queries and does not need to be serialized out
// during the scalding job
@transient val hbaseStore = HBaseByteArrayStore (quorum, table, ColumnFamily, ValColumnName, true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it need to be a val? Can it be a def?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or perhaps a lazy val (and also transient)?

.convert[K,V2](keyInj)(valueInj)

/**
* Exposes a stream with the (K,V) pairs from the highest batchID less than
* the input "exclusiveUB" batchID. See readVersions() for the creation of this stream
* This method is called by BatchedScaldingStore.merge
*/
override def readLast(exclusiveUB: BatchID, mode: Mode): Try[(BatchID, FlowProducer[TypedPipe[(K, V)]])] = {
Right((exclusiveUB, readVersions(exclusiveUB)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not quite right, is it? It will always return a Right. What if there is no data on disk? It is not clear that this will not have data loss.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to store some state somewhere that records the last written batch.

}


def readVersions(exclusiveUB: BatchID): FlowProducer[TypedPipe[(K, V)]] = Reader { (flowMode: (FlowDef, Mode)) =>
val mappable = new HBaseVersionedSource[K2, V2](table, scheme)

val filtered = TypedPipe.from(mappable)(flowMode._1, flowMode._2)
.map{x: (K2, V2) => unpack(x)}
.filter{ _._1 < exclusiveUB } // (BatchID, (K,V)
.map{unpacked: (BatchID,(K,V)) => (unpacked._2._1,(unpacked._1,unpacked._2._2))} // (K, (BatchID,V)

implicit def batchOrderer = Ordering.by[(BatchID,V),BatchID](_._1)

filtered
.group
.max
.map{x: (K, (BatchID,V)) => (x._1, x._2._2)}
}


/**
* write the (K, V) pairs aggregated up to batchID (inclusive) into the
* BatchedScaldingStore. In our case, this BatchedScaldingStore uses HBase
* as the mechanism to actually store data
*
* The data is written in serialized pairs of (K, (BatchID, V))
*/
override def writeLast(batchID: BatchID, lastVals: TypedPipe[(K, V)])(implicit flowDef: FlowDef, mode: Mode): Unit = {
import Dsl._

lastVals.map{x: (K,V) => Injection[(K2,V2),(ImmutableBytesWritable,ImmutableBytesWritable)](pack(batchID, x))}
.toPipe(new Fields(KeyColumnName,ValColumnName))
.write(new HBaseVersionedSource[K2, V2](table, scheme))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to write some state that notes what we have finished batchID.

}


def toReadableStore: ReadableStore[K,V2] = {
hbaseStore
}

}


class HBaseVersionedSource[K, V](table: String,
scheme: HBaseScheme )(
implicit injection: Injection[(K, V), (Array[Byte], Array[Byte])])
extends Source with Mappable[(K,V)] with TypedSink[(K,V)]
{
override def converter[U >: (K, V)] = TupleConverter.asSuperConverter[(K, V), U](TupleConverter.of[(K, V)])

override def setter[U <: (K, V)] = TupleSetter.asSubSetter[(K, V), U](TupleSetter.of[(K,V)])

override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_,_,_] = {
(new HBaseTap(table, scheme)).asInstanceOf[Tap[JobConf, RecordReader[_,_], OutputCollector[_,_]]]
}
}