Skip to content

Commit d6617d3

Browse files
committed
✨ Routing for Elasticsearch
1 parent e14b094 commit d6617d3

File tree

19 files changed

+297
-37
lines changed

19 files changed

+297
-37
lines changed

assets/svelte/consumers/SinkConsumerForm.svelte

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,13 @@
577577
bind:functionRefreshState
578578
/>
579579
{:else if consumer.type === "elasticsearch"}
580-
<ElasticsearchSinkForm errors={errors.consumer} bind:form />
580+
<ElasticsearchSinkForm
581+
errors={errors.consumer}
582+
bind:form
583+
{functions}
584+
{refreshFunctions}
585+
bind:functionRefreshState
586+
/>
581587
{/if}
582588

583589
<Card>

assets/svelte/consumers/dynamicRoutingDocs.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,14 @@ export const routedSinkDocs: Record<RoutedSinkType, RoutedSinkDocs> = {
113113
},
114114
},
115115
},
116+
elasticsearch: {
117+
fields: {
118+
index_name: {
119+
description: "Elasticsearch index name to publish to",
120+
staticValue: "<empty>",
121+
staticFormField: "index_name",
122+
dynamicDefault: "sequin.<database_name>.<table_schema>.<table_name>",
123+
},
124+
},
125+
},
116126
};

