Skip to content

Commit c4a7dbc

Browse files
committed
compile issues with watchTermination()
1 parent 3ce2c89 commit c4a7dbc

File tree

7 files changed

+12
-12
lines changed

7 files changed

+12
-12
lines changed

docs/src/test/scala/docs/http/scaladsl/HttpServerExampleSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ class HttpServerExampleSpec extends AnyWordSpec with Matchers
111111
val failureMonitor: ActorRef = system.actorOf(MyExampleMonitoringActor.props)
112112

113113
val reactToTopLevelFailures = Flow[IncomingConnection]
114-
.watchTermination()((_, termination) =>
114+
.watchTermination((_, termination) =>
115115
termination.failed.foreach {
116116
cause => failureMonitor ! cause
117117
})

http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ private[http] final class Http2Ext(implicit val system: ActorSystem)
106106
(incoming: Tcp.IncomingConnection) =>
107107
try {
108108
httpPlusSwitching(http1, http2).addAttributes(prepareServerAttributes(settings, incoming))
109-
.watchTermination() {
109+
.watchTermination {
110110
case (connectionTerminatorF, future) =>
111111
connectionTerminatorF.foreach { connectionTerminator =>
112112
masterTerminator.registerConnection(connectionTerminator)(fm.executionContext)
@@ -170,7 +170,7 @@ private[http] final class Http2Ext(implicit val system: ActorSystem)
170170

171171
val serverLayer: Flow[ByteString, ByteString, Future[Done]] = Flow.fromGraph(
172172
Flow[HttpRequest]
173-
.watchTermination()(Keep.right)
173+
.watchTermination(Keep.right)
174174
.prepend(injectedRequest)
175175
.via(Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(handler)(
176176
system.dispatcher))

http-core/src/main/scala/org/apache/pekko/http/scaladsl/Http.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,9 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme
121121
handler: Flow[HttpRequest, HttpResponse, Any]): ServerLayerFlow =
122122
Flow.fromGraph(
123123
Flow[HttpRequest]
124-
.watchTermination()(Keep.right)
124+
.watchTermination(Keep.right)
125125
.via(handler)
126-
.watchTermination() { (termWatchBefore, termWatchAfter) =>
126+
.watchTermination { (termWatchBefore, termWatchAfter) =>
127127
// flag termination when the user handler has gotten (or has emitted) termination
128128
// signals in both directions
129129
termWatchBefore.flatMap(_ => termWatchAfter)(ExecutionContext.parasitic)
@@ -234,7 +234,7 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme
234234
.mapAsyncUnordered(settings.maxConnections) { incoming =>
235235
try {
236236
fullLayer
237-
.watchTermination() {
237+
.watchTermination {
238238
case ((done, connectionTerminator), whenTerminates) =>
239239
whenTerminates.onComplete { _ =>
240240
masterTerminator.removeConnection(connectionTerminator)

http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/HostConnectionPoolSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -700,7 +700,7 @@ class HostConnectionPoolSpec extends PekkoSpecWithMaterializer(
700700
Sink.fromSubscriber(serverRequests),
701701
Source.fromPublisher(serverResponses))
702702
.joinMat(clientServerImplementation.get(killSwitch))(Keep.right)
703-
.watchTermination()(Keep.both)
703+
.watchTermination(Keep.both)
704704
.join(
705705
Flow.fromSinkAndSource(
706706
Sink.fromSubscriber(responseSubscriber),

http-core/src/test/scala/org/apache/pekko/http/impl/engine/ws/WSClientAutobahnTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ object WSClientAutobahnTest extends App {
173173
Http().singleWebSocketRequest(uri, clientFlow)._2
174174

175175
def completionSignal[T]: Flow[T, T, Future[Done]] =
176-
Flow[T].watchTermination()((_, res) => res)
176+
Flow[T].watchTermination((_, res) => res)
177177

178178
/**
179179
* The autobahn tests define a weird API where every request must be a WebSocket request and

http-core/src/test/scala/org/apache/pekko/http/impl/engine/ws/WebSocketIntegrationSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ class WebSocketIntegrationSpec extends PekkoSpecWithMaterializer(
210210
val handlerTermination = Promise[Done]()
211211

212212
val handler = Flow[Message]
213-
.watchTermination()(Keep.right)
213+
.watchTermination(Keep.right)
214214
.mapMaterializedValue(handlerTermination.completeWith(_))
215215
.map(m => TextMessage.Strict(s"Echo [${m.asTextMessage.getStrictText}]"))
216216

http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/TelemetrySpiSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ abstract class TelemetrySpiSpec(useTls: Boolean) extends PekkoSpecWithMaterializ
126126
attrs.get[TelemetryAttributes.ClientMeta].foreach(probe.ref ! _)
127127
probe.ref ! requestId
128128
request.addAttribute(requestIdAttr, requestId).addHeader(headers.RawHeader("request-id", requestId.id))
129-
}.watchTermination() { (_, done) =>
129+
}.watchTermination { (_, done) =>
130130
done.onComplete {
131131
case Success(_) => probe.ref ! "close-seen" // this is the expected case
132132
case Failure(t) => probe.ref ! t.getMessage // useful to diagnose cases where there's a failure
@@ -193,7 +193,7 @@ abstract class TelemetrySpiSpec(useTls: Boolean) extends PekkoSpecWithMaterializ
193193
telemetryProbe.ref ! "connection-seen"
194194
telemetryProbe.ref ! connId
195195
conn.copy(flow = conn.flow.addAttributes(Attributes(connId)))
196-
}.watchTermination() { (notUsed, done) =>
196+
}.watchTermination { (notUsed, done) =>
197197
done.onComplete(_ => telemetryProbe.ref ! "unbind-seen")(system.dispatcher)
198198
notUsed
199199
}
@@ -203,7 +203,7 @@ abstract class TelemetrySpiSpec(useTls: Boolean) extends PekkoSpecWithMaterializ
203203
Flow[HttpResponse].map { response =>
204204
telemetryProbe.ref ! "response-seen"
205205
response
206-
}.watchTermination() { (_, done) =>
206+
}.watchTermination { (_, done) =>
207207
done.foreach(_ => telemetryProbe.ref ! "close-seen")(system.dispatcher)
208208
},
209209
StreamUtils.statefulAttrsMap[HttpRequest, HttpRequest](attributes => { request =>

0 commit comments

Comments
 (0)