Skip to content

Commit 3144857

Browse files
committed
Update on reads, tests
1 parent ee84f2b commit 3144857

File tree

11 files changed

+68
-27
lines changed

11 files changed

+68
-27
lines changed

src/Databases/DataLake/DatabaseDataLake.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -434,17 +434,18 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
434434
configuration,
435435
configuration->createObjectStorage(context_copy, /* is_readonly */ false),
436436
StorageID(getDatabaseName(), name),
437-
/* columns */columns,
438-
/* constraints */ConstraintsDescription{},
439-
/* partition_by */nullptr,
437+
/* columns */ columns,
438+
/* constraints */ ConstraintsDescription{},
439+
/* partition_by */ nullptr,
440440
context_copy,
441-
/* comment */"",
441+
/* comment */ "",
442442
getFormatSettings(context_copy),
443443
LoadingStrictnessLevel::CREATE,
444444
getCatalog(),
445-
/* if_not_exists*/true,
446-
/* is_datalake_query*/true,
447-
/* lazy_init */true);
445+
/* if_not_exists */ true,
446+
/* is_datalake_query */ true,
447+
/* is_table_function */ true,
448+
/* lazy_init */ true);
448449
}
449450

450451
void DatabaseDataLake::dropTable( /// NOLINT

src/Storages/IStorageCluster.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,8 @@ void IStorageCluster::read(
288288
return;
289289
}
290290

291+
updateConfigurationIfNeeded(context);
292+
291293
storage_snapshot->check(column_names);
292294

293295
const auto & settings = context->getSettingsRef();

src/Storages/IStorageCluster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ class IStorageCluster : public IStorage
9797
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method writeFallBackToPure is not supported by storage {}", getName());
9898
}
9999

100+
virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {}
101+
100102
private:
101103
static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts = 0);
102104

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
9191
std::shared_ptr<DataLake::ICatalog> catalog,
9292
bool if_not_exists,
9393
bool is_datalake_query,
94+
bool is_table_function,
9495
bool lazy_init)
9596
: IStorageCluster(
9697
cluster_name_, table_id_, getLogger(fmt::format("{}({})", configuration_->getEngineName(), table_id_.table_name)))
@@ -145,6 +146,10 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
145146
tryLogCurrentException(log_);
146147
}
147148

149+
// For tables need to update configuration on each read
150+
// because data can be changed after previous update
151+
update_configuration_on_read_write = !is_table_function;
152+
148153
ColumnsDescription columns{columns_in_table_or_function_definition};
149154
std::string sample_path;
150155
if (need_resolve_columns_or_format)
@@ -743,6 +748,18 @@ IDataLakeMetadata * StorageObjectStorageCluster::getExternalMetadata(ContextPtr
743748
return configuration->getExternalMetadata();
744749
}
745750

751+
void StorageObjectStorageCluster::updateConfigurationIfNeeded(ContextPtr context)
752+
{
753+
if (update_configuration_on_read_write)
754+
{
755+
configuration->update(
756+
object_storage,
757+
context,
758+
/* if_not_updated_before */false,
759+
/* check_consistent_with_previous_metadata */false);
760+
}
761+
}
762+
746763
void StorageObjectStorageCluster::checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const
747764
{
748765
if (getClusterName(context).empty())

src/Storages/ObjectStorage/StorageObjectStorageCluster.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ class StorageObjectStorageCluster : public IStorageCluster
2525
std::shared_ptr<DataLake::ICatalog> catalog,
2626
bool if_not_exists,
2727
bool is_datalake_query,
28-
bool lazy_init = false);
28+
bool is_table_function,
29+
bool lazy_init);
2930

3031
std::string getName() const override;
3132

@@ -154,6 +155,8 @@ class StorageObjectStorageCluster : public IStorageCluster
154155
ContextPtr context,
155156
bool async_insert) override;
156157