assets/svelte/consumers/types.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ export type SequinStreamConsumer = BaseConsumer & {
187187
export type GcpPubsubConsumer = BaseConsumer & {
188188
sink: {
189189
type: "gcp_pubsub";
190+
elasticsearch;
190191
project_id: string;
191192
topic_id: string;
192193
use_emulator: boolean;
@@ -234,7 +235,7 @@ export type ElasticsearchConsumer = BaseConsumer & {
234235
type: "elasticsearch";
235236
endpoint_url: string;
236237
index_name: string;
237-
auth_type: "api_key" | "basic" | "bearer";
238+
auth_type: "none" | "api_key" | "basic" | "bearer";
238239
auth_value: string;
239240
batch_size: number;
240241
timeout_seconds: number;
@@ -268,6 +269,7 @@ export const SinkTypeValues = [
268269
"kafka",
269270
"sequin_stream",
270271
"gcp_pubsub",
272+
"elasticsearch",
271273
"nats",
272274
"rabbitmq",
273275
"typesense",
@@ -286,6 +288,7 @@ export const RoutedSinkTypeValues = [
286288
"gcp_pubsub",
287289
"typesense",
288290
"meilisearch",
291+
"elasticsearch",
289292
] as const;
290293

291294
export type RoutedSinkType = (typeof RoutedSinkTypeValues)[number];

assets/svelte/functions/Edit.svelte

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
gcp_pubsub: "GCP PubSub",
106106
typesense: "Typesense",
107107
meilisearch: "Meilisearch",
108+
elasticsearch: "Elasticsearch",
108109
};
109110
110111
let errorKeyOrder = ["description", "snippet", "line", "column"];
Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,87 @@
11
<script lang="ts">
22
import { ExternalLink } from "lucide-svelte";
3-
import { Card, CardContent } from "$lib/components/ui/card";
3+
import {
4+
Card,
5+
CardContent,
6+
CardHeader,
7+
CardTitle,
8+
} from "$lib/components/ui/card";
49
import { Button } from "$lib/components/ui/button";
510
import type { ElasticsearchConsumer } from "../../consumers/types";
611
712
export let consumer: ElasticsearchConsumer;
813
</script>
914

1015
<Card>
16+
<CardHeader>
17+
<CardTitle>Elasticsearch Configuration</CardTitle>
18+
</CardHeader>
1119
<CardContent class="p-6">
12-
<div class="flex justify-between items-center mb-4">
13-
<h2 class="text-lg font-semibold">Elasticsearch Configuration</h2>
14-
</div>
15-
1620
<div class="grid grid-cols-2 gap-4">
1721
<div>
1822
<span class="text-sm text-gray-500">Endpoint URL</span>
1923
<div class="mt-2">
2024
<div
21-
class="font-mono bg-slate-50 pl-1 pr-4 py-1 border border-slate-100 rounded-md break-all w-fit"
25+
class="font-mono bg-slate-50 px-2 py-1 border border-slate-100 rounded-md break-all w-fit"
2226
>
2327
<span>{consumer.sink.endpoint_url}</span>
2428
</div>
2529
</div>
2630
</div>
2731

32+
<div>
33+
<span class="text-sm text-gray-500">Authentication</span>
34+
<div class="mt-2">
35+
<span
36+
class="font-mono bg-slate-50 px-2 py-1 border border-slate-100 rounded-md break-all w-fit"
37+
>
38+
{#if consumer.sink.auth_type === "none"}
39+
No authentication
40+
{:else if consumer.sink.auth_type === "api_key"}
41+
API Key
42+
{:else if consumer.sink.auth_type === "basic"}
43+
Basic Auth
44+
{:else if consumer.sink.auth_type === "bearer"}
45+
Bearer Token
46+
{/if}
47+
</span>
48+
</div>
49+
</div>
50+
</div>
51+
</CardContent>
52+
</Card>
53+
<Card>
54+
<CardHeader>
55+
<CardTitle>Routing</CardTitle>
56+
</CardHeader>
57+
<CardContent>
58+
<div class="grid grid-cols-1 lg:grid-cols-2 gap-4">
2859
<div>
2960
<span class="text-sm text-gray-500">Index Name</span>
3061
<div class="mt-2">
3162
<span
32-
class="font-mono bg-slate-50 pl-1 pr-4 py-1 border border-slate-100 rounded-md whitespace-nowrap"
33-
>{consumer.sink.index_name}</span
63+
class="font-mono bg-slate-50 px-2 py-1 border border-slate-100 rounded-md break-all w-fit"
3464
>
65+
{#if consumer.routing_id}
66+
determined-by-router
67+
{:else}
68+
{consumer.sink.index_name}
69+
{/if}
70+
</span>
3571
</div>
3672
</div>
3773
</div>
74+
75+
{#if consumer.routing}
76+
<div class="mt-4">
77+
<span class="text-sm text-gray-500">Router</span>
78+
<div class="mt-2">
79+
<pre
80+
class="font-mono bg-slate-50 p-2 border border-slate-100 rounded-md text-sm overflow-x-auto w-full"><code
81+
>{consumer.routing.function.code}</code
82+
></pre>
83+
</div>
84+
</div>
85+
{/if}
3886
</CardContent>
3987
</Card>

assets/svelte/sinks/elasticsearch/ElasticsearchSinkForm.svelte

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import type { ElasticsearchConsumer } from "$lib/consumers/types";
1111
import { Label } from "$lib/components/ui/label";
1212
import { Eye, EyeOff, Info } from "lucide-svelte";
13+
import DynamicRoutingForm from "$lib/consumers/DynamicRoutingForm.svelte";
1314
import {
1415
Select,
1516
SelectContent,
@@ -19,10 +20,15 @@
1920
} from "$lib/components/ui/select";
2021
2122
export let form: ElasticsearchConsumer;
23+
export let functions: Array<any> = [];
24+
export let refreshFunctions: () => void;
25+
export let functionRefreshState: "idle" | "refreshing" | "done" = "idle";
26+
let selectedDynamic = form.routingMode === "dynamic";
2227
export let errors: any = {};
2328
let showPassword = false;
2429
2530
const authTypeOptions = [
31+
{ value: "none", label: "None" },
2632
{ value: "api_key", label: "API Key" },
2733
{ value: "basic", label: "Basic Auth" },
2834
{ value: "bearer", label: "Bearer Token" },
@@ -71,18 +77,6 @@
7177
</p>
7278
</div>
7379

74-
<div class="space-y-2">
75-
<Label for="index_name">Index name</Label>
76-
<Input
77-
id="index_name"
78-
bind:value={form.sink.index_name}
79-
placeholder="my-index"
80-
/>
81-
{#if errors.sink?.index_name}
82-
<p class="text-destructive text-sm">{errors.sink.index_name}</p>
83-
{/if}
84-
</div>
85-
8680
<div class="space-y-2">
8781
<Label for="auth_type">Authentication type</Label>
8882
<Select
@@ -126,11 +120,14 @@
126120
id="auth_value"
127121
type={showPassword ? "text" : "password"}
128122
bind:value={form.sink.auth_value}
129-
placeholder={form.sink.auth_type === "api_key"
130-
? "API Key"
131-
: form.sink.auth_type === "basic"
132-
? "username:password"
133-
: "Bearer Token"}
123+
disabled={form.sink.auth_type === "none"}
124+
placeholder={form.sink.auth_type === "none"
125+
? "N/A"
126+
: form.sink.auth_type === "api_key"
127+
? "API Key"
128+
: form.sink.auth_type === "basic"
129+
? "username:password"
130+
: "Bearer Token"}
134131
data-1p-ignore
135132
autocomplete="off"
136133
/>
@@ -150,7 +147,9 @@
150147
<p class="text-destructive text-sm">{errors.sink.auth_value}</p>
151148
{/if}
152149
<p class="text-sm text-muted-foreground">
153-
{#if form.sink.auth_type === "api_key"}
150+
{#if form.sink.auth_type === "none"}
151+
No authentication required
152+
{:else if form.sink.auth_type === "api_key"}
154153
Your Elasticsearch API key
155154
{:else if form.sink.auth_type === "basic"}
156155
Basic auth credentials in format username:password
@@ -179,3 +178,35 @@
179178
</div>
180179
</CardContent>
181180
</Card>
181+
182+
<Card>
183+
<CardHeader>
184+
<CardTitle>Routing</CardTitle>
185+
</CardHeader>
186+
<CardContent class="space-y-4">
187+
<DynamicRoutingForm
188+
bind:form
189+
routedSinkType="elasticsearch"
190+
{functions}
191+
{refreshFunctions}
192+
bind:functionRefreshState
193+
bind:selectedDynamic
194+
{errors}
195+
/>
196+
197+
{#if !selectedDynamic}
198+
<div class="space-y-2">
199+
<Label for="index_name">Index name</Label>
200+
<Input
201+
id="index_name"
202+
name="sink[index_name]"
203+
bind:value={form.sink.index_name}
204+
placeholder="my-index"
205+
/>
206+
{#if errors.sink?.index_name}
207+
<p class="text-destructive text-sm">{errors.sink.index_name}</p>
208+
{/if}
209+
</div>
210+
{/if}
211+
</CardContent>
212+
</Card>

docker-compose.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,23 @@ services:
120120
KAFKA_MAX_REQUEST_SIZE: 10485760
121121
KAFKA_REPLICA_FETCH_MAX_BYTES: 10485760
122122

123+
elasticsearch:
124+
profiles: [elasticsearch]
125+
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.1
126+
environment:
127+
- discovery.type=single-node
128+
- xpack.security.enabled=false
129+
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
130+
ports:
131+
- "127.0.0.1:9200:9200"
132+
volumes:
133+
- sequin_dev_elasticsearch:/usr/share/elasticsearch/data
134+
135+
123136
volumes:
124137
sequin_dev_postgres:
125138
sequin_dev_nats:
126139
sequin_dev_rabbitmq:
140+
sequin_dev_elasticsearch:
127141
prometheus_data:
128142
grafana_data:

docs/reference/routing.mdx

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ The following sinks support dynamic routing:
3939
- GCP PubSub
4040
- Typesense
4141
- Meilisearch
42+
- Elasticsearch
4243

4344
Each sink type has different fields that can be routed:
4445

@@ -91,6 +92,14 @@ For [GCP PubSub](/reference/sinks/gcp-pubsub), your routing function must return
9192
|-----|------|-------------|---------|
9293
| `topic_id` | String | The topic ID to publish to | `"users.created"` |
9394

95+
### Elasticsearch sink
96+
97+
For [Elasticsearch](/reference/sinks/elasticsearch), your routing function must return a map with these keys:
98+
99+
| Key | Type | Description | Example |
100+
|-----|------|-------------|---------|
101+
| `index_name` | String | The index to publish to | `"users"` |
102+
94103
### Typesense sink
95104

96105
For [Typesense](/reference/sinks/typesense), your routing function must return a map with these keys:

docs/reference/sinks/elasticsearch.mdx

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,19 @@ Errors from Elasticsearch are shown verbatim in the **Messages** tab. For bulk r
104104
- One sink targets one index; multiple indices require multiple sinks.
105105
- Only `index` and `delete` bulk operations are issued. Other operations (`update`, `create`, etc.) are not used.
106106

107+
## Routing
108+
109+
The Elasticsearch sink supports dynamic routing of the `index_name` with [routing functions](/reference/routing).
110+
111+
Example routing function:
112+
113+
```elixir
114+
def route(action, record, changes, metadata) do
115+
%{
116+
index_name: metadata.table_name
117+
}
118+
end
119+
```
120+
121+
When not using a routing function, messages will be indexed into the statically configured index.
122+

lib/sequin/consumers/elasticsearch_sink.ex

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,48 @@ defmodule Sequin.Consumers.ElasticsearchSink do
1212
field :type, Ecto.Enum, values: [:elasticsearch], default: :elasticsearch
1313
field :endpoint_url, :string
1414
field :index_name, :string
15-
field :auth_type, Ecto.Enum, values: [:api_key, :basic, :bearer], default: :api_key
15+
field :auth_type, Ecto.Enum, values: [:none, :api_key, :basic, :bearer], default: :api_key
1616
field :auth_value, Sequin.Encrypted.Binary
1717
field :batch_size, :integer, default: 100
18+
field :routing_mode, Ecto.Enum, values: [:dynamic, :static]
1819
end
1920

2021
def changeset(struct, params) do
2122
struct
22-
|> cast(params, [:endpoint_url, :index_name, :auth_type, :auth_value, :batch_size])
23-
|> validate_required([:endpoint_url, :index_name, :auth_type, :auth_value])
23+
|> cast(params, [:endpoint_url, :index_name, :auth_type, :auth_value, :batch_size, :routing_mode])
24+
|> validate_required([:endpoint_url, :auth_type])
25+
|> validate_auth()
26+
|> validate_routing()
2427
|> validate_endpoint_url()
2528
|> validate_length(:index_name, max: 1024)
2629
|> validate_number(:batch_size, greater_than: 0, less_than_or_equal_to: 10_000)
2730
end
2831

32+
defp validate_auth(changeset) do
33+
auth_type = get_field(changeset, :auth_type)
34+
35+
if auth_type == :none do
36+
put_change(changeset, :auth_value, nil)
37+
else
38+
validate_required(changeset, [:auth_value])
39+
end
40+
end
41+
42+
defp validate_routing(changeset) do
43+
routing_mode = get_field(changeset, :routing_mode)
44+
45+
cond do
46+
routing_mode == :dynamic ->
47+
put_change(changeset, :index_name, nil)
48+
49+
routing_mode == :static ->
50+
validate_required(changeset, [:index_name])
51+
52+
true ->
53+
add_error(changeset, :routing_mode, "is required")
54+
end
55+
end
56+
2957
defp validate_endpoint_url(changeset) do
3058
changeset
3159
|> validate_change(:endpoint_url, fn :endpoint_url, url ->

0 commit comments

Comments
 (0)