@@ -18,7 +18,7 @@ import pekko.actor._
1818import pekko .http .scaladsl .Http
1919import pekko .http .scaladsl .model .{ HttpRequest , HttpResponse , Uri }
2020import pekko .stream .scaladsl .{ Flow , Sink , Source }
21- import pekko .stream .OverflowStrategy
21+ import pekko .stream .{ CompletionStrategy , OverflowStrategy }
2222import com .typesafe .config .{ Config , ConfigFactory }
2323
2424import scala .concurrent .Future
@@ -45,7 +45,16 @@ object ConnectionTestApp {
4545 val sourceActor = {
4646 // Our superPool expects (HttpRequest, Int) as input
4747 val source =
48- Source .actorRef[(HttpRequest , Int )](10000 , OverflowStrategy .dropNew).buffer(20000 , OverflowStrategy .fail)
48+ Source .actorRef[(HttpRequest , Int )](
49+ {
50+ case Status .Success (s : CompletionStrategy ) => s
51+ case Status .Success (_) => CompletionStrategy .Draining
52+ case Status .Success => CompletionStrategy .Draining
53+ },
54+ { case Status .Failure (cause) => cause },
55+ 10000 ,
56+ OverflowStrategy .dropNew)
57+ .buffer(20000 , OverflowStrategy .fail)
4958 val sink = Sink .foreach[(Try [HttpResponse ], Int )] {
5059 case (resp, id) => handleResponse(resp, id)
5160 }
0 commit comments