@@ -10,34 +10,105 @@ module Reactive =
1010 open FSharp.Control .Redis .Streams .Core
1111 open FSharp.Control .Tasks .V2 .ContextInsensitive
1212
13- let pollStreamForever ( redisdb : IDatabase ) ( streamName : RedisKey ) ( startingPosition : RedisValue ) ( pollOptions : PollOptions ) =
14- Observable.Create ( fun ( obs : IObserver < _ >) ->
15- let cts = new CancellationTokenSource ()
16- let ct = cts.Token
17- task {
18- let mutable nextPollDelay = TimeSpan.Zero
19- let mutable nextPosition = startingPosition
20- try
13+ module Observable =
14+ let taskUnfold ( fn : 's -> Task <( 's * 'e ) option >) ( state : 's ) =
15+ Observable.Create ( fun ( obs : IObserver < _ >) ->
16+ let cts = new CancellationTokenSource ()
17+ let ct = cts.Token
18+ task {
19+ let mutable innerState = state
20+ let mutable isFinished = false
2121 try
22- while not ct.IsCancellationRequested do
23- let! ( response : StreamEntry []) = redisdb.StreamRangeAsync( streamName, minId = Nullable( nextPosition), count = ( Option.toNullable pollOptions.CountToPullATime))
24- match response with
25- | EmptyArray ->
26- nextPollDelay <- pollOptions.CalculateNextPollDelay nextPollDelay
27- do ! Task.Delay( nextPollDelay, ct)
28- | entries ->
29- let lastEntry = Seq.last entries
30- nextPosition <- EntryId.CalculateNextPositionIncr lastEntry.Id
31- nextPollDelay <- TimeSpan.Zero
32- entries |> Array.iter obs.OnNext
33- obs.OnCompleted()
34- with e ->
35- obs.OnError e
36- finally
37- cts.Dispose()
38- } |> ignore
39-
40- new Disposables.CancellationDisposable( cts) :> IDisposable
41- )
22+ try
23+ while not ct.IsCancellationRequested || not isFinished do
24+ let! result = fn innerState
25+ match result with
26+ | Some ( newState, output) ->
27+ innerState <- newState
28+ output |> obs.OnNext
29+ | None ->
30+ isFinished <- true
31+ obs.OnCompleted()
32+ with e ->
33+ obs.OnError e
34+ finally
35+ cts.Dispose()
36+ } |> ignore
37+
38+ new Disposables.CancellationDisposable( cts) :> IDisposable
39+ )
40+ let flattenArray ( observable : IObservable < array < _ >>) =
41+ observable.SelectMany ( fun x -> x :> seq<_>)
42+
43+ let pollStreamForever ( redisdb : IDatabase ) ( streamName : RedisKey ) ( startingPosition : RedisValue ) ( pollOptions : PollOptions ) =
44+ Observable.taskUnfold ( fun ( nextPosition , pollDelay ) -> task {
45+ let! ( response : StreamEntry []) = redisdb.StreamRangeAsync( streamName, minId = Nullable( nextPosition), count = ( Option.toNullable pollOptions.CountToPullATime))
46+ match response with
47+ | EmptyArray ->
48+ let nextPollDelay = pollOptions.CalculateNextPollDelay pollDelay
49+ do ! Task.Delay pollDelay
50+ return Some (( nextPosition, nextPollDelay ) , Array.empty )
51+ | entries ->
52+ let lastEntry = Seq.last entries
53+ let nextPosition = EntryId.CalculateNextPositionIncr lastEntry.Id
54+ let nextPollDelay = TimeSpan.Zero
55+ return Some (( nextPosition, nextPollDelay), entries )
56+
57+ }) ( startingPosition, TimeSpan.Zero)
58+ |> Observable.flattenArray
59+
60+ let readFromStream ( redisdb : IDatabase ) ( streamRead : ReadStreamConfig ) =
61+ let readForward ( newMinId : RedisValue ) =
62+ redisdb.StreamRangeAsync(
63+ key = streamRead.StreamName,
64+ minId = Nullable newMinId,
65+ maxId = Option.toNullable( streamRead.MaxId),
66+ count = Option.toNullable streamRead.CountToPullATime,
67+ messageOrder = streamRead.MessageOrder,
68+ flags = streamRead.Flags
69+ )
70+
71+ let readBackward ( newMaxId : RedisValue ) =
72+ redisdb.StreamRangeAsync(
73+ key = streamRead.StreamName,
74+ minId = Option.toNullable( streamRead.MinId),
75+ maxId = Nullable newMaxId,
76+ count = Option.toNullable streamRead.CountToPullATime,
77+ messageOrder = streamRead.MessageOrder,
78+ flags = streamRead.Flags
79+ )
80+
81+ let failureForMessageOrderCheck () =
82+ failwith " If there's more than two directions in a stream the universe is broken, consult a physicist."
83+
84+ let startingPosition =
85+ match streamRead.MessageOrder with
86+ | Order.Ascending -> streamRead.MinId |> Option.defaultValue StreamConstants.ReadMinValue
87+ | Order.Descending -> streamRead.MaxId |> Option.defaultValue StreamConstants.ReadMaxValue
88+ | _ -> failureForMessageOrderCheck ()
89+
90+ let readStream =
91+ match streamRead.MessageOrder with
92+ | Order.Ascending -> readForward
93+ | Order.Descending -> readBackward
94+ | _ -> failureForMessageOrderCheck ()
95+
96+ let calculateNextPosition =
97+ match streamRead.MessageOrder with
98+ | Order.Ascending -> EntryId.CalculateNextPositionIncr
99+ | Order.Descending -> EntryId.CalculateNextPositionDesc
100+ | _ -> failureForMessageOrderCheck ()
101+
102+ Observable.taskUnfold( fun nextPosition -> task {
103+ let! ( response : StreamEntry []) = readStream nextPosition
104+ match response with
105+ | EmptyArray ->
106+ return None
107+ | entries ->
108+ let lastEntry = Seq.last entries
109+ let nextPosition = calculateNextPosition lastEntry.Id
110+ return Some ( nextPosition, entries)
111+ }) startingPosition
112+ |> Observable.flattenArray
42113
43114
0 commit comments