Skip to content

Commit 52b1653

Browse files
author
Jimmy Byrd
committed
Allowp passing of cancellationToken to Observable.taskUnfold
1 parent 68106d9 commit 52b1653

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

src/FSharp.Control.Redis.Streams.Reactive/FSharp.Control.Redis.Streams.Reactive.fs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ module Reactive =
1111
open FSharp.Control.Tasks.V2.ContextInsensitive
1212

1313
module Observable =
14-
let taskUnfold (fn: 's -> Task<('s * 'e) option>) (state: 's) =
14+
let taskUnfold (fn: 's -> CancellationToken -> Task<('s * 'e) option>) (state: 's) =
1515
Observable.Create(fun (obs : IObserver<_>) ->
1616
let cts = new CancellationTokenSource()
1717
let ct = cts.Token
@@ -21,7 +21,7 @@ module Reactive =
2121
try
2222
try
2323
while not ct.IsCancellationRequested || not isFinished do
24-
let! result = fn innerState
24+
let! result = fn innerState ct
2525
match result with
2626
| Some (newState, output) ->
2727
innerState <- newState
@@ -41,7 +41,7 @@ module Reactive =
4141
observable.SelectMany (fun x -> x :> seq<_>)
4242

4343
let pollStreamForever (redisdb : IDatabase) (streamName : RedisKey) (startingPosition : RedisValue) (pollOptions : PollOptions) =
44-
Observable.taskUnfold (fun (nextPosition, pollDelay) -> task {
44+
Observable.taskUnfold (fun (nextPosition, pollDelay) ct -> task {
4545
let! (response : StreamEntry []) = redisdb.StreamRangeAsync(streamName, minId = Nullable(nextPosition), count = (Option.toNullable pollOptions.CountToPullATime))
4646
match response with
4747
| EmptyArray ->

0 commit comments

Comments
 (0)