Skip to content

Commit dd9b3bd

Browse files
committed
fix bug and added unit tests
1 parent 90bb27d commit dd9b3bd

File tree

2 files changed

+131
-49
lines changed

2 files changed

+131
-49
lines changed

core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -132,39 +132,27 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
132132

133133
private[this] val rand = new Random
134134

135-
// Reverse sorted list (by interval start time) of Merged beams we are currently aware of
136-
private[this] var beams: List[Beam[EventType]] =
137-
{
135+
private[this] var zkMetaCache = {
138136
try {
139137
val dataPath = zpathWithDefault("data", ClusteredBeamMeta.empty.toBytes(objectMapper))
140138
curator.sync().forPath(dataPath)
141-
val zkMetaData = ClusteredBeamMeta.fromBytes(objectMapper, curator.getData.forPath(dataPath)).fold(
139+
ClusteredBeamMeta.fromBytes(objectMapper, curator.getData.forPath(dataPath)).fold(
142140
e => {
143141
emitAlert(e, log, emitter, WARN, "Failed to read beam data from cache: %s" format identifier, alertMap)
144142
throw e
145143
},
146144
meta => meta
147145
)
148-
log.info("Synced from ZK, Found Beams - [%s]", zkMetaData)
149-
(
150-
zkMetaData.beamDictss map {
151-
beamDicts =>
152-
beamMergeFn(
153-
beamDicts._2.zipWithIndex map {
154-
case (beamDict, partitionNum) =>
155-
val decorate = beamDecorateFn(new Interval(beamDict.get("interval").get), partitionNum)
156-
decorate(beamMaker.fromDict(beamDict))
157-
}
158-
)
159-
}
160-
).toList sortBy (-_.getInterval().get.start.millis)
161146
}
162147
catch {
163148
case e: Throwable =>
164149
throw e
165150
}
166151
}
167152

153+
// Reverse sorted list (by interval start time) of Merged beams we are currently aware of
154+
private[this] var beams: List[Beam[EventType]] = Nil
155+
168156
// Lock updates to "localLatestCloseTime" and "beams" to prevent races.
169157
private[this] val beamWriteMonitor = new AnyRef
170158

@@ -184,6 +172,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
184172
)
185173
val newMeta = f(prevMeta)
186174
if (newMeta != prevMeta) {
175+
zkMetaCache = newMeta
187176
val newMetaBytes = newMeta.toBytes(objectMapper)
188177
log.info("Writing new beam data to[%s]: %s", dataPath, new String(newMetaBytes))
189178
curator.setData().forPath(dataPath, newMetaBytes)
@@ -219,7 +208,9 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
219208
case (interval, Some(foundBeam)) if foundBeam.getInterval().get.end + tuning.windowPeriod <= now => Future.value(None)
220209
case (interval, Some(foundBeam)) => Future.value(Some(foundBeam))
221210
case (interval, None) if interval.start <= localLatestCloseTime => Future.value(None)
211+
case (interval, None) if interval.end + tuning.windowPeriod <= now => Future.value(None)
222212
case (interval, None) if !creationInterval.overlaps(interval) => Future.value(None)
213+
case (interval, None) if interval.toDurationMillis == 0 => Future.value(None)
223214
case (interval, None) =>
224215
// We may want to create new merged beam(s). Acquire the zk mutex and examine the situation.
225216
// This could be more efficient, but it's happening infrequently so it's probably not a big deal.
@@ -228,9 +219,8 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
228219
log.info("Trying to create new beam with interval [%s]", interval)
229220
// We want to create this new beam
230221
// But first let us check in ZK if there is already any beam in ZK covering this interval
231-
232222
val beamDicts: Seq[untyped.Dict] = prev.beamDictss.collectFirst[Seq[untyped.Dict]]({
233-
case x if new Interval(x._2.head.get("interval").get).overlaps(interval) => x._2
223+
case x if new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC).overlaps(interval) => x._2
234224
}) getOrElse Nil
235225

236226
if (beamDicts.size >= tuning.partitions) {
@@ -250,7 +240,11 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
250240
interval.start
251241
)
252242
prev
253-
} else if (beamDicts.nonEmpty){
243+
} else if (beamDicts.nonEmpty &&
244+
!beamDicts.exists(
245+
beamDict => new Interval(beamDict.get("interval").get, ISOChronology.getInstanceUTC).contains(interval)
246+
))
247+
{
254248
throw new IllegalStateException(
255249
"WTF?? Requested to create a beam for interval [%s] which overlaps with existing beam [%s]" format(interval, beamDicts)
256250
)
@@ -280,7 +274,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
280274
val newBeamDictss: Map[Long, Seq[Dict]] = (prev.beamDictss filterNot {
281275
case (millis, beam) =>
282276
// Expire old beamDicts
283-
new Interval(beam.head.get("interval").get).end + tuning.windowPeriod < now
277+
new Interval(beam.head.get("interval").get, ISOChronology.getInstanceUTC).end + tuning.windowPeriod < now
284278
}) ++ (for (ts <- timestampsToCover) yield {
285279
val tsPrevDicts = prev.beamDictss.getOrElse(ts.millis, Nil)
286280
log.info(
@@ -390,22 +384,32 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
390384
// Most of the times head beam can handle the event as the beams is reverse sorted list by start time of interval
391385

392386
intervalBeamPair.find(_._1.contains(eventTimestamp)) match {
393-
case Some(mapEntry) => (mapEntry._1, mapEntry._2)
387+
case Some(x) => (x._1, x._2)
394388
case None =>
395-
val requiredInterval = tuning.segmentBucket(eventTimestamp)
389+
val requiredInterval = tuning.segmentBucket(eventTimestamp).withChronology(ISOChronology.getInstanceUTC)
396390
// Check to see if the interval for which we want to create a beam overlaps with interval of any existing beam
397391
// this may happen if segment granularity is changed
398-
val mayBeOverlappingInterval = beams.collectFirst {
399-
case x if x.getInterval().get.overlaps(requiredInterval) => x.getInterval().get
400-
} getOrElse new Interval(0,0)
401-
402-
(
403-
new Interval(
404-
Math.max(mayBeOverlappingInterval.end.millis, requiredInterval.start.millis),
405-
Math.max(mayBeOverlappingInterval.end.millis, requiredInterval.end.millis)
406-
),
407-
None
408-
)
392+
val actualInterval = zkMetaCache.beamDictss.toSeq.sortBy(-_._1) collectFirst {
393+
case x if new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC).contains(eventTimestamp) =>
394+
new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC)
395+
} match {
396+
case Some(x) => x
397+
case None =>
398+
zkMetaCache.beamDictss.toSeq.sortBy(-_._1) collectFirst {
399+
case x if new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC).overlaps(requiredInterval) =>
400+
new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC)
401+
} match {
402+
case None => requiredInterval
403+
case Some(x) =>
404+
new Interval(
405+
Math.max(x.end.millis, requiredInterval.start.millis),
406+
Math.max(x.end.millis, requiredInterval.end.millis),
407+
ISOChronology.getInstanceUTC
408+
)
409+
}
410+
}
411+
intervalBeamPair.put(actualInterval, None)
412+
(actualInterval, None)
409413
}
410414
}
411415

