@@ -25,27 +25,55 @@ import collection.mutable.{ Map => MutableMap }
2525object Memory {
2626 implicit def toSource [T ](traversable : TraversableOnce [T ])(implicit mf : Manifest [T ]): Producer [Memory , T ] =
2727 Producer .source[Memory , T ](traversable)
28+
29+ case class Identity [T <: AnyRef ](val unwrap : T ) {
30+ override def equals (that : Any ) = that match {
31+ case i : Identity [_] => unwrap.eq(i.unwrap)
32+ case _ => false
33+ }
34+ override def hashCode = System .identityHashCode(unwrap)
35+ }
36+
2837}
2938
3039trait MemoryService [- K , + V ] {
3140 def get (k : K ): Option [V ]
3241}
3342
3443class Memory (implicit jobID : JobId = JobId (" default.memory.jobId" )) extends Platform [Memory ] {
44+ import Memory .Identity
45+
3546 type Source [T ] = TraversableOnce [T ]
3647 type Store [K , V ] = MutableMap [K , V ]
3748 type Sink [- T ] = (T => Unit )
3849 type Service [- K , + V ] = MemoryService [K , V ]
3950 type Plan [T ] = Stream [T ]
4051
4152 private type Prod [T ] = Producer [Memory , T ]
42- private type JamfMap = HMap [Prod , Stream ]
53+ private type IProd [T ] = Identity [Producer [Memory , T ]]
54+
55+ // Key is wrapped in Identity to get reference equality semantics
56+ private type JamfMap = HMap [IProd , Stream ]
4357
4458 def counter (group : Group , name : Name ): Option [Long ] =
4559 MemoryStatProvider .getCountersForJob(jobID).flatMap { _.get(group.getString + " /" + name.getString).map { _.get } }
4660
61+ /**
62+ * On the memory platform the notion of summingbird stream literally
63+ * translates to scala streams. We plan the topology by creating
64+ * streams from sources and transforming them using other components
65+ * of the topology. Execution is then just a matter of forcing this stream.
66+ *
67+ * Since the topology is a DAG, some parts of the graph can be shared
68+ * between multiple roots(TailProducers combined with AlsoProducer). To
69+ * avoid duplicating the shared parts of the graph we keep track of
70+ * planned portions in a map. This map uses reference equality because
71+ * we care about the root components themselves and not their content.
72+ * e.g. summer uses a mutable map for store. If the content of this
73+ * mutable map changes it doesn't mean that we have a different summer.
74+ */
4775 private def toStream [T ](outerProducer : Prod [T ], jamfs : JamfMap ): (Stream [T ], JamfMap ) =
48- jamfs.get(outerProducer) match {
76+ jamfs.get(Identity ( outerProducer) ) match {
4977 case Some (s) => (s, jamfs)
5078 case None =>
5179 val (s, m) = outerProducer match {
@@ -83,6 +111,7 @@ class Memory(implicit jobID: JobId = JobId("default.memory.jobId")) extends Plat
83111 // Plan the first one, but ignore it
84112 val (left, leftM) = toStream(l, jamfs)
85113 val (right, rightM) = toStream(r, leftM)
114+
86115 // We need to force all of left to make sure any
87116 // side effects in write happen
88117 lazy val lforcedEmpty = left.filter(_ => false )
@@ -113,7 +142,7 @@ class Memory(implicit jobID: JobId = JobId("default.memory.jobId")) extends Plat
113142 }
114143 // scala can't infer that s is the right type through case statements above
115144 val st = s.asInstanceOf [Stream [T ]]
116- (st, m + (outerProducer -> st))
145+ (st, m + (Identity ( outerProducer) -> st))
117146 }
118147
119148 def plan [T ](prod : TailProducer [Memory , T ]): Stream [T ] = {
0 commit comments