Skip to content

Commit e3bed7b

Browse files
committed
Improve handling of close frames in web sockets
1 parent 081cc97 commit e3bed7b

File tree

5 files changed

+70
-32
lines changed

5 files changed

+70
-32
lines changed

build.sbt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ val scala3_M2 = "3.0.0-M2"
88
val scala3_M3 = "3.0.0-M3"
99
val scala3 = List(scala3_M2, scala3_M3)
1010

11-
val sttpModelVersion = "1.2.0-RC11"
11+
val sttpModelVersion = "1.2.0-RC12"
1212

1313
val scalaTestVersion = "3.2.3"
1414
val scalaNativeTestInterfaceVersion = "0.4.0-M2"
@@ -74,7 +74,7 @@ lazy val projectAggregates: Seq[ProjectReference] = if (sys.env.isDefinedAt("STT
7474
println("[info] STTP_NATIVE *not* defined, *not* including sttp-native in the aggregate projects")
7575
scala2.flatMap(v => List[ProjectReference](core.js(v), ws.js(v))) ++
7676
scala2.flatMap(v => List[ProjectReference](core.jvm(v), ws.jvm(v), fs2.jvm(v), monix.jvm(v))) ++
77-
scala3.flatMap(v => List[ProjectReference](core.jvm(v), ws.jvm(v), zio.jvm(v))) ++
77+
scala3.flatMap(v => List[ProjectReference](core.jvm(v), ws.jvm(v))) ++ // TODO , zio.jvm(v)
7878
List[ProjectReference](
7979
akka.jvm(scala2_12),
8080
akka.jvm(scala2_13),

ws/src/main/scala/sttp/ws/WebSocket.scala

Lines changed: 60 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,79 +4,113 @@ import sttp.model.Headers
44
import sttp.monad.MonadError
55
import sttp.monad.syntax._
66

7-
/** The `send` and `receive` methods may result in a failed effect, with either one of [[WebSocketException]]
8-
* exceptions, or a backend-specific exception.
7+
/** The `send*` and `receive*` methods may result in a failed effect, with either one of [[WebSocketException]]
8+
* exceptions, or a backend-specific exception. Specifically, they will fail with [[WebSocketClosed]] if the
9+
* web socket is closed.
10+
*
11+
* See the `either` and `eitherClose` method to lift web socket closed events to the value level.
912
*/
1013
trait WebSocket[F[_]] {
1114

12-
/** After receiving a close frame, no further interactions with the web socket should happen. Subsequent invocations
13-
* of `receive`, as well as `send`, will fail with the [[WebSocketClosed]] exception.
15+
/** Receive the next frame from the web socket. This can be a data frame, or a control frame including
16+
* [[WebSocketFrame.Close]]. After receiving a close frame, no further interactions with the web socket should
17+
* happen.
18+
*
19+
* However, not all implementations expose the close frame, and web sockets might also get closed without the proper
20+
* close frame exchange. In such cases, as well as when invoking `receive`/`send` after receiving a close frame,
21+
* this effect will fail with the [[WebSocketClosed]] exception.
1422
*/
1523
def receive(): F[WebSocketFrame]
1624
def send(f: WebSocketFrame, isContinuation: Boolean = false): F[Unit]
1725
def isOpen(): F[Boolean]
1826

1927
/** Receive a single data frame, ignoring others. The frame might be a fragment.
28+
* Will fail with [[WebSocketClosed]] if the web socket is closed, or if a close frame is received.
2029
* @param pongOnPing Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received.
2130
*/
22-
def receiveDataFrame(pongOnPing: Boolean = true): F[Either[WebSocketFrame.Close, WebSocketFrame.Data[_]]] =
31+
def receiveDataFrame(pongOnPing: Boolean = true): F[WebSocketFrame.Data[_]] =
2332
receive().flatMap {
24-
case close: WebSocketFrame.Close => (Left(close): Either[WebSocketFrame.Close, WebSocketFrame.Data[_]]).unit
25-
case d: WebSocketFrame.Data[_] => (Right(d): Either[WebSocketFrame.Close, WebSocketFrame.Data[_]]).unit
33+
case close: WebSocketFrame.Close => monad.error(WebSocketClosed(Some(close)))
34+
case d: WebSocketFrame.Data[_] => monad.unit(d)
2635
case WebSocketFrame.Ping(payload) if pongOnPing =>
2736
send(WebSocketFrame.Pong(payload)).flatMap(_ => receiveDataFrame(pongOnPing))
2837
case _ => receiveDataFrame(pongOnPing)
2938
}
3039

3140
/** Receive a single text data frame, ignoring others. The frame might be a fragment. To receive whole messages,
3241
* use [[receiveText]].
42+
* Will fail with [[WebSocketClosed]] if the web socket is closed, or if a close frame is received.
3343
* @param pongOnPing Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received.
3444
*/
35-
def receiveTextFrame(pongOnPing: Boolean = true): F[Either[WebSocketFrame.Close, WebSocketFrame.Text]] =
45+
def receiveTextFrame(pongOnPing: Boolean = true): F[WebSocketFrame.Text] =
3646
receiveDataFrame(pongOnPing).flatMap {
37-
case Left(close) => (Left(close): Either[WebSocketFrame.Close, WebSocketFrame.Text]).unit
38-
case Right(t: WebSocketFrame.Text) => (Right(t): Either[WebSocketFrame.Close, WebSocketFrame.Text]).unit
39-
case _ => receiveTextFrame(pongOnPing)
47+
case t: WebSocketFrame.Text => t.unit
48+
case _ => receiveTextFrame(pongOnPing)
4049
}
4150

4251
/** Receive a single binary data frame, ignoring others. The frame might be a fragment. To receive whole messages,
4352
* use [[receiveBinary]].
53+
* Will fail with [[WebSocketClosed]] if the web socket is closed, or if a close frame is received.
4454
* @param pongOnPing Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received.
4555
*/
46-
def receiveBinaryFrame(pongOnPing: Boolean = true): F[Either[WebSocketFrame.Close, WebSocketFrame.Binary]] =
56+
def receiveBinaryFrame(pongOnPing: Boolean = true): F[WebSocketFrame.Binary] =
4757
receiveDataFrame(pongOnPing).flatMap {
48-
case Left(close) => (Left(close): Either[WebSocketFrame.Close, WebSocketFrame.Binary]).unit
49-
case Right(t: WebSocketFrame.Binary) => (Right(t): Either[WebSocketFrame.Close, WebSocketFrame.Binary]).unit
50-
case _ => receiveBinaryFrame(pongOnPing)
58+
case t: WebSocketFrame.Binary => t.unit
59+
case _ => receiveBinaryFrame(pongOnPing)
5160
}
5261

5362
/** Receive a single text message (which might come from multiple, fragmented frames).
5463
* Ignores non-text frames and returns combined results.
64+
* Will fail with [[WebSocketClosed]] if the web socket is closed, or if a close frame is received.
5565
* @param pongOnPing Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received.
5666
*/
57-
def receiveText(pongOnPing: Boolean = true): F[Either[WebSocketFrame.Close, String]] =
67+
def receiveText(pongOnPing: Boolean = true): F[String] =
5868
receiveConcat(() => receiveTextFrame(pongOnPing), _ + _)
5969

6070
/** Receive a single binary message (which might come from multiple, fragmented frames).
6171
* Ignores non-binary frames and returns combined results.
72+
* Will fail with [[WebSocketClosed]] if the web socket is closed, or if a close frame is received.
6273
* @param pongOnPing Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received.
6374
*/
64-
def receiveBinary(pongOnPing: Boolean): F[Either[WebSocketFrame.Close, Array[Byte]]] =
75+
def receiveBinary(pongOnPing: Boolean): F[Array[Byte]] =
6576
receiveConcat(() => receiveBinaryFrame(pongOnPing), _ ++ _)
6677

78+
/** Extracts the received close frame (if available) as the left side of an either, or returns the original result
79+
* on the right.
80+
*
81+
* Will fail with [[WebSocketClosed]] if the web socket is closed, but no close frame is available.
82+
*
83+
* @param f The effect describing web socket interactions.
84+
*/
85+
def eitherClose[T](f: => F[T]): F[Either[WebSocketFrame.Close, T]] =
86+
f.map(t => Right(t): Either[WebSocketFrame.Close, T]).handleError { case WebSocketClosed(Some(close)) =>
87+
(Left(close): Either[WebSocketFrame.Close, T]).unit
88+
}
89+
90+
/** Returns an effect computing a:
91+
*
92+
* - `Left` if the web socket is closed - optionally with the received close frame (if available).
93+
* - `Right` with the original result otherwise.
94+
*
95+
* Will never fail with a [[WebSocketClosed]].
96+
*
97+
* @param f The effect describing web socket interactions.
98+
*/
99+
def either[T](f: => F[T]): F[Either[Option[WebSocketFrame.Close], T]] =
100+
f.map(t => Right(t): Either[Option[WebSocketFrame.Close], T]).handleError { case WebSocketClosed(close) =>
101+
(Left(close): Either[Option[WebSocketFrame.Close], T]).unit
102+
}
103+
67104
private def receiveConcat[T, U <: WebSocketFrame.Data[T]](
68-
receiveSingle: () => F[Either[WebSocketFrame.Close, U]],
105+
receiveSingle: () => F[U],
69106
combine: (T, T) => T
70-
): F[Either[WebSocketFrame.Close, T]] = {
107+
): F[T] = {
71108
receiveSingle().flatMap {
72-
case Left(close) => (Left(close): Either[WebSocketFrame.Close, T]).unit
73-
case Right(data) if !data.finalFragment =>
74-
receiveConcat(receiveSingle, combine).flatMap {
75-
case Left(close) => (Left(close): Either[WebSocketFrame.Close, T]).unit
76-
case Right(t) => (Right(combine(data.payload, t)): Either[WebSocketFrame.Close, T]).unit
109+
case data if !data.finalFragment =>
110+
receiveConcat(receiveSingle, combine).map { t =>
111+
combine(data.payload, t)
77112
}
78-
case Right(data) /* if data.finalFragment */ =>
79-
(Right(data.payload): Either[WebSocketFrame.Close, T]).unit
113+
case data /* if data.finalFragment */ => data.payload.unit
80114
}
81115
}
82116

ws/src/main/scala/sttp/ws/WebSocketException.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package sttp.ws
22

33
abstract class WebSocketException(msg: String) extends Exception(msg)
44

5-
class WebSocketClosed() extends WebSocketException(null)
5+
/** @param frame The received closing frame, if available.
6+
*/
7+
case class WebSocketClosed(frame: Option[WebSocketFrame.Close]) extends WebSocketException(null)
68

7-
class WebSocketBufferFull(capacity: Int) extends WebSocketException(s"Buffered $capacity messages")
9+
case class WebSocketBufferFull(capacity: Int) extends WebSocketException(s"Buffered $capacity messages")

ws/src/main/scala/sttp/ws/testing/WebSocketStub.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class WebSocketStub[S](
7171
new WebSocket[F] {
7272
private var state: S = initialState
7373
private var _isOpen: Boolean = true
74+
private var closeFrame: Option[WebSocketFrame.Close] = None
7475
private var responses = initialResponses
7576

7677
override def monad: MonadError[F] = m
@@ -83,6 +84,7 @@ class WebSocketStub[S](
8384
responses.headOption match {
8485
case Some(Success(close: WebSocketFrame.Close)) =>
8586
_isOpen = false
87+
closeFrame = Some(close)
8688
monad.unit(close)
8789
case Some(Success(response)) =>
8890
responses = responses.tail
@@ -94,7 +96,7 @@ class WebSocketStub[S](
9496
monad.error(new IllegalStateException("Unexpected 'receive', no more prepared responses."))
9597
}
9698
} else {
97-
monad.error(new WebSocketClosed())
99+
monad.error(WebSocketClosed(closeFrame))
98100
}
99101
}
100102
})

ws/src/test/scala/sttp/ws/testing/WebSocketStubTests.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ class WebSocketStubTests extends AnyFlatSpec with Matchers with ScalaFutures {
121121
try rt()
122122
catch {
123123
case e if h.isDefinedAt(e) => h.apply(e)()
124-
case e => throw e
124+
case e: Exception => throw e
125125
}
126126
override def ensure[T](f: () => T, e: => () => Unit): () => T =
127127
() =>

0 commit comments

Comments
 (0)