Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 69 additions & 75 deletions test/ch/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -288,24 +288,22 @@ defmodule Ch.ConnectionTest do
end

test "values", %{table: table} = ctx do
assert {:ok, %{num_rows: 3}} =
parameterize_query(
ctx,
"insert into {table:Identifier} values (1, 'a'),(2,'b'), (null, null)",
%{"table" => table}
)
parameterize_query(
ctx,
"insert into {table:Identifier} values (1, 'a'),(2,'b'), (null, null)",
%{"table" => table}
)

assert {:ok, %{rows: rows}} =
parameterize_query(ctx, "select * from {table:Identifier}", %{"table" => table})

assert rows == [[1, "a"], [2, "b"], [1, ""]]

assert {:ok, %{num_rows: 2}} =
parameterize_query(
ctx,
"insert into {$0:Identifier}(a, b) values ({$1:UInt8},{$2:String}),({$3:UInt8},{$4:String})",
[table, 4, "d", 5, "e"]
)
parameterize_query(
ctx,
"insert into {$0:Identifier}(a, b) values ({$1:UInt8},{$2:String}),({$3:UInt8},{$4:String})",
[table, 4, "d", 5, "e"]
)

assert {:ok, %{rows: rows}} =
parameterize_query(ctx, "select * from {table:Identifier} where a >= 4", %{
Expand Down Expand Up @@ -333,7 +331,8 @@ defmodule Ch.ConnectionTest do
stmt = "insert into #{table}(a, b) format RowBinary"
types = ["UInt8", "String"]
rows = [[1, "a"], [2, "b"]]
assert %{num_rows: 2} = parameterize_query!(ctx, stmt, rows, types: types)

parameterize_query!(ctx, stmt, rows, types: types)

assert %{rows: rows} =
parameterize_query!(ctx, "select * from {table:Identifier}", %{"table" => table})
Expand All @@ -348,7 +347,7 @@ defmodule Ch.ConnectionTest do
rows = [[1, "a"], [2, "b"]]
data = RowBinary.encode_rows(rows, types)

assert %{num_rows: 2} = parameterize_query!(ctx, stmt, data, encode: false)
parameterize_query!(ctx, stmt, data, encode: false)

assert %{rows: rows} =
parameterize_query!(ctx, "select * from {table:Identifier}", %{"table" => table})
Expand All @@ -365,13 +364,12 @@ defmodule Ch.ConnectionTest do
|> Stream.chunk_every(2)
|> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end)

assert {:ok, %{num_rows: 3}} =
parameterize_query(
ctx,
"insert into #{table}(a, b) format RowBinary",
stream,
encode: false
)
parameterize_query(
ctx,
"insert into #{table}(a, b) format RowBinary",
stream,
encode: false
)

assert {:ok, %{rows: rows}} =
parameterize_query(ctx, "select * from {table:Identifier}", %{"table" => table})
Expand All @@ -380,19 +378,17 @@ defmodule Ch.ConnectionTest do
end

test "select", %{table: table} = ctx do
assert {:ok, %{num_rows: 3}} =
parameterize_query(
ctx,
"insert into {table:Identifier} values (1, 'a'), (2, 'b'), (null, null)",
%{"table" => table}
)
parameterize_query(
ctx,
"insert into {table:Identifier} values (1, 'a'), (2, 'b'), (null, null)",
%{"table" => table}
)

assert {:ok, %{num_rows: 3}} =
parameterize_query(
ctx,
"insert into {table:Identifier}(a, b) select a, b from {table:Identifier}",
%{"table" => table}
)
parameterize_query(
ctx,
"insert into {table:Identifier}(a, b) select a, b from {table:Identifier}",
%{"table" => table}
)

assert {:ok, %{rows: rows}} =
parameterize_query(ctx, "select * from {table:Identifier}", %{"table" => table})
Expand Down Expand Up @@ -421,8 +417,7 @@ defmodule Ch.ConnectionTest do

on_exit(fn -> Ch.Test.query("drop table delete_t") end)

assert {:ok, %{num_rows: 2}} =
parameterize_query(ctx, "insert into delete_t values (1,'a'), (2,'b')")
parameterize_query(ctx, "insert into delete_t values (1,'a'), (2,'b')")

settings = [allow_experimental_lightweight_delete: 1]

Expand Down Expand Up @@ -496,18 +491,17 @@ defmodule Ch.ConnectionTest do
parameterize_query!(ctx, "create table fixed_string_t(a FixedString(3)) engine = Memory")
on_exit(fn -> Ch.Test.query("drop table fixed_string_t") end)

assert {:ok, %{num_rows: 4}} =
parameterize_query(
ctx,
"insert into fixed_string_t(a) format RowBinary",
[
[""],
["a"],
["aa"],
["aaa"]
],
types: ["FixedString(3)"]
)
parameterize_query(
ctx,
"insert into fixed_string_t(a) format RowBinary",
[
[""],
["a"],
["aa"],
["aaa"]
],
types: ["FixedString(3)"]
)

assert parameterize_query!(ctx, "select * from fixed_string_t").rows == [
[<<0, 0, 0>>],
Expand Down Expand Up @@ -541,17 +535,16 @@ defmodule Ch.ConnectionTest do
parameterize_query!(ctx, "create table decimal_t(d Decimal32(4)) engine = Memory")
on_exit(fn -> Ch.Test.query("drop table decimal_t") end)

assert %{num_rows: 3} =
parameterize_query!(
ctx,
"insert into decimal_t(d) format RowBinary",
_rows = [
[Decimal.new("2.66")],
[Decimal.new("2.6666")],
[Decimal.new("2.66666")]
],
types: ["Decimal32(4)"]
)
parameterize_query!(
ctx,
"insert into decimal_t(d) format RowBinary",
_rows = [
[Decimal.new("2.66")],
[Decimal.new("2.6666")],
[Decimal.new("2.66666")]
],
types: ["Decimal32(4)"]
)

assert parameterize_query!(ctx, "select * from decimal_t").rows == [
[Decimal.new("2.6600")],
Expand Down Expand Up @@ -852,16 +845,14 @@ defmodule Ch.ConnectionTest do
parameterize_query!(ctx, "CREATE TABLE tuples_t (`a` Tuple(String, Int64)) ENGINE = Memory")
on_exit(fn -> Ch.Test.query("DROP TABLE tuples_t") end)

assert %{num_rows: 2} =
parameterize_query!(ctx, "INSERT INTO tuples_t VALUES (('y', 10)), (('x',-10))")
parameterize_query!(ctx, "INSERT INTO tuples_t VALUES (('y', 10)), (('x',-10))")

assert %{num_rows: 2} =
parameterize_query!(
ctx,
"INSERT INTO tuples_t FORMAT RowBinary",
_rows = [[{"a", 20}], [{"b", 30}]],
types: ["Tuple(String, Int64)"]
)
parameterize_query!(
ctx,
"INSERT INTO tuples_t FORMAT RowBinary",
_rows = [[{"a", 20}], [{"b", 30}]],
types: ["Tuple(String, Int64)"]
)

assert parameterize_query!(ctx, "SELECT a FROM tuples_t ORDER BY a.1 ASC").rows == [
[{"a", 20}],
Expand Down Expand Up @@ -1341,13 +1332,12 @@ defmodule Ch.ConnectionTest do
parameterize_query(ctx, "SELECT n FROM nullable")

# weird thing about nullables is that, similar to bool, in binary format, any byte larger than 0 is `null`
assert {:ok, %{num_rows: 5}} =
parameterize_query(
ctx,
"insert into nullable format RowBinary",
<<1, 2, 3, 4, 5>>,
encode: false
)
parameterize_query(
ctx,
"insert into nullable format RowBinary",
<<1, 2, 3, 4, 5>>,
encode: false
)

assert %{num_rows: 1, rows: [[count]]} =
parameterize_query!(ctx, "select count(*) from nullable where n is null")
Expand Down Expand Up @@ -1815,7 +1805,11 @@ defmodule Ch.ConnectionTest do
]
]

assert {:ok, %{num_rows: 1}} = parameterize_query(ctx, stmt, rows, opts)
parameterize_query(ctx, stmt, rows, opts)

assert parameterize_query!(ctx, "select * from row_binary_names_and_types_t").rows == [
["AB", "rare", -42]
]
end

test "select with lots of columns", ctx do
Expand Down
7 changes: 0 additions & 7 deletions test/ch/settings_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,6 @@ defmodule Ch.SettingsTest do
{:ok, query_options: ctx[:query_options] || []}
end

test "can start without settings", %{query_options: query_options} do
assert {:ok, conn} = Ch.start_link()

assert {:ok, %{num_rows: 1, rows: [["async_insert", "Bool", "0"]]}} =
Ch.query(conn, "show settings like 'async_insert'", [], query_options)
Copy link
Copy Markdown
Collaborator Author

@ruslandoga ruslandoga Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default async_insert is now 1, and that's the reason for written_rows being 0.

end

test "can pass default settings", %{query_options: query_options} do
assert {:ok, conn} = Ch.start_link(settings: [async_insert: 1])

Expand Down
29 changes: 14 additions & 15 deletions test/ch/stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,20 @@ defmodule Ch.StreamTest do
Ch.query!(conn, "create table collect_stream(i UInt64) engine Memory")
on_exit(fn -> Ch.Test.query("DROP TABLE collect_stream") end)

assert %Ch.Result{command: :insert, num_rows: 1_000_000} =
DBConnection.run(conn, fn conn ->
Stream.repeatedly(fn -> [:rand.uniform(100)] end)
|> Stream.chunk_every(100_000)
|> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
|> Stream.take(10)
|> Enum.into(
Ch.stream(
conn,
"insert into collect_stream(i) format RowBinary",
_params = [],
Keyword.merge(query_options, encode: false)
)
)
end)
DBConnection.run(conn, fn conn ->
Stream.repeatedly(fn -> [:rand.uniform(100)] end)
|> Stream.chunk_every(100_000)
|> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
|> Stream.take(10)
|> Enum.into(
Ch.stream(
conn,
"insert into collect_stream(i) format RowBinary",
_params = [],
Keyword.merge(query_options, encode: false)
)
)
end)

assert Ch.query!(conn, "select count(*) from collect_stream").rows == [[1_000_000]]
end
Expand Down
Loading