Skip to content

Commit 2d85628

Browse files
committed
✨ Add batch_timeout_ms to webhooks
1 parent 2df5ba3 commit 2d85628

File tree

5 files changed

+44
-4
lines changed

5 files changed

+44
-4
lines changed

assets/svelte/consumers/SinkHttpPushForm.svelte

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,29 @@
170170
<AccordionTrigger>Advanced configuration</AccordionTrigger>
171171
<AccordionContent>
172172
<div class="space-y-4 pt-4">
173+
<div class="space-y-2">
174+
<Label for="batch-timeout">Batch timeout</Label>
175+
<div class="flex items-center space-x-2">
176+
<Input
177+
id="batch-timeout"
178+
type="number"
179+
bind:value={form.batchTimeoutMs}
180+
class="w-24"
181+
min="1"
182+
/>
183+
<span class="text-sm text-muted-foreground">ms</span>
184+
</div>
185+
<p class="text-sm text-muted-foreground">
186+
The maximum time to wait for a batch to reach its full size
187+
before sending. Defaults to 50ms.
188+
</p>
189+
{#if errors.batch_timeout_ms}
190+
<p class="text-destructive text-sm">
191+
{errors.batch_timeout_ms}
192+
</p>
193+
{/if}
194+
</div>
195+
173196
<div class="space-y-2">
174197
<Label for="max-ack-pending">Max ack pending</Label>
175198
<Input

lib/sequin/consumers/sink_consumer.ex

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ defmodule Sequin.Consumers.SinkConsumer do
4848
:health,
4949
:max_memory_mb,
5050
:legacy_transform,
51-
:timestamp_format
51+
:timestamp_format,
52+
:batch_timeout_ms
5253
]}
5354
typed_schema "sink_consumers" do
5455
field :name, :string
@@ -61,6 +62,7 @@ defmodule Sequin.Consumers.SinkConsumer do
6162
field :status, Ecto.Enum, values: [:active, :disabled, :paused], default: :active
6263
field :seq, :integer, read_after_writes: true
6364
field :batch_size, :integer, default: 1
65+
field :batch_timeout_ms, :integer, default: nil
6466
field :annotations, :map, default: %{}
6567
field :max_memory_mb, :integer, default: 1024
6668
field :partition_count, :integer, default: 1
@@ -165,7 +167,8 @@ defmodule Sequin.Consumers.SinkConsumer do
165167
:legacy_transform,
166168
:transform_id,
167169
:routing_id,
168-
:timestamp_format
170+
:timestamp_format,
171+
:batch_timeout_ms
169172
])
170173
|> cast_polymorphic_embed(:sink, required: true)
171174
|> Sequin.Changeset.cast_embed(:source_tables)

lib/sequin/runtime/sink_pipeline.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,12 +259,13 @@ defmodule Sequin.Runtime.SinkPipeline do
259259

260260
defp batchers_config(pipeline_mod, consumer) do
261261
concurrency = min(System.schedulers_online() * 2, 80)
262+
batch_timeout = consumer.batch_timeout_ms || 50
262263

263264
default = [
264265
default: [
265266
concurrency: concurrency,
266267
batch_size: consumer.batch_size,
267-
batch_timeout: 50
268+
batch_timeout: batch_timeout
268269
]
269270
]
270271

lib/sequin_web/live/components/consumer_form.ex

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,7 @@ defmodule SequinWeb.Components.ConsumerForm do
540540
"group_column_attnums" => form["groupColumnAttnums"]
541541
},
542542
"batch_size" => form["batchSize"],
543+
"batch_timeout_ms" => form["batchTimeoutMs"],
543544
"initial_backfill" => decode_initial_backfill(form),
544545
"transform_id" => if(form["transform"] === "none", do: nil, else: form["transform"]),
545546
"routing_id" => if(form["routingId"] === "none", do: nil, else: form["routingId"]),
@@ -734,7 +735,8 @@ defmodule SequinWeb.Components.ConsumerForm do
734735
"id" => consumer.id,
735736
"name" => consumer.name || Name.generate(999),
736737
"ack_wait_ms" => consumer.ack_wait_ms,
737-
"batch_size" => Map.get(consumer, :batch_size),
738+
"batch_size" => consumer.batch_size,
739+
"batch_timeout_ms" => consumer.batch_timeout_ms,
738740
"max_memory_mb" => consumer.max_memory_mb,
739741
"group_column_attnums" => source_table && source_table.group_column_attnums,
740742
"max_ack_pending" => consumer.max_ack_pending,
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
defmodule Sequin.Repo.Migrations.AddBatchTimeoutMsToSinkConsumers do
2+
use Ecto.Migration
3+
4+
@config_schema Application.compile_env(:sequin, [Sequin.Repo, :config_schema_prefix])
5+
6+
def change do
7+
alter table(:sink_consumers, prefix: @config_schema) do
8+
add :batch_timeout_ms, :integer, default: nil
9+
end
10+
end
11+
end

0 commit comments

Comments
 (0)