Skip to content

Commit 58d2b88

Browse files
author
Jimmy Byrd
committed
Adds Reactive Extensions
1 parent 89cefa4 commit 58d2b88

File tree

6 files changed

+123
-24
lines changed

6 files changed

+123
-24
lines changed

paket.dependencies

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ nuget StackExchange.Redis
1616
nuget Hopac
1717
nuget Akka.Streams
1818
nuget TaskBuilder.fs
19-
19+
nuget System.Reactive
2020

2121
// [ FAKE GROUP ]
2222
group Build

paket.lock

Lines changed: 18 additions & 23 deletions
Large diffs are not rendered by default.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Auto-Generated by FAKE; do not edit
2+
namespace System
3+
open System.Reflection
4+
5+
[<assembly: AssemblyTitleAttribute("Redis.Streams")>]
6+
[<assembly: AssemblyProductAttribute("Redis.Streams")>]
7+
[<assembly: AssemblyVersionAttribute("0.1.0")>]
8+
[<assembly: AssemblyMetadataAttribute("ReleaseDate","2017-03-17T00:00:00.0000000")>]
9+
[<assembly: AssemblyFileVersionAttribute("0.1.0")>]
10+
[<assembly: AssemblyInformationalVersionAttribute("0.1.0")>]
11+
[<assembly: AssemblyMetadataAttribute("ReleaseChannel","release")>]
12+
[<assembly: AssemblyMetadataAttribute("GitHash","bb8964b54bee133e9af64d316dc2cfee16df7f72")>]
13+
do ()
14+
15+
module internal AssemblyVersionInformation =
16+
let [<Literal>] AssemblyTitle = "Redis.Streams"
17+
let [<Literal>] AssemblyProduct = "Redis.Streams"
18+
let [<Literal>] AssemblyVersion = "0.1.0"
19+
let [<Literal>] AssemblyMetadata_ReleaseDate = "2017-03-17T00:00:00.0000000"
20+
let [<Literal>] AssemblyFileVersion = "0.1.0"
21+
let [<Literal>] AssemblyInformationalVersion = "0.1.0"
22+
let [<Literal>] AssemblyMetadata_ReleaseChannel = "release"
23+
let [<Literal>] AssemblyMetadata_GitHash = "bb8964b54bee133e9af64d316dc2cfee16df7f72"
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
namespace FSharp.Control.Redis.Streams
2+
3+
module Hopac =
4+
open System
5+
open System.Threading
6+
open System.Threading.Tasks
7+
open System.Reactive
8+
open System.Reactive.Linq
9+
open StackExchange.Redis
10+
open FSharp.Control.Redis.Streams.Core
11+
open FSharp.Control.Tasks.V2.ContextInsensitive
12+
13+
14+
let pollStreamForever (redisdb : IDatabase) (streamName : RedisKey) (startingPosition : RedisValue) (pollOptions : PollOptions) =
15+
let calculateNextPollDelay (nextPollDelay) =
16+
let increment = (float pollOptions.MaxPollDelay.Ticks / pollOptions.MaxPollDelayBuckets)
17+
let nextPollDelay = nextPollDelay + TimeSpan.FromTicks(int64 increment)
18+
TimeSpan.Min nextPollDelay pollOptions.MaxPollDelay
19+
20+
Observable.Create(fun (obs : IObserver<_>) ->
21+
let cts = new CancellationTokenSource()
22+
let ct = cts.Token
23+
task {
24+
let mutable nextPollDelay = TimeSpan.Zero
25+
let mutable nextPosition = startingPosition
26+
try
27+
try
28+
while not ct.IsCancellationRequested do
29+
let! (response : StreamEntry []) = redisdb.StreamRangeAsync(streamName, minId = Nullable(nextPosition), count = (Option.toNullable pollOptions.CountToPullATime))
30+
match response with
31+
| EmptySeq ->
32+
nextPollDelay <- calculateNextPollDelay nextPollDelay
33+
do! Task.Delay(nextPollDelay, ct)
34+
| entries ->
35+
let lastEntry = Seq.last entries
36+
nextPosition <- EntryId.CalculateNextPosition lastEntry.Id
37+
nextPollDelay <- TimeSpan.Zero
38+
entries |> Array.iter obs.OnNext
39+
obs.OnCompleted()
40+
with e ->
41+
obs.OnError e
42+
finally
43+
cts.Dispose()
44+
} |> ignore
45+
46+
new Disposables.CancellationDisposable(cts) :> IDisposable
47+
)
48+
49+
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?xml version="1.0" encoding="utf-8"?>
2+
<Project Sdk="Microsoft.NET.Sdk">
3+
<PropertyGroup>
4+
<TargetFrameworks>netstandard2.0;net461</TargetFrameworks>
5+
<GenerateDocumentationFile>true</GenerateDocumentationFile>
6+
</PropertyGroup>
7+
<PropertyGroup>
8+
<Title>FSharp.Control.Redis.Streams.Reactive</Title>
9+
<Description>FSharp.Control.Redis.Streams.Reactive does the thing!</Description>
10+
11+
</PropertyGroup>
12+
<PropertyGroup Condition="'$(Configuration)'=='Release'">
13+
<Optimize>true</Optimize>
14+
<Tailcalls>true</Tailcalls>
15+
16+
</PropertyGroup>
17+
<ItemGroup>
18+
<Compile Include="AssemblyInfo.fs" />
19+
<Compile Include="FSharp.Control.Redis.Streams.Reactive.fs" />
20+
</ItemGroup>
21+
<ItemGroup>
22+
<ProjectReference Include="..\FSharp.Control.Redis.Streams\FSharp.Control.Redis.Streams.fsproj" />
23+
</ItemGroup>
24+
<!-- <Import Project="..\..\fsc.props" /> -->
25+
<Import Project="..\..\netfx.props" />
26+
<Import Project="..\..\.paket\Paket.Restore.targets" />
27+
</Project>
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
FSharp.Core
2+
Microsoft.SourceLink.GitHub
3+
StackExchange.Redis
4+
System.Reactive
5+
TaskBuilder.fs

0 commit comments

Comments
 (0)