diff --git a/lib/sequin/runtime/typesense_pipeline.ex b/lib/sequin/runtime/typesense_pipeline.ex index 675ac9b83..647f0c083 100644 --- a/lib/sequin/runtime/typesense_pipeline.ex +++ b/lib/sequin/runtime/typesense_pipeline.ex @@ -82,8 +82,19 @@ defmodule Sequin.Runtime.TypesensePipeline do jsonl = encode_as_jsonl(external_messages) case Client.import_documents(client, sink.collection_name, jsonl, action: sink.import_action) do - {:error, error} -> {:error, error} - {:ok, _} -> {:ok, messages, context} + {:error, error} -> + {:error, error} + + {:ok, results} -> + messages = + Enum.zip_with(messages, results, fn message, result -> + case result do + :ok -> message + {:error, error_message} -> Broadway.Message.failed(message, error_message) + end + end) + + {:ok, messages, context} end end end diff --git a/lib/sequin/sinks/typesense/client.ex b/lib/sequin/sinks/typesense/client.ex index 6397ab5c7..6a222bbf0 100644 --- a/lib/sequin/sinks/typesense/client.ex +++ b/lib/sequin/sinks/typesense/client.ex @@ -70,14 +70,9 @@ defmodule Sequin.Sinks.Typesense.Client do responses = body |> String.split("\n", trim: true) - |> Enum.map(&Jason.decode!/1) + |> Enum.map(&parse_batch_item_result/1) - if Enum.all?(responses, &Map.get(&1, "success", false)) do - {:ok, responses} - else - msg = extract_error_message(responses) - {:error, Error.service(service: :typesense, message: "Batch import failed: #{msg}")} - end + {:ok, responses} {:ok, %{status: status, body: body}} -> error_message = extract_error_message(body) @@ -251,16 +246,17 @@ defmodule Sequin.Sinks.Typesense.Client do end end - defp extract_error_message(body) when is_list(body) do - body - |> Enum.map(&extract_error_message/1) - |> Enum.frequencies() - |> Enum.map_join("\n", fn {msg, n} -> - "#{n}x #{msg}" - end) - end - defp extract_error_message(body) do inspect(body) end + + defp parse_batch_item_result(json_line) do + res = Jason.decode!(json_line) + + if Map.get(res, "success", false) do + :ok + else + {:error, extract_error_message(res)} + end + end end