Skip to content

Commit 8966129

Browse files
authored
http2: add client-side cancellation propagation delay (#4027)
1 parent 751d7ae commit 8966129

File tree

4 files changed

+11
-5
lines changed

4 files changed

+11
-5
lines changed

akka-http-core/src/main/scala/akka/http/impl/engine/http2/Http2.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ private[http] final class Http2Ext(implicit val system: ActorSystem)
234234
TLS(createEngine _, closing = TLSClosing.eagerClose)
235235

236236
stack.joinMat(clientConnectionSettings.transport.connectTo(host, port, clientConnectionSettings)(system.classicSystem))(Keep.right)
237+
.addAttributes(Http.cancellationStrategyAttributeForDelay(clientConnectionSettings.streamCancellationDelay))
237238
}
238239

239240
def outgoingConnectionPriorKnowledge(host: String, port: Int, clientConnectionSettings: ClientConnectionSettings, log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
@@ -243,6 +244,7 @@ private[http] final class Http2Ext(implicit val system: ActorSystem)
243244
TLSPlacebo()
244245

245246
stack.joinMat(clientConnectionSettings.transport.connectTo(host, port, clientConnectionSettings)(system.classicSystem))(Keep.right)
247+
.addAttributes(Http.cancellationStrategyAttributeForDelay(clientConnectionSettings.streamCancellationDelay))
246248
}
247249

248250
private def prepareClientAttributes(serverHost: String, port: Int): Attributes =

akka-http-core/src/main/scala/akka/http/impl/engine/http2/client/PersistentConnection.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import akka.http.scaladsl.settings.Http2ClientSettings
1313
import akka.stream.scaladsl.{ Flow, Keep, Source }
1414
import akka.stream.stage.TimerGraphStageLogic
1515
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, StageLogging }
16-
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
16+
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet, StreamTcpException }
1717
import akka.util.PrettyDuration
1818

1919
import java.util.concurrent.ThreadLocalRandom
@@ -133,8 +133,10 @@ private[http2] object PersistentConnection {
133133
}
134134
}
135135
val onFailed = getAsyncCallback[Throwable] { cause =>
136+
// If the materialized value is failed, then the stream should be broken by design.
137+
// Nevertheless also kick our ends of the stream.
136138
responseIn.cancel()
137-
requestOut.fail(new RuntimeException("connection broken", cause))
139+
requestOut.fail(new StreamTcpException("connection broken"))
138140

139141
if (connectsLeft.contains(0)) {
140142
failStage(new RuntimeException(s"Connection failed after $maxAttempts attempts", cause))

akka-http2-support/src/test/scala/akka/http/impl/engine/http2/Http2ClientServerSpec.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import akka.http.scaladsl.settings.ClientConnectionSettings
1313
import akka.http.scaladsl.unmarshalling.Unmarshal
1414
import akka.http.scaladsl.Http
1515
import akka.http.scaladsl.settings.ServerSettings
16+
import akka.stream.StreamTcpException
1617
import akka.stream.scaladsl.{ Sink, Source }
1718
import akka.stream.testkit.{ TestPublisher, TestSubscriber }
1819
import akka.testkit.TestProbe
@@ -98,7 +99,8 @@ class Http2ClientServerSpec extends AkkaSpecWithMaterializer(
9899
clientResponsesIn.ensureSubscription()
99100
Thread.sleep(500)
100101
clientRequestsOut.expectCancellation()
101-
clientResponsesIn.expectComplete()
102+
// expect idle timeout connection abort exception to propagate to user
103+
clientResponsesIn.expectError() shouldBe a[StreamTcpException]
102104
}
103105
"support client-side idle-timeout" in new TestSetup {
104106
override def clientSettings: ClientConnectionSettings = super.clientSettings.withIdleTimeout(100.millis)

akka-http2-support/src/test/scala/akka/http/impl/engine/http2/Http2PersistentClientSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import akka.http.scaladsl.settings.Http2ClientSettings
1515
import akka.http.scaladsl.unmarshalling.Unmarshal
1616
import akka.http.scaladsl.{ ClientTransport, Http }
1717
import akka.http.scaladsl.settings.ServerSettings
18-
import akka.stream.{ KillSwitches, UniqueKillSwitch }
18+
import akka.stream.{ KillSwitches, StreamTcpException, UniqueKillSwitch }
1919
import akka.stream.scaladsl.{ Flow, Keep, Sink, Source }
2020
import akka.stream.testkit.scaladsl.StreamTestKit
2121
import akka.stream.testkit.{ TestPublisher, TestSubscriber }
@@ -364,7 +364,7 @@ abstract class Http2PersistentClientSpec(tls: Boolean) extends AkkaSpecWithMater
364364
})
365365

366366
val killProbe = TestProbe()
367-
def killConnection(): Unit = killProbe.expectMsgType[UniqueKillSwitch].abort(new RuntimeException("connection was killed"))
367+
def killConnection(): Unit = killProbe.expectMsgType[UniqueKillSwitch].abort(new StreamTcpException("connection was killed"))
368368

369369
object server {
370370
private lazy val requestProbe = TestProbe()

0 commit comments

Comments
 (0)