Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ a904e917d5851ebffc5e1eabbee70cf1685c4263

# Scala Steward: Reformat with scalafmt 3.10.3
245df4a95b09802b6c1ea2dbc429259b368bd0ef

# Scala Steward: Reformat with scalafmt 3.10.4
3a1f225850c219c27f8f1b30699aec4eea45923e
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = 3.10.3
version = 3.10.4
runner.dialect = scala213
project.git = true
style = defaultWithAlign
Expand Down
9 changes: 6 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ inThisBuild(Def.settings(
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
onLoad in Global := {
sLog.value.info(
s"Building Pekko HTTP ${version.value} against Pekko ${PekkoCoreDependency.version} on Scala ${(httpCore / scalaVersion).value}")
s"Building Pekko HTTP ${version.value} against Pekko ${PekkoCoreDependency.version} on Scala ${(httpCore /
scalaVersion).value}")
(onLoad in Global).value
},
projectInfoVersion := (if (isSnapshot.value) "snapshot" else version.value),
Expand Down Expand Up @@ -424,8 +425,10 @@ lazy val docs = project("docs")
"javadoc.org.apache.pekko.link_style" -> "direct",
"scaladoc.org.apache.pekko.base_url" -> s"https://pekko.apache.org/api/pekko/${PekkoCoreDependency.default.link}",
"scaladoc.org.apache.pekko.link_style" -> "direct",
"javadoc.org.apache.pekko.http.base_url" -> s"https://pekko.apache.org/japi/pekko-http/${projectInfoVersion.value}",
"scaladoc.org.apache.pekko.http.base_url" -> s"https://pekko.apache.org/api/pekko-http/${projectInfoVersion.value}",
"javadoc.org.apache.pekko.http.base_url" ->
s"https://pekko.apache.org/japi/pekko-http/${projectInfoVersion.value}",
"scaladoc.org.apache.pekko.http.base_url" ->
s"https://pekko.apache.org/api/pekko-http/${projectInfoVersion.value}",
"github.base_url" -> GitHub.url(version.value, isSnapshot.value)),
apidocRootPackage := "org.apache.pekko",
ValidatePR / additionalTasks += Compile / paradox)
Expand Down
65 changes: 31 additions & 34 deletions docs/src/test/scala/docs/http/scaladsl/HttpServerExampleSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -361,19 +361,18 @@ class HttpServerExampleSpec extends AnyWordSpec with Matchers
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher

