Skip to content

Commit 2bd50a4

Browse files
committed
Merge pull request #2 from dclim/kafka-support
add Kafka support
2 parents c6baba4 + 82d6791 commit 2bd50a4

File tree

5 files changed

+94
-13
lines changed

5 files changed

+94
-13
lines changed

build.sbt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,20 @@ ReleaseKeys.publishArtifactsAction := PgpKeys.publishSigned.value
5555

5656
libraryDependencies ++= Seq(
5757
"com.metamx" %% "scala-util" % "1.11.3" exclude("log4j", "log4j") force(),
58-
"com.ircclouds.irc" % "irc-api" % "1.0-0014"
58+
"com.ircclouds.irc" % "irc-api" % "1.0-0014",
59+
"org.apache.kafka" % "kafka-clients" % "0.9.0.0",
60+
"ch.qos.logback" % "logback-core" % "1.1.2",
61+
"ch.qos.logback" % "logback-classic" % "1.1.2"
5962
)
6063

64+
lazy val root = project.in(file(".")).enablePlugins(JavaAppPackaging)
65+
6166
//
6267
// Test stuff
6368
//
6469

6570
libraryDependencies ++= Seq(
6671
"org.scalatest" %% "scalatest" % "2.2.5" % "test",
6772
"junit" % "junit" % "4.11" % "test",
68-
"com.novocode" % "junit-interface" % "0.11-RC1" % "test",
69-
"ch.qos.logback" % "logback-core" % "1.1.2" % "test",
70-
"ch.qos.logback" % "logback-classic" % "1.1.2" % "test"
73+
"com.novocode" % "junit-interface" % "0.11-RC1" % "test"
7174
)

project/plugins.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ addSbtPlugin("com.github.gseitz" % "sbt-release" % "0.8.5")
99
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
1010

1111
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
12+
13+
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.1.0")

src/main/scala/io/imply/wikiticker/ConsoleTicker.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,28 @@ import com.metamx.common.scala.Jackson
2121
import com.metamx.common.scala.Logging
2222
import com.metamx.common.scala.lifecycle._
2323
import com.twitter.app.Flags
24-
import java.io.File
25-
import java.io.FileOutputStream
26-
import java.io.PrintStream
2724

2825
object ConsoleTicker extends Logging
2926
{
3027
def main(args: Array[String]) {
3128
val flags = new Flags("wikiticker-console")
32-
val out = flags("out", "-", "write to file")
29+
val out = flags("out", "console", "output destination [console, file, kafka]")
30+
val fileName = flags("filename", "wikiticker.out", "file name for output")
31+
val brokers = flags("brokers", "localhost:9092", "Kafka [bootstrap.servers]")
32+
val topic = flags("topic", "wikipedia", "Kafka topic")
33+
3334
flags.parseArgs(args)
3435

35-
val outStream = out() match {
36-
case "-" => System.out
37-
case fileName => new PrintStream(new FileOutputStream(new File(fileName)))
36+
val writer = out() match {
37+
case "console" => new ConsoleWriter
38+
case "file" => new FileWriter(fileName())
39+
case "kafka" => new KafkaWriter(brokers(), topic())
40+
case _ => throw new IllegalArgumentException("-out must be one of [console, file, kafka]")
3841
}
3942

4043
val listener = new MessageListener {
4144
override def process(message: Message) = {
42-
outStream.println(Jackson.generate(message.toMap))
45+
writer.write(Jackson.generate(message.toMap))
4346
}
4447
}
4548

@@ -112,6 +115,7 @@ object ConsoleTicker extends Logging
112115
ticker.start()
113116
} onStop {
114117
ticker.stop()
118+
writer.shutdown()
115119
}
116120

117121
try {
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2015 Imply Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.imply.wikiticker
18+
19+
import java.io.File
20+
import java.io.FileOutputStream
21+
import java.io.PrintStream
22+
import java.util.Properties
23+
import org.apache.kafka.clients.producer.KafkaProducer
24+
import org.apache.kafka.clients.producer.ProducerRecord
25+
import org.apache.kafka.common.serialization.StringSerializer
26+
27+
sealed trait Writer
28+
{
29+
def write(data: String): Unit
30+
31+
def shutdown(): Unit
32+
}
33+
34+
class ConsoleWriter extends Writer
35+
{
36+
override def write(data: String): Unit = {
37+
System.out.println(data)
38+
}
39+
40+
override def shutdown(): Unit = {}
41+
}
42+
43+
class FileWriter(fileName: String) extends Writer
44+
{
45+
private val outStream = new PrintStream(new FileOutputStream(new File(fileName)))
46+
47+
override def write(data: String): Unit = {
48+
outStream.println(data)
49+
}
50+
51+
override def shutdown(): Unit = {
52+
outStream.close()
53+
}
54+
}
55+
56+
class KafkaWriter(brokers: String, topic: String) extends Writer
57+
{
58+
private val props = new Properties()
59+
props.put("bootstrap.servers", brokers)
60+
props.put("acks", "all")
61+
props.put("retries", "3")
62+
63+
private val producer = new KafkaProducer[String, String](props, new StringSerializer(), new StringSerializer())
64+
65+
override def write(data: String): Unit = {
66+
producer.send(new ProducerRecord[String, String](topic, data)).get()
67+
}
68+
69+
override def shutdown(): Unit = {
70+
producer.close()
71+
}
72+
}

version.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version in ThisBuild := "0.3-SNAPSHOT"
1+
version in ThisBuild := "0.3"

0 commit comments

Comments
 (0)