Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -482,3 +482,20 @@ Resources:
- [low level documentation of the client settings](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
- [thorough free book](https://www.confluent.io/wp-content/uploads/confluent-kafka-definitive-guide-complete.pdf)
- [medium post covering some high level structures that Jet explored in this space](https://medium.com/@eulerfx/scaling-event-sourcing-at-jet-9c873cac33b8).


# Telemetry

Propulsion emits OpenTelemetry spans for stream processing. All span attributes are prefixed with the `propulsion.`
namespace

## {category} process

| Attribute | Description |
|-------------------|------------------------------------------------------------------------------------------------------------------|
| `category` | The category being processed |
| `stream_name` | The full stream name being processed |
| `stream_id` | The id of the stream being processed |
| `batch_size` | The size of the batch being processed |
| `first_timestamp` | The receive timestamp of the first event in the batch being handled |
| `lead_time_ms` | The [lead time](https://www.merriam-webster.com/dictionary/lead%20time) in milliseconds for processing the batch |
4 changes: 1 addition & 3 deletions src/Propulsion.Feed/FeedSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,7 @@ type TailingFeedSource

let crawl trancheId (wasLast, startPos) ct = taskSeq {
if wasLast then do! Task.delay tailSleepInterval ct
try let batches = crawl.Invoke(trancheId, startPos, ct)
for batch in batches do
yield batch
try yield! crawl.Invoke(trancheId, startPos, ct)
with e -> // Swallow (and sleep, if requested) if there's an issue reading from a tailing log
match logReadFailure with None -> log.ForContext("tranche", trancheId).ForContext<TailingFeedSource>().Warning(e, "Read failure") | Some l -> l e
match readFailureSleepInterval with None -> () | Some interval -> do! Task.delay interval ct }
Expand Down
21 changes: 19 additions & 2 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace Propulsion.Streams

open System.Diagnostics
open Propulsion
open Propulsion.Internal
open Serilog
Expand Down Expand Up @@ -848,6 +849,8 @@ module Dispatcher =
member _.AwaitCapacity(ct) = inner.AwaitButRelease(ct)
member _.TryReplenish(pending, markStarted, project) = tryFillDispatcher pending markStarted project

let private source = new ActivitySource("Propulsion")

/// Implementation of IDispatcher that feeds items to an item dispatcher that maximizes concurrent requests (within a limit)
type Concurrent<'P, 'R, 'E, 'F> internal
( inner : ItemDispatcher<Result<'P, 'E>, 'F>,
Expand All @@ -857,9 +860,23 @@ module Dispatcher =
( maxDop,
project : FsCodec.StreamName -> FsCodec.ITimelineEvent<'F>[] -> CancellationToken -> Task<struct (bool * Result<'P, 'E>)>,
interpretProgress) =
let project struct (startTs, item : Scheduling.Item<'F>) (ct : CancellationToken) = task {
let project struct (startTs: int64, item : Scheduling.Item<'F>) (ct : CancellationToken) = task {
use act = source.StartActivity("process", ActivityKind.Consumer)
if act <> null then
let struct(category, streamId) = FsCodec.StreamName.splitCategoryAndStreamId item.stream
act.DisplayName <- $"{category} process"
act.SetTag("propulsion.stream_name", item.stream)
.SetTag("propulsion.stream_id", streamId)
.SetTag("propulsion.category", category)
.SetTag("propulsion.batch_size", item.span.Length)
.SetTag("propulsion.first_timestamp", item.span[0].Timestamp)
|> ignore
let! struct (progressed, res) = project item.stream item.span ct
return struct (Stopwatch.elapsed startTs, item.stream, progressed, res) }
let elapsed = Stopwatch.elapsed startTs
if act <> null then
let oldestItemTs = item.span[0].Timestamp
act.SetTag("propulsion.lead_time_ms", (DateTimeOffset.UtcNow - oldestItemTs).TotalMilliseconds) |> ignore
return struct (elapsed, item.stream, progressed, res) }
Concurrent<_, _, _, _>(ItemDispatcher(maxDop), project, interpretProgress)
static member Create(maxDop, prepare : Func<_, _, _>, handle : Func<_, _, CancellationToken, Task<_>>, toIndex : Func<_, 'R, int64>) =
let project sn span ct = task {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox.MessageDb" Version="4.0.0-rc.11" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.10" />
<PackageReference Include="OpenTelemetry" Version="1.5.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.5.0" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.5.0" />
<PackageReference Include="TypeShape" Version="10.0.0" />
</ItemGroup>
<ItemGroup>
Expand Down
193 changes: 99 additions & 94 deletions tests/Propulsion.MessageDb.Integration/Tests.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module Propulsion.MessageDb.Integration.Tests
module Propulsion.MessageDb.Integration

open Npgsql
open NpgsqlTypes
open Propulsion.Internal
open Propulsion.MessageDb
open Swensen.Unquote
Expand All @@ -10,22 +8,21 @@ open System.Collections.Generic
open System.Diagnostics
open System.Threading.Tasks
open Xunit
open OpenTelemetry
open OpenTelemetry.Trace
open OpenTelemetry.Resources

let source = new ActivitySource("Propulsion.MessageDb.Integration")

module Simple =
type Hello = { name : string}
type Event =
| Hello of Hello
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.SystemTextJson.Codec.Create<Event>()

let createStreamMessage streamName =
let cmd = NpgsqlBatchCommand()
cmd.CommandText <- "select 1 from write_message(@Id::text, @StreamName, @EventType, @Data, null, null)"
cmd.Parameters.AddWithValue("Id", NpgsqlDbType.Uuid, Guid.NewGuid()) |> ignore
cmd.Parameters.AddWithValue("StreamName", NpgsqlDbType.Text, streamName) |> ignore
cmd.Parameters.AddWithValue("EventType", NpgsqlDbType.Text, "Hello") |> ignore
cmd.Parameters.AddWithValue("Data", NpgsqlDbType.Jsonb, """{"name": "world"}""") |> ignore
cmd
type State = unit
let initial = ()
let fold state events = state

let ConnectionString =
match Environment.GetEnvironmentVariable "MSG_DB_CONNECTION_STRING" with
Expand All @@ -36,25 +33,18 @@ let CheckpointConnectionString =
| null -> "Host=localhost; Database=message_store; Port=5432; Username=postgres; Password=postgres"
| s -> s

let decider categoryName id =
let client = Equinox.MessageDb.MessageDbClient(ConnectionString)
let ctx = Equinox.MessageDb.MessageDbContext(client)
let category = Equinox.MessageDb.MessageDbCategory(ctx, Simple.codec, Simple.fold, Simple.initial)
Equinox.Decider.resolve Serilog.Log.Logger category categoryName (Equinox.StreamId.gen string id)

let connect () = task {
let conn = new NpgsqlConnection(ConnectionString)
do! conn.OpenAsync()
return conn
}

let writeMessagesToStream (conn: NpgsqlConnection) streamName = task {
let batch = conn.CreateBatch()
for _ in 1..20 do
let cmd = createStreamMessage streamName
batch.BatchCommands.Add(cmd)
do! batch.ExecuteNonQueryAsync() :> Task }

let writeMessagesToCategory conn category = task {
let writeMessagesToCategory category = task {
for _ in 1..50 do
let streamName = $"{category}-{Guid.NewGuid():N}"
do! writeMessagesToStream conn streamName
}
let streamId = Guid.NewGuid().ToString("N")
let decider = decider category streamId
let decide _ = List.replicate 20 (Simple.Event.Hello { name = "world" })
do! decider.Transact(decide, load = Equinox.LoadOption.AssumeEmpty) }

let stats log = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, TimeSpan.FromMinutes 1)
with member _.HandleOk x = ()
Expand All @@ -65,47 +55,6 @@ let makeCheckpoints consumerGroup = task {
do! checkpoints.CreateSchemaIfNotExists()
return checkpoints }

[<Fact>]
let ``It processes events for a category`` () = task {
use! conn = connect ()
let log = Serilog.Log.Logger
let consumerGroup = $"{Guid.NewGuid():N}"
let category1 = $"{Guid.NewGuid():N}"
let category2 = $"{Guid.NewGuid():N}"
do! writeMessagesToCategory conn category1
do! writeMessagesToCategory conn category2
let! checkpoints = makeCheckpoints consumerGroup
let stats = stats log
let mutable stop = ignore
let handled = HashSet<_>()
let handle stream (events: Propulsion.Sinks.Event[]) _ct = task {
lock handled (fun _ ->
for evt in events do
handled.Add((stream, evt.Index)) |> ignore)
test <@ Array.chooseV Simple.codec.TryDecode events |> Array.forall ((=) (Simple.Hello { name = "world" })) @>
if handled.Count >= 2000 then
stop ()
return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) }
use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats)
let source = MessageDbSource(
log, TimeSpan.FromMinutes 1,
ConnectionString, 1000, TimeSpan.FromMilliseconds 100,
checkpoints, sink, [| category1; category2 |])
use src = source.Start()
stop <- src.Stop

Task.Delay(TimeSpan.FromSeconds 30).ContinueWith(fun _ -> src.Stop()) |> ignore

do! src.Await()

// 2000 total events
test <@ handled.Count = 2000 @>
// 20 in each stream
test <@ handled |> Array.ofSeq |> Array.groupBy fst |> Array.map (snd >> Array.length) |> Array.forall ((=) 20) @>
// they were handled in order within streams
let ordering = handled |> Seq.groupBy fst |> Seq.map (snd >> Seq.map snd >> Seq.toArray) |> Seq.toArray
test <@ ordering |> Array.forall ((=) [| 0L..19L |]) @> }

type ActivityCapture() =
let operations = ResizeArray()
let listener =
Expand All @@ -121,29 +70,85 @@ type ActivityCapture() =
interface IDisposable with
member _.Dispose() = listener.Dispose()

[<Fact>]
let ``It doesn't read the tail event again`` () = task {
let log = Serilog.LoggerConfiguration().CreateLogger()
let consumerGroup = $"{Guid.NewGuid():N}"
let category = $"{Guid.NewGuid():N}"
use! conn = connect ()
do! writeMessagesToStream conn $"{category}-1"
let! checkpoints = makeCheckpoints consumerGroup

let stats = stats log
type Tests() =
let sdk =
Sdk.CreateTracerProviderBuilder()
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(serviceName = "Tests"))
.AddSource("Equinox")
.AddSource("Equinox.MessageDb")
.AddSource("Propulsion")
.AddSource("Propulsion.MessageDb.Integration")
.AddSource("Npgsql")
.AddOtlpExporter(fun opts -> opts.Endpoint <- Uri("http://localhost:4317"))
.AddConsoleExporter()
.Build()

[<Fact>]
let ``It processes events for a category`` () = task {
use _ = source.StartActivity("It processes events for a category", ActivityKind.Server)
let log = Serilog.Log.Logger
let consumerGroup = $"{Guid.NewGuid():N}"
let category1 = $"{Guid.NewGuid():N}"
let category2 = $"{Guid.NewGuid():N}"
do! writeMessagesToCategory category1
do! writeMessagesToCategory category2
let! checkpoints = makeCheckpoints consumerGroup
let stats = stats log
let mutable stop = ignore
let handled = HashSet<_>()
let handle stream (events: Propulsion.Sinks.Event[]) _ct = task {
lock handled (fun _ -> for evt in events do handled.Add((stream, evt.Index)) |> ignore)
test <@ Array.chooseV Simple.codec.TryDecode events |> Array.forall ((=) (Simple.Hello { name = "world" })) @>
if handled.Count >= 2000 then stop ()
return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) }
use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats)
let source = MessageDbSource(
log, TimeSpan.FromMinutes 1,
ConnectionString, 1000, TimeSpan.FromMilliseconds 100,
checkpoints, sink, [| category1; category2 |])
use src = source.Start()
stop <- src.Stop

Task.Delay(TimeSpan.FromSeconds 30).ContinueWith(fun _ -> src.Stop()) |> ignore

do! src.Await()

// 2000 total events
test <@ handled.Count = 2000 @>
// 20 in each stream
test <@ handled |> Array.ofSeq |> Array.groupBy fst |> Array.map (snd >> Array.length) |> Array.forall ((=) 20) @>
// they were handled in order within streams
let ordering = handled |> Seq.groupBy fst |> Seq.map (snd >> Seq.map snd >> Seq.toArray) |> Seq.toArray
test <@ ordering |> Array.forall ((=) [| 0L..19L |]) @> }

[<Fact>]
let ``It doesn't read the tail event again`` () = task {
use _ = source.StartActivity("It doesn't read the tail event again", ActivityKind.Server)
let log = Serilog.LoggerConfiguration().CreateLogger()
let consumerGroup = $"{Guid.NewGuid():N}"
let category = $"{Guid.NewGuid():N}"
let decider = decider category "1"
do! decider.Transact((fun _ -> List.replicate 20 (Simple.Hello { name = "world" })), load = Equinox.LoadOption.AssumeEmpty)
let! checkpoints = makeCheckpoints consumerGroup

let stats = stats log

let handle _ _ _ = task {
return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) }
use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 1, 1, handle, stats)
let batchSize = 10
let source = MessageDbSource(
log, TimeSpan.FromMilliseconds 1000,
ConnectionString, batchSize, TimeSpan.FromMilliseconds 10000,
checkpoints, sink, [| category |])

use capture = new ActivityCapture()

do! source.RunUntilCaughtUp(TimeSpan.FromSeconds(10), stats.StatsInterval) :> Task

// 3 batches fetched, 1 checkpoint read, and 1 checkpoint write
test <@ capture.Operations.Count = 5 @> }

let handle _ _ _ = task {
return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) }
use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 1, 1, handle, stats)
let batchSize = 10
let source = MessageDbSource(
log, TimeSpan.FromMilliseconds 1000,
ConnectionString, batchSize, TimeSpan.FromMilliseconds 1000,
checkpoints, sink, [| category |])

use capture = new ActivityCapture()

do! source.RunUntilCaughtUp(TimeSpan.FromSeconds(10), stats.StatsInterval) :> Task

// 3 batches fetched, 1 checkpoint read, and 1 checkpoint write
test <@ capture.Operations.Count = 5 @> }
interface IDisposable with
member _.Dispose() = sdk.Shutdown() |> ignore