@@ -59,7 +59,6 @@ and Arguments(c, p: ParseResults<Parameters>) =
5959 | x -> p.Raise $" unexpected subcommand %A {x}"
6060 member val StatsInterval = TimeSpan.minutes 1
6161 member val StateInterval = TimeSpan.minutes 5
62- member val IdleDelay = TimeSpan.ms 10.
6362and [<NoEquality; NoComparison; RequireSubcommand>] SourceParameters =
6463 | [<CliPrefix( CliPrefix.None); Last; Unique>] Cosmos of ParseResults < Args.Cosmos.Parameters >
6564 | [<CliPrefix( CliPrefix.None); Last; Unique>] Dynamo of ParseResults < Args.Dynamo.Parameters >
@@ -189,6 +188,7 @@ type Stats(log, statsInterval, stateInterval, verboseStore, logExternalStats) =
189188 handled <- 0 ; ignored <- 0
190189 intervalLats.Dump( log, " EVENTS" )
191190 intervalLats.Clear()
191+ base .DumpStats()
192192 override _.DumpState purge =
193193 accEventTypeLats.Dump( log, " ΣEVENTS" )
194194 for cat in Seq.append accHam.Categories accSpam.Categories |> Seq.distinct |> Seq.sort do
@@ -243,7 +243,7 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
243243 let json = Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream events |> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize
244244 do ! producer.ProduceAsync( FsCodec.StreamName.toString stream, json) |> Async.Ignore
245245 return Propulsion.Sinks.StreamResult.AllProcessed, Outcome.render_ stream ham spam 0 }
246- Propulsion.Sinks.Factory.StartConcurrent( Log.Logger, maxReadAhead, maxConcurrentProcessors, handle a.Filters.EventFilter, stats, idleDelay = a.IdleDelay )
246+ Propulsion.Sinks.Factory.StartConcurrent( Log.Logger, maxReadAhead, maxConcurrentProcessors, handle a.Filters.EventFilter, stats)
247247 | SubCommand.Sync a ->
248248 let eventsContext = a.ConnectEvents() |> Async.RunSynchronously
249249 let stats = Propulsion.CosmosStore.CosmosStoreSinkStats( Log.Logger, statsInterval, stateInterval)
0 commit comments