Skip to content

Commit fca7c43

Browse files
author
Jimmy Byrd
committed
Adds Akka Streams tests
1 parent ef23387 commit fca7c43

File tree

6 files changed

+256
-0
lines changed

6 files changed

+256
-0
lines changed

FSharp.Control.Redis.Streams.sln

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FSharp.Control.Redis.Stream
1919
EndProject
2020
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FSharp.Control.Redis.Streams.Reactive", "src\FSharp.Control.Redis.Streams.Reactive\FSharp.Control.Redis.Streams.Reactive.fsproj", "{F260F85B-FAC8-484F-AD86-88808E271F9B}"
2121
EndProject
22+
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FSharp.Control.Redis.Streams.Akka.Tests", "tests\FSharp.Control.Redis.Streams.Akka.Tests\FSharp.Control.Redis.Streams.Akka.Tests.fsproj", "{B1FE2B5B-2AFD-43E9-ACBF-997DE13CFF43}"
23+
EndProject
2224
Global
2325
GlobalSection(SolutionConfigurationPlatforms) = preSolution
2426
Debug|Any CPU = Debug|Any CPU
@@ -104,6 +106,18 @@ Global
104106
{F260F85B-FAC8-484F-AD86-88808E271F9B}.Release|x64.Build.0 = Release|Any CPU
105107
{F260F85B-FAC8-484F-AD86-88808E271F9B}.Release|x86.ActiveCfg = Release|Any CPU
106108
{F260F85B-FAC8-484F-AD86-88808E271F9B}.Release|x86.Build.0 = Release|Any CPU
109+
{B1FE2B5B-2AFD-43E9-ACBF-997DE13CFF43}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
110+
{B1FE2B5B-2AFD-43E9-ACBF-997DE13CFF43}.Debug|Any CPU.Build.0 = Debug|Any CPU
111+
{B1FE2B5B-2AFD-43E9-ACBF-997DE13CFF43}.Debug|x64.ActiveCfg = Debug|Any CPU
112+
{B1FE2B5B-2AFD-43E9-ACBF-997DE13CFF43}.Debug|x64.Build.0 = Debug|Any CPU
113+
{B1FE2B5B-2AFD-43E9-ACBF-997DE13CFF43}.Debug|x86.ActiveCfg = Debug|Any CPU
114+
{B1FE2B5B-2AFD-43E9-ACBF-997DE13CFF43}.Debug|x86.Build.0 = Debug|Any CPU
115+
{B1FE2B5B-2AFD-43E9-ACBF-997DE13CFF43}.Release|Any CPU.ActiveCfg = Release|Any CPU
116+
{B1FE2B5B-2AFD-43E9-ACBF-997DE13CFF43}.Release|Any CPU.Build.0 = Release|Any CPU
117+
{B1FE2B5B-2AFD-43E9-ACBF-997DE13CFF43}.Release|x64.ActiveCfg = Release|Any CPU
118+
{B1FE2B5B-2AFD-43E9-ACBF-997DE13CFF43}.Release|x64.Build.0 = Release|Any CPU
119+
{B1FE2B5B-2AFD-43E9-ACBF-997DE13CFF43}.Release|x86.ActiveCfg = Release|Any CPU
120+
{B1FE2B5B-2AFD-43E9-ACBF-997DE13CFF43}.Release|x86.Build.0 = Release|Any CPU
107121
EndGlobalSection
108122
GlobalSection(NestedProjects) = preSolution
109123
{5D30E174-2538-47AC-8443-318C8C5DC2C9} = {C397A34C-84F1-49E7-AEBC-2F9F2B196216}
@@ -112,5 +126,6 @@ Global
112126
{C1AF5E8A-28CC-4CC3-B3A5-30EAEE1D22F0} = {C397A34C-84F1-49E7-AEBC-2F9F2B196216}
113127
{2CCEB944-20F6-4461-A517-31555822C36F} = {ACBEE43C-7A88-4FB1-9B06-DB064D22B29F}
114128
{F260F85B-FAC8-484F-AD86-88808E271F9B} = {C397A34C-84F1-49E7-AEBC-2F9F2B196216}
129+
{B1FE2B5B-2AFD-43E9-ACBF-997DE13CFF43} = {ACBEE43C-7A88-4FB1-9B06-DB064D22B29F}
115130
EndGlobalSection
116131
EndGlobal
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Auto-Generated by FAKE; do not edit
2+
namespace System
3+
open System.Reflection
4+
5+
[<assembly: AssemblyTitleAttribute("Redis.Streams.Tests")>]
6+
[<assembly: AssemblyProductAttribute("Redis.Streams")>]
7+
[<assembly: AssemblyVersionAttribute("0.1.0")>]
8+
[<assembly: AssemblyMetadataAttribute("ReleaseDate","2017-03-17T00:00:00.0000000")>]
9+
[<assembly: AssemblyFileVersionAttribute("0.1.0")>]
10+
[<assembly: AssemblyInformationalVersionAttribute("0.1.0")>]
11+
[<assembly: AssemblyMetadataAttribute("ReleaseChannel","release")>]
12+
[<assembly: AssemblyMetadataAttribute("GitHash","bb8964b54bee133e9af64d316dc2cfee16df7f72")>]
13+
do ()
14+
15+
module internal AssemblyVersionInformation =
16+
let [<Literal>] AssemblyTitle = "Redis.Streams.Tests"
17+
let [<Literal>] AssemblyProduct = "Redis.Streams"
18+
let [<Literal>] AssemblyVersion = "0.1.0"
19+
let [<Literal>] AssemblyMetadata_ReleaseDate = "2017-03-17T00:00:00.0000000"
20+
let [<Literal>] AssemblyFileVersion = "0.1.0"
21+
let [<Literal>] AssemblyInformationalVersion = "0.1.0"
22+
let [<Literal>] AssemblyMetadata_ReleaseChannel = "release"
23+
let [<Literal>] AssemblyMetadata_GitHash = "bb8964b54bee133e9af64d316dc2cfee16df7f72"
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?xml version="1.0" encoding="utf-8"?>
2+
<Project Sdk="Microsoft.NET.Sdk">
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFrameworks>netcoreapp2.2;net461</TargetFrameworks>
6+
</PropertyGroup>
7+
<ItemGroup>
8+
<Compile Include="AssemblyInfo.fs" />
9+
<Compile Include="Tests.fs" />
10+
<Compile Include="Main.fs" />
11+
</ItemGroup>
12+
<ItemGroup>
13+
<ProjectReference Include="../../src/FSharp.Control.Redis.Streams.Akka/FSharp.Control.Redis.Streams.Akka.fsproj" />
14+
</ItemGroup>
15+
<!-- <Import Project="..\..\fsc.props" /> -->
16+
<Import Project="..\..\netfx.props" />
17+
<Import Project="..\..\.paket\Paket.Restore.targets" />
18+
</Project>
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
module ExpectoTemplate
2+
open Expecto
3+
4+
[<EntryPoint>]
5+
let main argv =
6+
Tests.runTestsInAssembly defaultConfig argv
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
module Tests
2+
3+
open System
4+
open Expecto
5+
open StackExchange.Redis
6+
open System.Threading
7+
open System.Threading.Tasks
8+
open Akka.Streams.Dsl
9+
open FSharp.Control.Redis.Streams.Core
10+
open FSharp.Control.Redis.Streams.Akka
11+
open FSharp.Control.Tasks.V2.ContextInsensitive
12+
open Hopac
13+
open Akka.Actor
14+
open Akka.Streams
15+
16+
let getUniqueKey (keyType : string) (key : string) =
17+
let suffix = Guid.NewGuid().ToString()
18+
sprintf "%s:%s:%s" keyType key suffix
19+
|> RedisKey.op_Implicit
20+
21+
let printNameEntryValues (nve : NameValueEntry) =
22+
printfn "Key: %A - Value: %A" nve.Name nve.Value
23+
let printStreamEntry (se : StreamEntry) =
24+
printfn "Id: %A" se.Id
25+
se.Values |> Seq.iter(printNameEntryValues )
26+
let printRedisStream (rs : RedisStream) =
27+
printfn "key: %A" rs.Key
28+
rs.Entries |> Seq.iter(printStreamEntry)
29+
30+
let rkey (s : string) = RedisKey.op_Implicit s
31+
let rval (s : string) = RedisValue.op_Implicit s
32+
let nve name value = NameValueEntry(rval name, rval value)
33+
34+
let inline map (fn: 't -> 'u) (source) : Source<'u, 'mat> =
35+
SourceOperations.Select(source, Func<_, _>(fn))
36+
37+
let inline runForEach (mat: #IMaterializer) (fn: 't -> unit) (source: Source<'t, 'mat>) = task {
38+
do! source.RunForeach(Action<_>(fn), mat)
39+
}
40+
41+
let failDueToTimeout message (ts : TimeSpan) =
42+
failtestf "%s. Expected task to complete but failed after timeout of %f ms" message ts.TotalMilliseconds
43+
44+
type StreamExpect<'a> (system : ActorSystem, predicate : seq<'a> -> bool) =
45+
let mat = system.Materializer()
46+
let values = ResizeArray<'a>()
47+
let cts = new CancellationTokenSource()
48+
let predicateConditionMet = TaskCompletionSource<unit>()
49+
let mutable debugPrint : Option<'a -> unit> = None
50+
// let mutable subscription : IDisposable = new Disposables.BooleanDisposable() :> IDisposable
51+
let checkPredicate () =
52+
if predicate values then
53+
predicateConditionMet.TrySetResult ()
54+
|> ignore
55+
56+
member this.Values
57+
with get () = values
58+
59+
member this.DebugPrint
60+
with get () = debugPrint
61+
and set (v) = debugPrint <- v
62+
member this.CaptureFromStream (s : Source<_,_>) =
63+
let printer =
64+
match debugPrint with
65+
| Some d -> d
66+
| None -> ignore
67+
68+
s
69+
|> map (fun x -> x |> printer; x)
70+
|> runForEach mat (values.Add >> checkPredicate)
71+
|> ignore
72+
73+
74+
member this.Await message (timeout : TimeSpan) = task {
75+
let generateTimeout () = task {
76+
do! Task.Delay(timeout,cts.Token)
77+
failDueToTimeout message timeout
78+
}
79+
80+
let! firstFinishedTask = Task.WhenAny [
81+
predicateConditionMet.Task
82+
generateTimeout ()
83+
]
84+
cts.Cancel()
85+
do! firstFinishedTask
86+
}
87+
88+
interface IDisposable with
89+
member x.Dispose() =
90+
mat.Dispose()
91+
cts.Dispose()
92+
93+
let ranStr n : string =
94+
let r = Random()
95+
String(Array.init n (fun _ -> char (r.Next(97,123))))
96+
97+
98+
99+
let options = ConfigurationOptions.Parse("localhost", ResponseTimeout = 100000, SyncTimeout=100000, ConnectTimeout=100000)
100+
let redis = ConnectionMultiplexer.Connect(options)
101+
[<Tests>]
102+
let tests =
103+
testSequenced <|
104+
testList "samples" [
105+
testCaseAsync "Stream should generate 2 events" <| async {
106+
use system = ActorSystem.Create("system")
107+
let db = redis.GetDatabase()
108+
let key = getUniqueKey "stream" "Foo"
109+
use expecter = new StreamExpect<_>(system, fun s -> s |> Seq.length = 2)
110+
pollStreamForever db key StreamPosition.Beginning PollOptions.Default
111+
|> expecter.CaptureFromStream
112+
113+
let values =
114+
[|
115+
nve "Field1" "Value1"
116+
nve "Field2" "Value3"
117+
|]
118+
let! x = db.StreamAddAsync(key, values) |> Async.AwaitTask
119+
let values =
120+
[|
121+
nve "Field1" "Value4"
122+
nve "Field2" "Value6"
123+
|]
124+
let! x = db.StreamAddAsync(key, values) |> Async.AwaitTask
125+
do! expecter.Await "Should have 2 results" (TimeSpan.FromSeconds(1.)) |> Async.AwaitTask
126+
}
127+
128+
testCaseAsync "Stream should generate 20000 events" <| async {
129+
use system = ActorSystem.Create("system")
130+
let total = 20000
131+
let db = redis.GetDatabase()
132+
let key = getUniqueKey "stream" "Foo"
133+
use expecter = new StreamExpect<_>(system, fun s -> s |> Seq.length = total)
134+
pollStreamForever db key StreamPosition.Beginning PollOptions.Default
135+
|> expecter.CaptureFromStream
136+
137+
job {
138+
do!
139+
[0..total]
140+
|> Seq.map(fun i ->
141+
let values =
142+
[|
143+
NameValueEntry (RedisValue.op_Implicit "Field1", RedisValue.op_Implicit total)
144+
|]
145+
job {
146+
let! x = db.StreamAddAsync(key, values) |> Async.AwaitTask
147+
return ()
148+
})
149+
|> Stream.ofSeq
150+
|> Stream.mapPipelinedJob (Environment.ProcessorCount * 4096 * 2) id
151+
|> Stream.iter
152+
} |> start
153+
154+
do! expecter.Await "Should have 20000 results" (TimeSpan.FromSeconds(30.)) |> Async.AwaitTask
155+
}
156+
157+
158+
testCaseAsync "Stream should generate large fields" <| async {
159+
use system = ActorSystem.Create("system")
160+
let total = 200
161+
let db = redis.GetDatabase()
162+
let key = getUniqueKey "stream" "Foo"
163+
use expecter = new StreamExpect<_>(system, fun s -> s |> Seq.length = total)
164+
pollStreamForever db key StreamPosition.Beginning PollOptions.Default
165+
|> expecter.CaptureFromStream
166+
167+
job {
168+
do!
169+
[0..total]
170+
|> Seq.map(fun i ->
171+
let data = ranStr (20000)
172+
let values =
173+
[|
174+
NameValueEntry (RedisValue.op_Implicit "Field1", RedisValue.op_Implicit data)
175+
|]
176+
job {
177+
let! x = db.StreamAddAsync(key, values) |> Async.AwaitTask
178+
return ()
179+
})
180+
|> Stream.ofSeq
181+
|> Stream.mapPipelinedJob (Environment.ProcessorCount * 2) id
182+
|> Stream.iter
183+
} |> start
184+
185+
do! expecter.Await "Should have 2 results" (TimeSpan.FromSeconds(30.)) |> Async.AwaitTask
186+
}
187+
]
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
Expecto
2+
FSharp.Core
3+
dotnet-mono
4+
Microsoft.NET.Test.Sdk
5+
YoloDev.Expecto.TestSdk
6+
altcover
7+
Hopac

0 commit comments

Comments
 (0)