Skip to content

Commit 3f46deb

Browse files
authored
Merge pull request ClickHouse#79416 from ClickHouse/delta-kernel-rs-support-local-storage
Support local storage in delta-kernel
2 parents 6123a3a + 47023ac commit 3f46deb

File tree

3 files changed

+173
-31
lines changed

3 files changed

+173
-31
lines changed

src/Storages/ObjectStorage/DataLakes/DeltaLake/KernelHelper.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#if USE_DELTA_KERNEL_RS
44
#include <Storages/ObjectStorage/S3/Configuration.h>
5+
#include <Storages/ObjectStorage/Local/Configuration.h>
56
#include "KernelHelper.h"
67
#include "KernelUtils.h"
78
#include <Common/logger_useful.h>
@@ -114,6 +115,36 @@ class S3KernelHelper final : public IKernelHelper
114115
}
115116
};
116117

118+
/// A helper class to manage local fs storage.
119+
class LocalKernelHelper final : public IKernelHelper
120+
{
121+
public:
122+
explicit LocalKernelHelper(const std::string & path_) : table_location(getTableLocation(path_)), path(path_) {}
123+
124+
const std::string & getTableLocation() const override { return table_location; }
125+
126+
const std::string & getDataPath() const override { return path; }
127+
128+
ffi::EngineBuilder * createBuilder() const override
129+
{
130+
ffi::EngineBuilder * builder = KernelUtils::unwrapResult(
131+
ffi::get_engine_builder(
132+
KernelUtils::toDeltaString(table_location),
133+
&KernelUtils::allocateError),
134+
"get_engine_builder");
135+
136+
return builder;
137+
}
138+
139+
private:
140+
const std::string table_location;
141+
const std::string path;
142+
143+
static std::string getTableLocation(const std::string & path)
144+
{
145+
return "file://" + path + "/";
146+
}
147+
};
117148
}
118149

