Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class HttpServerExampleSpec extends AnyWordSpec with Matchers
val failureMonitor: ActorRef = system.actorOf(MyExampleMonitoringActor.props)

val reactToTopLevelFailures = Flow[IncomingConnection]
.watchTermination()((_, termination) =>
.watchTermination((_, termination) =>
termination.failed.foreach {
cause => failureMonitor ! cause
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# changes needed to uptake Pekko Core 2.0.0
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.http.impl.engine.client.PoolInterface#Logic.onDownstreamFinish")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#IncomingStreamBuffer.onDownstreamFinish")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.http.impl.engine.http2.hpack.HandleOrPassOnStage#State.onDownstreamFinish")
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[http] final class Http2Ext(implicit val system: ActorSystem)
(incoming: Tcp.IncomingConnection) =>
try {
httpPlusSwitching(http1, http2).addAttributes(prepareServerAttributes(settings, incoming))
.watchTermination() {
.watchTermination {
case (connectionTerminatorF, future) =>
connectionTerminatorF.foreach { connectionTerminator =>
masterTerminator.registerConnection(connectionTerminator)(fm.executionContext)
Expand Down Expand Up @@ -170,7 +170,7 @@ private[http] final class Http2Ext(implicit val system: ActorSystem)

val serverLayer: Flow[ByteString, ByteString, Future[Done]] = Flow.fromGraph(
Flow[HttpRequest]
.watchTermination()(Keep.right)
.watchTermination(Keep.right)
.prepend(injectedRequest)
.via(Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(handler)(
system.dispatcher))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,12 +721,11 @@ private[http] object HttpServerBluePrint {
})

private var activeTimers = 0
private val timeout: FiniteDuration = {
private val timeout: FiniteDuration =
inheritedAttributes.get[ActorAttributes.StreamSubscriptionTimeout] match {
case Some(attr) => attr.timeout
case None => 5.minutes // should not happen
}
}
private def addTimeout(s: SubscriptionTimeout): Unit = {
if (activeTimers == 0) setKeepGoing(true)
activeTimers += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package org.apache.pekko.http.impl.util
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.stream.stage.GraphStageLogic
import pekko.event.{ LogSource, LoggingAdapter, NoLogging }
import pekko.stream.ActorMaterializer
import pekko.event.{ LogSource, LoggingAdapter }

// TODO Try to reconcile with what Pekko provides in StageLogging.
// We thought this could be removed when https://github.com/akka/akka/issues/18793 had been implemented
Expand All @@ -43,10 +42,7 @@ private[pekko] trait StageLoggingWithOverride extends GraphStageLogic {
_log =
logOverride match {
case DefaultNoLogging =>
materializer match {
case a: ActorMaterializer => pekko.event.Logging(a.system, logSource)(LogSource.fromClass)
case _ => NoLogging
}
pekko.event.Logging(materializer.system, logSource)(LogSource.fromClass)
case x => x
}
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme
handler: Flow[HttpRequest, HttpResponse, Any]): ServerLayerFlow =
Flow.fromGraph(
Flow[HttpRequest]
.watchTermination()(Keep.right)
.watchTermination(Keep.right)
.via(handler)
.watchTermination() { (termWatchBefore, termWatchAfter) =>
.watchTermination { (termWatchBefore, termWatchAfter) =>
// flag termination when the user handler has gotten (or has emitted) termination
// signals in both directions
termWatchBefore.flatMap(_ => termWatchAfter)(ExecutionContext.parasitic)
Expand Down Expand Up @@ -234,7 +234,7 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme
.mapAsyncUnordered(settings.maxConnections) { incoming =>
try {
fullLayer
.watchTermination() {
.watchTermination {
case ((done, connectionTerminator), whenTerminates) =>
whenTerminates.onComplete { _ =>
masterTerminator.removeConnection(connectionTerminator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ class HostConnectionPoolSpec extends PekkoSpecWithMaterializer(
Sink.fromSubscriber(serverRequests),
Source.fromPublisher(serverResponses))
.joinMat(clientServerImplementation.get(killSwitch))(Keep.right)
.watchTermination()(Keep.both)
.watchTermination(Keep.both)
.join(
Flow.fromSinkAndSource(
Sink.fromSubscriber(responseSubscriber),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ object WSClientAutobahnTest extends App {
Http().singleWebSocketRequest(uri, clientFlow)._2

def completionSignal[T]: Flow[T, T, Future[Done]] =
Flow[T].watchTermination()((_, res) => res)
Flow[T].watchTermination((_, res) => res)

/**
* The autobahn tests define a weird API where every request must be a WebSocket request and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class WebSocketIntegrationSpec extends PekkoSpecWithMaterializer(
val handlerTermination = Promise[Done]()

val handler = Flow[Message]
.watchTermination()(Keep.right)
.watchTermination(Keep.right)
.mapMaterializedValue(handlerTermination.completeWith(_))
.map(m => TextMessage.Strict(s"Echo [${m.asTextMessage.getStrictText}]"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

package org.apache.pekko.http.javadsl.server;

import org.apache.pekko.japi.pf.FI;
import org.apache.pekko.japi.function.Function;
import org.apache.pekko.japi.function.Predicate;
import org.apache.pekko.japi.pf.PFBuilder;

public class ExceptionHandlerBuilder {
Expand All @@ -35,7 +36,7 @@ private ExceptionHandlerBuilder(PFBuilder<Throwable, Route> delegate) {
* @return a builder with the case statement added
*/
public <P extends Throwable> ExceptionHandlerBuilder match(
final Class<P> type, FI.Apply<P, Route> apply) {
final Class<P> type, final Function<P, Route> apply) {
delegate.match(type, apply);
return this;
}
Expand All @@ -50,7 +51,7 @@ public <P extends Throwable> ExceptionHandlerBuilder match(
* @return a builder with the case statement added
*/
public <P extends Throwable> ExceptionHandlerBuilder match(
final Class<P> type, final FI.TypedPredicate<P> predicate, final FI.Apply<P, Route> apply) {
final Class<P> type, final Predicate<P> predicate, final Function<P, Route> apply) {
delegate.match(type, predicate, apply);
return this;
}
Expand All @@ -63,7 +64,7 @@ public <P extends Throwable> ExceptionHandlerBuilder match(
* @return a builder with the case statement added
*/
public <P extends Throwable> ExceptionHandlerBuilder matchEquals(
final P object, final FI.Apply<P, Route> apply) {
final P object, final Function<P, Route> apply) {
delegate.matchEquals(object, apply);
return this;
}
Expand All @@ -74,7 +75,7 @@ public <P extends Throwable> ExceptionHandlerBuilder matchEquals(
* @param apply an action to apply to the argument
* @return a builder with the case statement added
*/
public ExceptionHandlerBuilder matchAny(final FI.Apply<Throwable, Route> apply) {
public ExceptionHandlerBuilder matchAny(final Function<Throwable, Route> apply) {
delegate.matchAny(apply);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# refactor Java DSL ExceptionHandler (due to function changes in Pekko Core 2.0.0)
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.http.javadsl.server.ExceptionHandlerBuilder.match")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.http.javadsl.server.ExceptionHandlerBuilder.matchEquals")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.http.javadsl.server.ExceptionHandlerBuilder.matchAny")
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ abstract class TelemetrySpiSpec(useTls: Boolean) extends PekkoSpecWithMaterializ
attrs.get[TelemetryAttributes.ClientMeta].foreach(probe.ref ! _)
probe.ref ! requestId
request.addAttribute(requestIdAttr, requestId).addHeader(headers.RawHeader("request-id", requestId.id))
}.watchTermination() { (_, done) =>
}.watchTermination { (_, done) =>
done.onComplete {
case Success(_) => probe.ref ! "close-seen" // this is the expected case
case Failure(t) => probe.ref ! t.getMessage // useful to diagnose cases where there's a failure
Expand Down Expand Up @@ -193,7 +193,7 @@ abstract class TelemetrySpiSpec(useTls: Boolean) extends PekkoSpecWithMaterializ
telemetryProbe.ref ! "connection-seen"
telemetryProbe.ref ! connId
conn.copy(flow = conn.flow.addAttributes(Attributes(connId)))
}.watchTermination() { (notUsed, done) =>
}.watchTermination { (notUsed, done) =>
done.onComplete(_ => telemetryProbe.ref ! "unbind-seen")(system.dispatcher)
notUsed
}
Expand All @@ -203,7 +203,7 @@ abstract class TelemetrySpiSpec(useTls: Boolean) extends PekkoSpecWithMaterializ
Flow[HttpResponse].map { response =>
telemetryProbe.ref ! "response-seen"
response
}.watchTermination() { (_, done) =>
}.watchTermination { (_, done) =>
done.foreach(_ => telemetryProbe.ref ! "close-seen")(system.dispatcher)
},
StreamUtils.statefulAttrsMap[HttpRequest, HttpRequest](attributes => { request =>
Expand Down
3 changes: 2 additions & 1 deletion project/PekkoCoreDependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoCoreDependency extends PekkoDependency {
override val checkProject: String = "pekko-cluster-sharding-typed"
override val module: Option[String] = None
override val currentVersion: String = "1.3.0"
override val minVersion: String = "2.0.0-M1"
override val currentVersion: String = "2.0.0-M1"
}