val route =
(put & path("lines")) {
withoutSizeLimit {
extractDataBytes { bytes =>
val finishedWriting = bytes.runWith(FileIO.toPath(new File("/tmp/example.out").toPath))

// we only want to respond once the incoming data has been handled:
onComplete(finishedWriting) { ioResult =>
complete("Finished writing data: " + ioResult)
}
val route = (put & path("lines")) {
withoutSizeLimit {
extractDataBytes { bytes =>
val finishedWriting = bytes.runWith(FileIO.toPath(new File("/tmp/example.out").toPath))

// we only want to respond once the incoming data has been handled:
onComplete(finishedWriting) { ioResult =>
complete("Finished writing data: " + ioResult)
}
}
}
}
// #consume-raw-dataBytes
}

Expand All @@ -388,19 +387,18 @@ class HttpServerExampleSpec extends AnyWordSpec with Matchers
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher

val route =
(put & path("lines")) {
withoutSizeLimit {
extractRequest { (r: HttpRequest) =>
val finishedWriting = r.discardEntityBytes().future

// we only want to respond once the incoming data has been handled:
onComplete(finishedWriting) { done =>
complete("Drained all data from connection... (" + done + ")")
}
val route = (put & path("lines")) {
withoutSizeLimit {
extractRequest { (r: HttpRequest) =>
val finishedWriting = r.discardEntityBytes().future

// we only want to respond once the incoming data has been handled:
onComplete(finishedWriting) { done =>
complete("Drained all data from connection... (" + done + ")")
}
}
}
}
// #discard-discardEntityBytes
}

Expand All @@ -416,22 +414,21 @@ class HttpServerExampleSpec extends AnyWordSpec with Matchers
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher

val route =
(put & path("lines")) {
withoutSizeLimit {
extractDataBytes { data =>
// Closing connections, method 1 (eager):
// we deem this request as illegal, and close the connection right away:
data.runWith(Sink.cancelled) // "brutally" closes the connection

// Closing connections, method 2 (graceful):
// consider draining connection and replying with `Connection: Close` header
// if you want the client to close after this request/reply cycle instead:
respondWithHeader(Connection("close"))
complete(StatusCodes.Forbidden -> "Not allowed!")
}
val route = (put & path("lines")) {
withoutSizeLimit {
extractDataBytes { data =>
// Closing connections, method 1 (eager):
// we deem this request as illegal, and close the connection right away:
data.runWith(Sink.cancelled) // "brutally" closes the connection

// Closing connections, method 2 (graceful):
// consider draining connection and replying with `Connection: Close` header
// if you want the client to close after this request/reply cycle instead:
respondWithHeader(Connection("close"))
complete(StatusCodes.Forbidden -> "Not allowed!")
}
}
}
// #discard-close-connections
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,11 @@ class CaseClassExtractionExamplesSpec extends RoutingSpec with Inside {
// #example-3
case class Color(name: String, red: Int, green: Int, blue: Int)

val route =
(path("color" / Segment) & parameters("r".as[Int], "g".as[Int], "b".as[Int]))
.as(Color.apply _) { color =>
// ... route working with the `color` instance
doSomethingWith(color) // #hide
}
val route = (path("color" / Segment) & parameters("r".as[Int], "g".as[Int], "b".as[Int]))
.as(Color.apply _) { color =>
// ... route working with the `color` instance
doSomethingWith(color) // #hide
}
Get("/color/abc?r=1&g=2&b=3") ~> route ~> check { responseAs[String] shouldEqual "Color(abc,1,2,3)" } // #hide
// #example-3
}
Expand All @@ -77,7 +76,7 @@ class CaseClassExtractionExamplesSpec extends RoutingSpec with Inside {
"example 4 test" in {
val route =
(path("color" / Segment) &
parameters("r".as[Int], "g".as[Int], "b".as[Int])).as(Color.apply _) { color =>
parameters("r".as[Int], "g".as[Int], "b".as[Int])).as(Color.apply _) { color =>
// ... route working with the `color` instance
doSomethingWith(color) // #hide
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,9 @@ class DirectiveExamplesSpec extends RoutingSpec with CompileOnlySpec {
"example-6" in {
// #example-6
val getOrPut = get | put
val route =
(path("order" / IntNumber) & getOrPut & extractMethod) { (id, m) =>
complete(s"Received ${m.name} request for order $id")
}
val route = (path("order" / IntNumber) & getOrPut & extractMethod) { (id, m) =>
complete(s"Received ${m.name} request for order $id")
}
verify(route) // #hide
// #example-6
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,9 @@ class HeaderDirectivesExamplesSpec extends RoutingSpec with CompileOnlySpec with
case _ => None
}

val route =
(headerValue(extractExampleHeader) | provide("newValue")) { value =>
complete(s"headerValue $value")
}
val route = (headerValue(extractExampleHeader) | provide("newValue")) { value =>
complete(s"headerValue $value")
}

// tests:
Get("/") ~> RawHeader("exampleHeaderValue", "theHeaderValue") ~> route ~> check {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ class ParameterDirectivesExamplesSpec extends RoutingSpec with CompileOnlySpec w

Get("/?color=blue&count=blub") ~> Route.seal(route) ~> check {
status shouldEqual StatusCodes.BadRequest
responseAs[String] shouldEqual "The query parameter 'count' was malformed:\n'blub'" +
responseAs[String] shouldEqual
"The query parameter 'count' was malformed:\n'blub'" +
" is not a valid 32-bit signed integer value"
}
// #mapped-value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ class RouteDirectivesExamplesSpec extends RoutingSpec with CompileOnlySpec {

Get("/foo") ~> route ~> check {
status shouldEqual StatusCodes.PermanentRedirect
responseAs[String] shouldEqual """The request, and all future requests should be repeated using <a href="/foo/">this URI</a>."""
responseAs[String] shouldEqual
"""The request, and all future requests should be repeated using <a href="/foo/">this URI</a>."""
}
// #redirect-examples
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,7 @@ import scala.concurrent.Await
import scala.concurrent.duration.{ Duration, DurationInt }

import org.openjdk.jmh.annotations.{
Benchmark,
BenchmarkMode,
Mode,
OperationsPerInvocation,
OutputTimeUnit,
Scope,
State,
TearDown
Benchmark, BenchmarkMode, Mode, OperationsPerInvocation, OutputTimeUnit, Scope, State, TearDown
}

import org.apache.pekko
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,7 @@ import org.apache.pekko
import pekko.http.impl.engine.http2.FrameEvent.{ DataFrame, HeadersFrame }
import pekko.http.impl.engine.http2.framing.FrameRenderer
import pekko.http.scaladsl.model.{
AttributeKeys,
ContentTypes,
HttpEntity,
HttpMethods,
HttpRequest,
HttpResponse,
Trailer
AttributeKeys, ContentTypes, HttpEntity, HttpMethods, HttpRequest, HttpResponse, Trailer
}
import pekko.http.scaladsl.model.HttpEntity.{ Chunk, LastChunk }
import pekko.http.scaladsl.model.headers.RawHeader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,7 @@ package org.apache.pekko.http.impl.engine.client
import org.apache.pekko
import pekko.Done
import pekko.actor.{
Actor,
ActorLogging,
ActorRef,
DeadLetterSuppression,
Deploy,
ExtendedActorSystem,
NoSerializationVerificationNeeded,
Actor, ActorLogging, ActorRef, DeadLetterSuppression, Deploy, ExtendedActorSystem, NoSerializationVerificationNeeded,
Props
}
import pekko.annotation.InternalApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ private[client] object NewHostConnectionPool {
state = Unconnected
}

if (!previousState.isIdle && state.isIdle && !(state == Unconnected && currentEmbargo != Duration.Zero)) {
if (!previousState.isIdle && state.isIdle &&
!(state == Unconnected && currentEmbargo != Duration.Zero)) {
debug("Slot became idle... Trying to pull")
idleSlots.add(this)
pullIfNeeded()
Expand Down Expand Up @@ -345,7 +346,8 @@ private[client] object NewHostConnectionPool {
OptionVal.Some(Event.onNewConnectionEmbargo.preApply(currentEmbargo))
// numConnectedSlots might be slow for big numbers of connections, so avoid calling if minConnections feature is disabled
case s
if !s.isConnected && s.isIdle && settings.minConnections > 0 && numConnectedSlots < settings.minConnections =>
if !s.isConnected && s.isIdle && settings.minConnections > 0 &&
numConnectedSlots < settings.minConnections =>
debug(s"Preconnecting because number of connected slots fell down to $numConnectedSlots")
OptionVal.Some(Event.onPreConnect)
case _ => OptionVal.None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,13 @@ package org.apache.pekko.http.impl.engine.http2

import org.apache.pekko
import pekko.actor.{
ActorSystem,
ClassicActorSystemProvider,
ExtendedActorSystem,
Extension,
ExtensionId,
ExtensionIdProvider
ActorSystem, ClassicActorSystemProvider, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider
}
import pekko.annotation.InternalApi
import pekko.event.LoggingAdapter
import pekko.http.impl.engine.HttpConnectionIdleTimeoutBidi
import pekko.http.impl.engine.server.{
GracefulTerminatorStage,
MasterServerTerminator,
ServerTerminator,
UpgradeToOtherProtocolResponseHeader
GracefulTerminatorStage, MasterServerTerminator, ServerTerminator, UpgradeToOtherProtocolResponseHeader
}
import pekko.http.impl.util.LogByteStringTools
import pekko.http.scaladsl.Http.OutgoingConnection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ import pekko.http.impl.util.LogByteStringTools.logTLSBidiBySetting
import pekko.http.impl.util.StreamUtils
import pekko.http.scaladsl.model._
import pekko.http.scaladsl.settings.{
ClientConnectionSettings,
Http2ClientSettings,
Http2ServerSettings,
ParserSettings,
ServerSettings
ClientConnectionSettings, Http2ClientSettings, Http2ServerSettings, ParserSettings, ServerSettings
}
import pekko.stream.{ BidiShape, Graph, StreamTcpException, ThrottleMode }
import pekko.stream.TLSProtocol._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ import pekko.stream.Inlet
import pekko.stream.Outlet
import pekko.stream.scaladsl.Source
import pekko.stream.stage.{
GraphStageLogic,
GraphStageWithMaterializedValue,
InHandler,
OutHandler,
StageLogging,
TimerGraphStageLogic
GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler, StageLogging, TimerGraphStageLogic
}
import pekko.util.ByteString
import pekko.util.OptionVal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper
}

debug(
s"Incoming side of stream [$streamId] changed state: ${oldState.stateName} -> ${newState.stateName} after handling [$event${if (eventArg ne null)
s"Incoming side of stream [$streamId] changed state: ${oldState.stateName} -> ${newState.stateName} after handling [$event${if (eventArg ne
null)
s"($eventArg)"
else ""}]")

Expand Down Expand Up @@ -794,7 +795,8 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper
// TODO: Check that buffer is not too much over the limit (which we might warn the user about)
// The problem here is that backpressure will only work properly if batch elements like
// ByteString have a reasonable size.
if (!upstreamClosed && buffer.length < multiplexer.maxBytesToBufferPerSubstream && !inlet.hasBeenPulled && !inlet.isClosed)
if (!upstreamClosed && buffer.length < multiplexer.maxBytesToBufferPerSubstream && !inlet.hasBeenPulled &&
!inlet.isClosed)
inlet.pull()

/** Cleans up internal state (but not external) */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,8 @@ private[http] object BodyPartParser {
val boyerMoore: BoyerMoore = new BoyerMoore(needle)

def isBoundary(input: ByteString, offset: Int, ix: Int = eolLength): Boolean = {
@tailrec def process(input: ByteString, offset: Int, ix: Int): Boolean =
(ix == needle.length) || (byteAt(input, offset + ix - eol.length) == needle(ix)) && process(input, offset,
@tailrec def process(input: ByteString, offset: Int, ix: Int): Boolean = (ix == needle.length) ||
(byteAt(input, offset + ix - eol.length) == needle(ix)) && process(input, offset,
ix + 1)

process(input, offset, ix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ import pekko.http.impl.util.HttpConstants._
import pekko.http.scaladsl.model.{ ErrorInfo, HttpHeader, MediaTypes, StatusCode, StatusCodes }
import pekko.http.scaladsl.model.headers.{ EmptyHeader, RawHeader }
import pekko.http.scaladsl.settings.ParserSettings.{
ErrorLoggingVerbosity,
IllegalResponseHeaderNameProcessingMode,
IllegalResponseHeaderValueProcessingMode
ErrorLoggingVerbosity, IllegalResponseHeaderNameProcessingMode, IllegalResponseHeaderValueProcessingMode
}
import pekko.util.ByteString

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ private[http] class HttpResponseRendererFactory(
def render(h: HttpHeader) = r ~~ h

def mustRenderTransferEncodingChunkedHeader =
entity.isChunked && (!entity.isKnownEmpty || ctx.requestMethod == HttpMethods.HEAD) && (ctx.requestProtocol == `HTTP/1.1`)
entity.isChunked && (!entity.isKnownEmpty || ctx.requestMethod == HttpMethods.HEAD) &&
(ctx.requestProtocol == `HTTP/1.1`)

def renderHeaders(headers: immutable.Seq[HttpHeader], alwaysClose: Boolean = false): Unit = {
var connHeader: Connection = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ import pekko.http.impl.engine.parsing.ParserOutput._
import pekko.http.impl.engine.parsing._
import pekko.http.impl.engine.rendering.ResponseRenderingContext.CloseRequested
import pekko.http.impl.engine.rendering.{
DateHeaderRendering,
HttpResponseRendererFactory,
ResponseRenderingContext,
ResponseRenderingOutput
DateHeaderRendering, HttpResponseRendererFactory, ResponseRenderingContext, ResponseRenderingOutput
}
import pekko.http.impl.util._
import pekko.http.javadsl.model
Expand Down Expand Up @@ -513,8 +510,7 @@ private[http] object HttpServerBluePrint {
s"Sending an 2xx 'early' response before end of request for ${requestStart.uri} received... " +
"Note that the connection will be closed after this response. Also, many clients will not read early responses! " +
"Consider only issuing this response after the request data has been completely read!")
val forceClose =
(requestStart.expect100Continue && oneHundredContinueResponsePending) ||
val forceClose = (requestStart.expect100Continue && oneHundredContinueResponsePending) ||
(isClosed(requestParsingIn) && openRequests.isEmpty) ||
isEarlyResponse

Expand Down
Loading