Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions lib/sequin/changeset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ defmodule Sequin.Changeset do
alias Sequin.Consumers.SourceTable
alias Sequin.Replication.WalPipeline

require Logger

def validate_name(%Ecto.Changeset{} = changeset) do
name = Ecto.Changeset.get_field(changeset, :name)

Expand Down Expand Up @@ -39,4 +41,55 @@ defmodule Sequin.Changeset do
cast_embed(changeset, :source_tables, with: &SourceTable.event_changeset(&1, &2))
end
end

# See: https://github.com/sequinstream/sequin/issues/1465
def put_deserializers(%Ecto.Changeset{} = changeset, field, deserializer_field) do
case get_field(changeset, field) do
nil ->
changeset

field_value ->
deserializers =
field_value
|> Enum.map(fn
{key, %Date{}} -> {key, "date"}
{key, %DateTime{}} -> {key, "datetime"}
{key, %NaiveDateTime{}} -> {key, "naive_datetime"}
{key, %Decimal{}} -> {key, "decimal"}
{key, _} -> {key, nil}
end)
|> Enum.filter(fn {_key, type} -> type != nil end)
|> Map.new()

put_change(changeset, deserializer_field, deserializers)
end
end

def deserialize(map, nil), do: map

def deserialize(map, deserializers) when is_map(map) and is_map(deserializers) do
Enum.reduce(map, map, fn {key, value}, acc ->
case Map.get(deserializers, key) do
nil ->
acc

"date" ->
Map.replace!(acc, key, Date.from_iso8601!(value))

"datetime" ->
{:ok, datetime, _} = DateTime.from_iso8601(value)
Map.replace!(acc, key, datetime)

"naive_datetime" ->
Map.replace!(acc, key, NaiveDateTime.from_iso8601!(value))

"decimal" ->
Map.replace!(acc, key, Decimal.new(value))

unknown ->
Logger.warning("Unknown deserializer: #{inspect(unknown)}")
acc
end
end)
end
end
23 changes: 8 additions & 15 deletions lib/sequin/consumers/consumer_event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Sequin.Consumers.ConsumerEvent do
import Ecto.Changeset
import Ecto.Query

alias Sequin.Consumers.ConsumerEvent
alias Sequin.Consumers.ConsumerEventData

@primary_key false
Expand Down Expand Up @@ -106,21 +107,6 @@ defmodule Sequin.Consumers.ConsumerEvent do
Enum.map(pks, &to_string/1)
end

def from_map(attrs) do
attrs =
attrs
|> Sequin.Map.atomize_keys()
|> Map.update!(:record_pks, &stringify_record_pks/1)
|> Map.update!(:data, fn data ->
data = Sequin.Map.atomize_keys(data)
metadata = Sequin.Map.atomize_keys(data.metadata)
data = Map.put(data, :metadata, struct!(ConsumerEventData.Metadata, metadata))
struct!(ConsumerEventData, data)
end)

struct!(__MODULE__, attrs)
end

def where_consumer_id(query \\ base_query(), consumer_id) do
from([consumer_event: ce] in query, where: ce.consumer_id == ^consumer_id)
end
Expand Down Expand Up @@ -190,4 +176,11 @@ defmodule Sequin.Consumers.ConsumerEvent do
defp base_query(query \\ __MODULE__) do
from(ce in query, as: :consumer_event)
end

def deserialize(%ConsumerEvent{} = consumer_event) do
%ConsumerEvent{
consumer_event
| data: ConsumerEventData.deserialize(consumer_event.data)
}
end
end
16 changes: 15 additions & 1 deletion lib/sequin/consumers/consumer_event_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ defmodule Sequin.Consumers.ConsumerEventData do

import Ecto.Changeset

alias __MODULE__

@type t :: %__MODULE__{
record: map(),
changes: map() | nil,
Expand All @@ -12,11 +14,13 @@ defmodule Sequin.Consumers.ConsumerEventData do
}

@primary_key false
@derive Jason.Encoder
@derive {Jason.Encoder, except: [:record_deserializers, :changes_deserializers]}

embedded_schema do
field :record, :map
field :record_deserializers, :map
field :changes, :map
field :changes_deserializers, :map
field :action, Ecto.Enum, values: [:insert, :update, :delete, :read]

