Skip to content

Commit ab3e1c7

Browse files
authored
feat(propulsion sync): Add --requireAll (#262)
1 parent 13d163b commit ab3e1c7

File tree

9 files changed

+28
-18
lines changed

9 files changed

+28
-18
lines changed

src/Propulsion.CosmosStore/CosmosStoreSink.fs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ type CosmosStoreSink =
180180
/// Starts a <c>Sink</c> that ingests all submitted events into the supplied <c>context</c>
181181
static member Start
182182
( log: ILogger, maxReadAhead, eventsContext, maxConcurrentStreams, stats: CosmosStoreSinkStats,
183-
?purgeInterval, ?wakeForResults, ?idleDelay,
183+
?purgeInterval, ?wakeForResults, ?idleDelay, ?requireAll,
184184
// Default: 16384
185185
?maxEvents,
186186
// Default: 256KB (limited by maximum size of a CosmosDB stored procedure invocation)
@@ -190,6 +190,6 @@ type CosmosStoreSink =
190190
let scheduler =
191191
let dumpStreams logStreamStates _log = logStreamStates Event.storedSize
192192
Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5, prioritizeStreamsBy = Event.storedSize,
193-
?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay)
193+
?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay, ?requireAll = requireAll)
194194
Factory.Start(log, scheduler.Pump, maxReadAhead, scheduler,
195195
ingesterStateInterval = defaultArg ingesterStateInterval stats.StateInterval.Period, ?commitInterval = commitInterval)

src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
<PropertyGroup>
66
<TargetFramework>net6.0</TargetFramework>
77
<!-- <PackageValidationBaselineVersion>3.0.0-rc.12</PackageValidationBaselineVersion>-->
8+
<!-- <PackageValidationBaselineVersion>3.0.0-rc.12.1</PackageValidationBaselineVersion>-->
89
</PropertyGroup>
910

1011
<ItemGroup>
@@ -27,7 +28,7 @@
2728

2829
<ItemGroup>
2930
<ProjectReference Condition=" '$(Configuration)' == 'Debug' " Include="..\Propulsion\Propulsion.fsproj" />
30-
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="Propulsion" Version="[3.0.0-rc.12, 4.0.0)" />
31+
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="Propulsion" Version="[3.0.0-rc.13, 4.0.0)" />
3132
</ItemGroup>
3233

3334
</Project>

src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
<PropertyGroup>
66
<TargetFramework>net6.0</TargetFramework>
7-
<PackageValidationBaselineVersion>3.0.0-rc.12</PackageValidationBaselineVersion>
7+
<!-- <PackageValidationBaselineVersion>3.0.0-rc.12</PackageValidationBaselineVersion>-->
88
<DefineConstants>COSMOSV3</DefineConstants>
99
</PropertyGroup>
1010

@@ -46,7 +46,7 @@
4646

4747
<ItemGroup>
4848
<ProjectReference Condition=" '$(Configuration)' == 'Debug' " Include="..\Propulsion\Propulsion.fsproj" />
49-
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="Propulsion" Version="[3.0.0-rc.12, 4.0.0)" />
49+
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="Propulsion" Version="[3.0.0-rc.13, 4.0.0)" />
5050
</ItemGroup>
5151

5252
</Project>

src/Propulsion.DynamoStore.Notifier/Propulsion.DynamoStore.Notifier.fsproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="3.7.4.10" />
4444
<PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />
4545
<ProjectReference Condition=" '$(Configuration)' == 'Debug' " Include="..\Propulsion\Propulsion.fsproj" />
46-
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="Propulsion" Version="[3.0.0-rc.12, 4.0.0)" />
46+
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="Propulsion" Version="[3.0.0-rc.13, 4.0.0)" />
4747
</ItemGroup>
4848

4949
<!-- The Notifier dotnet new template extracts the published binaries from the tools folder using logic in https://github.com/jet/propulsion/pull/143 -->

