Skip to content

Commit e1e5aa6

Browse files
committed
Extract Handler.fs for proProjector
1 parent 1c026e0 commit e1e5aa6

File tree

4 files changed

+48
-40
lines changed

4 files changed

+48
-40
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
2323
### Changed
2424

2525
- Target `Propulsion`.* v `2.0.0-rc3`
26+
- `proProjector`: extract `Handler.fs`
2627

2728
### Fixed
2829

propulsion-projector/Handler.fs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
module ProjectorTemplate.Handler
2+
3+
open Propulsion.Cosmos
4+
5+
//let replaceLongDataWithNull (x : FsCodec.ITimelineEvent<byte[]>) : FsCodec.ITimelineEvent<_> =
6+
// if x.Data.Length < 900_000 then x
7+
// else FsCodec.Core.TimelineEvent.Create(x.Index, x.EventType, null, x.Meta, timestamp=x.Timestamp)
8+
//
9+
//let hackDropBigBodies (e : Propulsion.Streams.StreamEvent<_>) : Propulsion.Streams.StreamEvent<_> =
10+
// { stream = e.stream; event = replaceLongDataWithNull e.event }
11+
12+
let mapToStreamItems (docs : Microsoft.Azure.Documents.Document seq) : Propulsion.Streams.StreamEvent<_> seq =
13+
docs
14+
|> Seq.collect EquinoxCosmosParser.enumStreamEvents
15+
// TODO use Seq.filter and/or Seq.map to adjust what's being sent etc
16+
// |> Seq.map hackDropBigBodies
17+
18+
#if kafka
19+
#if parallelOnly
20+
type ExampleOutput = { Id : string }
21+
22+
let render (doc : Microsoft.Azure.Documents.Document) : string * string =
23+
let equinoxPartition, documentId = doc.GetPropertyValue "p", doc.Id
24+
equinoxPartition, FsCodec.NewtonsoftJson.Serdes.Serialize { Id = documentId }
25+
#else
26+
let render (stream: FsCodec.StreamName, span: Propulsion.Streams.StreamSpan<_>) = async {
27+
let value =
28+
span
29+
|> Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream
30+
|> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize
31+
return FsCodec.StreamName.toString stream, value }
32+
#endif
33+
#else
34+
let handle (_stream, span: Propulsion.Streams.StreamSpan<_>) = async {
35+
let r = System.Random()
36+
let ms = r.Next(1, span.events.Length)
37+
do! Async.Sleep ms }
38+
#endif

propulsion-projector/Program.fs

Lines changed: 8 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -168,62 +168,30 @@ module Logging =
168168
c.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t)
169169
|> fun c -> c.CreateLogger()
170170

171-
let replaceLongDataWithNull (x : FsCodec.ITimelineEvent<byte[]>) : FsCodec.ITimelineEvent<_> =
172-
if x.Data.Length < 900_000 then x
173-
else FsCodec.Core.TimelineEvent.Create(x.Index, x.EventType, null, x.Meta, timestamp=x.Timestamp)
174-
175-
let hackDropBigBodies (e : Propulsion.Streams.StreamEvent<_>) : Propulsion.Streams.StreamEvent<_> =
176-
{ stream = e.stream; event = replaceLongDataWithNull e.event }
177-
178-
let mapToStreamItems (docs : Microsoft.Azure.Documents.Document seq) : Propulsion.Streams.StreamEvent<_> seq =
179-
docs
180-
|> Seq.collect EquinoxCosmosParser.enumStreamEvents
181-
// TODO use Seq.filter and/or Seq.map to adjust what's being sent etc
182-
// |> Seq.map hackDropBigBodies
183-
184-
#if kafka && nostreams
185-
type ExampleOutput = { Id : string }
186-
#endif
187-
188171
let [<Literal>] AppName = "ProjectorTemplate"
189172

190173
let build (args : CmdParser.Arguments) =
191174
let discovery, source, connector = args.Cosmos.BuildConnectionDetails()
192175
let aux, leaseId, startFromTail, maxDocuments, lagFrequency, (maxReadAhead, maxConcurrentStreams) = args.BuildChangeFeedParams()
193176
#if kafka
194177
let (broker, topic) = args.Target.BuildTargetParams()
195-
#if parallelOnly
196-
let render (doc : Microsoft.Azure.Documents.Document) : string * string =
197-
let equinoxPartition, documentId = doc.GetPropertyValue "p", doc.Id
198-
equinoxPartition, FsCodec.NewtonsoftJson.Serdes.Serialize { Id = documentId }
199178
let producer = Propulsion.Kafka.Producer(Log.Logger, AppName, broker, topic)
200-
let projector =
201-
Propulsion.Kafka.ParallelProducerSink.Start(maxReadAhead, maxConcurrentStreams, render, producer, statsInterval=TimeSpan.FromMinutes 1.)
202-
let createObserver () = CosmosSource.CreateObserver(Log.Logger, projector.StartIngester, fun x -> upcast x)
179+
#if parallelOnly
180+
let sink = Propulsion.Kafka.ParallelProducerSink.Start(maxReadAhead, maxConcurrentStreams, Handler.render, producer, statsInterval=TimeSpan.FromMinutes 1.)
181+
let createObserver () = CosmosSource.CreateObserver(Log.Logger, sink.StartIngester, fun x -> upcast x)
203182
#else
204-
let render (stream: FsCodec.StreamName, span: Propulsion.Streams.StreamSpan<_>) = async {
205-
let value =
206-
span
207-
|> Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream
208-
|> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize
209-
return FsCodec.StreamName.toString stream, value }
210-
let producer = Propulsion.Kafka.Producer(Log.Logger, AppName, broker, topic)
211-
let projector =
183+
let sink =
212184
Propulsion.Kafka.StreamsProducerSink.Start(
213-
Log.Logger, maxReadAhead, maxConcurrentStreams, render, producer,
185+
Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.render, producer,
214186
statsInterval=TimeSpan.FromMinutes 1., stateInterval=TimeSpan.FromMinutes 2.)
215-
let createObserver () = CosmosSource.CreateObserver(Log.Logger, projector.StartIngester, mapToStreamItems)
187+
let createObserver () = CosmosSource.CreateObserver(Log.Logger, sink.StartIngester, Handler.mapToStreamItems)
216188
#endif
217189
#else
218-
let project (_stream, span: Propulsion.Streams.StreamSpan<_>) = async {
219-
let r = Random()
220-
let ms = r.Next(1, span.events.Length)
221-
do! Async.Sleep ms }
222190
let sink =
223191
Propulsion.Streams.StreamsProjector.Start(
224-
Log.Logger, maxReadAhead, maxConcurrentStreams, project,
192+
Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.handle,
225193
statsInterval=TimeSpan.FromMinutes 1., stateInterval=TimeSpan.FromMinutes 5.)
226-
let createObserver () = CosmosSource.CreateObserver(Log.Logger, sink.StartIngester, mapToStreamItems)
194+
let createObserver () = CosmosSource.CreateObserver(Log.Logger, sink.StartIngester, Handler.mapToStreamItems)
227195
#endif
228196
let runSourcePipeline =
229197
CosmosSource.Run(

propulsion-projector/Projector.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
<ItemGroup>
1010
<None Include="README.md" />
11+
<Compile Include="Handler.fs" />
1112
<Compile Include="Program.fs" />
1213
</ItemGroup>
1314

0 commit comments

Comments
 (0)