-
Notifications
You must be signed in to change notification settings - Fork 17
Open
Description
Hey folks,
Thanks for the great library! I put together a small mix task to help me get started (this is my first time interacting with ClickHouse).
If you think it might be useful I'm open to suggestions on how to make it good enough for ecto_ch:
defmodule Mix.Tasks.Ecto.Ch.SchemaHint do
@moduledoc """
Provides ClickHouse schema hints based on PostgreSQL table data types.
"""
use Mix.Task
require Ecto.Schema
require Logger
alias Ecto.Adapters.SQL, as: SQLAdapter
@with_migration_opt "--with-migration"
def run([]) do
IO.puts("Provide the Ecto schema module name and the application repo module name.")
IO.puts("#{IO.ANSI.yellow()}For example:#{IO.ANSI.reset()} mix ecto.ch.schema_hint MyApp.Accounts.User MyApp.Repo")
IO.puts("Pass #{@with_migration_opt} to print migration hint.")
end
@shortdoc "Accepts an Ecto schema module name and an application repo module name."
def run(args) when length(args) == 2 or length(args) == 3 do
schema_module = to_elixir_module(Enum.at(args, 0))
repo = to_elixir_module(Enum.at(args, 1))
start_ecto_dependencies!(repo)
types =
schema_module
|> get_column_info(repo)
|> infer_types()
IO.puts(build_clickhouse_schema(schema_module, types))
if Enum.any?(args, &(&1 == @with_migration_opt)) do
IO.puts(build_clickhouse_migration(schema_module, types))
end
end
defp infer_types(column_info) do
Enum.reduce(column_info, [], fn
{"id", {_type, _nullable}}, acc ->
acc
{field_name, {"json", _nullable}}, acc ->
Logger.warning(
"Ignoring jsonb column '#{field_name}'. Consult https://clickhouse.com/docs/en/integrations/data-formats/json"
)
acc
{field_name, {data_type, nullable}}, acc ->
data_type = map_postgres_type_to_clickhouse(data_type)
clickhouse_type =
if nullable do
"Nullable(#{data_type})"
else
data_type
end
[
%{field_name: field_name, data_type: data_type, clickhouse_type: clickhouse_type, nullable: nullable}
| acc
]
end)
|> Enum.sort()
end
defp get_column_info(schema_module, repo) do
table_name = schema_module.__schema__(:source)
query = "SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_name = $1"
case SQLAdapter.query(repo, query, [table_name]) do
{:ok, result} ->
Enum.reduce(result.rows, %{}, fn [column_name, data_type, is_nullable], acc ->
Map.put(acc, column_name, {data_type, is_nullable == "YES"})
end)
{:error, _reason} ->
%{}
end
end
defp map_postgres_type_to_clickhouse(data_type) do
case String.downcase(data_type) do
"date" ->
"Date"
"timestamp" ->
"DateTime"
"real" ->
"Float32"
"double precision" ->
"Float64"
"decimal" ->
"Decimal"
"numeric" ->
"Decimal"
"smallint" ->
"Int16"
"integer" ->
"Int32"
"bigint" ->
"Int64"
"serial" ->
"UInt32"
"bigserial" ->
"UInt64"
"text" ->
"String"
"char" ->
"String"
"character varying" ->
"String"
"array" ->
"Array"
"boolean" ->
"Bool"
"uuid" ->
"UUID"
"timestamp without time zone" ->
"DateTime"
unknown ->
Logger.warning("Ignoring unknown type '#{unknown}'. Consult https://clickhouse.com/docs/en/sql-reference/data-types")
end
end
defp build_clickhouse_schema(schema_module, types) do
schema_name = schema_module.__schema__(:source)
fields_definitions =
Enum.map(types, fn field_definition ->
"field :#{field_definition.field_name}, Ch, type: \"#{field_definition.clickhouse_type}\""
end)
"""
schema "#{schema_name}" do
#{Enum.join(fields_definitions, "\n ")}
end
"""
end
defp build_clickhouse_migration(schema_module, fields_definitions) do
schema_name = schema_module.__schema__(:source)
add_column_entries =
fields_definitions
|> Enum.map(fn field_definition ->
"add :#{field_definition.field_name}, :#{field_definition.data_type}#{if field_definition.nullable, do: ", null: true"}"
end)
|> Enum.sort()
"""
table_options = []
engine_options = [order_by: "tuple()"] # skip sorting key. See https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree#order_by
options = table_options ++ engine_options
create table(:#{schema_name}, primary_key: false, engine: "ReplicatedMergeTree", options: options) do
#{Enum.join(add_column_entries, "\n ")}
end
"""
end
defp to_elixir_module(module_name) do
String.to_existing_atom("Elixir.#{module_name}")
end
# https://github.com/elixir-ecto/ecto_sql/blob/b4329a1fe6f2888b5b99b3b5b3316f246a838c3a/lib/ecto/migrator.ex#L149
def start_ecto_dependencies!(repo) do
config = repo.config()
mode = :permanent
Enum.map([:ecto_sql], fn app ->
{:ok, _started} = Application.ensure_all_started(app, mode)
end)
{:ok, _repo_started} = repo.__adapter__().ensure_all_started(config, mode)
case repo.start_link(pool_size: 2) do
{:ok, _} ->
{:ok, :stop}
{:error, {:already_started, _pid}} ->
{:ok, :restart}
{:error, _} = error ->
raise "error starting repo: #{inspect(error)}"
end
end
endruslandogaruslandoga
Metadata
Metadata
Assignees
Labels
No labels