@@ -430,7 +434,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
430434
!beams.exists(_.getInterval().get.contains(tbwTimestamp))
431435
) yield {
432436
// Create beam asynchronously
433-
beam((tuning.segmentBucket(tbwTimestamp), None), now)
437+
beam((tuning.segmentBucket(tbwTimestamp).withChronology(ISOChronology.getInstanceUTC), None), now)
434438
})
435439
// Propagate data
436440
val countFutures = for ((beamIntervalPair, eventGroup) <- grouped) yield {

core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala

Lines changed: 92 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import org.apache.curator.framework.CuratorFramework
4444
import org.joda.time.DateTimeZone
4545
import org.joda.time.DateTime
4646
import org.joda.time.Interval
47+
import org.joda.time.chrono.ISOChronology
4748
import org.scala_tools.time.Implicits._
4849
import org.scalatest.BeforeAndAfter
4950
import org.scalatest.FunSuite
@@ -68,7 +69,14 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
6869
SimpleEvent(new DateTime("2012-01-01T01:10Z"), Map("foo" -> "e")),
6970
SimpleEvent(new DateTime("2012-01-01T01:20Z"), Map("foo" -> "f")),
7071
SimpleEvent(new DateTime("2012-01-01T03:05Z"), Map("foo" -> "g")),
71-
SimpleEvent(new DateTime("2012-01-01T03:20Z"), Map("foo" -> "h"))
72+
SimpleEvent(new DateTime("2012-01-01T03:20Z"), Map("foo" -> "h")),
73+
SimpleEvent(new DateTime("2012-01-01T01:05Z"), Map("foo" -> "i")),
74+
SimpleEvent(new DateTime("2012-01-01T01:06Z"), Map("foo" -> "j")),
75+
SimpleEvent(new DateTime("2012-01-01T01:07Z"), Map("foo" -> "k")),
76+
SimpleEvent(new DateTime("2012-01-01T01:06Z"), Map("foo" -> "l")),
77+
SimpleEvent(new DateTime("2012-01-01T01:05Z"), Map("foo" -> "m")),
78+
SimpleEvent(new DateTime("2012-01-01T01:09Z"), Map("foo" -> "n")),
79+
SimpleEvent(new DateTime("2012-01-01T01:10Z"), Map("foo" -> "o"))
7280
) map {
7381
x => x.fields("foo") -> x
7482
}).toMap
@@ -79,14 +87,18 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
7987
val localZone = new DateTime().getZone
8088

