@@ -9,9 +9,11 @@ open Akka.Streams.Dsl
99open FSharp.Control .Redis .Streams .Core
1010open FSharp.Control .Redis .Streams .Akka
1111open FSharp.Control .Tasks .V2 .ContextInsensitive
12+ open FSharp.Control .Redis .Streams .Infrastructure .Tests
1213open Hopac
1314open Akka.Actor
1415open Akka.Streams
16+ open FSharp.Control .Redis .Streams
1517
1618let getUniqueKey ( keyType : string ) ( key : string ) =
1719 let suffix = Guid.NewGuid() .ToString()
@@ -127,61 +129,163 @@ let tests =
127129
128130 testCaseAsync " Stream should generate 20000 events" <| async {
129131 use system = ActorSystem.Create( " system" )
130- let total = 20000
132+ let numberOfEvents = 20000
131133 let db = redis.GetDatabase()
132- let key = getUniqueKey " stream" " Foo"
133- use expecter = new StreamExpect<_>( system, fun s -> s |> Seq.length = total)
134- pollStreamForever db key StreamPosition.Beginning PollOptions.Default
134+
135+ let streamName = getUniqueStreamKey " StreamGenerate20000Events"
136+ use _ = disposableStream db streamName
137+
138+ use expecter = new StreamExpect<_>( system, fun s -> s |> Seq.length = numberOfEvents)
139+ pollStreamForever db streamName StreamPosition.Beginning PollOptions.Default
135140 |> expecter.CaptureFromStream
136141
137- job {
138- do !
139- [ 0 .. total]
140- |> Seq.map( fun i ->
141- let values =
142- [|
143- NameValueEntry ( RedisValue.op_ Implicit " Field1" , RedisValue.op_ Implicit total)
144- |]
145- job {
146- let! x = db.StreamAddAsync( key, values) |> Async.AwaitTask
147- return ()
148- })
149- |> Stream.ofSeq
150- |> Stream.mapPipelinedJob ( Environment.ProcessorCount * 4096 * 2 ) id
151- |> Stream.iter
152- } |> start
142+ generateDataForStreamCon db streamName numberOfEvents 200
143+ |> Job.Ignore
144+ |> start
145+
153146
154147 do ! expecter.Await " Should have 20000 results" ( TimeSpan.FromSeconds( 30. )) |> Async.AwaitTask
155148 }
156149
157150
158151 testCaseAsync " Stream should generate large fields" <| async {
159152 use system = ActorSystem.Create( " system" )
160- let total = 200
153+ let numberOfEvents = 200
161154 let db = redis.GetDatabase()
162- let key = getUniqueKey " stream" " Foo"
163- use expecter = new StreamExpect<_>( system, fun s -> s |> Seq.length = total)
164- pollStreamForever db key StreamPosition.Beginning PollOptions.Default
155+ let streamName = getUniqueStreamKey " StreamGenerateLargeField"
156+ use _ = disposableStream db streamName
157+ use expecter = new StreamExpect<_>( system, fun s -> s |> Seq.length = numberOfEvents)
158+ pollStreamForever db streamName StreamPosition.Beginning PollOptions.Default
165159 |> expecter.CaptureFromStream
166160
167- job {
168- do !
169- [ 0 .. total]
170- |> Seq.map( fun i ->
171- let data = ranStr ( 20000 )
172- let values =
173- [|
174- NameValueEntry ( RedisValue.op_ Implicit " Field1" , RedisValue.op_ Implicit data)
175- |]
176- job {
177- let! x = db.StreamAddAsync( key, values) |> Async.AwaitTask
178- return ()
179- })
180- |> Stream.ofSeq
181- |> Stream.mapPipelinedJob ( Environment.ProcessorCount * 2 ) id
182- |> Stream.iter
183- } |> start
161+ generateDataForStreamCon db streamName numberOfEvents 20000
162+ |> Job.Ignore
163+ |> start
184164
185165 do ! expecter.Await " Should have 2 results" ( TimeSpan.FromSeconds( 30. )) |> Async.AwaitTask
186166 }
187- ]
167+ ]
168+
169+
170+
171+
172+
173+ [<Tests>]
174+ let readFromStreamTests =
175+ // testSequenced <|
176+ testList " readFromStream" [
177+ testCaseJob " Read forward all Ascending" <| job {
178+ use system = ActorSystem.Create( " system" )
179+ let numberOfEvents = 10
180+ let db = redis.GetDatabase()
181+ let streamName = getUniqueStreamKey " ReadForwardAll"
182+ use _ = disposableStream db streamName
183+ let! data = generateDataForStreamCon db streamName numberOfEvents 200
184+
185+ let expecter = new StreamExpect<_>( system, fun s -> s |> Seq.length = numberOfEvents)
186+
187+ ReadStreamConfig.fromStreamName streamName
188+ |> readFromStream db
189+ |> expecter.CaptureFromStream
190+
191+ do ! expecter.Await ( sprintf " Should have %d results" numberOfEvents) ( TimeSpan.FromSeconds( 10. ))
192+ let actualValues =
193+ expecter.Values
194+ |> Seq.collect( fun v -> v.Values)
195+
196+ let expected =
197+ data
198+ |> Seq.map snd
199+ |> Seq.collect id
200+ |> Seq.toList
201+ Expect.sequenceEqual actualValues expected " Should be same order"
202+ }
203+
204+ testCaseJob " Read forward withCountToPull Ascending" <| job {
205+ use system = ActorSystem.Create( " system" )
206+ let numberOfEvents = 10
207+ let db = redis.GetDatabase()
208+ let streamName = getUniqueStreamKey " ReadForwardwithCountToPull"
209+ use _ = disposableStream db streamName
210+ let! data = generateDataForStreamCon db streamName numberOfEvents 200
211+
212+ use expecter = new StreamExpect<_>( system, fun s -> s |> Seq.length = numberOfEvents)
213+
214+ ReadStreamConfig.fromStreamName streamName
215+ |> ReadStreamConfig.withCountToPullATime 3
216+ |> readFromStream db
217+ |> expecter.CaptureFromStream
218+
219+ do ! expecter.Await ( sprintf " Should have %d results" numberOfEvents) ( TimeSpan.FromSeconds( 10. ))
220+ let actualValues =
221+ expecter.Values
222+ |> Seq.collect( fun v -> v.Values)
223+
224+ let expected =
225+ data
226+ |> Seq.map snd
227+ |> Seq.collect id
228+ |> Seq.toList
229+ Expect.sequenceEqual actualValues expected " Should be same order"
230+ }
231+
232+ testCaseJob " Read backward all" <| job {
233+ use system = ActorSystem.Create( " system" )
234+ let numberOfEvents = 10
235+ let db = redis.GetDatabase()
236+ let streamName = getUniqueStreamKey " ReadbackwardAll"
237+ use _ = disposableStream db streamName
238+ let! data = generateDataForStreamCon db streamName numberOfEvents 200
239+
240+ use expecter = new StreamExpect<_>( system, fun s -> s |> Seq.length = numberOfEvents)
241+
242+ ReadStreamConfig.fromStreamName streamName
243+ |> ReadStreamConfig.withDescending
244+ |> readFromStream db
245+ |> expecter.CaptureFromStream
246+
247+ do ! expecter.Await ( sprintf " Should have %d results" numberOfEvents) ( TimeSpan.FromSeconds( 10. ))
248+ let actualValues =
249+ expecter.Values
250+ |> Seq.collect( fun v -> v.Values)
251+ |> Seq.toList
252+ let expected =
253+ data
254+ |> Seq.map snd
255+ |> Seq.collect id
256+ |> Seq.rev
257+ |> Seq.toList
258+ Expect.sequenceEqual actualValues expected " Should be same order"
259+ }
260+
261+ testCaseJob " Read backward count" <| job {
262+ use system = ActorSystem.Create( " system" )
263+ let numberOfEvents = 10
264+ let db = redis.GetDatabase()
265+ let streamName = getUniqueStreamKey " Readbackwardcount"
266+ use _ = disposableStream db streamName
267+ let! data = generateDataForStreamCon db streamName numberOfEvents 200
268+
269+ use expecter = new StreamExpect<_>( system, fun s -> s |> Seq.length = numberOfEvents)
270+
271+ ReadStreamConfig.fromStreamName streamName
272+ |> ReadStreamConfig.withDescending
273+ |> ReadStreamConfig.withCountToPullATime 1
274+ |> readFromStream db
275+ |> expecter.CaptureFromStream
276+
277+ do ! expecter.Await ( sprintf " Should have %d results" numberOfEvents) ( TimeSpan.FromSeconds( 10. ))
278+ let actualValues =
279+ expecter.Values
280+ |> Seq.collect( fun v -> v.Values)
281+ |> Seq.toList
282+ let expected =
283+ data
284+ |> Seq.map snd
285+ |> Seq.collect id
286+ |> Seq.rev
287+ |> Seq.toList
288+ Expect.sequenceEqual actualValues expected " Should be same order"
289+ }
290+
291+ ]
0 commit comments