158+
void updateConfigurationIfNeeded(ContextPtr context) override;
159+
157160
/*
158161
In case the table was created with `object_storage_cluster` setting,
159162
modify the AST query object so that it uses the table function implementation
@@ -176,6 +179,7 @@ class StorageObjectStorageCluster : public IStorageCluster
176179

177180
/// non-clustered storage to fall back on pure realisation if needed
178181
std::shared_ptr<StorageObjectStorage> pure_storage;
182+
bool update_configuration_on_read_write;
179183
};
180184

181185
}

src/Storages/ObjectStorage/registerStorageObjectStorage.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject
9898
args.mode,
9999
configuration->getCatalog(context, args.query.attach),
100100
args.query.if_not_exists,
101-
/* is_datalake_query*/ false);
101+
/* is_datalake_query */ false,
102+
/* is_table_function */ false,
103+
/* lazy_init */ false);
102104
}
103105

104106
#endif

src/TableFunctions/TableFunctionObjectStorage.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,11 @@ StoragePtr TableFunctionObjectStorage<Definition, Configuration, is_data_lake>::
204204
/* comment */ String{},
205205
/* format_settings */ std::nullopt, /// No format_settings
206206
/* mode */ LoadingStrictnessLevel::CREATE,
207-
configuration->getCatalog(context, /*attach*/ false),
207+
configuration->getCatalog(context, /* attach */ false),
208208
/* if_not_exists */ false,
209-
/* is_datalake_query*/ false);
209+
/* is_datalake_query */ false,
210+
/* is_table_function */ true,
211+
/* lazy_init */ false);
210212

211213
storage->startup();
212214
return storage;

src/TableFunctions/TableFunctionObjectStorageCluster.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ StoragePtr TableFunctionObjectStorageCluster<Definition, Configuration, is_data_
7676
/* mode */ LoadingStrictnessLevel::CREATE,
7777
/* catalog*/ nullptr,
7878
/* if_not_exists */ false,
79-
/* is_datalake_query*/ false,
79+
/* is_datalake_query */ false,
80+
/* is_table_function */ true,
8081
/* lazy_init */ true);
8182
}
8283

tests/integration/helpers/iceberg_utils.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -288,22 +288,26 @@ def get_creation_expression(
288288
)
289289

290290
elif storage_type == "local":
291-
assert not run_on_cluster
292-
293-
if table_function:
291+
if run_on_cluster:
292+
assert table_function
294293
return f"""
295-
iceberg{engine_part}({storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format})
294+
iceberg{engine_part}Cluster('cluster_single_node', {storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format})
296295
"""
297296
else:
298-
return (
299-
f"""
300-
DROP TABLE IF EXISTS {table_name};
301-
CREATE TABLE {if_not_exists_prefix} {table_name} {schema}
302-
ENGINE=Iceberg{engine_part}({storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format})
303-
{partition_by}
304-
{settings_expression}
297+
if table_function:
298+
return f"""
299+
iceberg{engine_part}({storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format})
305300
"""
306-
)
301+
else:
302+
return (
303+
f"""
304+
DROP TABLE IF EXISTS {table_name};
305+
CREATE TABLE {if_not_exists_prefix} {table_name} {schema}
306+
ENGINE=Iceberg{engine_part}({storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format})
307+
{partition_by}
308+
{settings_expression}
309+
"""
310+
)
307311

308312
else:
309313
raise Exception(f"Unknown iceberg storage type: {storage_type}")

tests/integration/test_storage_iceberg/configs/config.d/cluster.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,13 @@
1616
</replica>
1717
</shard>
1818
</cluster_simple>
19+
<cluster_single_node>
20+
<shard>
21+
<replica>
22+
<host>node1</host>
23+
<port>9000</port>
24+
</replica>
25+
</shard>
26+
</cluster_single_node>
1927
</remote_servers>
2028
</clickhouse>

0 commit comments

Comments
 (0)