Skip to content

Commit 514e2a5

Browse files
Channel improvements (#233)
1 parent e2151f8 commit 514e2a5

File tree

3 files changed

+135
-32
lines changed

3 files changed

+135
-32
lines changed

sample/ChannelsSample/ChannelsSample.fs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,8 @@ open Saturn.Channels
88
open Microsoft.Extensions.Logging
99
open Giraffe.HttpStatusCodeHandlers
1010

11-
11+
//Normal router for Http request, brodcasting messages to all connected clients
1212
let browserRouter = router {
13-
get "/" (text "Hello world")
1413
get "/ping" (fun next ctx -> task {
1514
let hub = ctx.GetService<Saturn.Channels.ISocketHub>()
1615
match ctx.TryGetQueryStringValue "message" with
@@ -22,13 +21,24 @@ let browserRouter = router {
2221
})
2322
}
2423

24+
//Sample channel implementation
2525
let sampleChannel = channel {
2626
join (fun ctx si -> task {
2727
ctx.GetLogger().LogInformation("Connected! Socket Id: " + si.SocketId.ToString())
2828
return Ok
2929
})
3030

31-
handle "topic" (fun ctx si msg ->
31+
//Handles can be typed if needed.
32+
handle "topic" (fun ctx si (msg: Message<string>) ->
33+
task {
34+
let logger = ctx.GetLogger()
35+
logger.LogInformation("got message {message} from client with Socket Id: {socketId}", msg, si.SocketId)
36+
return ()
37+
}
38+
)
39+
40+
//Handles can specifiy it's own payload type - different topic in one channel may have different payloads
41+
handle "othertopic" (fun ctx si (msg: Message<int>) ->
3242
task {
3343
let logger = ctx.GetLogger()
3444
logger.LogInformation("got message {message} from client with Socket Id: {socketId}", msg, si.SocketId)

src/Saturn/Channels.fs

Lines changed: 121 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,46 @@ module Socket = ThreadSafeWebSocket
1717

1818
module Channels =
1919

20-
type Message<'a> = { Topic: string; Ref: string; Payload: 'a}
21-
type Message = Message<obj>
20+
///Url (relative to root application url) on which channel is hosted. Type alias for `string`
21+
type ChannelPath = string
22+
23+
///Topic of the channel. Type alias for `string`
24+
type Topic = string
25+
26+
/// Types representing channels message.
27+
/// It always includes topic, reference id of the message (random GUID), and payload object.
28+
type Message<'a> = { Topic: Topic; Ref: string; Payload: 'a}
29+
30+
///Socket Id. Type alias for `Guid`
2231
type SocketId = Guid
23-
type SocketInfo = { SocketId: SocketId }
32+
33+
///Type representing information about client that has executed some channel action
34+
///It's passed as an argument in channel actions (`join`, `handle`, `terminate`)
35+
type ClientInfo = { SocketId: SocketId }
2436
with
2537
static member New socketId =
2638
{ SocketId = socketId }
27-
type ChannelPath = string
28-
type Topic = string
2939

40+
///Type representing result of `join` action. It can be either succesful (`Ok`) or you can reject client connection (`Rejected`)
3041
type JoinResult =
3142
| Ok
3243
| Rejected of reason: string
3344

45+
/// Interface of the internal representation of the channel.
46+
/// Shouldn't be used manually, you get its instance from the `channel` Computation Expression
3447
type IChannel =
35-
abstract member Join: HttpContext * SocketInfo -> Task<JoinResult>
36-
abstract member HandleMessage: HttpContext * SocketInfo * Message -> Task<unit>
37-
abstract member Terminate: HttpContext * SocketInfo -> Task<unit>
48+
abstract member Join: HttpContext * ClientInfo -> Task<JoinResult>
49+
abstract member HandleMessage: HttpContext * ClientInfo * IJsonSerializer * string -> Task<unit>
50+
abstract member Terminate: HttpContext * ClientInfo -> Task<unit>
3851

52+
/// Interface representing server side Socket Hub, giving you ability to brodcast messages (either to particular socket or to all sockets).
53+
/// You can get instance of it with `ctx.GetService&lt;Saturn.Channels.ISocketHub>()` from any place that has access to HttpContext instance (`controller` actions, `channel` actions, normal `HttpHandler`)
3954
type ISocketHub =
4055
abstract member SendMessageToClients: ChannelPath -> Topic -> 'a -> Task<unit>
4156
abstract member SendMessageToClient: ChannelPath -> SocketId -> Topic -> 'a -> Task<unit>
4257

4358
/// A type that wraps access to connected websockets by endpoint
44-
type SocketHub(serializer: IJsonSerializer) =
59+
type internal SocketHub(serializer: IJsonSerializer) =
4560
let sockets = Dictionary<ChannelPath, ConcurrentDictionary<SocketId, Socket.ThreadSafeWebSocket>>()
4661

4762
let sendMessage (msg: 'a Message) (socket: Socket.ThreadSafeWebSocket) = task {
@@ -80,7 +95,7 @@ module Channels =
8095
| _ -> ()
8196
}
8297

83-
type SocketMiddleware(next : RequestDelegate, serializer: IJsonSerializer, path: string, channel: IChannel, sockets: SocketHub, logger: ILogger<SocketMiddleware>) =
98+
type internal SocketMiddleware(next : RequestDelegate, serializer: IJsonSerializer, path: string, channel: IChannel, sockets: SocketHub, logger: ILogger<SocketMiddleware>) =
8499
do sockets.NewPath path
85100

86101
member __.Invoke(ctx : HttpContext) =
@@ -91,7 +106,7 @@ module Channels =
91106
let logger = ctx.RequestServices.GetRequiredService<ILogger<SocketMiddleware>>()
92107
logger.LogTrace("Promoted websocket request")
93108
let socketId = Guid.NewGuid()
94-
let socketInfo = SocketInfo.New socketId
109+
let socketInfo = ClientInfo.New socketId
95110
let! joinResult = channel.Join(ctx, socketInfo)
96111
match joinResult with
97112
| Ok ->
@@ -107,8 +122,7 @@ module Channels =
107122
| Result.Ok (WebSocket.ReceiveUTF8Result.String msg) ->
108123
logger.LogTrace("received message {0}", msg)
109124
try
110-
let msg = serializer.Deserialize<Message> msg
111-
do! channel.HandleMessage(ctx, socketInfo, msg)
125+
do! channel.HandleMessage(ctx, socketInfo, serializer, msg)
112126
with
113127
| ex ->
114128
// typically a deserialization error, swallow
@@ -138,40 +152,117 @@ module Channels =
138152

139153

140154
[<AutoOpen>]
155+
///Module with `channel` computation expression
141156
module ChannelBuilder =
142157
open Channels
143158

159+
///Type representing internal state of the `channel` computation expression
144160
type ChannelBuilderState = {
145-
Join: (HttpContext -> SocketInfo -> Task<JoinResult>) option
146-
Handlers: Map<string, (HttpContext-> SocketInfo -> Message -> Task<unit>)>
147-
Terminate: (HttpContext -> SocketInfo -> Task<unit>) option
148-
NotFoundHandler: (HttpContext -> SocketInfo -> Message -> Task<unit>) option
149-
ErrorHandler: HttpContext -> SocketInfo -> Message -> Exception -> Task<unit>
161+
Join: (HttpContext -> ClientInfo -> Task<JoinResult>) option
162+
Handlers: Map<string, (IJsonSerializer -> HttpContext-> ClientInfo -> string -> Task<unit>)>
163+
Terminate: (HttpContext -> ClientInfo -> Task<unit>) option
164+
NotFoundHandler: (HttpContext -> ClientInfo -> Message<obj> -> Task<unit>) option
165+
ErrorHandler: HttpContext -> ClientInfo -> Message<obj> -> Exception -> Task<unit>
150166
}
151167

168+
///Computation expression used to create channels - an `controller`-like abstraction over WebSockets allowing real-time, and push-based communication between server and the client
169+
/// The messages handled by channels should be json-encoded, in a following form: `{Topic = "my topic"; Ref = "unique-message-id"; Payload = {...} }`
170+
///
171+
///The result of the computation expression is the `IChannel` instance that can be registered in the `application` computation expression using `add_channel` operation.
172+
///
173+
///**Example:**
174+
///
175+
/// ```fsharp
176+
///
177+
/// let browserRouter = router {
178+
/// get "/ping" (fun next ctx -> task {
179+
/// let hub = ctx.GetService&lt;Saturn.Channels.ISocketHub>()
180+
/// match ctx.TryGetQueryStringValue "message" with
181+
/// | None ->
182+
/// do! hub.SendMessageToClients "/channel" "greeting" "hello"
183+
/// | Some message ->
184+
/// do! hub.SendMessageToClients "/channel" "greeting" (sprintf "hello, %s" message)
185+
/// return! Successful.ok (text "Pinged the clients") next ctx
186+
/// })
187+
/// }
188+
///
189+
/// let sampleChannel = channel {
190+
/// join (fun ctx si -> task {
191+
/// ctx.GetLogger().LogInformation("Connected! Socket Id: " + si.SocketId.ToString())
192+
/// return Ok
193+
/// })
194+
///
195+
/// handle "topic" (fun ctx si msg ->
196+
/// task {
197+
/// let logger = ctx.GetLogger()
198+
/// logger.LogInformation("got message {message} from client with Socket Id: {socketId}", msg, si.SocketId)
199+
/// return ()
200+
/// })
201+
/// }
202+
///
203+
/// let app = application {
204+
/// use_router browserRouter
205+
/// url "http://localhost:8085/"
206+
/// add_channel "/channel" sampleChannel
207+
/// }
208+
/// ```
152209
type ChannelBuilder internal () =
153210
member __.Yield(_) : ChannelBuilderState =
154211
{Join = None; Handlers = Map.empty; Terminate = None; NotFoundHandler = None; ErrorHandler = fun _ _ _ ex -> raise ex }
155212

156213
[<CustomOperation("join")>]
214+
///Action executed when client tries to join the channel.
215+
///You can either return `Ok` if channel allows join, or reject it with `Rejected`
216+
///Typical cases for rejection may include authorization/authentication,
217+
///not being able to handle more connections or other business logic reasons.
218+
///
219+
/// As arguments, `join` action gets:
220+
/// * current `HttpContext` for the request
221+
/// * `ClientInfo` instance representing additional information about client sending request
157222
member __.Join(state, handler) =
158223
{state with Join= Some handler}
159224

160225
[<CustomOperation("handle")>]
161-
member __.Handle(state, topic, handler) =
162-
{state with Handlers=state.Handlers.Add(topic, handler)}
226+
///Action executed when client sends a message to the channel to the given topic.
227+
///
228+
/// As arguments, `handle` action gets:
229+
/// * current `HttpContext` for the request
230+
/// * `ClientInfo` instance representing additional information about client sending request
231+
/// * `Message<'a>` instance representing message sent from client to the channel
232+
member __.Handle<'a>(state, topic, (handler : HttpContext -> ClientInfo -> Message<'a> -> Task<unit>)) =
233+
let objHandler = fun (serializer: IJsonSerializer) ctx ci (msg: string) ->
234+
let nmsg = serializer.Deserialize<Message<'a>> msg
235+
handler ctx ci nmsg
236+
237+
{state with Handlers=state.Handlers.Add(topic, objHandler)}
163238

164239
[<CustomOperation("terminate")>]
240+
///Action executed when client disconnects from the channel
241+
///
242+
/// As arguments, `join` action gets:
243+
/// * current `HttpContext` for the request
244+
/// * `ClientInfo` instance representing additional information about client sending request
165245
member __.Terminate(state, handler) =
166246
{state with Terminate= Some handler}
167247

168248
[<CustomOperation("not_found_handler")>]
169-
member __.NotFoundHandler(state : ChannelBuilderState, handler) =
170-
{state with NotFoundHandler= Some handler}
249+
///Action executed when clients sends a message to the topic for which `handle` was not registered
250+
///
251+
/// As arguments, `not_found_handler` action gets:
252+
/// * current `HttpContext` for the request
253+
/// * `ClientInfo` instance representing additional information about client sending request
254+
/// * `Message<'a>` instance representing message sent from client to the channel
255+
member __.NotFoundHandler(state, handler) =
256+
{state with ChannelBuilderState.NotFoundHandler= Some handler}
171257

172258
[<CustomOperation("error_handler")>]
173-
member __.ErrorHandler(state : ChannelBuilderState, handler) =
174-
{state with ErrorHandler= handler}
259+
///Action executed when unhandled exception happens in the
260+
/// As arguments, `not_found_handler` action gets:
261+
/// * current `HttpContext` for the request
262+
/// * `ClientInfo` instance representing additional information about client sending request
263+
/// * `Message<'a>` instance representing message sent from client to the channel
264+
member __.ErrorHandler(state, handler) =
265+
{state with ChannelBuilderState.ErrorHandler= handler}
175266

176267
member __.Run(state: ChannelBuilderState) : IChannel =
177268
if state.Join.IsNone then failwith "Join is required operation for any channel. Please use `join` operation in your `channel` CE to define it."
@@ -187,8 +278,9 @@ module ChannelBuilder =
187278
state.Handlers.TryFind msgTopic
188279

189280
let handler =
190-
fun (ctx: HttpContext) (si: SocketInfo) (msg : Message) -> task {
281+
fun (serializer: IJsonSerializer) (ctx: HttpContext) (si: ClientInfo) (rawMsg : string) -> task {
191282
let logger = ctx.RequestServices.GetRequiredService<ILogger<IChannel>>()
283+
let msg = serializer.Deserialize<Message<obj>> rawMsg
192284
logger.LogInformation("got message {message}", msg)
193285
try
194286
match findHandler msg.Topic with
@@ -202,7 +294,7 @@ module ChannelBuilder =
202294
return ()
203295
| Some hdl ->
204296
logger.LogInformation("found handler for topic {topic}", msg.Topic)
205-
return! hdl ctx si msg
297+
return! hdl serializer ctx si rawMsg
206298
with
207299
| ex ->
208300
logger.LogError(ex, "error while handling message {message}", msg)
@@ -216,8 +308,9 @@ module ChannelBuilder =
216308

217309
member __.Terminate(ctx, si) = terminate ctx si
218310

219-
member __.HandleMessage(ctx, si, msg) =
220-
handler ctx si msg
311+
member __.HandleMessage(ctx, si, serializer, msg) =
312+
handler serializer ctx si msg
221313
}
222314

315+
///Computation expression used to create channels
223316
let channel = ChannelBuilder()

tests/Saturn.UnitTests/Helpers.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ let getEmptyContext (method: string) (path : string) =
4848

4949
ctx.RequestServices
5050
.GetService(typeof<Json.IJsonSerializer>)
51-
.Returns(new NewtonsoftJsonSerializer(NewtonsoftJsonSerializer.DefaultSettings))
51+
.Returns(NewtonsoftJsonSerializer(NewtonsoftJsonSerializer.DefaultSettings))
5252
|> ignore
5353

5454
ctx.RequestServices

0 commit comments

Comments
 (0)