@@ -2015,6 +2015,224 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
20152015  ):  Stream [F2 , O2 ] = 
20162016    that.mergeHaltL(this )
20172017
2018+   /**  Merges two streams with priority given to the first stream. 
2019+     * 
2020+     * Internally, this uses two bounded queues (of one element). 
2021+     * Queue has tryTake() which allows to try a non-blocking read. 
2022+     * This is used to check for elements on the prioritized queue, 
2023+     * before a blocking read through racePair() is tried on both 
2024+     * queues, if no data is available on the prioritized queue. 
2025+     */  
2026+   def  mergePreferred [F2 [x] >:  F [x], O2  >:  O ](
2027+       that : Stream [F2 , O2 ]
2028+   )(implicit  F :  Concurrent [F2 ]):  Stream [F2 , O2 ] =  {
2029+     val  fstream :  F2 [Stream [F2 , O2 ]] = 
2030+       for  {
2031+         interrupt <-  F .deferred[Unit ]
2032+         resultL <-  F .deferred[Either [Throwable , Unit ]]
2033+         resultR <-  F .deferred[Either [Throwable , Unit ]]
2034+         resultQL <-  Queue .bounded[F2 , Option [Stream [F2 , O2 ]]](1 )
2035+         resultQR <-  Queue .bounded[F2 , Option [Stream [F2 , O2 ]]](1 )
2036+       } yield  {
2037+ 
2038+         def  watchInterrupted (str : Stream [F2 , O2 ]):  Stream [F2 , O2 ] = 
2039+           str.interruptWhen(interrupt.get.attempt)
2040+ 
2041+         //  action to signal that one stream is finished (by putting a None in it)
2042+         def  doneAndClose (q : Queue [F2 , Option [Stream [F2 , O2 ]]]):  F2 [Unit ] =  q.offer(None ).void
2043+ 
2044+         //  action to interrupt the processing of both streams by completing interrupt
2045+         val  signalInterruption :  F2 [Unit ] =  interrupt.complete(()).void
2046+ 
2047+         //  Read from a stream and (possibly blocking) write to the bounded queue for that stream
2048+         def  go (s : Stream [F2 , O2 ], q : Queue [F2 , Option [Stream [F2 , O2 ]]]):  Pull [F2 , Nothing , Unit ] = 
2049+           s.pull.uncons
2050+             .flatMap {
2051+               case  Some ((hd, tl)) => 
2052+                 val  send  =  q.offer(Some (Stream .chunk(hd)))
2053+                 Pull .eval(send) >>  go(tl, q)
2054+               case  None  => 
2055+                 Pull .done
2056+             }
2057+ 
2058+         def  runStream (
2059+             s : Stream [F2 , O2 ],
2060+             whenDone : Deferred [F2 , Either [Throwable , Unit ]],
2061+             q : Queue [F2 , Option [Stream [F2 , O2 ]]]
2062+         ):  F2 [Unit ] =  {
2063+           val  str  =  watchInterrupted(go(s, q).stream)
2064+           str.compile.drain.attempt
2065+             .flatMap {
2066+               //  signal completion of our side before we will signal interruption,
2067+               //  to make sure our result is always available to others
2068+               case  r @  Left (_) => 
2069+                 whenDone.complete(r) >>  signalInterruption
2070+               case  r @  Right (_) => 
2071+                 whenDone.complete(r) >>  doneAndClose(q)
2072+             }
2073+         }
2074+ 
2075+         //  Typedef for the fibres that read from the queues.
2076+         //  That's contained in the Either returned by racePair()
2077+         type  FBR  =  Fiber [F2 , Throwable , Option [Stream [F2 , O2 ]]]
2078+ 
2079+         //  An ADT for tracking state of the two queues.
2080+         //  The types describe the state, starting with BothActive.
2081+         //  Next state is either LeftDone or RightDone.
2082+         //  Final state is BothDone.
2083+         //  The members of those states store the loosing fibre
2084+         //  of a racePair()-call, which will be reused during the
2085+         //  next read.
2086+         sealed  trait  QueuesState 
2087+         final  case  class  BothActive (v : Option [Either [FBR , FBR ]]) extends  QueuesState 
2088+         final  case  class  LeftDone (rFbr : Option [FBR ]) extends  QueuesState 
2089+         final  case  class  RightDone (lFbr : Option [FBR ]) extends  QueuesState 
2090+         case  object  BothDone  extends  QueuesState 
2091+ 
2092+         //  Race the given effects, returning the result of the winner
2093+         //  plus the still active fibre of the looser
2094+         def  raceQueues (
2095+             lq : F2 [Option [Stream [F2 , O2 ]]],
2096+             rq : F2 [Option [Stream [F2 , O2 ]]]
2097+         ):  F2 [(Option [Stream [F2 , O2 ]], Either [FBR , FBR ])] = 
2098+           F .racePair(lq, rq)
2099+             .flatMap {
2100+               case  Left ((result, fiber)) => 
2101+                 result.embedError.map(_ ->  fiber.asRight[FBR ])
2102+               case  Right ((fiber, result)) => 
2103+                 result.embedError.map(_ ->  fiber.asLeft[FBR ])
2104+             }
2105+ 
2106+         //  stream that is generated from pumping out the elements of the queue.
2107+         val  pumpFromQueue :  Stream [F2 , O2 ] = 
2108+           Stream 
2109+             .unfoldEval[F2 , QueuesState , Stream [F2 , O2 ]](BothActive (None )) { s => 
2110+               //  Returning None from unfoldEval will stop the stream. If we read a None
2111+               //  from any queue, we cannot return that but must continue reading on the
2112+               //  other queue. Thus, we need a method which can be called recursively to
2113+               //  continue reading in case of None.
2114+               def  readNext (s : QueuesState ):  F2 [(Option [Stream [F2 , O2 ]], QueuesState )] = 
2115+                 s match  {
2116+                   //  The initial state, both queues are active and there are no fibres left over
2117+                   case  BothActive (None ) => 
2118+                     //  check available data on left, which would be prioritized
2119+                     resultQL.tryTake
2120+                       .flatMap {
2121+                         _.fold(
2122+                           //  no data available on prioritized queue, race both queues
2123+                           raceQueues(resultQL.take, resultQR.take)
2124+                             .flatMap[(Option [Stream [F2 , O2 ]], QueuesState )] {
2125+                               case  (None , Left (fbr)) => 
2126+                                 readNext(RightDone (fbr.some))
2127+                               case  (None , Right (fbr)) => 
2128+                                 readNext(LeftDone (fbr.some))
2129+                               case  (Some (s), fbr) => 
2130+                                 F .pure(s.some ->  BothActive (fbr.some))
2131+                             }
2132+                         )(os => 
2133+                           //  we read data from the prioritized queue, however, this sill could be a None,
2134+                           //  signalling that queue is done. Handle that:
2135+                           os.fold(readNext(LeftDone (None )))(ls => 
2136+                             F .pure(ls.some ->  BothActive (None ))
2137+                           )
2138+                         )
2139+                       }
2140+ 
2141+                   //  right was looser during the last run
2142+                   case  BothActive (Some (Right (fbr))) => 
2143+                     //  anyway, check for available data on left first, ignoring the incoming fibre for right
2144+                     resultQL.tryTake
2145+                       .flatMap(
2146+                         _.fold(
2147+                           //  use the incoming fibre to read from right queue
2148+                           raceQueues(resultQL.take, fbr.joinWithNever)
2149+                             .flatMap[(Option [Stream [F2 , O2 ]], QueuesState )] {
2150+                               case  (None , Left (fbr)) => 
2151+                                 readNext(RightDone (fbr.some))
2152+                               case  (None , Right (fbr)) => 
2153+                                 readNext(LeftDone (fbr.some))
2154+                               case  (Some (s), fbr) => 
2155+                                 F .pure(s.some ->  BothActive (fbr.some))
2156+                             }
2157+                         )(os => 
2158+                           //  important to reuse the incoming fibre here!
2159+                           os.fold(readNext(LeftDone (fbr.some)))(ls => 
2160+                             F .pure(ls.some ->  BothActive (fbr.asRight[FBR ].some))
2161+                           )
2162+                         )
2163+                       )
2164+ 
2165+                   //  left was looser during the last run
2166+                   case  BothActive (Some (Left (fbr))) => 
2167+                     //  Can't check for available data on left this time,
2168+                     //  because there's an active fibre reading from the left queue.
2169+                     //  Start a race and reuse that fibre for left.
2170+                     raceQueues(fbr.joinWithNever, resultQR.take)
2171+                       .flatMap[(Option [Stream [F2 , O2 ]], QueuesState )] {
2172+                         case  (None , Left (fbr)) => 
2173+                           readNext(RightDone (fbr.some))
2174+                         case  (None , Right (fbr)) => 
2175+                           readNext(LeftDone (fbr.some))
2176+                         case  (Some (s), fbr) => 
2177+                           F .pure(s.some ->  BothActive (fbr.some))
2178+                       }
2179+ 
2180+                   //  Left queue is done, but, it's possible we retrieve an active fibre for right.
2181+                   case  LeftDone (fbr) => 
2182+                     fbr
2183+                       .map(_.joinWithNever) //  join the incoming fibre if given
2184+                       .getOrElse(resultQR.take) //  ordinary take() if no fibre has been given
2185+                       .map {
2186+                         case  None  => 
2187+                           None  ->  BothDone 
2188+                         case  os => 
2189+                           os ->  LeftDone (None )
2190+                       }
2191+ 
2192+                   //  mirror case of above
2193+                   case  RightDone (fbr) => 
2194+                     fbr
2195+                       .map(_.joinWithNever)
2196+                       .getOrElse(resultQL.take)
2197+                       .map {
2198+                         case  None  => 
2199+                           None  ->  BothDone 
2200+                         case  os => 
2201+                           os ->  RightDone (None )
2202+                       }
2203+ 
2204+                   //  this should never happen, but we need to make the compiler happy
2205+                   case  BothDone  => 
2206+                     F .pure(None  ->  BothDone )
2207+                 }
2208+ 
2209+               //  readNext() returns None in _1 if and only if both queues are done
2210+               readNext(s).map {
2211+                 case  (None , _) => 
2212+                   None  //  finish the stream (unfoldEval)
2213+                 case  (Some (s), st) => 
2214+                   (s ->  st).some //  emit element and new state (unfoldEval)
2215+               }
2216+             }
2217+             .flatten //  we have Stream[F2, Stream[F2, O2]] and flatten that to Stream[F2, O2]
2218+ 
2219+         val  atRunEnd :  F2 [Unit ] = 
2220+           for  {
2221+             _ <-  signalInterruption //  interrupt so the upstreams have chance to complete
2222+             left <-  resultL.get
2223+             right <-  resultR.get
2224+             r <-  F .fromEither(CompositeFailure .fromResults(left, right))
2225+           } yield  r
2226+ 
2227+         val  runStreams  = 
2228+           runStream(this , resultL, resultQL).start >>  runStream(that, resultR, resultQR).start
2229+ 
2230+         Stream .bracket(runStreams)(_ =>  atRunEnd) >>  watchInterrupted(pumpFromQueue)
2231+       }
2232+     Stream .eval(fstream).flatten
2233+ 
2234+   }
2235+ 
20182236  /**  Given two sorted streams emits a single sorted stream, like in merge-sort. 
20192237    * For entries that are considered equal by the Order, left stream element is emitted first. 
20202238    * Note: both this and another streams MUST BE ORDERED already 
0 commit comments