From 449820e3bb191a7072a1a153b8f9ea7af2051d76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Tue, 1 Oct 2024 00:50:56 +0100 Subject: [PATCH 1/4] WIP: test reproducing the "Received unexpected frame of type RstStreamFrame for stream 1 in state HalfClosedLocalWaitingForPeerStream" --- .../http/impl/engine/http2/Http2StreamHandling.scala | 1 + .../pekko/http/impl/engine/http2/Http2ClientSpec.scala | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala index 2969be8a7..8ca91410f 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala @@ -40,6 +40,7 @@ import scala.util.control.NoStackTrace * Mixed into the Http2ServerDemux graph logic. */ @InternalApi +//noinspection ConvertibleToMethodValue,ScalaWeakerAccess,ScalaUnusedSymbol private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper { self => // required API from demux def isServer: Boolean diff --git a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala index 6220625a0..037db9ab5 100644 --- a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala +++ b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala @@ -72,6 +72,7 @@ import scala.collection.immutable * * if applicable: provide response frames * * validate the produced application-level responses */ +//noinspection TypeAnnotation class Http2ClientSpec extends PekkoSpecWithMaterializer(""" pekko.http.client.remote-address-header = on pekko.http.client.http2.log-frames = on @@ -291,6 +292,15 @@ class Http2ClientSpec extends PekkoSpecWithMaterializer(""" val dynamicTableUpdateTo8192 = ByteString(63, 225, 63) headerPayload.take(3) shouldBe dynamicTableUpdateTo8192 }) + "close stream if peer sends RST_STREAM frame with REFUSED_STREAM".inAssertAllStagesStopped(new TestSetup with NetProbes { + user.emitRequest(Post("/", HttpEntity("hello"))) + val TheStreamId = network.expect[HeadersFrame]().streamId + + network.sendRST_STREAM(TheStreamId, ErrorCode.REFUSED_STREAM) + + expectGracefulCompletion() + + }) } "support stream for response data" should { From a34a9902586ec7f7196dc481f104489f2bfc4668 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Tue, 1 Oct 2024 01:22:16 +0100 Subject: [PATCH 2/4] abuse 429 TooManyRequests --- .../pekko/http/impl/engine/http2/Http2StreamHandling.scala | 4 ++++ .../apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala index 8ca91410f..c55d9e61e 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala @@ -446,6 +446,10 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper case _: WindowUpdateFrame => // We're not planning on sending any data on this stream anymore, so we don't care about window updates. this + case rst@RstStreamFrame(streamId, _) => + //TODO if errorCode is REFUSED_STREAM, we should try to open a new stream + val frame = ParsedHeadersFrame(streamId, endStream = true, Seq((":status", "429")), None) + dispatchStream(streamId, frame, ByteString.empty, correlationAttributes, _ => Closed) case _ => expectIncomingStream(event, Closed, HalfClosedLocal(_), correlationAttributes) } diff --git a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala index 037db9ab5..4a8e2e9c3 100644 --- a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala +++ b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala @@ -298,7 +298,8 @@ class Http2ClientSpec extends PekkoSpecWithMaterializer(""" network.sendRST_STREAM(TheStreamId, ErrorCode.REFUSED_STREAM) - expectGracefulCompletion() + val response = user.expectResponse() + response.status should be(StatusCodes.TooManyRequests) }) } From 8fcc920a8bc1d4a531c7d3a717780d1790cf5646 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Tue, 1 Oct 2024 02:35:50 +0100 Subject: [PATCH 3/4] also use PeerClosedStreamException --- .../http/impl/engine/http2/Http2StreamHandling.scala | 6 +++--- .../pekko/http/impl/engine/http2/Http2ClientSpec.scala | 9 ++++++++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala index c55d9e61e..1bd369490 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala @@ -447,9 +447,9 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper // We're not planning on sending any data on this stream anymore, so we don't care about window updates. this case rst@RstStreamFrame(streamId, _) => - //TODO if errorCode is REFUSED_STREAM, we should try to open a new stream - val frame = ParsedHeadersFrame(streamId, endStream = true, Seq((":status", "429")), None) - dispatchStream(streamId, frame, ByteString.empty, correlationAttributes, _ => Closed) + val headers = ParsedHeadersFrame(streamId, endStream = false, Seq((":status", "429")), None) + dispatchSubstream(headers, Right(Source.failed(new PeerClosedStreamException(rst.streamId, rst.errorCode))), correlationAttributes) + Closed case _ => expectIncomingStream(event, Closed, HalfClosedLocal(_), correlationAttributes) } diff --git a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala index 4a8e2e9c3..33b868475 100644 --- a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala +++ b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala @@ -293,14 +293,21 @@ class Http2ClientSpec extends PekkoSpecWithMaterializer(""" headerPayload.take(3) shouldBe dynamicTableUpdateTo8192 }) "close stream if peer sends RST_STREAM frame with REFUSED_STREAM".inAssertAllStagesStopped(new TestSetup with NetProbes { - user.emitRequest(Post("/", HttpEntity("hello"))) + val data = ByteString("abcd") + user.emitRequest(Post("/", HttpEntity(data))) val TheStreamId = network.expect[HeadersFrame]().streamId + network.expectDATA(TheStreamId, endStream = true, data) network.sendRST_STREAM(TheStreamId, ErrorCode.REFUSED_STREAM) val response = user.expectResponse() response.status should be(StatusCodes.TooManyRequests) + val entityDataIn = ByteStringSinkProbe(response.entity.dataBytes) + val error = entityDataIn.expectError() + error.getMessage shouldBe "Stream with ID [1] was closed by peer with code REFUSED_STREAM(0x07)" + + connectionShouldStillBeUsable() }) } From 4d49c30cbcfa5d4a751b0070d5e3c14cd8ac736e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Tue, 1 Oct 2024 12:21:55 +0100 Subject: [PATCH 4/4] scalafmt --- .../engine/http2/Http2StreamHandling.scala | 8 +++--- .../impl/engine/http2/Http2ClientSpec.scala | 28 +++++++++---------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala index 1bd369490..26decba61 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala @@ -40,7 +40,6 @@ import scala.util.control.NoStackTrace * Mixed into the Http2ServerDemux graph logic. */ @InternalApi -//noinspection ConvertibleToMethodValue,ScalaWeakerAccess,ScalaUnusedSymbol private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper { self => // required API from demux def isServer: Boolean @@ -446,9 +445,10 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper case _: WindowUpdateFrame => // We're not planning on sending any data on this stream anymore, so we don't care about window updates. this - case rst@RstStreamFrame(streamId, _) => - val headers = ParsedHeadersFrame(streamId, endStream = false, Seq((":status", "429")), None) - dispatchSubstream(headers, Right(Source.failed(new PeerClosedStreamException(rst.streamId, rst.errorCode))), correlationAttributes) + case rst: RstStreamFrame => + val headers = ParsedHeadersFrame(rst.streamId, endStream = false, Seq((":status", "429")), None) + dispatchSubstream(headers, Right(Source.failed(new PeerClosedStreamException(rst.streamId, rst.errorCode))), + correlationAttributes) Closed case _ => expectIncomingStream(event, Closed, HalfClosedLocal(_), correlationAttributes) diff --git a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala index 33b868475..97b665724 100644 --- a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala +++ b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientSpec.scala @@ -72,7 +72,6 @@ import scala.collection.immutable * * if applicable: provide response frames * * validate the produced application-level responses */ -//noinspection TypeAnnotation class Http2ClientSpec extends PekkoSpecWithMaterializer(""" pekko.http.client.remote-address-header = on pekko.http.client.http2.log-frames = on @@ -292,23 +291,24 @@ class Http2ClientSpec extends PekkoSpecWithMaterializer(""" val dynamicTableUpdateTo8192 = ByteString(63, 225, 63) headerPayload.take(3) shouldBe dynamicTableUpdateTo8192 }) - "close stream if peer sends RST_STREAM frame with REFUSED_STREAM".inAssertAllStagesStopped(new TestSetup with NetProbes { - val data = ByteString("abcd") - user.emitRequest(Post("/", HttpEntity(data))) - val TheStreamId = network.expect[HeadersFrame]().streamId - network.expectDATA(TheStreamId, endStream = true, data) + "close stream if peer sends RST_STREAM frame with REFUSED_STREAM".inAssertAllStagesStopped( + new TestSetup with NetProbes { + val data = ByteString("abcd") + user.emitRequest(Post("/", HttpEntity(data))) + val TheStreamId = network.expect[HeadersFrame]().streamId + network.expectDATA(TheStreamId, endStream = true, data) - network.sendRST_STREAM(TheStreamId, ErrorCode.REFUSED_STREAM) + network.sendRST_STREAM(TheStreamId, ErrorCode.REFUSED_STREAM) - val response = user.expectResponse() - response.status should be(StatusCodes.TooManyRequests) + val response = user.expectResponse() + response.status should be(StatusCodes.TooManyRequests) - val entityDataIn = ByteStringSinkProbe(response.entity.dataBytes) - val error = entityDataIn.expectError() - error.getMessage shouldBe "Stream with ID [1] was closed by peer with code REFUSED_STREAM(0x07)" + val entityDataIn = ByteStringSinkProbe(response.entity.dataBytes) + val error = entityDataIn.expectError() + error.getMessage shouldBe "Stream with ID [1] was closed by peer with code REFUSED_STREAM(0x07)" - connectionShouldStillBeUsable() - }) + connectionShouldStillBeUsable() + }) } "support stream for response data" should {