src/Propulsion.EventStore/Propulsion.EventStore.fsproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
<ItemGroup>
2727
<ProjectReference Condition=" '$(Configuration)' == 'Debug' " Include="..\Propulsion\Propulsion.fsproj" />
28-
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="Propulsion" Version="[3.0.0-rc.12, 4.0.0)" />
28+
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="Propulsion" Version="[3.0.0-rc.13, 4.0.0)" />
2929
</ItemGroup>
3030

3131
</Project>

src/Propulsion.Feed/Propulsion.Feed.fsproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
<ItemGroup>
2525
<ProjectReference Condition=" '$(Configuration)' == 'Debug' " Include="..\Propulsion\Propulsion.fsproj" />
26-
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="Propulsion" Version="[3.0.0-rc.12, 4.0.0)" />
26+
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="Propulsion" Version="[3.0.0-rc.13, 4.0.0)" />
2727
</ItemGroup>
2828

2929
</Project>

src/Propulsion.Kafka/Propulsion.Kafka.fsproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
<ItemGroup>
2525
<ProjectReference Condition=" '$(Configuration)' == 'Debug' " Include="..\Propulsion\Propulsion.fsproj" />
26-
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="Propulsion" Version="[3.0.0-rc.12, 4.0.0)" />
26+
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="Propulsion" Version="[3.0.0-rc.13, 4.0.0)" />
2727
</ItemGroup>
2828

2929
</Project>

src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
<ItemGroup>
2929
<ProjectReference Condition=" '$(Configuration)' == 'Debug' " Include="..\Propulsion\Propulsion.fsproj" />
30-
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="Propulsion" Version="[3.0.0-rc.12, 4.0.0)" />
30+
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="Propulsion" Version="[3.0.0-rc.13, 4.0.0)" />
3131
</ItemGroup>
3232

3333
</Project>

tools/Propulsion.Tool/Sync.fs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@ type [<NoEquality; NoComparison; RequireSubcommand>] Parameters =
1111
| [<AltCommandLine "-w"; Unique>] MaxWriters of int
1212
| [<AltCommandLine "-Z"; Unique>] FromTail
1313
| [<AltCommandLine "-F"; Unique>] Follow
14+
| [<AltCommandLine "-A"; Unique>] RequireAll
1415
| [<AltCommandLine "-C"; Unique>] Categorize
1516
| [<AltCommandLine "-b"; Unique>] MaxItems of int
1617

17-
| [<AltCommandLine "-I"; AltCommandLine "--include-indexes"; Unique>] IncIdx
18+
| [<AltCommandLine "-I"; AltCommandLine "--include-system"; Unique>] IncSys
1819
| [<AltCommandLine "-cat"; AltCommandLine "--include-category">] IncCat of regex: string
1920
| [<AltCommandLine "-ncat"; AltCommandLine "--exclude-category">] ExcCat of regex: string
2021
| [<AltCommandLine "-sn"; AltCommandLine "--include-streamname">] IncStream of regex: string
@@ -32,10 +33,14 @@ type [<NoEquality; NoComparison; RequireSubcommand>] Parameters =
3233
| MaxWriters _ -> "maximum number of concurrent streams on which to process at any time. Default: 8 (Cosmos: 16)."
3334
| FromTail -> "(iff fresh projection) - force starting from present Position. Default: Ensure each and every event is projected from the start."
3435
| Follow -> "Stop when the Tail is reached."
36+
| RequireAll -> "Wait for out of order events to arrive (including waiting for event 0 per stream) before dispatching for any stream. " +
37+
"NOTE normally a large `MaxReadAhead` and `cosmos -b` is required to avoid starving the scheduler. " +
38+
"NOTE This mode does not make sense to apply unless the ProcessorName is fresh; if the consumer group name is not fresh (and hence items are excluded from the feed), there will inevitably be missing events, and processing will stall. " +
39+
"Default: assume events arrive from the changefeed (and/or the input JSON file) without any gaps or out of order deliveries for any stream."
3540
| Categorize -> "Gather handler latency stats by category"
3641
| MaxItems _ -> "Controls checkpointing granularity by adjusting the batch size being loaded from the feed. Default: Unlimited"
3742