8189
def buffers = _lock.synchronized {
82-
_buffers.values.map(x => (x.timestamp.withZone(localZone), x.partition, x.open, x.buffer.toSeq)).toSet
90+
_buffers.values.map(x => (x.interval.start.withZone(localZone), x.partition, x.open, x.buffer.toSeq)).toSet
91+
}
92+
93+
def buffersWithInterval = _lock.synchronized {
94+
_buffers.values.map(x => (x.interval, x.partition, x.open, x.buffer.toSeq)).toSet
8395
}
8496

8597
def beamsList = _lock.synchronized {
8698
_beams.toList
8799
}
88100

89-
class EventBuffer(val timestamp: DateTime, val partition: Int)
101+
class EventBuffer(val interval: Interval, val partition: Int)
90102
{
91103
val buffer: mutable.Buffer[SimpleEvent] = mutable.ListBuffer()
92104
@volatile var open: Boolean = true
@@ -113,20 +125,20 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
113125
def getInterval() = None
114126
}
115127

116-
class TestingBeam(val timestamp: DateTime, val partition: Int, val uuid: String = UUID.randomUUID().toString)
128+
class TestingBeam(val interval: Interval, val partition: Int, val uuid: String = UUID.randomUUID().toString)
117129
extends Beam[SimpleEvent]
118130
{
119131
_lock.synchronized {
120132
_beams += this
121133
}
122134

123-
def getInterval() = None
135+
def getInterval() = Some(interval)
124136

125137
def propagate(_events: Seq[SimpleEvent]) = _lock.synchronized {
126138
if (_events.contains(events("defunct"))) {
127139
Future.exception(new DefunctBeamException("Defunct"))
128140
} else {
129-
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(timestamp, partition))
141+
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(interval, partition))
130142
buffer.open = true
131143
buffer.buffer ++= _events
132144
Future.value(_events.size)
@@ -135,35 +147,35 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
135147

136148
def close() = _lock.synchronized {
137149
_beams -= this
138-
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(timestamp, partition))
150+
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(interval, partition))
139151
buffer.open = false
140152
Future.Done
141153
}
142154

143155
def toDict = Dict(
144-
"timestamp" -> timestamp.toString(),
156+
"interval" -> interval.toString,
145157
"partition" -> partition,
146158
"uuid" -> uuid
147159
)
148160
}
149161

150162
class TestingBeamMaker extends BeamMaker[SimpleEvent, TestingBeam]
151163
{
152-
def newBeam(interval: Interval, partition: Int) = new TestingBeam(interval.start, partition)
164+
def newBeam(interval: Interval, partition: Int) = new TestingBeam(interval, partition)
153165

154166
def toDict(beam: TestingBeam) = {
155167
Dict(
156-
"timestamp" -> beam.timestamp.toString(),
168+
"interval" -> beam.interval.toString,
157169
"partition" -> beam.partition,
158170
"uuid" -> beam.uuid
159171
)
160172
}
161173

162174
def fromDict(d: Dict) = {
163-
val timestamp = new DateTime(d("timestamp"))
175+
val interval= new Interval(d("interval"))
164176
val partition = int(d("partition"))
165177
val uuid = str(d("uuid"))
166-
new TestingBeam(timestamp, partition, uuid)
178+
new TestingBeam(interval, partition, uuid)
167179
}
168180
}
169181

@@ -357,6 +369,72 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
357369
}
358370
}
359371