119150
namespace DB
@@ -148,6 +179,11 @@ DeltaLake::KernelHelperPtr getKernelHelper(
148179
s3_credentials.GetSessionToken(),
149180
auth_settings[S3AuthSetting::no_sign_request]);
150181
}
182+
case DB::ObjectStorageType::Local:
183+
{
184+
const auto * local_conf = dynamic_cast<const DB::StorageLocalConfiguration *>(configuration.get());
185+
return std::make_shared<DeltaLake::LocalKernelHelper>(local_conf->getPath());
186+
}
151187
default:
152188
{
153189
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED,

src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,18 @@ class TableSnapshot::Iterator final : public DB::IObjectIterator
124124
if (have_scan_data_res)
125125
{
126126
std::unique_lock lock(next_mutex);
127-
if (!shutdown.load() && data_files.size() >= list_batch_size)
127+
if (!shutdown.load() && list_batch_size && data_files.size() >= list_batch_size)
128128
{
129-
schedule_next_batch_cv.wait(lock, [&]() { return (data_files.size() < list_batch_size) || shutdown.load(); });
129+
LOG_TEST(log, "List batch size is {}/{}", data_files.size(), list_batch_size);
130+
131+
schedule_next_batch_cv.wait(
132+
lock,
133+
[&]() { return (data_files.size() < list_batch_size) || shutdown.load(); });
130134
}
131135
}
132136
else
133137
{
138+
LOG_TEST(log, "All data files were listed");
134139
{
135140
std::lock_guard lock(next_mutex);
136141
iterator_finished = true;

tests/integration/test_database_delta/test.py

Lines changed: 130 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,13 @@ def started_cluster():
3636
user_configs=[],
3737
image="clickhouse/integration-test-with-unity-catalog",
3838
with_installed_binary=False,
39-
tag=os.environ.get("DOCKER_BASE_WITH_UNITY_CATALOG_TAG", "latest")
39+
tag=os.environ.get("DOCKER_BASE_WITH_UNITY_CATALOG_TAG", "latest"),
4040
)
4141

4242
logging.info("Starting cluster...")
4343
cluster.start()
4444

45-
start_unity_catalog(cluster.instances['node1'])
45+
start_unity_catalog(cluster.instances["node1"])
4646

4747
yield cluster
4848

@@ -68,65 +68,166 @@ def execute_spark_query(node, query_text, ignore_exit_code=False):
6868
--conf "spark.sql.defaultCatalog=unity" \\
6969
-S -e "{query_text}" | grep -v 'loading settings'
7070
""",
71-
], nothrow=ignore_exit_code
71+
],
72+
nothrow=ignore_exit_code,
7273
)
7374

75+
7476
def execute_multiple_spark_queries(node, queries_list, ignore_exit_code=False):
75-
return execute_spark_query(node, ';'.join(queries_list), ignore_exit_code)
77+
return execute_spark_query(node, ";".join(queries_list), ignore_exit_code)
78+
7679

7780
def test_embedded_database_and_tables(started_cluster):
78-
node1 = started_cluster.instances['node1']
79-
node1.query("create database unity_test engine DataLakeCatalog('http://localhost:8080/api/2.1/unity-catalog') settings warehouse = 'unity', catalog_type='unity', vended_credentials=false", settings={"allow_experimental_database_unity_catalog": "1"})
80-
default_tables = list(sorted(node1.query("SHOW TABLES FROM unity_test LIKE 'default%'", settings={'use_hive_partitioning':'0'}).strip().split('\n')))
81+
node1 = started_cluster.instances["node1"]
82+
node1.query(
83+
"create database unity_test engine DataLakeCatalog('http://localhost:8080/api/2.1/unity-catalog') settings warehouse = 'unity', catalog_type='unity', vended_credentials=false",
84+
settings={"allow_experimental_database_unity_catalog": "1"},
85+
)
86+
default_tables = list(
87+
sorted(
88+
node1.query(
89+
"SHOW TABLES FROM unity_test LIKE 'default%'",
90+
settings={"use_hive_partitioning": "0"},
91+
)
92+
.strip()
93+
.split("\n")
94+
)
95+
)
8196
print("Default tables", default_tables)
82-
assert default_tables == ['default.marksheet', 'default.marksheet_uniform', 'default.numbers', 'default.user_countries']
97+
assert default_tables == [
98+
"default.marksheet",
99+
"default.marksheet_uniform",
100+
"default.numbers",
101+
"default.user_countries",
102+
]
83103

84104
for table in default_tables:
85105
if table == "default.marksheet_uniform":
86106
continue
87107
assert "DeltaLake" in node1.query(f"show create table unity_test.`{table}`")
88-
if table in ('default.marksheet', 'default.user_countries'):
89-
data_clickhouse = TSV(node1.query(f"SELECT * FROM unity_test.`{table}` ORDER BY 1,2,3"))
90-
data_spark = TSV(execute_spark_query(node1, f"SELECT * FROM unity.{table} ORDER BY 1,2,3"))
108+
if table in ("default.marksheet", "default.user_countries"):
109+
data_clickhouse = TSV(
110+
node1.query(f"SELECT * FROM unity_test.`{table}` ORDER BY 1,2,3")
111+
)
112+
data_spark = TSV(
113+
execute_spark_query(
114+
node1, f"SELECT * FROM unity.{table} ORDER BY 1,2,3"
115+
)
116+
)
91117
print("Data ClickHouse\n", data_clickhouse)
92118
print("Data Spark\n", data_spark)
93119
assert data_clickhouse == data_spark
94120

95121

96122
def test_multiple_schemes_tables(started_cluster):
97-
node1 = started_cluster.instances['node1']
98-
execute_multiple_spark_queries(node1, [f'CREATE SCHEMA test_schema{i}' for i in range(10)], True)
99-
execute_multiple_spark_queries(node1, [f'CREATE TABLE test_schema{i}.test_table{i} (col1 int, col2 double) using Delta location \'/tmp/test_schema{i}/test_table{i}\'' for i in range(10)], True)
100-
execute_multiple_spark_queries(node1, [f'INSERT INTO test_schema{i}.test_table{i} VALUES ({i}, {i}.0)' for i in range(10)], True)
123+
node1 = started_cluster.instances["node1"]
124+
execute_multiple_spark_queries(
125+
node1, [f"CREATE SCHEMA test_schema{i}" for i in range(10)], True
126+
)
127+
execute_multiple_spark_queries(
128+
node1,
129+
[
130+
f"CREATE TABLE test_schema{i}.test_table{i} (col1 int, col2 double) using Delta location '/tmp/test_schema{i}/test_table{i}'"
131+
for i in range(10)
132+
],
133+
True,
134+
)
135+
execute_multiple_spark_queries(
136+
node1,
137+
[
138+
f"INSERT INTO test_schema{i}.test_table{i} VALUES ({i}, {i}.0)"
139+
for i in range(10)
140+
],
141+
True,
142+
)
101143

102-
node1.query("create database multi_schema_test engine DataLakeCatalog('http://localhost:8080/api/2.1/unity-catalog') settings warehouse = 'unity', catalog_type='unity', vended_credentials=false", settings={"allow_experimental_database_unity_catalog": "1"})
103-
multi_schema_tables = list(sorted(node1.query("SHOW TABLES FROM multi_schema_test LIKE 'test_schema%'", settings={'use_hive_partitioning':'0'}).strip().split('\n')))
144+
node1.query(
145+
"create database multi_schema_test engine DataLakeCatalog('http://localhost:8080/api/2.1/unity-catalog') settings warehouse = 'unity', catalog_type='unity', vended_credentials=false",
146+
settings={"allow_experimental_database_unity_catalog": "1"},
147+
)
148+
multi_schema_tables = list(
149+
sorted(
150+
node1.query(
151+
"SHOW TABLES FROM multi_schema_test LIKE 'test_schema%'",
152+
settings={"use_hive_partitioning": "0"},
153+
)
154+
.strip()
155+
.split("\n")
156+
)
157+
)
104158
print(multi_schema_tables)
105159

106160
for i, table in enumerate(multi_schema_tables):
107-
assert node1.query(f"SELECT col1 FROM multi_schema_test.`{table}`").strip() == str(i)
108-
assert int(node1.query(f"SELECT col2 FROM multi_schema_test.`{table}`").strip()) == i
161+
assert node1.query(
162+
f"SELECT col1 FROM multi_schema_test.`{table}`"
163+
).strip() == str(i)
164+
assert (
165+
int(node1.query(f"SELECT col2 FROM multi_schema_test.`{table}`").strip())
166+
== i
167+
)
109168

110169

111-
def test_complex_table_schema(started_cluster):
112-
node1 = started_cluster.instances['node1']
113-
execute_spark_query(node1, "CREATE SCHEMA schema_with_complex_tables", ignore_exit_code=True)
170+
@pytest.mark.parametrize("use_delta_kernel", ["1", "0"])
171+
def test_complex_table_schema(started_cluster, use_delta_kernel):
172+
node1 = started_cluster.instances["node1"]
173+
schema_name = f"schema_with_complex_tables_{use_delta_kernel}"
174+
execute_spark_query(
175+
node1, f"CREATE SCHEMA {schema_name}", ignore_exit_code=True
176+
)
177+
table_name = f"complex_table_{use_delta_kernel}"
114178
schema = "event_date DATE, event_time TIMESTAMP, hits ARRAY<integer>, ids MAP<int, string>, really_complex STRUCT<f1:int,f2:string>"
115-
create_query = f"CREATE TABLE schema_with_complex_tables.complex_table ({schema}) using Delta location '/tmp/complex_schema/complex_table'"
179+
create_query = f"CREATE TABLE {schema_name}.{table_name} ({schema}) using Delta location '/tmp/complex_schema/{table_name}'"
116180
execute_spark_query(node1, create_query, ignore_exit_code=True)
117-
execute_spark_query(node1, "insert into schema_with_complex_tables.complex_table SELECT to_date('2024-10-01', 'yyyy-MM-dd'), to_timestamp('2024-10-01 00:12:00'), array(42, 123, 77), map(7, 'v7', 5, 'v5'), named_struct(\\\"f1\\\", 34, \\\"f2\\\", 'hello')", ignore_exit_code=True)
181+
execute_spark_query(
182+
node1,
183+
f"insert into {schema_name}.{table_name} SELECT to_date('2024-10-01', 'yyyy-MM-dd'), to_timestamp('2024-10-01 00:12:00'), array(42, 123, 77), map(7, 'v7', 5, 'v5'), named_struct(\\\"f1\\\", 34, \\\"f2\\\", 'hello')",
184+
ignore_exit_code=True,
185+
)
118186

119-
node1.query("create database complex_schema engine DataLakeCatalog('http://localhost:8080/api/2.1/unity-catalog') settings warehouse = 'unity', catalog_type='unity', vended_credentials=false", settings={"allow_experimental_database_unity_catalog": "1"})
187+
node1.query(
188+
f"""
189+
drop database if exists complex_schema;
190+
create database complex_schema
191+
engine DataLakeCatalog('http://localhost:8080/api/2.1/unity-catalog')
192+
settings warehouse = 'unity', catalog_type='unity', vended_credentials=false, allow_experimental_delta_kernel_rs={use_delta_kernel}
193+
""",
194+
settings={"allow_experimental_database_unity_catalog": "1"},
195+
)
120196

121-
complex_schema_tables = list(sorted(node1.query("SHOW TABLES FROM complex_schema LIKE 'schema_with_complex_tables%'", settings={'use_hive_partitioning':'0'}).strip().split('\n')))
197+
complex_schema_tables = list(
198+
sorted(
199+
node1.query(
200+
f"SHOW TABLES FROM complex_schema LIKE '{schema_name}%'",
201+
settings={"use_hive_partitioning": "0"},
202+
)
203+
.strip()
204+
.split("\n")
205+
)
206+
)
122207

123208
assert len(complex_schema_tables) == 1
124209

125-
print(node1.query("SHOW CREATE TABLE complex_schema.`schema_with_complex_tables.complex_table`"))
126-
complex_data = node1.query("SELECT * FROM complex_schema.`schema_with_complex_tables.complex_table`").strip().split('\t')
210+
print(
211+
node1.query(
212+
f"SHOW CREATE TABLE complex_schema.`{schema_name}.{table_name}`"
213+
)
214+
)
215+
complex_data = (
216+
node1.query(
217+
f"SELECT * FROM complex_schema.`{schema_name}.{table_name}`"
218+
)
219+
.strip()
220+
.split("\t")
221+
)
127222
print(complex_data)
128223
assert complex_data[0] == "2024-10-01"
129-
assert complex_data[1] == "2024-10-01 00:12:00.000000"
224+
if use_delta_kernel == "1":
225+
assert complex_data[1] == "2024-10-01 00:12:00.000" #FIXME
226+
else:
227+
assert complex_data[1] == "2024-10-01 00:12:00.000000"
130228
assert complex_data[2] == "[42,123,77]"
131229
assert complex_data[3] == "{7:'v7',5:'v5'}"
132230
assert complex_data[4] == "(34,'hello')"
231+
232+
if use_delta_kernel == "1":
233+
assert node1.contains_in_log(f"DeltaLakeMetadata: Initializing snapshot")

0 commit comments

Comments
 (0)