38-
| IncIdx -> "Include Index streams. Default: Exclude Index Streams, identified by a $ prefix."
43+
| IncSys -> "Include System streams. Default: Exclude Index Streams, identified by a $ prefix."
3944
| IncCat _ -> "Allow Stream Category. Multiple values are combined with OR. Default: include all, subject to Category Deny and Stream Deny rules."
4045
| ExcCat _ -> "Deny Stream Category. Specified values/regexes are applied after the Category Allow rule(s)."
4146
| IncStream _ -> "Allow Stream Name. Multiple values are combined with OR. Default: Allow all streams that pass the category Allow test, Fail the Category and Stream deny tests."
@@ -51,7 +56,7 @@ and Arguments(c, p: ParseResults<Parameters>) =
5156
member val Filters = Propulsion.StreamFilter(
5257
allowCats = p.GetResults IncCat, denyCats = p.GetResults ExcCat,
5358
allowSns = p.GetResults IncStream, denySns = p.GetResults ExcStream,
54-
incIndexes = p.Contains IncIdx,
59+
includeSystem = p.Contains IncSys,
5560
allowEts = p.GetResults IncEvent, denyEts = p.GetResults ExcEvent)
5661
member val Categorize = p.Contains Categorize
5762
member val Command =
@@ -217,7 +222,7 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
217222
| Some x, _ -> x
218223
| None, Json _ -> System.Guid.NewGuid() |> _.ToString("N")
219224
| None, _ -> p.Raise "ConsumerGroupName is mandatory, unless consuming from a JSON file"
220-
let startFromTail, follow, maxItems = p.Contains FromTail, p.Contains Follow, p.TryGetResult MaxItems
225+
let startFromTail, follow, requireAll, maxItems = p.Contains FromTail, p.Contains Follow, p.Contains RequireAll, p.TryGetResult MaxItems
221226
let producer =
222227
match a.Command with
223228
| SubCommand.Kafka a ->
@@ -240,15 +245,19 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
240245
match producer with
241246
| None -> ()
242247
| Some producer ->
243-
let json = Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream events |> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize
248+
let json = Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream events
249+
|> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize
244250
do! producer.ProduceAsync(FsCodec.StreamName.toString stream, json) |> Async.Ignore
245251
return Propulsion.Sinks.StreamResult.AllProcessed, Outcome.render_ stream ham spam 0 }
246-
Propulsion.Sinks.Factory.StartConcurrent(Log.Logger, maxReadAhead, maxConcurrentProcessors, handle a.Filters.EventFilter, stats)
252+
Propulsion.Sinks.Factory.StartConcurrent(Log.Logger, maxReadAhead, maxConcurrentProcessors, handle a.Filters.EventFilter, stats,
253+
requireAll = requireAll)
247254
| SubCommand.Sync sa ->
248255
let eventsContext = sa.ConnectEvents() |> Async.RunSynchronously
249-
let stats = Propulsion.CosmosStore.CosmosStoreSinkStats(Log.Logger, statsInterval, stateInterval, logExternalStats = dumpStoreStats, Categorize = a.Categorize)
256+
let stats = Propulsion.CosmosStore.CosmosStoreSinkStats(Log.Logger, statsInterval, stateInterval,
257+
logExternalStats = dumpStoreStats, Categorize = a.Categorize)
250258
Propulsion.CosmosStore.CosmosStoreSink.Start(Metrics.log, maxReadAhead, eventsContext, maxConcurrentProcessors, stats,
251-
purgeInterval = TimeSpan.hours 1, maxBytes = sa.MaxBytes)
259+
maxBytes = sa.MaxBytes, requireAll = requireAll,
260+
?purgeInterval = if requireAll then None else Some (TimeSpan.hours 1))
252261
let source =
253262
match a.Command.Source with
254263
| Cosmos sa ->

0 commit comments

Comments
 (0)