Skip to content

Commit 2ae0c26

Browse files
committed
Merge pull request #54 from gianm/default-fsp
Provide a default firehoseServicePattern. (fix #46)
2 parents d03dfd8 + 9b5cef2 commit 2ae0c26

File tree

3 files changed

+67
-39
lines changed

3 files changed

+67
-39
lines changed

README.md

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ You can set up and use a Finagle Service like this:
1919

2020
```java
2121
final String indexService = "overlord"; // Your overlord's service name.
22-
final String firehosePattern = "druid:firehose:%s"; // Make up a service pattern, include %s somewhere in it.
2322
final String discoveryPath = "/druid/discovery"; // Your overlord's druid.discovery.curator.path
2423
final String dataSource = "foo";
2524
final List<String> dimensions = ImmutableList.of("bar", "qux");
@@ -57,13 +56,7 @@ final Service<List<Map<String, Object>>, Integer> druidService = DruidBeams
5756
.builder(timestamper)
5857
.curator(curator)
5958
.discoveryPath(discoveryPath)
60-
.location(
61-
DruidLocation.create(
62-
indexService,
63-
firehosePattern,
64-
dataSource
65-
)
66-
)
59+
.location(DruidLocation.create(indexService, dataSource))
6760
.timestampSpec(timestampSpec)
6861
.rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularity.MINUTE))
6962
.tuning(
@@ -95,7 +88,6 @@ Or in Scala:
9588

9689
```scala
9790
val indexService = "overlord" // Your overlord's druid.service, with slashes replaced by colons.
98-
val firehosePattern = "druid:firehose:%s" // Make up a service pattern, include %s somewhere in it.
9991
val discoveryPath = "/druid/discovery" // Your overlord's druid.discovery.curator.path.
10092
val dataSource = "foo"
10193
val dimensions = Seq("bar", "qux")
@@ -111,7 +103,7 @@ val druidService = DruidBeams
111103
.builder(timestamper)
112104
.curator(curator)
113105
.discoveryPath(discoveryPath)
114-
.location(DruidLocation(indexService, firehosePattern, dataSource))
106+
.location(DruidLocation(indexService, dataSource))
115107
.rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularity.MINUTE))
116108
.tuning(
117109
ClusteredBeamTuning(
@@ -248,7 +240,6 @@ class MyBeamFactory extends BeamFactory[Map[String, Any]]
248240
curator.start()
249241

250242
val indexService = "overlord" // Your overlord's druid.service, with slashes replaced by colons.
251-
val firehosePattern = "druid:firehose:%s" // Make up a service pattern, include %s somewhere in it.
252243
val discoveryPath = "/druid/discovery" // Your overlord's druid.discovery.curator.path.
253244
val dataSource = "foo"
254245
val dimensions = Seq("bar")
@@ -258,7 +249,7 @@ class MyBeamFactory extends BeamFactory[Map[String, Any]]
258249
.builder((eventMap: Map[String, Any]) => new DateTime(eventMap("timestamp")))
259250
.curator(curator)
260251
.discoveryPath(discoveryPath)
261-
.location(DruidLocation(indexService, firehosePattern, dataSource))
252+
.location(DruidLocation(indexService, dataSource))
262253
.rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularity.MINUTE))
263254
.tuning(
264255
ClusteredBeamTuning(

core/src/main/scala/com/metamx/tranquility/druid/DruidEnvironment.scala

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class DruidEnvironment(
3232

3333
override def equals(other: Any) = other match {
3434
case that: DruidEnvironment =>
35-
(indexService, firehoseServicePattern) == (that.indexService, that.firehoseServicePattern)
35+
(indexService, firehoseServicePattern) ==(that.indexService, that.firehoseServicePattern)
3636
case _ => false
3737
}
3838

@@ -41,21 +41,41 @@ class DruidEnvironment(
4141

4242
object DruidEnvironment
4343
{
44+
def apply(indexServiceMaybeWithSlashes: String): DruidEnvironment = {
45+
new DruidEnvironment(indexServiceMaybeWithSlashes, defaultFirehoseServicePattern(indexServiceMaybeWithSlashes))
46+
}
47+
4448
def apply(indexServiceMaybeWithSlashes: String, firehoseServicePattern: String): DruidEnvironment = {
4549
new DruidEnvironment(indexServiceMaybeWithSlashes, firehoseServicePattern)
4650
}
4751

4852
/**
49-
* Factory method for creating DruidEnvironment objects. DruidEnvironments represent a Druid indexing service
50-
* cluster, locatable through service discovery.
51-
*
52-
* @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with
53-
* colons before searching for this in service discovery, because Druid does the
54-
* same thing before announcing.
55-
* @param firehoseServicePattern Make up a service pattern, include %s somewhere in it. This will be used for
56-
* internal service-discovery purposes, to help Tranquility find Druid indexing tasks.
57-
*/
53+
* Factory method for creating DruidEnvironment objects. DruidEnvironments represent a Druid indexing service
54+
* cluster, locatable through service discovery.
55+
*
56+
* @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with
57+
* colons before searching for this in service discovery, because Druid does the
58+
* same thing before announcing.
59+
* @param firehoseServicePattern Make up a service pattern, include %s somewhere in it. This will be used for
60+
* internal service-discovery purposes, to help Tranquility find Druid indexing tasks.
61+
*/
5862
def create(indexServiceMaybeWithSlashes: String, firehoseServicePattern: String): DruidEnvironment = {
59-
new DruidEnvironment(indexServiceMaybeWithSlashes, firehoseServicePattern)
63+
apply(indexServiceMaybeWithSlashes, firehoseServicePattern)
64+
}
65+
66+
/**
67+
* Factory method for creating DruidEnvironment objects. DruidEnvironments represent a Druid indexing service
68+
* cluster, locatable through service discovery.
69+
*
70+
* @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with
71+
* colons before searching for this in service discovery, because Druid does the
72+
* same thing before announcing.
73+
*/
74+
def create(indexServiceMaybeWithSlashes: String): DruidEnvironment = {
75+
apply(indexServiceMaybeWithSlashes)
76+
}
77+
78+
private def defaultFirehoseServicePattern(indexServiceMaybeWithSlashes: String) = {
79+
s"firehose:${indexServiceMaybeWithSlashes.replace('/', ':')}:%s"
6080
}
6181
}

core/src/main/scala/com/metamx/tranquility/druid/DruidLocation.scala

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,27 +33,27 @@ object DruidLocation
3333
}
3434

3535
/**
36-
* Factory method for creating DruidLocation objects. DruidLocations represent a specific Druid dataSource in a
37-
* specific Druid indexing service cluster.
38-
*
39-
* @param environment the Druid indexing service
40-
* @param dataSource the Druid dataSource
41-
*/
36+
* Factory method for creating DruidLocation objects. DruidLocations represent a specific Druid dataSource in a
37+
* specific Druid indexing service cluster.
38+
*
39+
* @param environment the Druid indexing service
40+
* @param dataSource the Druid dataSource
41+
*/
4242
def create(environment: DruidEnvironment, dataSource: String): DruidLocation = {
4343
DruidLocation(environment, dataSource)
4444
}
4545

4646
/**
47-
* Factory method for creating DruidLocation objects. DruidLocations represent a specific Druid dataSource in a
48-
* specific Druid indexing service cluster.
49-
*
50-
* @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with
51-
* colons before searching for this in service discovery, because Druid does the
52-
* same thing before announcing.
53-
* @param firehoseServicePattern Make up a service pattern, include %s somewhere in it. This will be used for
54-
* internal service-discovery purposes, to help Tranquility find Druid indexing tasks.
55-
* @param dataSource the Druid dataSource
56-
*/
47+
* Factory method for creating DruidLocation objects. DruidLocations represent a specific Druid dataSource in a
48+
* specific Druid indexing service cluster.
49+
*
50+
* @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with
51+
* colons before searching for this in service discovery, because Druid does the
52+
* same thing before announcing.
53+
* @param firehoseServicePattern Make up a service pattern, include %s somewhere in it. This will be used for
54+
* internal service-discovery purposes, to help Tranquility find Druid indexing tasks.
55+
* @param dataSource the Druid dataSource
56+
*/
5757
def create(
5858
indexServiceMaybeWithSlashes: String,
5959
firehoseServicePattern: String,
@@ -62,4 +62,21 @@ object DruidLocation
6262
{
6363
DruidLocation(DruidEnvironment(indexServiceMaybeWithSlashes, firehoseServicePattern), dataSource)
6464
}
65+
66+
/**
67+
* Factory method for creating DruidLocation objects. DruidLocations represent a specific Druid dataSource in a
68+
* specific Druid indexing service cluster.
69+
*
70+
* @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with
71+
* colons before searching for this in service discovery, because Druid does the
72+
* same thing before announcing.
73+
* @param dataSource the Druid dataSource
74+
*/
75+
def create(
76+
indexServiceMaybeWithSlashes: String,
77+
dataSource: String
78+
): DruidLocation =
79+
{
80+
DruidLocation(DruidEnvironment(indexServiceMaybeWithSlashes), dataSource)
81+
}
6582
}

0 commit comments

Comments
 (0)