Skip to content

Commit da19a17

Browse files
committed
remove ConsumerRecord
1 parent 144d2c9 commit da19a17

File tree

71 files changed

+492
-1772
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+492
-1772
lines changed

assets/svelte/components/FilterForm.svelte

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import { Switch } from "$lib/components/ui/switch";
44
import FunctionPicker from "$lib/consumers/FunctionPicker.svelte";
55
6-
export let messageKind: string;
76
export let form: {
87
actions: string[];
98
filterId: string;
@@ -35,29 +34,27 @@
3534
<Label class="text-base font-medium">Filters</Label>
3635
{/if}
3736

38-
{#if messageKind === "event"}
39-
<div class="flex flex-col gap-4">
40-
<div class="text-base font-medium">Operations to capture</div>
41-
<div class="flex items-center gap-4">
42-
{#each switches as { id, label }}
43-
<div class="flex items-center gap-2">
44-
<Label for={id} class="cursor-pointer">{label}</Label>
45-
<Switch
46-
{id}
47-
disabled={!form.postgresDatabaseId && !form.tableOid}
48-
checked={actions.includes(id)}
49-
onCheckedChange={(checked) => {
50-
const newActions = checked
51-
? [...actions, id]
52-
: actions.filter((a) => a !== id);
53-
form.actions = newActions;
54-
}}
55-
/>
56-
</div>
57-
{/each}
58-
</div>
37+
<div class="flex flex-col gap-4">
38+
<div class="text-base font-medium">Operations to capture</div>
39+
<div class="flex items-center gap-4">
40+
{#each switches as { id, label }}
41+
<div class="flex items-center gap-2">
42+
<Label for={id} class="cursor-pointer">{label}</Label>
43+
<Switch
44+
{id}
45+
disabled={!form.postgresDatabaseId && !form.tableOid}
46+
checked={actions.includes(id)}
47+
onCheckedChange={(checked) => {
48+
const newActions = checked
49+
? [...actions, id]
50+
: actions.filter((a) => a !== id);
51+
form.actions = newActions;
52+
}}
53+
/>
54+
</div>
55+
{/each}
5956
</div>
60-
{/if}
57+
</div>
6158

6259
<!-- Filter Function -->
6360
<div class="flex flex-col gap-4">

assets/svelte/consumers/SinkConsumerForm.svelte

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,8 @@
7979
};
8080
export let isSelfHosted: boolean;
8181
82-
type MessageKind = "event" | "record";
83-
8482
interface FormState {
8583
type: string;
86-
messageKind: MessageKind;
8784
messageGrouping: boolean;
8885
maxMemoryMb: number;
8986
postgresDatabaseId: string | null;
@@ -116,7 +113,6 @@
116113
117114
let initialForm: FormState = {
118115
type: consumer.type,
119-
messageKind: (consumer.message_kind || "event") as MessageKind,
120116
messageGrouping: consumer.message_grouping,
121117
maxMemoryMb: Number(consumer.max_memory_mb),
122118
postgresDatabaseId: consumer.postgres_database_id,
@@ -385,7 +381,6 @@
385381
<CardContent>
386382
<FilterForm
387383
{functions}
388-
messageKind={form.messageKind}
389384
{selectedDatabase}
390385
bind:form
391386
{refreshFunctions}

assets/svelte/consumers/types.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ export type BaseConsumer = {
2929
name: string;
3030
annotations: Record<string, boolean>;
3131
status: "active" | "paused" | "disabled";
32-
message_kind: string;
3332
message_grouping: boolean;
3433
ack_wait_ms: number;
3534
max_ack_pending: number;

assets/svelte/databases/Show.svelte

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
interface Consumer {
3030
id: string;
3131
name: string;
32-
message_kind: string;
3332
consumer_kind: string;
3433
source_tables: Table[];
3534
href: string;

assets/svelte/sinks/nats/NatsSinkCard.svelte

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,9 @@
9191
class="underline">router</a
9292
>
9393
<ExternalLink class="h-4 w-4 inline" />
94-
{:else if consumer.message_kind === "event"}
94+
{:else}
9595
sequin.changes.{consumer.database
9696
.name}.&lt;table_schema&gt;.&lt;table_name&gt;.&lt;action&gt;
97-
{:else}
98-
sequin.rows.{consumer.database
99-
.name}.&lt;table_schema&gt;.&lt;table_name&gt;
10097
{/if}
10198
</span>
10299
</div>

assets/svelte/wal_pipelines/Form.svelte

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,6 @@
157157

158158
{#if selectedSourceTable}
159159
<FilterForm
160-
messageKind="event"
161160
selectedTable={selectedSourceTable}
162161
bind:form
163162
onFilterChange={(filters) => (form.sourceTableFilters = filters)}

lib/sequin/changeset.ex

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,7 @@ defmodule Sequin.Changeset do
3333
end
3434

3535
def cast_embed(%Ecto.Changeset{} = changeset, :source_tables) do
36-
case get_field(changeset, :message_kind) do
37-
:record ->
38-
cast_embed(changeset, :source_tables, with: &SourceTable.record_changeset(&1, &2))
39-
40-
:event ->
41-
cast_embed(changeset, :source_tables, with: &SourceTable.event_changeset(&1, &2))
42-
end
36+
cast_embed(changeset, :source_tables, with: &SourceTable.event_changeset(&1, &2))
4337
end
4438

4539
# See: https://github.com/sequinstream/sequin/issues/1465

lib/sequin/consumers/acknowledged_messages/acknowledged_messages.ex

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ defmodule Sequin.Consumers.AcknowledgedMessages do
33

44
alias Sequin.Consumers.AcknowledgedMessages.AcknowledgedMessage
55
alias Sequin.Consumers.ConsumerEvent
6-
alias Sequin.Consumers.ConsumerRecord
76
alias Sequin.Error
87
alias Sequin.Redis
98

@@ -12,7 +11,7 @@ defmodule Sequin.Consumers.AcknowledgedMessages do
1211
@doc """
1312
Stores messages for a given consumer_id in a Redis sorted set and trims to the latest @max_messages.
1413
"""
15-
@spec store_messages(String.t(), list(ConsumerEvent.t() | ConsumerRecord.t()), non_neg_integer()) ::
14+
@spec store_messages(String.t(), list(ConsumerEvent.t()), non_neg_integer()) ::
1615
:ok | {:error, Error.t()}
1716
def store_messages(consumer_id, messages, max_messages \\ @max_messages) do
1817
# Add messages to the sorted set
@@ -65,30 +64,6 @@ defmodule Sequin.Consumers.AcknowledgedMessages do
6564
"acknowledged_messages:v0:#{consumer_id}"
6665
end
6766

68-
def to_acknowledged_message(%ConsumerRecord{} = record) do
69-
deliver_count = if record.deliver_count == 0, do: 1, else: record.deliver_count
70-
71-
%AcknowledgedMessage{
72-
id: record.id,
73-
consumer_id: record.consumer_id,
74-
seq: record.commit_lsn + record.commit_idx,
75-
commit_lsn: record.commit_lsn,
76-
commit_idx: record.commit_idx,
77-
commit_timestamp: record.data.metadata.commit_timestamp,
78-
ack_id: record.ack_id,
79-
deliver_count: deliver_count,
80-
last_delivered_at: record.last_delivered_at,
81-
record_pks: record.record_pks,
82-
table_oid: record.table_oid,
83-
not_visible_until: record.not_visible_until,
84-
inserted_at: record.inserted_at,
85-
trace_id: record.replication_message_trace_id,
86-
state: record.state,
87-
table_name: record.data.metadata.table_name,
88-
table_schema: record.data.metadata.table_schema
89-
}
90-
end
91-
9267
def to_acknowledged_message(%ConsumerEvent{} = event) do
9368
deliver_count = if event.deliver_count == 0, do: 1, else: event.deliver_count
9469

0 commit comments

Comments
 (0)