@@ -277,14 +277,14 @@ module Dispatch =
277277 // On each iteration, we try to fill the in-flight queue, taking the oldest and/or heaviest streams first
278278 let tryFillDispatcher ( potential : seq < Item < 'F >>) markStarted project markBusy =
279279 let xs = potential.GetEnumerator()
280- let startTimestamp = System.Diagnostics. Stopwatch.GetTimestamp ()
280+ let startTs = Stopwatch.timestamp ()
281281 let mutable hasCapacity , dispatched = true , false
282282 while xs.MoveNext() && hasCapacity do
283283 let item = xs.Current
284- let succeeded = inner.TryAdd( project struct ( startTimestamp , item))
284+ let succeeded = inner.TryAdd( project struct ( startTs , item))
285285 if succeeded then
286286 markBusy item.stream
287- markStarted ( item.stream, startTimestamp )
287+ markStarted ( item.stream, startTs )
288288 hasCapacity <- succeeded
289289 dispatched <- dispatched || succeeded // if we added any request, we'll skip sleeping
290290 struct ( dispatched, hasCapacity)
@@ -434,11 +434,6 @@ module Scheduling =
434434
435435 type [<Struct; NoEquality; NoComparison>] BufferState = Idle | Active | Full | Slipstreaming
436436
437- module StopwatchTicks =
438-
439- let inline elapsed ( sw : System.Diagnostics.Stopwatch ) = sw.ElapsedTicks
440- let inline toTimeSpan ticks = TimeSpan.FromSeconds( double ticks / double System.Diagnostics.Stopwatch.Frequency)
441-
442437 module Stats =
443438
444439 /// Manages state used to generate metrics (and summary logs) regarding streams currently being processed by a Handler
@@ -447,8 +442,8 @@ module Scheduling =
447442 type private StreamState = { ts : int64 ; mutable count : int }
448443 let private walkAges ( state : Dictionary < _ , _ >) =
449444 if state.Count = 0 then Seq.empty else
450- let currentTimestamp = System.Diagnostics. Stopwatch.GetTimestamp ()
451- seq { for x in state.Values -> struct ( currentTimestamp - x.ts, x.count) }
445+ let currentTs = Stopwatch.timestamp ()
446+ seq { for x in state.Values -> struct ( currentTs - x.ts, x.count) }
452447 let private renderState agesAndCounts =
453448 let mutable oldest , newest , streams , attempts = Int64.MinValue, Int64.MaxValue, 0 , 0
454449 for struct ( diff, count) in agesAndCounts do
@@ -457,7 +452,7 @@ module Scheduling =
457452 streams <- streams + 1
458453 attempts <- attempts + count
459454 if streams = 0 then oldest <- 0 L; newest <- 0 L
460- struct ( streams, attempts), struct ( StopwatchTicks.toTimeSpan oldest, StopwatchTicks.toTimeSpan newest)
455+ struct ( streams, attempts), struct ( Stopwatch.ticksToTimeSpan oldest, Stopwatch.ticksToTimeSpan newest)
461456 /// Manages the list of currently dispatched Handlers
462457 /// NOTE we are guaranteed we'll hear about a Start before a Finish (or another Start) per stream by the design of the Dispatcher
463458 type private Active () =
@@ -507,17 +502,17 @@ module Scheduling =
507502 type [<NoComparison; NoEquality>] Timers () =
508503 let mutable results , dispatch , merge , ingest , stats , sleep = 0 L, 0 L, 0 L, 0 L, 0 L, 0 L
509504 let sw = Stopwatch.start()
510- member _.RecordResults sw = results <- results + StopwatchTicks.elapsed sw
511- member _.RecordDispatch sw = dispatch <- dispatch + StopwatchTicks.elapsed sw
512- member _.RecordMerge sw = merge <- merge + StopwatchTicks.elapsed sw
513- member _.RecordIngest sw = ingest <- ingest + StopwatchTicks.elapsed sw
514- member _.RecordStats sw = stats <- stats + StopwatchTicks.elapsed sw
515- member _.RecordSleep sw = sleep <- sleep + StopwatchTicks.elapsed sw
505+ member _.RecordResults ts = results <- results + Stopwatch.elapsedTicks ts
506+ member _.RecordDispatch ts = dispatch <- dispatch + Stopwatch.elapsedTicks ts
507+ member _.RecordMerge ts = merge <- merge + Stopwatch.elapsedTicks ts
508+ member _.RecordIngest ts = ingest <- ingest + Stopwatch.elapsedTicks ts
509+ member _.RecordStats ts = stats <- stats + Stopwatch.elapsedTicks ts
510+ member _.RecordSleep ts = sleep <- sleep + Stopwatch.elapsedTicks ts
516511 member _.Dump ( log : ILogger ) =
517- let dt , ft , mt = StopwatchTicks.toTimeSpan results, StopwatchTicks.toTimeSpan dispatch, StopwatchTicks.toTimeSpan merge
518- let it , st , zt = StopwatchTicks.toTimeSpan ingest, StopwatchTicks.toTimeSpan stats, StopwatchTicks.toTimeSpan sleep
512+ let dt , ft , mt = Stopwatch.ticksToTimeSpan results, Stopwatch.ticksToTimeSpan dispatch, Stopwatch.ticksToTimeSpan merge
513+ let it , st , zt = Stopwatch.ticksToTimeSpan ingest, Stopwatch.ticksToTimeSpan stats, Stopwatch.ticksToTimeSpan sleep
519514 let m = Log.Metric.SchedulerCpu ( mt, it, ft, dt, st)
520- let tot = StopwatchTicks.toTimeSpan ( results + dispatch + merge + ingest + stats + sleep)
515+ let tot = Stopwatch.ticksToTimeSpan ( results + dispatch + merge + ingest + stats + sleep)
521516 ( log |> Log.withMetric m) .Information( " Cpu Streams {mt:n1}s Batches {it:n1}s Dispatch {ft:n1}s Results {dt:n1}s Stats {st:n1}s Sleep {zt:n1}s Total {tot:n1}s Interval {int:n1}s" ,
522517 mt.TotalSeconds, it.TotalSeconds, ft.TotalSeconds, dt.TotalSeconds, st.TotalSeconds, zt.TotalSeconds, tot.TotalSeconds, sw.ElapsedSeconds)
523518 results <- 0 L; dispatch <- 0 L; merge <- 0 L; ingest <- 0 L; stats <- 0 L; sleep <- 0 L
@@ -632,9 +627,9 @@ module Scheduling =
632627 static member Create ( inner ,
633628 project : struct ( FsCodec.StreamName * StreamSpan < 'F >) -> CancellationToken -> Task < struct ( bool * Choice < 'P , 'E >)>,
634629 interpretProgress , stats , dumpStreams ) =
635- let project struct ( startTicks , item : Dispatch.Item < 'F >) ( ct : CancellationToken ) = task {
630+ let project struct ( startTs , item : Dispatch.Item < 'F >) ( ct : CancellationToken ) = task {
636631 let! progressed , res = project ( item.stream, item.span) ct
637- return struct ( System.Diagnostics. Stopwatch.GetTimestamp () - startTicks |> StopwatchTicks.toTimeSpan , item.stream, progressed, res) }
632+ return struct ( Stopwatch.elapsed startTs , item.stream, progressed, res) }
638633 MultiDispatcher<_, _, _, _>( inner, project, interpretProgress, stats, dumpStreams)
639634 static member Create ( inner , handle , interpret , toIndex , stats , dumpStreams ) =
640635 let project item ct = task {
@@ -877,17 +872,17 @@ module Scheduling =
877872
878873 member _.Pump ( ct : CancellationToken ) = task {
879874 use _ = dispatcher.Result.Subscribe( fun struct ( t , s , pr , r ) -> writeResult ( Result ( t, s, pr, r)))
880- let inline ssw () = Stopwatch.start ()
875+ let inline ts () = Stopwatch.timestamp ()
881876 while not ct.IsCancellationRequested do
882877 let mutable s = { idle = true ; dispatcherState = Idle; remaining = maxCycles; waitForPending = false ; waitForCapacity = false }
883878 let t = dispatcher.Timers
884879 while s.remaining <> 0 do
885880 s.remaining <- s.remaining - 1
886881 // 1. propagate write write outcomes to buffer (can mark batches completed etc)
887- let processedResults = let sw = ssw () in let r = tryHandleResults () in t.RecordResults sw ; r
882+ let processedResults = let ts = ts () in let r = tryHandleResults () in t.RecordResults ts ; r
888883 // 2. top up provisioning of writers queue
889884 // On each iteration, we try to fill the in-flight queue, taking the oldest and/or heaviest streams first
890- let struct ( dispatched , hasCapacity ) = let sw = ssw () in let r = tryDispatch s.IsSlipStreaming in t.RecordDispatch sw ; r
885+ let struct ( dispatched , hasCapacity ) = let ts = ts () in let r = tryDispatch s.IsSlipStreaming in t.RecordDispatch ts ; r
891886 s.idle <- s.idle && not processedResults && not dispatched
892887 match s.dispatcherState with
893888 | Idle when not hasCapacity ->
@@ -899,8 +894,8 @@ module Scheduling =
899894 | Idle -> // need to bring more work into the pool as we can't fill the work queue from what we have
900895 // If we're going to fill the write queue with random work, we should bring all read events into the state first
901896 // Hence we potentially take more than one batch at a time based on maxBatches (but less buffered work is more optimal)
902- let mergeStreams batchStreams = let sw = ssw () in streams.Merge batchStreams; t.RecordMerge sw
903- let ingestBatch batch = let sw = ssw () in ingestBatch batch; t.RecordIngest sw
897+ let mergeStreams batchStreams = let ts = ts () in streams.Merge batchStreams; t.RecordMerge ts
898+ let ingestBatch batch = let ts = ts () in ingestBatch batch; t.RecordIngest ts
904899 let struct ( ingested , filled ) = ingestBatches mergeStreams ingestBatch maxBatches
905900 if ingested then s.waitForPending <- not filled // no need to wait if we ingested as many as needed
906901 elif slipstreamingEnabled then s.dispatcherState <- Slipstreaming; s.waitForPending <- true // try some slip-streaming, but wait for proper items too
@@ -911,17 +906,17 @@ module Scheduling =
911906 if s.remaining = 0 && hasCapacity then s.waitForPending <- true
912907 if s.remaining = 0 && not hasCapacity && not wakeForResults then s.waitForCapacity <- true
913908 // While the loop can take a long time, we don't attempt logging of stats per iteration on the basis that the maxCycles should be low
914- let sw = ssw () in dispatcher.RecordStats( pendingCount()); t.RecordStats sw
909+ let ts = ts () in dispatcher.RecordStats( pendingCount()); t.RecordStats ts
915910 // 4. Do a minimal sleep so we don't run completely hot when empty (unless we did something non-trivial)
916911 if s.idle then
917- let sleepSw = ssw ()
912+ let sleepTs = Stopwatch.timestamp ()
918913 let wakeConditions : Task array = [|
919914 if wakeForResults then awaitResults ct
920915 elif s.waitForCapacity then dispatcher.AwaitCapacity()
921916 if s.waitForPending then awaitPending ct
922917 Task.Delay( int sleepIntervalMs) |]
923918 do ! Task.WhenAny( wakeConditions) :> Task
924- t.RecordSleep sleepSw
919+ t.RecordSleep sleepTs
925920 // 3. Record completion state once per full iteration; dumping streams is expensive so needs to be done infrequently
926921 if dispatcher.RecordState( s.dispatcherState, streams, totalPurged) && purgeDue () then
927922 purge () }
@@ -1222,11 +1217,11 @@ module Sync =
12221217
12231218 let attemptWrite struct ( stream , span : FsCodec.ITimelineEvent < 'F > array ) ct = task {
12241219 let struct ( met , span' ) = StreamSpan.slice< 'F> sliceSize ( maxEvents, maxBytes) span
1225- let prepareSw = Stopwatch.start ()
1220+ let prepareTs = Stopwatch.timestamp ()
12261221 try let req = struct ( stream, span')
12271222 let! res , outcome = Async.StartImmediateAsTask( handle req, cancellationToken = ct)
12281223 let index ' = SpanResult.toIndex span' res
1229- return struct ( index' > span[ 0 ]. Index, Choice1Of2 struct ( index', struct ( met, prepareSw.Elapsed ), outcome))
1224+ return struct ( index' > span[ 0 ]. Index, Choice1Of2 struct ( index', struct ( met, Stopwatch.elapsed prepareTs ), outcome))
12301225 with e -> return struct ( false , Choice2Of2 struct ( met, e)) }
12311226
12321227 let interpretWriteResultProgress _streams ( stream : FsCodec.StreamName ) = function
0 commit comments