Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@
"mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},
"statistex": {:hex, :statistex, "1.1.0", "7fec1eb2f580a0d2c1a05ed27396a084ab064a40cfc84246dbfb0c72a5c761e5", [:mix], [], "hexpm", "f5950ea26ad43246ba2cce54324ac394a4e7408fdcf98b8e230f503a0cba9cf5"},
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
"telemetry": {:hex, :telemetry, "1.4.1", "ab6de178e2b29b58e8256b92b382ea3f590a47152ca3651ea857a6cae05ac423", [:rebar3], [], "hexpm", "2172e05a27531d3d31dd9782841065c50dd5c3c7699d95266b2edd54c2dafa1c"},
"tz": {:hex, :tz, "0.28.1", "717f5ffddfd1e475e2a233e221dc0b4b76c35c4b3650b060c8e3ba29dd6632e9", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:mint, "~> 1.6", [hex: :mint, repo: "hexpm", optional: true]}], "hexpm", "bfdca1aa1902643c6c43b77c1fb0cb3d744fd2f09a8a98405468afdee0848c8a"},
}
144 changes: 69 additions & 75 deletions test/ch/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -288,24 +288,22 @@ defmodule Ch.ConnectionTest do
end

test "values", %{table: table} = ctx do
assert {:ok, %{num_rows: 3}} =
parameterize_query(
ctx,
"insert into {table:Identifier} values (1, 'a'),(2,'b'), (null, null)",
%{"table" => table}
)
parameterize_query(
ctx,
"insert into {table:Identifier} values (1, 'a'),(2,'b'), (null, null)",
%{"table" => table}
)

assert {:ok, %{rows: rows}} =
parameterize_query(ctx, "select * from {table:Identifier}", %{"table" => table})

assert rows == [[1, "a"], [2, "b"], [1, ""]]

assert {:ok, %{num_rows: 2}} =
parameterize_query(
ctx,
"insert into {$0:Identifier}(a, b) values ({$1:UInt8},{$2:String}),({$3:UInt8},{$4:String})",
[table, 4, "d", 5, "e"]
)
parameterize_query(
ctx,
"insert into {$0:Identifier}(a, b) values ({$1:UInt8},{$2:String}),({$3:UInt8},{$4:String})",
[table, 4, "d", 5, "e"]
)

