Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions lib/logflare/backends/backend.ex
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
defmodule Logflare.Backends.Backend do
@moduledoc false
use TypedEctoSchema

use TypedEctoSchema
import Ecto.Changeset

alias Ecto.Changeset
alias Logflare.Alerting.AlertQuery
alias Logflare.Backends.Adaptor
alias Logflare.Backends.Backend
alias Logflare.Endpoints.Query
alias Logflare.Rules.Rule
alias Logflare.Source
alias Logflare.User
alias Logflare.Rules.Rule
alias Logflare.Alerting.AlertQuery

@adaptor_mapping %{
webhook: Adaptor.WebhookAdaptor,
Expand Down Expand Up @@ -39,6 +41,8 @@ defmodule Logflare.Backends.Backend do
on_replace: :delete
)

has_many(:queries, Query)

field(:register_for_ingest, :boolean, virtual: true, default: true)
field :metadata, :map
timestamps()
Expand All @@ -56,7 +60,7 @@ defmodule Logflare.Backends.Backend do
end

# temp function
defp do_config_change(%Ecto.Changeset{changes: %{config: config}} = changeset) do
defp do_config_change(%Changeset{changes: %{config: config}} = changeset) do
changeset
|> put_change(:config_encrypted, config)
|> delete_change(:config)
Expand All @@ -66,10 +70,10 @@ defmodule Logflare.Backends.Backend do

# common config validation function
defp validate_config(%{valid?: true} = changeset) do
type = Ecto.Changeset.get_field(changeset, :type)
type = Changeset.get_field(changeset, :type)
mod = adaptor_mapping()[type]

Ecto.Changeset.validate_change(changeset, :config, fn :config, config ->
Changeset.validate_change(changeset, :config, fn :config, config ->
case Adaptor.cast_and_validate_config(mod, config) do
%{valid?: true} -> []
%{valid?: false, errors: errors} -> for {key, err} <- errors, do: {:"config.#{key}", err}
Expand Down
61 changes: 49 additions & 12 deletions lib/logflare/endpoints/query.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
defmodule Logflare.Endpoints.Query do
@moduledoc false

use TypedEctoSchema
import Ecto.Changeset
import Logflare.Utils.Guards
require Logger

alias Logflare.Endpoints.Query
alias Logflare.Endpoints
alias Ecto.Changeset
alias Logflare.Alerting
alias Logflare.Backends
alias Logflare.Backends.Backend
alias Logflare.Endpoints
alias Logflare.Endpoints.Query
alias Logflare.SingleTenant
alias Logflare.Sql
alias Logflare.User

@derive {Jason.Encoder,
only: [
Expand All @@ -28,7 +36,7 @@ defmodule Logflare.Endpoints.Query do
field(:name, :string)
field(:query, :string)
field(:description, :string)
field(:language, Ecto.Enum, values: [:bq_sql, :pg_sql, :lql], default: :bq_sql)
field(:language, Ecto.Enum, values: [:bq_sql, :lql, :pg_sql], default: :bq_sql)
field(:source_mapping, :map)
field(:sandboxable, :boolean)
field(:cache_duration_seconds, :integer, default: 3_600)
Expand All @@ -39,7 +47,8 @@ defmodule Logflare.Endpoints.Query do
field(:parsed_labels, :map, virtual: true)
field(:metrics, :map, virtual: true)

belongs_to(:user, Logflare.User)
belongs_to(:user, User)
belongs_to(:backend, Backend)

timestamps()
end
Expand Down Expand Up @@ -67,8 +76,10 @@ defmodule Logflare.Endpoints.Query do
:enable_auth,
:language,
:description,
:backend_id,
:labels
])
|> infer_language_from_backend()
|> validate_required([:name, :query, :language])
end

Expand All @@ -85,8 +96,10 @@ defmodule Logflare.Endpoints.Query do
:enable_auth,
:language,
:description,
:backend_id,
:labels
])
|> infer_language_from_backend()
|> validate_query(:query)
|> default_validations()
|> update_source_mapping()
Expand All @@ -100,8 +113,8 @@ defmodule Logflare.Endpoints.Query do
|> validate_number(:max_limit, greater_than: 0, less_than: 10_001)
end

def validate_query(changeset, field) when is_atom(field) do
language = Ecto.Changeset.get_field(changeset, :language, :bq_sql)
def validate_query(changeset, field) when is_atom_value(field) do
language = Changeset.get_field(changeset, :language, :bq_sql)
user = get_field(changeset, :user)
endpoint_name = get_field(changeset, :name)

Expand All @@ -121,38 +134,62 @@ defmodule Logflare.Endpoints.Query do

