Skip to content

Commit 2bbe428

Browse files
committed
Arg processing cleanup
1 parent 4aa793e commit 2bbe428

File tree

2 files changed

+55
-61
lines changed

2 files changed

+55
-61
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
1414
- `Propulsion.CosmosStore3`: Special cased version of `Propulsion.CosmosStore` to target `Equinox.CosmosStore` v `[3.0.7`-`3.99.0]` **Deprecated; Please migrate to `Propulsion.CosmosStore` by updating `Equinox.CosmosStore` dependencies to `4.0.0`** [#139](https://github.com/jet/propulsion/pull/139)
1515
- `Propulsion.DynamoStore`: `Equinox.CosmosStore`-equivalent functionality for `Equinox.DynamoStore`. Combines elements of `CosmosStore`, `SqlStreamStore`, `Feed` [#140](https://github.com/jet/propulsion/pull/140)
1616
- `Propulsion.Tool`: `checkpoint` commandline option; enables viewing or overriding checkpoints [#141](https://github.com/jet/propulsion/pull/141)
17-
- `Propulsion.Tool`: Add support for [autoscaling throughput](https://docs.microsoft.com/en-us/azure/cosmos-db/provision-throughput-autoscale) of Cosmos containers and databases [#142](https://github.com/jet/propulsion/pull/142)
17+
- `Propulsion.Tool`: Add support for [autoscaling throughput](https://docs.microsoft.com/en-us/azure/cosmos-db/provision-throughput-autoscale) of Cosmos containers and databases [#142](https://github.com/jet/propulsion/pull/142) :pray: [@brihadish](https://github.com/brihadish)
1818

1919
### Changed
2020

tools/Propulsion.Tool/Program.fs

Lines changed: 54 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,16 @@ and [<NoComparison; NoEquality>] InitAuxParameters =
3737
| Suffix _ -> "Specify Container Name suffix (default: `-aux`)."
3838
| Cosmos _ -> "Cosmos Connection parameters."
3939

40-
and CosmosInitInfo(args : ParseResults<InitAuxParameters>) =
41-
40+
and CosmosInitInfo(p : ParseResults<InitAuxParameters>) =
4241
let throughputSpec =
43-
match args.Contains Autoscale with
44-
| true -> Equinox.CosmosStore.Core.Initialization.Throughput.Autoscale (args.GetResult(Rus, 4000))
45-
| false -> Equinox.CosmosStore.Core.Initialization.Throughput.Manual (args.GetResult(Rus, 400))
46-
47-
member _.ProvisioningMode =
48-
match args.GetResult(Mode, CosmosModeType.Container) with
42+
if p.Contains Autoscale then Equinox.CosmosStore.Core.Initialization.Throughput.Autoscale (p.GetResult(Rus, 4000))
43+
else Equinox.CosmosStore.Core.Initialization.Throughput.Manual (p.GetResult(Rus, 400))
44+
member val ProvisioningMode =
45+
match p.GetResult(Mode, CosmosModeType.Container) with
4946
| CosmosModeType.Container -> Equinox.CosmosStore.Core.Initialization.Provisioning.Container throughputSpec
5047
| CosmosModeType.Db -> Equinox.CosmosStore.Core.Initialization.Provisioning.Database throughputSpec
5148
| CosmosModeType.Serverless ->
52-
if args.Contains Rus || args.Contains Autoscale then missingArg "Cannot specify RU/s or Autoscale in Serverless mode"
49+
if p.Contains Rus || p.Contains Autoscale then missingArg "Cannot specify RU/s or Autoscale in Serverless mode"
5350
Equinox.CosmosStore.Core.Initialization.Provisioning.Serverless
5451
and [<NoEquality; NoComparison>] CheckpointParameters =
5552
| [<AltCommandLine "-s"; Mandatory>] Source of Propulsion.Feed.SourceId
@@ -108,57 +105,55 @@ let [<Literal>] appName = "propulsion-tool"
108105

109106
module CosmosInit =
110107

111-
let aux (c, a : ParseResults<InitAuxParameters>) = async {
112-
match a.TryGetSubCommand() with
113-
| Some (InitAuxParameters.Cosmos sa) ->
114-
let args = Args.Cosmos.Arguments(c, sa)
115-
let mode = (CosmosInitInfo a).ProvisioningMode
116-
let client = args.ConnectLeases()
108+
let aux (c, p : ParseResults<InitAuxParameters>) =
109+
match p.GetSubCommand() with
110+
| InitAuxParameters.Cosmos sa ->
111+
let a = Args.Cosmos.Arguments(c, sa)
112+
let mode = (CosmosInitInfo p).ProvisioningMode
113+
let client = a.ConnectLeases()
117114
match mode with
118115
| Equinox.CosmosStore.Core.Initialization.Provisioning.Container throughput ->
119-
let modeStr = "Container"
120116
match throughput with
121117
| Equinox.CosmosStore.Core.Initialization.Throughput.Autoscale rus ->
122-
Log.Information("Provisioning Leases Container having autoscale throughput of max {rus:n0} RU/s", rus)
118+
Log.Information("Provisioning Leases Container with Autoscale throughput of up to {rus:n0} RU/s", rus)
123119
| Equinox.CosmosStore.Core.Initialization.Throughput.Manual rus ->
124-
Log.Information("Provisioning Leases Container having manual throughput of {rus:n0} RU/s", rus)
120+
Log.Information("Provisioning Leases Container with {rus:n0} RU/s", rus)
125121
| Equinox.CosmosStore.Core.Initialization.Provisioning.Database throughput ->
126122
let modeStr = "Database"
127123
match throughput with
128124
| Equinox.CosmosStore.Core.Initialization.Throughput.Autoscale rus ->
129-
Log.Information("Provisioning Leases Container at {modeStr:l} level having autoscale throughput of max {rus:n0} RU/s", modeStr, rus)
125+
Log.Information("Provisioning Leases Container at {modeStr:l} level with Autoscale throughput of up to {rus:n0} RU/s", modeStr, rus)
130126
| Equinox.CosmosStore.Core.Initialization.Throughput.Manual rus ->
131-
Log.Information("Provisioning Leases Container at {modeStr:l} level having manual mode with {rus:n0} RU/s", modeStr, rus)
127+
Log.Information("Provisioning Leases Container at {modeStr:l} level with {rus:n0} RU/s", modeStr, rus)
132128
| Equinox.CosmosStore.Core.Initialization.Provisioning.Serverless ->
133129
let modeStr = "Serverless"
134130
Log.Information("Provisioning Leases Container in {modeStr:l} mode with automatic throughput RU/s as configured in account", modeStr)
135-
return! Equinox.CosmosStore.Core.Initialization.initAux client.Database.Client (client.Database.Id, client.Id) mode
136-
| _ -> return failwith "please specify a `cosmos` endpoint" }
131+
Equinox.CosmosStore.Core.Initialization.initAux client.Database.Client (client.Database.Id, client.Id) mode
132+
| x -> missingArg $"unexpected subcommand %A{x}"
137133

138134
module Checkpoints =
139135

140-
type Arguments(c, a : ParseResults<CheckpointParameters>) =
141-
136+
type Arguments(c, p : ParseResults<CheckpointParameters>) =
142137
member val StoreArgs =
143-
match a.GetSubCommand() with
138+
match p.GetSubCommand() with
144139
| CheckpointParameters.Cosmos p -> Choice1Of2 (Args.Cosmos.Arguments (c, p))
145140
| CheckpointParameters.Dynamo p -> Choice2Of2 (Args.Dynamo.Arguments (c, p))
146-
| _ -> Args.missingArg "Must specify `cosmos` or `dynamo` store"
141+
| _ -> missingArg "Must specify `cosmos` or `dynamo` store"
147142

148-
let readOrOverride (c, a : ParseResults<CheckpointParameters>) = async {
149-
let args = Arguments(c, a)
150-
let source, tranche, group = a.GetResult Source, a.GetResult Tranche, a.GetResult Group
143+
let readOrOverride (c, p : ParseResults<CheckpointParameters>) = async {
144+
let a = Arguments(c, p)
145+
let source, tranche, group = p.GetResult Source, p.GetResult Tranche, p.GetResult Group
151146
let! store, storeSpecFragment, overridePosition = async {
152147
let cache = Equinox.Cache (appName, sizeMb = 1)
153-
match args.StoreArgs with
148+
match a.StoreArgs with
154149
| Choice1Of2 a ->
155150
let! store = a.CreateCheckpointStore(group, cache, Log.forMetrics)
156151
return (store : Propulsion.Feed.IFeedCheckpointStore), "cosmos", fun pos -> store.Override(source, tranche, pos)
157152
| Choice2Of2 a ->
158153
let store = a.CreateCheckpointStore(group, cache, Log.forMetrics)
159154
return store, $"dynamo -t {a.IndexTable}", fun pos -> store.Override(source, tranche, pos) }
160155
Log.Information("Checkpoint Source {source} Tranche {tranche} Consumer Group {group}", source, tranche, group)
161-
match a.TryGetResult OverridePosition with
156+
match p.TryGetResult OverridePosition with
162157
| None ->
163158
let! interval, pos = store.Start(source, tranche)
164159
Log.Information("Checkpoint position {pos}; Checkpoint event frequency {checkpointEventIntervalM:f0}m", pos, interval.TotalMinutes)
@@ -171,28 +166,28 @@ module Checkpoints =
171166

172167
module Project =
173168

174-
type KafkaArguments(c, a : ParseResults<KafkaParameters>) =
175-
member _.Broker = a.TryGetResult Broker |> Option.defaultWith (fun () -> c.KafkaBroker)
176-
member _.Topic = a.TryGetResult Topic |> Option.defaultWith (fun () -> c.KafkaTopic)
169+
type KafkaArguments(c, p : ParseResults<KafkaParameters>) =
170+
member _.Broker = p.TryGetResult Broker |> Option.defaultWith (fun () -> c.KafkaBroker)
171+
member _.Topic = p.TryGetResult Topic |> Option.defaultWith (fun () -> c.KafkaTopic)
177172
member val StoreArgs =
178-
match a.GetSubCommand() with
173+
match p.GetSubCommand() with
179174
| KafkaParameters.Cosmos p -> Choice1Of2 (Args.Cosmos.Arguments (c, p))
180175
| KafkaParameters.Dynamo p -> Choice2Of2 (Args.Dynamo.Arguments (c, p))
181-
| _ -> Args.missingArg "Must specify `cosmos` or `dynamo` store"
176+
| x -> missingArg $"unexpected subcommand %A{x}"
182177

183-
type StatsArguments(c, a : ParseResults<StatsParameters>) =
178+
type StatsArguments(c, p : ParseResults<StatsParameters>) =
184179
member val StoreArgs =
185-
match a.GetSubCommand() with
180+
match p.GetSubCommand() with
186181
| StatsParameters.Cosmos p -> Choice1Of2 (Args.Cosmos.Arguments (c, p))
187182
| StatsParameters.Dynamo p -> Choice2Of2 (Args.Dynamo.Arguments (c, p))
188183

189-
type Arguments(c, a : ParseResults<ProjectParameters>) =
184+
type Arguments(c, p : ParseResults<ProjectParameters>) =
190185
member val IdleDelay = TimeSpan.FromMilliseconds 10.
191186
member val StoreArgs =
192-
match a.GetSubCommand() with
187+
match p.GetSubCommand() with
193188
| Kafka a -> KafkaArguments(c, a).StoreArgs
194189
| Stats a -> StatsArguments(c, a).StoreArgs
195-
| x -> Args.missingArg $"Invalid subcommand %A{x}"
190+
| x -> missingArg $"unexpected subcommand %A{x}"
196191

197192
type Stats(statsInterval, statesInterval, logExternalStats) =
198193
inherit Propulsion.Streams.Stats<unit>(Log.Logger, statsInterval = statsInterval, statesInterval = statesInterval)
@@ -203,25 +198,25 @@ module Project =
203198
base.DumpStats()
204199
logExternalStats Log.Logger
205200

206-
let run (c : Args.Configuration, a : ParseResults<ProjectParameters>) = async {
207-
let args = Arguments(c, a)
201+
let run (c : Args.Configuration, p : ParseResults<ProjectParameters>) = async {
202+
let a = Arguments(c, p)
208203
let storeArgs, dumpStoreStats =
209-
match args.StoreArgs with
204+
match a.StoreArgs with
210205
| Choice1Of2 sa -> Choice1Of2 sa, Equinox.CosmosStore.Core.Log.InternalMetrics.dump
211206
| Choice2Of2 sa -> Choice2Of2 sa, Equinox.DynamoStore.Core.Log.InternalMetrics.dump
212-
let group, startFromTail, maxItems = a.GetResult ConsumerGroupName, a.Contains FromTail, a.TryGetResult MaxItems
207+
let group, startFromTail, maxItems = p.GetResult ConsumerGroupName, p.Contains FromTail, p.TryGetResult MaxItems
213208
match maxItems with None -> () | Some bs -> Log.Information("ChangeFeed Max items Count {changeFeedMaxItems}", bs)
214209
if startFromTail then Log.Warning("ChangeFeed (If new projector group) Skipping projection of all existing events.")
215210
let producer =
216-
match a.GetSubCommand() with
211+
match p.GetSubCommand() with
217212
| Kafka a ->
218213
let a = KafkaArguments(c, a)
219214
let linger = FsKafka.Batching.BestEffortSerial (TimeSpan.FromMilliseconds 100.)
220215
let cfg = FsKafka.KafkaProducerConfig.Create(appName, a.Broker, Confluent.Kafka.Acks.Leader, linger, Confluent.Kafka.CompressionType.Lz4)
221216
let p = FsKafka.KafkaProducer.Create(Log.Logger, cfg, a.Topic)
222217
Some p
223218
| Stats _ -> None
224-
| x -> Args.missingArg $"Invalid subcommand %A{x}"
219+
| x -> missingArg $"unexpected subcommand %A{x}"
225220
let stats = Stats(TimeSpan.FromMinutes 1., TimeSpan.FromMinutes 5., logExternalStats = dumpStoreStats)
226221
let sink =
227222
let maxReadAhead, maxConcurrentStreams = 2, 16
@@ -231,7 +226,7 @@ module Project =
231226
| Some producer ->
232227
let json = Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream span |> Newtonsoft.Json.JsonConvert.SerializeObject
233228
let! _ = producer.ProduceAsync(FsCodec.StreamName.toString stream, json) in () }
234-
Propulsion.Streams.StreamsProjector.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, handle, stats, stats.StatsInterval, idleDelay = args.IdleDelay)
229+
Propulsion.Streams.StreamsProjector.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, handle, stats, stats.StatsInterval, idleDelay = a.IdleDelay)
235230
let source =
236231
match storeArgs with
237232
| Choice1Of2 sa ->
@@ -273,20 +268,19 @@ let parseCommandLine argv =
273268

274269
[<EntryPoint>]
275270
let main argv =
276-
let args = parseCommandLine argv
277-
let verbose, verboseConsole, verboseStore = args.Contains Verbose, args.Contains VerboseConsole, args.Contains VerboseStore
278-
let metrics = Sinks.equinoxMetricsOnly
279-
Log.Logger <- LoggerConfiguration().Configure(verbose).Sinks(metrics, verboseConsole, verboseStore).CreateLogger()
280-
281-
try try let c = Args.Configuration(Environment.GetEnvironmentVariable >> Option.ofObj)
282-
try match args.GetSubCommand() with
271+
try let a = parseCommandLine argv
272+
let verbose, verboseConsole, verboseStore = a.Contains Verbose, a.Contains VerboseConsole, a.Contains VerboseStore
273+
let metrics = Sinks.equinoxMetricsOnly
274+
try Log.Logger <- LoggerConfiguration().Configure(verbose).Sinks(metrics, verboseConsole, verboseStore).CreateLogger()
275+
let c = Args.Configuration(Environment.GetEnvironmentVariable >> Option.ofObj)
276+
try match a.GetSubCommand() with
283277
| Init a -> CosmosInit.aux (c, a) |> Async.Ignore<Microsoft.Azure.Cosmos.Container> |> Async.RunSynchronously
284278
| Checkpoint a -> Checkpoints.readOrOverride (c, a) |> Async.RunSynchronously
285279
| Project a -> Project.run (c, a) |> Async.RunSynchronously
286-
| _ -> Args.missingArg "Please specify a valid subcommand :- init, checkpoint or project"
280+
| x -> missingArg $"unexpected subcommand %A{x}"
287281
0
288-
with e when not (e :? Args.MissingArg || e :? ArguParseException) -> Log.Fatal(e, "Exiting"); 2
282+
with e when not (e :? MissingArg || e :? ArguParseException) -> Log.Fatal(e, "Exiting"); 2
289283
finally Log.CloseAndFlush()
290-
with Args.MissingArg msg -> eprintfn $"ERROR: %s{msg}"; 1
291-
| :? ArguParseException as e -> eprintfn $"%s{e.Message}"; 1
292-
| e -> eprintfn $"Exception %s{e.Message}"; 1
284+
with :? ArguParseException as e -> eprintfn $"%s{e.Message}"; 1
285+
| MissingArg msg -> eprintfn $"ERROR: %s{msg}"; 1
286+
| e -> eprintfn $"EXCEPTION: %s{e.Message}"; 1

0 commit comments

Comments
 (0)