372+
test("IncreaseGranularity") {
373+
withLocalCurator {
374+
curator =>
375+
val oldTuning = defaultTuning.copy(segmentGranularity = Granularity.MINUTE, windowPeriod = 1.minute)
376+
val newTuning = oldTuning.copy(segmentGranularity = Granularity.FIVE_MINUTE)
377+
378+
val beamsA = newBeams(curator, oldTuning)
379+
beamsA.timekeeper.now = start
380+
beamsA.blockagate(Seq("i") map events)
381+
beamsA.blockagate(Seq("i") map events)
382+
beamsA.timekeeper.now = start + 1.minute
383+
beamsA.blockagate(Seq("j") map events)
384+
beamsA.blockagate(Seq("j") map events)
385+
386+
val beamsB = newBeams(curator, newTuning)
387+
beamsB.timekeeper.now = start + 2.minute
388+
beamsB.blockagate(Seq("k") map events)
389+
beamsB.blockagate(Seq("k") map events)
390+
beamsB.blockagate(Seq("l") map events)
391+
beamsB.blockagate(Seq("l") map events)
392+
beamsB.blockagate(Seq("m") map events)
393+
beamsB.blockagate(Seq("m") map events)
394+
beamsB.blockagate(Seq("n") map events)
395+
beamsB.blockagate(Seq("n") map events)
396+
397+
Await.result(beamsA.close())
398+
399+
assert(buffersWithInterval === Set(
400+
(new Interval("2012-01-01T01:05Z/2012-01-01T01:06Z", ISOChronology.getInstanceUTC), 0, false, Seq("i") map events),
401+
(new Interval("2012-01-01T01:05Z/2012-01-01T01:06Z", ISOChronology.getInstanceUTC), 1, false, Seq("i") map events),
402+
// "j" and "l" are in same partition as diff beams were used to propagate them
403+
(new Interval("2012-01-01T01:06Z/2012-01-01T01:07Z", ISOChronology.getInstanceUTC), 0, false, Seq("j", "l") map events),
404+
(new Interval("2012-01-01T01:06Z/2012-01-01T01:07Z", ISOChronology.getInstanceUTC), 1, false, Seq("j", "l") map events),
405+
(new Interval("2012-01-01T01:07Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 0, true, Seq("k", "n") map events),
406+
(new Interval("2012-01-01T01:07Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 1, true, Seq("k", "n") map events)
407+
))
408+
}
409+
}
410+
411+
test("DecreaseGranularity") {
412+
withLocalCurator {
413+
curator =>
414+
val oldTuning = defaultTuning.copy(segmentGranularity = Granularity.FIVE_MINUTE)
415+
val newTuning = oldTuning.copy(segmentGranularity = Granularity.MINUTE)
416+
417+
val beamsA = newBeams(curator, oldTuning)
418+
beamsA.timekeeper.now = start
419+
beamsA.blockagate(Seq("i") map events)
420+
421+
val beamsB = newBeams(curator, newTuning)
422+
beamsB.timekeeper.now = start + 4.minute
423+
beamsB.blockagate(Seq("j") map events)
424+
beamsB.blockagate(Seq("n") map events)
425+
beamsB.blockagate(Seq("o") map events)
426+
beamsB.blockagate(Seq("o") map events)
427+
Await.result(beamsB.close())
428+
429+
assert(buffersWithInterval === Set(
430+
(new Interval("2012-01-01T01:05Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 0, false, Seq("i", "j") map events),
431+
(new Interval("2012-01-01T01:05Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 1, false, Seq("n") map events),
432+
(new Interval("2012-01-01T01:10Z/2012-01-01T01:11Z", ISOChronology.getInstanceUTC), 0, false, Seq("o") map events),
433+
(new Interval("2012-01-01T01:10Z/2012-01-01T01:11Z", ISOChronology.getInstanceUTC), 1, false, Seq("o") map events)
434+
))
435+
}
436+
}
437+
360438
test("DefunctBeam") {
361439
withLocalCurator {
362440
curator =>
@@ -389,10 +467,10 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
389467
))
390468
val desired = List("2012-01-01T00Z", "2012-01-01T00Z", "2012-01-01T01Z", "2012-01-01T01Z").map(new DateTime(_))
391469
val startTime = System.currentTimeMillis()
392-
while (System.currentTimeMillis() < startTime + 2000 && beamsList.map(_.timestamp).sortBy(_.millis) != desired) {
470+
while (System.currentTimeMillis() < startTime + 2000 && beamsList.map(_.interval.start).sortBy(_.millis) != desired) {
393471
Thread.sleep(100)
394472
}
395-
assert(beamsList.map(_.timestamp).sortBy(_.millis) === desired)
473+
assert(beamsList.map(_.interval.start).sortBy(_.millis) === desired)
396474
}
397475
}
398476

0 commit comments

Comments
 (0)