embeds_one :metadata, Metadata, primary_key: false do
Expand Down Expand Up @@ -44,11 +48,21 @@ defmodule Sequin.Consumers.ConsumerEventData do
|> cast(attrs, [:record, :changes, :action])
|> cast_embed(:metadata, required: true, with: &metadata_changeset/2)
|> validate_required([:record, :action, :metadata])
|> Sequin.Changeset.put_deserializers(:record, :record_deserializers)
|> Sequin.Changeset.put_deserializers(:changes, :changes_deserializers)
end

def metadata_changeset(metadata, attrs) do
metadata
|> cast(attrs, [:table_schema, :table_name, :commit_timestamp, :commit_lsn, :database_name, :transaction_annotations])
|> validate_required([:table_schema, :table_name, :commit_timestamp, :commit_lsn])
end

def deserialize(%ConsumerEventData{} = data) do
%ConsumerEventData{
data
| record: Sequin.Changeset.deserialize(data.record, data.record_deserializers),
changes: Sequin.Changeset.deserialize(data.changes, data.changes_deserializers)
}
end
end
8 changes: 8 additions & 0 deletions lib/sequin/consumers/consumer_record.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Sequin.Consumers.ConsumerRecord do
import Ecto.Changeset
import Ecto.Query

alias Sequin.Consumers.ConsumerRecord
alias Sequin.Consumers.ConsumerRecordData

@primary_key false
Expand Down Expand Up @@ -197,4 +198,11 @@ defmodule Sequin.Consumers.ConsumerRecord do
defp base_query(query \\ __MODULE__) do
from(cr in query, as: :consumer_record)
end

def deserialize(%ConsumerRecord{} = consumer_record) do
%ConsumerRecord{
consumer_record
| data: ConsumerRecordData.deserialize(consumer_record.data)
}
end
end
10 changes: 9 additions & 1 deletion lib/sequin/consumers/consumer_record_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ defmodule Sequin.Consumers.ConsumerRecordData do

import Ecto.Changeset

alias __MODULE__

@type t :: %__MODULE__{
record: map(),
metadata: map()
}

@primary_key false
@derive {Jason.Encoder, except: [:action]}
@derive {Jason.Encoder, except: [:action, :record_deserializers]}
embedded_schema do
field :record, :map
field :record_deserializers, :map
field :action, Ecto.Enum, values: [:insert, :update, :delete, :read]

embeds_one :metadata, Metadata, primary_key: false do
Expand All @@ -39,11 +42,16 @@ defmodule Sequin.Consumers.ConsumerRecordData do
|> cast(attrs, [:record, :action])
|> cast_embed(:metadata, required: true, with: &metadata_changeset/2)
|> validate_required([:record, :metadata])
|> Sequin.Changeset.put_deserializers(:record, :record_deserializers)
end

def metadata_changeset(metadata, attrs) do
metadata
|> cast(attrs, [:table_schema, :table_name, :commit_timestamp, :commit_lsn, :database_name])
|> validate_required([:table_schema, :table_name, :commit_timestamp, :commit_lsn])
end

def deserialize(%ConsumerRecordData{} = data) do
%ConsumerRecordData{data | record: Sequin.Changeset.deserialize(data.record, data.record_deserializers)}
end
end
62 changes: 4 additions & 58 deletions lib/sequin/consumers/consumers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -432,33 +432,6 @@ defmodule Sequin.Consumers do

# ConsumerEvent

def get_consumer_event(consumer_id, ack_id: ack_id) do
consumer_id
|> ConsumerEvent.where_consumer_id()
|> ConsumerEvent.where_ack_id(ack_id)
|> Repo.one()
end

def get_consumer_event(consumer_id, commit_lsn) do
consumer_event =
consumer_id
|> ConsumerEvent.where_consumer_id()
|> ConsumerEvent.where_commit_lsn(commit_lsn)
|> Repo.one()

case consumer_event do
nil -> {:error, Error.not_found(entity: :consumer_event)}
consumer_event -> {:ok, consumer_event}
end
end

def get_consumer_event!(consumer_id, commit_lsn) do
case get_consumer_event(consumer_id, commit_lsn) do
{:ok, consumer_event} -> consumer_event
{:error, _} -> raise Error.not_found(entity: :consumer_event)
end
end

def list_consumer_messages_for_consumer(%SinkConsumer{} = consumer, params \\ [], opts \\ []) do
case consumer.message_kind do
:event -> list_consumer_events_for_consumer(consumer.id, params, opts)
Expand Down Expand Up @@ -496,7 +469,9 @@ defmodule Sequin.Consumers do
ConsumerEvent.where_wal_cursor_in(query, wal_cursors)
end)