validate_change(changeset, field, fn field, value ->
{:ok, expanded_query} =
Logflare.Sql.expand_subqueries(
Sql.expand_subqueries(
language,
value,
queries
)

case Logflare.Sql.transform(language, expanded_query, user) do
case Sql.transform(language, expanded_query, user) do
{:ok, _} -> []
{:error, error} -> [{field, error}]
end
end)
end

# Only update source mapping if there are no errors
def update_source_mapping(%Ecto.Changeset{errors: [], changes: %{query: query}} = changeset)
when is_binary(query) do
case Logflare.Sql.sources(query, get_field(changeset, :user)) do
def update_source_mapping(%Changeset{errors: [], changes: %{query: query}} = changeset)
when is_non_empty_binary(query) do
case Sql.sources(query, get_field(changeset, :user)) do
{:ok, source_mapping} -> put_change(changeset, :source_mapping, source_mapping)
{:error, error} -> add_error(changeset, :query, error)
end
end

def update_source_mapping(changeset), do: changeset

@spec infer_language_from_backend(%Changeset{}) :: %Changeset{}
defp infer_language_from_backend(%Changeset{} = changeset) do
case get_change(changeset, :language) do
nil ->
case get_field(changeset, :backend_id) do
nil ->
changeset

backend_id ->
backend = Backends.get_backend(backend_id)
language = map_backend_to_language(backend, SingleTenant.supabase_mode?())
put_change(changeset, :language, language)
end

_ ->
changeset
end
end

# Only set to `pg_sql` if postgres backend and single tenant / supabase mode is false
@spec map_backend_to_language(Backend.t(), boolean()) :: :bq_sql | :pg_sql
defp map_backend_to_language(%{type: :postgres}, false), do: :pg_sql
defp map_backend_to_language(_backend, _supabase_mode), do: :bq_sql

@doc """
Replaces a query with latest source names.
"""
@spec map_query_sources(Query.t()) :: Query.t()
def map_query_sources(
%__MODULE__{query: query, source_mapping: source_mapping, user_id: user_id} = q
) do
case Logflare.Sql.source_mapping(query, user_id, source_mapping) do
case Sql.source_mapping(query, user_id, source_mapping) do
{:ok, query} ->
Map.put(q, :query, query)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Logflare.Repo.Migrations.AddBackendIdToEndpointQueries do
use Ecto.Migration

def change do
alter table(:endpoint_queries) do
add :backend_id, references(:backends, on_delete: :nilify_all), null: true
end

create index(:endpoint_queries, [:backend_id])
end
end
49 changes: 49 additions & 0 deletions test/logflare/endpoints_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,55 @@ defmodule Logflare.EndpointsTest do
assert stored_sql =~ "my.date"
end

describe "language inference from backend" do
test "postgres backend maps to `pg_sql` language" do
user = insert(:user)
backend = insert(:backend, user: user, type: :postgres)

assert {:ok, endpoint} =
Endpoints.create_query(user, %{
name: "postgres-endpoint",
query: "select current_date as date",
backend_id: backend.id
# Note: no language specified - should be inferred
})

assert endpoint.language == :pg_sql
assert endpoint.backend_id == backend.id
end

test "bigquery backend maps to `bq_sql` language" do
user = insert(:user)
backend = insert(:backend, user: user, type: :bigquery)

assert {:ok, endpoint} =
Endpoints.create_query(user, %{
name: "bigquery-endpoint",
query: "select current_date() as date",
backend_id: backend.id
})

assert endpoint.language == :bq_sql
assert endpoint.backend_id == backend.id
end

test "backend does not overwrite explicit language definition" do
user = insert(:user)
backend = insert(:backend, user: user, type: :bigquery)

assert {:ok, endpoint} =
Endpoints.create_query(user, %{
name: "bigquery-endpoint-lql-test",
query: "select current_date() as date",
backend_id: backend.id,
language: :pg_sql
})

assert endpoint.language == :pg_sql
assert endpoint.backend_id == backend.id
end
end

describe "running queries in bigquery backends" do
test "run an endpoint query without caching" do
pid = self()
Expand Down
12 changes: 9 additions & 3 deletions test/support/factory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,21 @@ defmodule Logflare.Factory do
%UserPreferences{}
end

def endpoint_factory do
def endpoint_factory(attrs \\ %{}) do
user = Map.get(attrs, :user, build(:user))
backend = Map.get(attrs, :backend)
language = Map.get(attrs, :language, :bq_sql)

%Query{
user: build(:user),
user: user,
description: "some desc #{TestUtils.random_string()}",
token: Ecto.UUID.generate(),
query: "select current_date() as date",
language: :bq_sql,
language: language,
backend: backend,
name: TestUtils.random_string()
}
|> merge_attributes(Map.drop(attrs, [:backend, :language, :user]))
end

def child_endpoint_factory do
Expand Down
Loading