Skip to content

Commit a2ca9bb

Browse files
author
gaido
committed
added Hbase methods
added Hbase methods
1 parent 77d4353 commit a2ca9bb

File tree

3 files changed

+47
-2
lines changed

3 files changed

+47
-2
lines changed

SparkInputLib/build.sbt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,19 @@ name := "spark_input_lib"
22

33
organization := "it.mgaido"
44

5-
version := "0.0.3"
5+
version := "0.0.4"
66

77
scalaVersion := "2.10.5"
88

9-
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.2.0-cdh5.3.2" % "provided"
9+
autoScalaLibrary := false
10+
11+
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.1" % "provided"
12+
13+
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.1.2" % "provided" exclude("javax.servlet", "servlet-api")
14+
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.1.2" % "provided" exclude("javax.servlet", "servlet-api")
15+
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.1.2" % "provided" exclude("javax.servlet", "servlet-api")
16+
17+
1018
libraryDependencies += "org.scalatest" % "scalatest_2.10" % "2.1.3" % "test"
1119

1220
resolvers += "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/"

SparkInputLib/src/main/scala-2.10/it/mgaido/spark/io/IOHelper.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@ import org.apache.spark.rdd.RDD
44
import org.apache.spark.SparkContext
55
import org.apache.hadoop.io.LongWritable
66
import org.apache.hadoop.io.Text
7+
import org.apache.hadoop.hbase.client.Result
8+
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
9+
import org.apache.hadoop.hbase.HBaseConfiguration
10+
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
11+
import org.apache.hadoop.mapreduce.Job
12+
import org.apache.hadoop.hbase.client.Put
13+
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
714

815
/**
916
* @author m.gaido
@@ -19,4 +26,26 @@ object IOHelper {
1926
.map(line => line._2.toString)
2027
}
2128

29+
def readHbaseTable(tableName:String):RDD[(ImmutableBytesWritable, Result)] = {
30+
@transient val hconf = HBaseConfiguration.create()
31+
hconf.set(TableInputFormat.INPUT_TABLE, tableName)
32+
val job = Job.getInstance(hconf)
33+
job.setInputFormatClass(classOf[TableInputFormat])
34+
35+
@transient val jobConf = job.getConfiguration
36+
jobConf.set(TableInputFormat.INPUT_TABLE, tableName)
37+
SparkContext.getOrCreate().newAPIHadoopRDD(jobConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
38+
}
39+
40+
def upsertToHbaseTable(tableName:String, rdd:RDD[(ImmutableBytesWritable, Put)]) = {
41+
@transient val hconf = HBaseConfiguration.create()
42+
hconf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
43+
44+
val job = Job.getInstance(hconf)
45+
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
46+
@transient val jobConf = job.getConfiguration
47+
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
48+
rdd.saveAsNewAPIHadoopDataset(jobConf)
49+
}
50+
2251
}

SparkInputLib/src/main/scala-2.10/it/mgaido/spark/io/package.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,22 @@ package it.mgaido.spark
55
*/
66
import org.apache.spark.rdd.RDD
77
import org.apache.spark.SparkContext
8+
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
9+
import org.apache.hadoop.hbase.client.Put
810

911
package object io {
1012
implicit class IOSparkContext(sparkContext: SparkContext){
1113

1214
def textFileWithHeader(path:String, numHeaderLines:Int):RDD[String] = {
1315
IOHelper.readTextFilesWithHeader(sparkContext,path, numHeaderLines)
1416
}
17+
18+
def hbaseTable(tableName:String) = IOHelper.readHbaseTable(tableName)
1519

1620
}
21+
22+
implicit class HBaseRDD(rdd:RDD[(ImmutableBytesWritable, Put)]){
23+
def upsertToHbaseTable(tableName:String) = IOHelper.upsertToHbaseTable(tableName, rdd)
24+
}
1725
}
1826

0 commit comments

Comments
 (0)