@@ -30,10 +30,12 @@ import com.metamx.emitter.service.ServiceEmitter
3030import com .metamx .tranquility .beam .Beam
3131import com .metamx .tranquility .beam .ClusteredBeam
3232import com .metamx .tranquility .beam .ClusteredBeamTuning
33- import com .metamx .tranquility .beam .HashPartitionBeam
33+ import com .metamx .tranquility .beam .MergingPartitioningBeam
3434import com .metamx .tranquility .finagle .BeamService
3535import com .metamx .tranquility .finagle .FinagleRegistry
3636import com .metamx .tranquility .finagle .FinagleRegistryConfig
37+ import com .metamx .tranquility .partition .GenericTimeAndDimsPartitioner
38+ import com .metamx .tranquility .partition .Partitioner
3739import com .metamx .tranquility .typeclass .JavaObjectWriter
3840import com .metamx .tranquility .typeclass .JsonWriter
3941import com .metamx .tranquility .typeclass .ObjectWriter
@@ -50,29 +52,29 @@ import scala.collection.JavaConverters._
5052import scala .language .reflectiveCalls
5153
5254/**
53- * Builds Beams or Finagle services that send events to the Druid indexing service.
54- *
55- * {{{
56- * val curator = CuratorFrameworkFactory.newClient("localhost:2181", new BoundedExponentialBackoffRetry(100, 30000, 30))
57- * curator.start()
58- * val dataSource = "foo"
59- * val dimensions = Seq("bar")
60- * val aggregators = Seq(new LongSumAggregatorFactory("baz", "baz"))
61- * val service = DruidBeams
62- * .builder[Map[String, Any]](eventMap => new DateTime(eventMap("timestamp")))
63- * .curator(curator)
64- * .discoveryPath("/test/discovery")
65- * .location(DruidLocation(new DruidEnvironment("druid:local:indexer", "druid:local:firehose:%s"), dataSource))
66- * .rollup(DruidRollup(dimensions, aggregators, QueryGranularity.MINUTE))
67- * .tuning(new ClusteredBeamTuning(Granularity.HOUR, 10.minutes, 1, 1))
68- * .buildService()
69- * val future = service(Seq(Map("timestamp" -> "2010-01-02T03:04:05.678Z", "bar" -> "hey", "baz" -> 3)))
70- * println("result = %s" format Await.result(future))
71- * }}}
72- *
73- * Your event type (in this case, {{{Map[String, Any]}}} must be serializable via Jackson to JSON that Druid can
74- * understand. If Jackson is not an appropriate choice, you can provide an ObjectWriter via {{{.objectWriter(...)}}}.
75- */
55+ * Builds Beams or Finagle services that send events to the Druid indexing service.
56+ *
57+ * {{{
58+ * val curator = CuratorFrameworkFactory.newClient("localhost:2181", new BoundedExponentialBackoffRetry(100, 30000, 30))
59+ * curator.start()
60+ * val dataSource = "foo"
61+ * val dimensions = Seq("bar")
62+ * val aggregators = Seq(new LongSumAggregatorFactory("baz", "baz"))
63+ * val service = DruidBeams
64+ * .builder[Map[String, Any]](eventMap => new DateTime(eventMap("timestamp")))
65+ * .curator(curator)
66+ * .discoveryPath("/test/discovery")
67+ * .location(DruidLocation(new DruidEnvironment("druid:local:indexer", "druid:local:firehose:%s"), dataSource))
68+ * .rollup(DruidRollup(dimensions, aggregators, QueryGranularity.MINUTE))
69+ * .tuning(new ClusteredBeamTuning(Granularity.HOUR, 10.minutes, 1, 1))
70+ * .buildService()
71+ * val future = service(Seq(Map("timestamp" -> "2010-01-02T03:04:05.678Z", "bar" -> "hey", "baz" -> 3)))
72+ * println("result = %s" format Await.result(future))
73+ * }}}
74+ *
75+ * Your event type (in this case, {{{Map[String, Any]}}} must be serializable via Jackson to JSON that Druid can
76+ * understand. If Jackson is not an appropriate choice, you can provide an ObjectWriter via {{{.objectWriter(...)}}}.
77+ */
7678object DruidBeams
7779{
7880 val DefaultTimestampSpec = new TimestampSpec (" timestamp" , " iso" , null )
@@ -108,52 +110,77 @@ object DruidBeams
108110
109111 def rollup (rollup : DruidRollup ) = new Builder [EventType ](config.copy(_rollup = Some (rollup)))
110112
111- def timestampSpec (timestampSpec : TimestampSpec ) = new Builder [EventType ](config.copy(_timestampSpec = Some (timestampSpec)))
113+ def timestampSpec (timestampSpec : TimestampSpec ) = {
114+ new Builder [EventType ](config.copy(_timestampSpec = Some (timestampSpec)))
115+ }
112116
113- def clusteredBeamZkBasePath (path : String ) = new Builder [EventType ](config.copy(_clusteredBeamZkBasePath = Some (path)))
117+ def clusteredBeamZkBasePath (path : String ) = {
118+ new Builder [EventType ](config.copy(_clusteredBeamZkBasePath = Some (path)))
119+ }
114120
115121 def clusteredBeamIdent (ident : String ) = new Builder [EventType ](config.copy(_clusteredBeamIdent = Some (ident)))
116122
117- def druidBeamConfig (beamConfig : DruidBeamConfig ) = new Builder [EventType ](config.copy(_druidBeamConfig = Some (beamConfig)))
123+ def druidBeamConfig (beamConfig : DruidBeamConfig ) = {
124+ new Builder [EventType ](config.copy(_druidBeamConfig = Some (beamConfig)))
125+ }
118126
119127 def emitter (emitter : ServiceEmitter ) = new Builder [EventType ](config.copy(_emitter = Some (emitter)))
120128
121- def finagleRegistry (registry : FinagleRegistry ) = new Builder [EventType ](config.copy(_finagleRegistry = Some (registry)))
129+ def finagleRegistry (registry : FinagleRegistry ) = {
130+ new Builder [EventType ](config.copy(_finagleRegistry = Some (registry)))
131+ }
122132
123133 def timekeeper (timekeeper : Timekeeper ) = new Builder [EventType ](config.copy(_timekeeper = Some (timekeeper)))
124134
125- def beamDecorateFn (f : (Interval , Int ) => Beam [EventType ] => Beam [EventType ]) = new
126- Builder (config.copy(_beamDecorateFn = Some (f)))
135+ def beamDecorateFn (f : (Interval , Int ) => Beam [EventType ] => Beam [EventType ]) = {
136+ new Builder (config.copy(_beamDecorateFn = Some (f)))
137+ }
138+
139+ def beamMergeFn (f : Seq [Beam [EventType ]] => Beam [EventType ]) = {
140+ if (config._partitioner.nonEmpty) {
141+ throw new IllegalStateException (" Cannot set both 'beamMergeFn' and 'partitioner'" )
142+ }
143+ new Builder [EventType ](config.copy(_beamMergeFn = Some (f)))
144+ }
127145
128- def beamMergeFn (f : Seq [Beam [EventType ]] => Beam [EventType ]) = new Builder [EventType ](config.copy(_beamMergeFn = Some (f)))
146+ def partitioner (partitioner : Partitioner [EventType ]) = {
147+ if (config._beamMergeFn.nonEmpty) {
148+ throw new IllegalStateException (" Cannot set both 'beamMergeFn' and 'partitioner'" )
149+ }
150+ new Builder [EventType ](config.copy(_partitioner = Some (partitioner)))
151+ }
129152
130153 def alertMap (d : Dict ) = new Builder [EventType ](config.copy(_alertMap = Some (d)))
131154
132155 @ deprecated(" use .objectWriter(...)" , " 0.2.21" )
133- def eventWriter (writer : ObjectWriter [EventType ]) = new Builder [EventType ](config.copy(_objectWriter = Some (writer)))
156+ def eventWriter (writer : ObjectWriter [EventType ]) = {
157+ new Builder [EventType ](config.copy(_objectWriter = Some (writer)))
158+ }
134159
135- def objectWriter (writer : ObjectWriter [EventType ]) = new Builder [EventType ](config.copy(_objectWriter = Some (writer)))
160+ def objectWriter (writer : ObjectWriter [EventType ]) = {
161+ new Builder [EventType ](config.copy(_objectWriter = Some (writer)))
162+ }
136163
137164 def objectWriter (writer : JavaObjectWriter [EventType ]) = {
138165 new Builder [EventType ](config.copy(_objectWriter = Some (ObjectWriter .wrap(writer))))
139166 }
140167
141- def eventTimestamped (timeFn : EventType => DateTime ) = new Builder [EventType ](
142- config.copy(
143- _timestamper = Some (
144- new Timestamper [EventType ]
145- {
146- def timestamp (a : EventType ) = timeFn(a)
147- }
168+ def eventTimestamped (timeFn : EventType => DateTime ) = {
169+ new Builder [EventType ](
170+ config.copy(
171+ _timestamper = Some (
172+ new Timestamper [EventType ]
173+ {
174+ def timestamp (a : EventType ) = timeFn(a)
175+ }
176+ )
148177 )
149178 )
150- )
179+ }
151180
152181 def buildBeam (): Beam [EventType ] = {
153182 val things = config.buildAll()
154- implicit val eventTimestamped = things.timestamper getOrElse {
155- throw new IllegalArgumentException (" WTF?! Should have had a Timestamperable event..." )
156- }
183+ implicit val eventTimestamped = things.timestamper
157184 val lifecycle = new Lifecycle
158185 val indexService = new IndexService (
159186 things.location.environment,
@@ -222,6 +249,7 @@ object DruidBeams
222249 _finagleRegistry : Option [FinagleRegistry ] = None ,
223250 _timekeeper : Option [Timekeeper ] = None ,
224251 _beamDecorateFn : Option [(Interval , Int ) => Beam [EventType ] => Beam [EventType ]] = None ,
252+ _partitioner : Option [Partitioner [EventType ]] = None ,
225253 _beamMergeFn : Option [Seq [Beam [EventType ]] => Beam [EventType ]] = None ,
226254 _alertMap : Option [Dict ] = None ,
227255 _objectWriter : Option [ObjectWriter [EventType ]] = None ,
@@ -280,19 +308,30 @@ object DruidBeams
280308 val beamDecorateFn = _beamDecorateFn getOrElse {
281309 (interval : Interval , partition : Int ) => (beam : Beam [EventType ]) => beam
282310 }
283- val beamMergeFn = _beamMergeFn getOrElse {
284- (beams : Seq [Beam [EventType ]]) => new HashPartitionBeam [EventType ](beams.toIndexedSeq)
285- }
286311 val alertMap = _alertMap getOrElse Map .empty
287312 val objectWriter = _objectWriter getOrElse {
288313 new JsonWriter [EventType ]
289314 {
290- protected def viaJsonGenerator (a : EventType , jg : JsonGenerator ) {
315+ override protected def viaJsonGenerator (a : EventType , jg : JsonGenerator ): Unit = {
291316 scalaObjectMapper.writeValue(jg, a)
292317 }
293318 }
294319 }
295- val timestamper = _timestamper
320+ val timestamper = _timestamper getOrElse {
321+ throw new IllegalArgumentException (" WTF?! Should have had a Timestamperable event..." )
322+ }
323+ val beamMergeFn = _beamMergeFn getOrElse {
324+ val partitioner = _partitioner getOrElse {
325+ GenericTimeAndDimsPartitioner .create(
326+ timestamper,
327+ timestampSpec,
328+ rollup
329+ )
330+ }
331+ (beams : Seq [Beam [EventType ]]) => {
332+ new MergingPartitioningBeam [EventType ](partitioner, beams.toIndexedSeq)
333+ }
334+ }
296335 }
297336 }
298337
0 commit comments