Repo.all(query, opts)
query
|> Repo.all(opts)
|> Enum.map(&ConsumerEvent.deserialize/1)
end

def upsert_consumer_messages(%SinkConsumer{} = consumer, messages) do
Expand Down Expand Up @@ -541,40 +516,11 @@ defmodule Sequin.Consumers do

# ConsumerRecord

def get_consumer_record(consumer_id, id) when is_integer(id) do
consumer_record =
consumer_id
|> ConsumerRecord.where_consumer_id()
|> ConsumerRecord.where_id(id)
|> Repo.one()

case consumer_record do
nil -> {:error, Error.not_found(entity: :consumer_record)}
consumer_record -> {:ok, consumer_record}
end
end

def get_consumer_record(consumer_id, params) when is_list(params) or is_map(params) do
consumer_id
|> consumer_record_query(params)
|> Repo.one()
|> case do
%ConsumerRecord{} = consumer_record -> {:ok, consumer_record}
nil -> {:error, Error.not_found(entity: :consumer_record)}
end
end

def get_consumer_record!(consumer_id, id) when is_integer(id) do
case get_consumer_record(consumer_id, id) do
{:ok, consumer_record} -> consumer_record
{:error, error} -> raise error
end
end

def list_consumer_records_for_consumer(consumer_id, params \\ [], opts \\ []) do
consumer_id
|> consumer_record_query(params)
|> Repo.all(opts)
|> Enum.map(&ConsumerRecord.deserialize/1)
end

defp consumer_record_query(consumer_id, params) do
Expand Down
2 changes: 2 additions & 0 deletions lib/sequin/postgres/postgres.ex
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ defmodule Sequin.Postgres do
end
end

# If you add a new type here, you may need to modify deserializers in changeset.ex
# See: https://github.com/sequinstream/sequin/issues/1465
def pg_type_to_ecto_type(pg_type) do
case pg_type do
"integer" -> :integer
Expand Down
79 changes: 79 additions & 0 deletions test/changeset_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
defmodule Sequin.ChangesetTest do
use Sequin.Case, async: true

import Ecto.Changeset

alias Sequin.Changeset

describe "put_deserializers/3 and deserialize/2 integration" do
test "properly handles serialization and deserialization of complex types" do
# Create a record with complex types
now = DateTime.truncate(DateTime.utc_now(), :second)
today = Date.utc_today()
naive_now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
decimal_value = Decimal.new("123.45")

# Build initial data for schemaless changeset
data = %{
record: %{
"id" => 1,
"name" => "Test Record",
"amount" => decimal_value,
"created_at" => now,
"date" => today,
"updated_at" => naive_now
}
}

# Define types for schemaless changeset
types = %{
record: :map,
record_deserializers: :map
}

# Create a schemaless changeset
changeset =
{data, types}
|> cast(%{}, [:record])
|> Changeset.put_deserializers(:record, :record_deserializers)

# Verify that deserializers are correctly generated
deserializers =
changeset
|> get_change(:record_deserializers)
|> Map.new()

assert Map.has_key?(deserializers, "amount")
assert Map.has_key?(deserializers, "created_at")
assert Map.has_key?(deserializers, "date")
assert Map.has_key?(deserializers, "updated_at")

assert deserializers["amount"] == "decimal"
assert deserializers["created_at"] == "datetime"
assert deserializers["date"] == "date"
assert deserializers["updated_at"] == "naive_datetime"

# Simulate the Repo insert and query
json_record = JSON.decode!(JSON.encode!(data.record))

# Now deserialize the JSON-like map back to complex types
deserialized_record = Changeset.deserialize(json_record, deserializers)

# Verify all complex types are properly converted back
assert deserialized_record["id"] == 1
assert deserialized_record["name"] == "Test Record"

assert %Decimal{} = deserialized_record["amount"]
assert Decimal.equal?(deserialized_record["amount"], decimal_value)

assert %DateTime{} = deserialized_record["created_at"]
assert DateTime.compare(deserialized_record["created_at"], now) == :eq

assert %Date{} = deserialized_record["date"]
assert Date.compare(deserialized_record["date"], today) == :eq

assert %NaiveDateTime{} = deserialized_record["updated_at"]
assert NaiveDateTime.compare(deserialized_record["updated_at"], naive_now) == :eq
end
end
end
Loading
Loading