assert {:ok, %{rows: rows}} =
parameterize_query(ctx, "select * from {table:Identifier} where a >= 4", %{
Expand Down Expand Up @@ -333,7 +331,8 @@ defmodule Ch.ConnectionTest do
stmt = "insert into #{table}(a, b) format RowBinary"
types = ["UInt8", "String"]
rows = [[1, "a"], [2, "b"]]
assert %{num_rows: 2} = parameterize_query!(ctx, stmt, rows, types: types)

parameterize_query!(ctx, stmt, rows, types: types)

assert %{rows: rows} =
parameterize_query!(ctx, "select * from {table:Identifier}", %{"table" => table})
Expand All @@ -348,7 +347,7 @@ defmodule Ch.ConnectionTest do
rows = [[1, "a"], [2, "b"]]
data = RowBinary.encode_rows(rows, types)

assert %{num_rows: 2} = parameterize_query!(ctx, stmt, data, encode: false)
parameterize_query!(ctx, stmt, data, encode: false)

assert %{rows: rows} =
parameterize_query!(ctx, "select * from {table:Identifier}", %{"table" => table})
Expand All @@ -365,13 +364,12 @@ defmodule Ch.ConnectionTest do
|> Stream.chunk_every(2)
|> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end)

assert {:ok, %{num_rows: 3}} =
parameterize_query(
ctx,
"insert into #{table}(a, b) format RowBinary",
stream,
encode: false
)
parameterize_query(
ctx,
"insert into #{table}(a, b) format RowBinary",
stream,
encode: false
)

assert {:ok, %{rows: rows}} =
parameterize_query(ctx, "select * from {table:Identifier}", %{"table" => table})
Expand All @@ -380,19 +378,17 @@ defmodule Ch.ConnectionTest do
end

test "select", %{table: table} = ctx do
assert {:ok, %{num_rows: 3}} =
parameterize_query(
ctx,
"insert into {table:Identifier} values (1, 'a'), (2, 'b'), (null, null)",
%{"table" => table}
)
parameterize_query(
ctx,
"insert into {table:Identifier} values (1, 'a'), (2, 'b'), (null, null)",
%{"table" => table}
)

assert {:ok, %{num_rows: 3}} =
parameterize_query(
ctx,
"insert into {table:Identifier}(a, b) select a, b from {table:Identifier}",
%{"table" => table}
)
parameterize_query(
ctx,
"insert into {table:Identifier}(a, b) select a, b from {table:Identifier}",
%{"table" => table}
)

assert {:ok, %{rows: rows}} =
parameterize_query(ctx, "select * from {table:Identifier}", %{"table" => table})
Expand Down Expand Up @@ -421,8 +417,7 @@ defmodule Ch.ConnectionTest do

on_exit(fn -> Ch.Test.query("drop table delete_t") end)

assert {:ok, %{num_rows: 2}} =
parameterize_query(ctx, "insert into delete_t values (1,'a'), (2,'b')")
parameterize_query(ctx, "insert into delete_t values (1,'a'), (2,'b')")

settings = [allow_experimental_lightweight_delete: 1]

Expand Down Expand Up @@ -496,18 +491,17 @@ defmodule Ch.ConnectionTest do
parameterize_query!(ctx, "create table fixed_string_t(a FixedString(3)) engine = Memory")
on_exit(fn -> Ch.Test.query("drop table fixed_string_t") end)

assert {:ok, %{num_rows: 4}} =
parameterize_query(
ctx,
"insert into fixed_string_t(a) format RowBinary",
[
[""],
["a"],
["aa"],
["aaa"]
],
types: ["FixedString(3)"]
)
parameterize_query(
ctx,
"insert into fixed_string_t(a) format RowBinary",
[
[""],
["a"],
["aa"],
["aaa"]
],
types: ["FixedString(3)"]
)

assert parameterize_query!(ctx, "select * from fixed_string_t").rows == [
[<<0, 0, 0>>],
Expand Down Expand Up @@ -541,17 +535,16 @@ defmodule Ch.ConnectionTest do
parameterize_query!(ctx, "create table decimal_t(d Decimal32(4)) engine = Memory")
on_exit(fn -> Ch.Test.query("drop table decimal_t") end)

assert %{num_rows: 3} =
parameterize_query!(
ctx,
"insert into decimal_t(d) format RowBinary",
_rows = [
[Decimal.new("2.66")],
[Decimal.new("2.6666")],
[Decimal.new("2.66666")]
],
types: ["Decimal32(4)"]
)
parameterize_query!(
ctx,
"insert into decimal_t(d) format RowBinary",
_rows = [
[Decimal.new("2.66")],
[Decimal.new("2.6666")],
[Decimal.new("2.66666")]
],
types: ["Decimal32(4)"]
)

assert parameterize_query!(ctx, "select * from decimal_t").rows == [
[Decimal.new("2.6600")],
Expand Down Expand Up @@ -852,16 +845,14 @@ defmodule Ch.ConnectionTest do
parameterize_query!(ctx, "CREATE TABLE tuples_t (`a` Tuple(String, Int64)) ENGINE = Memory")
on_exit(fn -> Ch.Test.query("DROP TABLE tuples_t") end)

assert %{num_rows: 2} =
parameterize_query!(ctx, "INSERT INTO tuples_t VALUES (('y', 10)), (('x',-10))")
parameterize_query!(ctx, "INSERT INTO tuples_t VALUES (('y', 10)), (('x',-10))")

assert %{num_rows: 2} =
parameterize_query!(
ctx,
"INSERT INTO tuples_t FORMAT RowBinary",
_rows = [[{"a", 20}], [{"b", 30}]],
types: ["Tuple(String, Int64)"]
)
parameterize_query!(
ctx,
"INSERT INTO tuples_t FORMAT RowBinary",
_rows = [[{"a", 20}], [{"b", 30}]],
types: ["Tuple(String, Int64)"]
)

assert parameterize_query!(ctx, "SELECT a FROM tuples_t ORDER BY a.1 ASC").rows == [
[{"a", 20}],
Expand Down Expand Up @@ -1341,13 +1332,12 @@ defmodule Ch.ConnectionTest do
parameterize_query(ctx, "SELECT n FROM nullable")

# weird thing about nullables is that, similar to bool, in binary format, any byte larger than 0 is `null`
assert {:ok, %{num_rows: 5}} =
parameterize_query(
ctx,
"insert into nullable format RowBinary",
<<1, 2, 3, 4, 5>>,
encode: false
)
parameterize_query(
ctx,
"insert into nullable format RowBinary",
<<1, 2, 3, 4, 5>>,
encode: false
)

assert %{num_rows: 1, rows: [[count]]} =
parameterize_query!(ctx, "select count(*) from nullable where n is null")
Expand Down Expand Up @@ -1815,7 +1805,11 @@ defmodule Ch.ConnectionTest do
]
]

assert {:ok, %{num_rows: 1}} = parameterize_query(ctx, stmt, rows, opts)
parameterize_query(ctx, stmt, rows, opts)

assert parameterize_query!(ctx, "select * from row_binary_names_and_types_t").rows == [
["AB", "rare", -42]
]
end

test "select with lots of columns", ctx do
Expand Down
7 changes: 0 additions & 7 deletions test/ch/settings_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,6 @@ defmodule Ch.SettingsTest do
{:ok, query_options: ctx[:query_options] || []}
end

test "can start without settings", %{query_options: query_options} do
assert {:ok, conn} = Ch.start_link()

assert {:ok, %{num_rows: 1, rows: [["async_insert", "Bool", "0"]]}} =
Ch.query(conn, "show settings like 'async_insert'", [], query_options)
Copy link
Copy Markdown
Collaborator Author

@ruslandoga ruslandoga Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default async_insert is now 1, and that's the reason for written_rows being 0.

end

test "can pass default settings", %{query_options: query_options} do
assert {:ok, conn} = Ch.start_link(settings: [async_insert: 1])

Expand Down
29 changes: 14 additions & 15 deletions test/ch/stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,20 @@ defmodule Ch.StreamTest do
Ch.query!(conn, "create table collect_stream(i UInt64) engine Memory")
on_exit(fn -> Ch.Test.query("DROP TABLE collect_stream") end)

assert %Ch.Result{command: :insert, num_rows: 1_000_000} =
DBConnection.run(conn, fn conn ->
Stream.repeatedly(fn -> [:rand.uniform(100)] end)
|> Stream.chunk_every(100_000)
|> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
|> Stream.take(10)
|> Enum.into(
Ch.stream(
conn,
"insert into collect_stream(i) format RowBinary",
_params = [],
Keyword.merge(query_options, encode: false)
)
)
end)
DBConnection.run(conn, fn conn ->
Stream.repeatedly(fn -> [:rand.uniform(100)] end)
|> Stream.chunk_every(100_000)
|> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
|> Stream.take(10)
|> Enum.into(
Ch.stream(
conn,
"insert into collect_stream(i) format RowBinary",
_params = [],
Keyword.merge(query_options, encode: false)
)
)
end)

assert Ch.query!(conn, "select count(*) from collect_stream").rows == [[1_000_000]]
end
Expand Down
Loading