Skip to content

Commit 9d7edd8

Browse files
committed
More local context
1 parent 5ff409a commit 9d7edd8

File tree

6 files changed

+70
-36
lines changed

6 files changed

+70
-36
lines changed

src/Databases/DataLake/RestCatalog.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,7 @@ bool RestCatalog::getTableMetadataImpl(
627627
{
628628
// int format_version = metadata_object->getValue<int>("format-version");
629629
auto schema_processor = DB::Iceberg::IcebergSchemaProcessor(context_);
630-
auto id = DB::IcebergMetadata::parseTableSchema(metadata_object, schema_processor, log);
630+
auto id = DB::IcebergMetadata::parseTableSchema(metadata_object, schema_processor, context_, log);
631631
auto schema = schema_processor.getClickhouseTableSchemaById(id);
632632
result.setSchema(*schema);
633633
}

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ IcebergMetadata::IcebergMetadata(
148148
updateState(context_, metadata_object_);
149149
}
150150

151-
void IcebergMetadata::addTableSchemaById(Int32 schema_id, Poco::JSON::Object::Ptr metadata_object) const
151+
void IcebergMetadata::addTableSchemaById(Int32 schema_id, Poco::JSON::Object::Ptr metadata_object, ContextPtr context_) const
152152
{
153153
if (persistent_components.schema_processor->hasClickhouseTableSchemaById(schema_id))
154154
return;
@@ -163,7 +163,7 @@ void IcebergMetadata::addTableSchemaById(Int32 schema_id, Poco::JSON::Object::Pt
163163
auto current_schema = schemas->getObject(i);
164164
if (current_schema->has(f_schema_id) && current_schema->getValue<int>(f_schema_id) == schema_id)
165165
{
166-
persistent_components.schema_processor->addIcebergTableSchema(current_schema);
166+
persistent_components.schema_processor->addIcebergTableSchema(current_schema, context_);
167167
return;
168168
}
169169
}
@@ -174,21 +174,24 @@ void IcebergMetadata::addTableSchemaById(Int32 schema_id, Poco::JSON::Object::Pt
174174
}
175175

176176
Int32 IcebergMetadata::parseTableSchema(
177-
const Poco::JSON::Object::Ptr & metadata_object, IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger)
177+
const Poco::JSON::Object::Ptr & metadata_object,
178+
IcebergSchemaProcessor & schema_processor,
179+
ContextPtr context_,
180+
LoggerPtr metadata_logger)
178181
{
179182
const auto format_version = metadata_object->getValue<Int32>(f_format_version);
180183
if (format_version == 2)
181184
{
182185
auto [schema, current_schema_id] = parseTableSchemaV2Method(metadata_object);
183-
schema_processor.addIcebergTableSchema(schema);
186+
schema_processor.addIcebergTableSchema(schema, context_);
184187
return current_schema_id;
185188
}
186189
else
187190
{
188191
try
189192
{
190193
auto [schema, current_schema_id] = parseTableSchemaV1Method(metadata_object);
191-
schema_processor.addIcebergTableSchema(schema);
194+
schema_processor.addIcebergTableSchema(schema, context_);
192195
return current_schema_id;
193196
}
194197
catch (const Exception & first_error)
@@ -198,7 +201,7 @@ Int32 IcebergMetadata::parseTableSchema(
198201
try
199202
{
200203
auto [schema, current_schema_id] = parseTableSchemaV2Method(metadata_object);
201-
schema_processor.addIcebergTableSchema(schema);
204+
schema_processor.addIcebergTableSchema(schema, context_);
202205
LOG_WARNING(
203206
metadata_logger,
204207
"Iceberg table schema was parsed using v2 specification, but it was impossible to parse it using v1 "
@@ -457,7 +460,7 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec
457460
for (UInt32 j = 0; j < schemas->size(); ++j)
458461
{
459462
auto schema = schemas->getObject(j);
460-
persistent_components.schema_processor->addIcebergTableSchema(schema);
463+
persistent_components.schema_processor->addIcebergTableSchema(schema, local_context);
461464
}
462465
auto snapshots = metadata_object->get(f_snapshots).extract<Poco::JSON::Array::Ptr>();
463466
bool successfully_found_snapshot = false;
@@ -522,7 +525,7 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec
522525
relevant_snapshot_id,
523526
configuration_ptr->getPathForRead().path);
524527
relevant_snapshot_schema_id = snapshot->getValue<Int32>(f_schema_id);
525-
addTableSchemaById(relevant_snapshot_schema_id, metadata_object);
528+
addTableSchemaById(relevant_snapshot_schema_id, metadata_object, local_context);
526529
}
527530
}
528531
if (!successfully_found_snapshot)
@@ -609,7 +612,11 @@ void IcebergMetadata::updateState(const ContextPtr & local_context, Poco::JSON::
609612
{
610613
updateSnapshot(local_context, metadata_object);
611614
}
612-
relevant_snapshot_schema_id = parseTableSchema(metadata_object, *persistent_components.schema_processor, log);
615+
relevant_snapshot_schema_id = parseTableSchema(
616+
metadata_object,
617+
*persistent_components.schema_processor,
618+
local_context,
619+
log);
613620
}
614621
}
615622

@@ -626,14 +633,17 @@ std::shared_ptr<NamesAndTypesList> IcebergMetadata::getInitialSchemaByPath(Conte
626633
: nullptr;
627634
}
628635

629-
std::shared_ptr<const ActionsDAG> IcebergMetadata::getSchemaTransformer(ContextPtr, ObjectInfoPtr object_info) const
636+
std::shared_ptr<const ActionsDAG> IcebergMetadata::getSchemaTransformer(ContextPtr context_, ObjectInfoPtr object_info) const
630637
{
631638
IcebergDataObjectInfo * iceberg_object_info = dynamic_cast<IcebergDataObjectInfo *>(object_info.get());
632639
SharedLockGuard lock(mutex);
633640
if (!iceberg_object_info)
634641
return nullptr;
635642
return (iceberg_object_info->underlying_format_read_schema_id != relevant_snapshot_schema_id)
636-
? persistent_components.schema_processor->getSchemaTransformationDagByIds(iceberg_object_info->underlying_format_read_schema_id, relevant_snapshot_schema_id)
643+
? persistent_components.schema_processor->getSchemaTransformationDagByIds(
644+
context_,
645+
iceberg_object_info->underlying_format_read_schema_id,
646+
relevant_snapshot_schema_id)
637647
: nullptr;
638648
}
639649

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,10 @@ class IcebergMetadata : public IDataLakeMetadata
8080
bool supportsSchemaEvolution() const override { return true; }
8181

8282
static Int32 parseTableSchema(
83-
const Poco::JSON::Object::Ptr & metadata_object, Iceberg::IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger);
83+
const Poco::JSON::Object::Ptr & metadata_object,
84+
Iceberg::IcebergSchemaProcessor & schema_processor,
85+
ContextPtr context_,
86+
LoggerPtr metadata_logger);
8487

8588
bool supportsUpdate() const override { return true; }
8689
bool supportsWrites() const override { return true; }
@@ -150,7 +153,7 @@ class IcebergMetadata : public IDataLakeMetadata
150153

151154
void updateState(const ContextPtr & local_context, Poco::JSON::Object::Ptr metadata_object) TSA_REQUIRES(mutex);
152155
void updateSnapshot(ContextPtr local_context, Poco::JSON::Object::Ptr metadata_object) TSA_REQUIRES(mutex);
153-
void addTableSchemaById(Int32 schema_id, Poco::JSON::Object::Ptr metadata_object) const TSA_REQUIRES(mutex);
156+
void addTableSchemaById(Int32 schema_id, Poco::JSON::Object::Ptr metadata_object, ContextPtr context_) const TSA_REQUIRES(mutex);
154157
std::optional<Int32> getSchemaVersionByFileIfOutdated(String data_path) const TSA_REQUIRES_SHARED(mutex);
155158
void initializeSchemasFromManifestList(ContextPtr local_context, ManifestFileCacheKeys manifest_list_ptr) const TSA_REQUIRES(mutex);
156159
};

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ ManifestFileContent::ManifestFileContent(
200200
const Poco::JSON::Object::Ptr & schema_object = json.extract<Poco::JSON::Object::Ptr>();
201201
Int32 manifest_schema_id = schema_object->getValue<int>(f_schema_id);
202202

203-
schema_processor.addIcebergTableSchema(schema_object);
203+
schema_processor.addIcebergTableSchema(schema_object, context);
204204

205205
for (size_t i = 0; i != partition_specification->size(); ++i)
206206
{

src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ namespace Iceberg
151151

152152
std::string IcebergSchemaProcessor::default_link{};
153153

154-
void IcebergSchemaProcessor::addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr)
154+
void IcebergSchemaProcessor::addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr, ContextPtr context_)
155155
{
156156
std::lock_guard lock(mutex);
157157

@@ -174,7 +174,7 @@ void IcebergSchemaProcessor::addIcebergTableSchema(Poco::JSON::Object::Ptr schem
174174
auto name = field->getValue<String>(f_name);
175175
bool required = field->getValue<bool>(f_required);
176176
current_full_name = name;
177-
auto type = getFieldType(field, f_type, required, current_full_name, true);
177+
auto type = getFieldType(field, f_type, context_, required, current_full_name, true);
178178
clickhouse_schema->push_back(NameAndTypePair{name, type});
179179
clickhouse_types_by_source_ids[{schema_id, field->getValue<Int32>(f_id)}] = NameAndTypePair{current_full_name, type};
180180
clickhouse_ids_by_source_names[{schema_id, current_full_name}] = field->getValue<Int32>(f_id);
@@ -275,21 +275,25 @@ DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name, Cont
275275
}
276276

277277
DataTypePtr
278-
IcebergSchemaProcessor::getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type, String & current_full_name, bool is_subfield_of_root)
278+
IcebergSchemaProcessor::getComplexTypeFromObject(
279+
const Poco::JSON::Object::Ptr & type,
280+
String & current_full_name,
281+
ContextPtr context_,
282+
bool is_subfield_of_root)
279283
{
280284
String type_name = type->getValue<String>(f_type);
281285
if (type_name == f_list)
282286
{
283287
bool element_required = type->getValue<bool>("element-required");
284-
auto element_type = getFieldType(type, f_element, element_required);
288+
auto element_type = getFieldType(type, f_element, context_, element_required);
285289
return std::make_shared<DataTypeArray>(element_type);
286290
}
287291

288292
if (type_name == f_map)
289293
{
290-
auto key_type = getFieldType(type, f_key, true);
294+
auto key_type = getFieldType(type, f_key, context_, true);
291295
auto value_required = type->getValue<bool>("value-required");
292-
auto value_type = getFieldType(type, f_value, value_required);
296+
auto value_type = getFieldType(type, f_value, context_, value_required);
293297
return std::make_shared<DataTypeMap>(key_type, value_type);
294298
}
295299

@@ -313,7 +317,7 @@ IcebergSchemaProcessor::getComplexTypeFromObject(const Poco::JSON::Object::Ptr &
313317

314318
(current_full_name += ".").append(element_names.back());
315319
scope_guard guard([&] { current_full_name.resize(current_full_name.size() - element_names.back().size() - 1); });
316-
element_types.push_back(getFieldType(field, f_type, required, current_full_name, true));
320+
element_types.push_back(getFieldType(field, f_type, context_, required, current_full_name, true));
317321
TSA_SUPPRESS_WARNING_FOR_WRITE(clickhouse_types_by_source_ids)
318322
[{schema_id, field->getValue<Int32>(f_id)}] = NameAndTypePair{current_full_name, element_types.back()};
319323

@@ -322,7 +326,7 @@ IcebergSchemaProcessor::getComplexTypeFromObject(const Poco::JSON::Object::Ptr &
322326
}
323327
else
324328
{
325-
element_types.push_back(getFieldType(field, f_type, required));
329+
element_types.push_back(getFieldType(field, f_type, context_, required));
326330
}
327331
}
328332

@@ -335,18 +339,19 @@ IcebergSchemaProcessor::getComplexTypeFromObject(const Poco::JSON::Object::Ptr &
335339
DataTypePtr IcebergSchemaProcessor::getFieldType(
336340
const Poco::JSON::Object::Ptr & field,
337341
const String & type_key,
342+
ContextPtr context_,
338343
bool required,
339344
String & current_full_name,
340345
bool is_subfield_of_root)
341346
{
342347
if (field->isObject(type_key))
343-
return getComplexTypeFromObject(field->getObject(type_key), current_full_name, is_subfield_of_root);
348+
return getComplexTypeFromObject(field->getObject(type_key), current_full_name, context_, is_subfield_of_root);
344349

345350
auto type = field->get(type_key);
346351
if (type.isString())
347352
{
348353
const String & type_name = type.extract<String>();
349-
auto data_type = getSimpleType(type_name, getContext());
354+
auto data_type = getSimpleType(type_name, context_);
350355
return required ? data_type : makeNullable(data_type);
351356
}
352357

@@ -376,7 +381,11 @@ bool IcebergSchemaProcessor::allowPrimitiveTypeConversion(const String & old_typ
376381

377382
// Ids are passed only for error logging purposes
378383
std::shared_ptr<ActionsDAG> IcebergSchemaProcessor::getSchemaTransformationDag(
379-
const Poco::JSON::Object::Ptr & old_schema, const Poco::JSON::Object::Ptr & new_schema, Int32 old_id, Int32 new_id)
384+
const Poco::JSON::Object::Ptr & old_schema,
385+
const Poco::JSON::Object::Ptr & new_schema,
386+
ContextPtr context_,
387+
Int32 old_id,
388+
Int32 new_id)
380389
{
381390
std::unordered_map<size_t, std::pair<Poco::JSON::Object::Ptr, const ActionsDAG::Node *>> old_schema_entries;
382391
auto old_schema_fields = old_schema->get(f_fields).extract<Poco::JSON::Array::Ptr>();
@@ -388,7 +397,7 @@ std::shared_ptr<ActionsDAG> IcebergSchemaProcessor::getSchemaTransformationDag(
388397
size_t id = field->getValue<size_t>(f_id);
389398
auto name = field->getValue<String>(f_name);
390399
bool required = field->getValue<bool>(f_required);
391-
old_schema_entries[id] = {field, &dag->addInput(name, getFieldType(field, f_type, required))};
400+
old_schema_entries[id] = {field, &dag->addInput(name, getFieldType(field, f_type, context_, required))};
392401
}
393402
auto new_schema_fields = new_schema->get(f_fields).extract<Poco::JSON::Array::Ptr>();
394403
for (size_t i = 0; i != new_schema_fields->size(); ++i)
@@ -397,7 +406,7 @@ std::shared_ptr<ActionsDAG> IcebergSchemaProcessor::getSchemaTransformationDag(
397406
size_t id = field->getValue<size_t>(f_id);
398407
auto name = field->getValue<String>(f_name);
399408
bool required = field->getValue<bool>(f_required);
400-
auto type = getFieldType(field, f_type, required);
409+
auto type = getFieldType(field, f_type, context_, required);
401410
auto old_node_it = old_schema_entries.find(id);
402411
if (old_node_it != old_schema_entries.end())
403412
{
@@ -407,7 +416,7 @@ std::shared_ptr<ActionsDAG> IcebergSchemaProcessor::getSchemaTransformationDag(
407416
|| field->getObject(f_type)->getValue<std::string>(f_type) == "list"
408417
|| field->getObject(f_type)->getValue<std::string>(f_type) == "map"))
409418
{
410-
auto old_type = getFieldType(old_json, "type", required);
419+
auto old_type = getFieldType(old_json, "type", context_, required);
411420
auto transform = std::make_shared<EvolutionFunctionStruct>(std::vector{type}, std::vector{old_type}, old_json, field);
412421
old_node = &dag->addFunction(transform, std::vector<const Node *>{old_node}, name);
413422

@@ -437,7 +446,7 @@ std::shared_ptr<ActionsDAG> IcebergSchemaProcessor::getSchemaTransformationDag(
437446
}
438447
else if (allowPrimitiveTypeConversion(old_type, new_type))
439448
{
440-
node = &dag->addCast(*old_node, getFieldType(field, f_type, required), name);
449+
node = &dag->addCast(*old_node, getFieldType(field, f_type, context_, required), name);
441450
}
442451
outputs.push_back(node);
443452
}
@@ -463,7 +472,10 @@ std::shared_ptr<ActionsDAG> IcebergSchemaProcessor::getSchemaTransformationDag(
463472
return dag;
464473
}
465474

466-
std::shared_ptr<const ActionsDAG> IcebergSchemaProcessor::getSchemaTransformationDagByIds(Int32 old_id, Int32 new_id)
475+
std::shared_ptr<const ActionsDAG> IcebergSchemaProcessor::getSchemaTransformationDagByIds(
476+
ContextPtr context_,
477+
Int32 old_id,
478+
Int32 new_id)
467479
{
468480
if (old_id == new_id)
469481
return nullptr;
@@ -482,7 +494,7 @@ std::shared_ptr<const ActionsDAG> IcebergSchemaProcessor::getSchemaTransformatio
482494
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with schema-id {} is unknown", new_id);
483495

484496
return transform_dags_by_ids[{old_id, new_id}]
485-
= getSchemaTransformationDag(old_schema_it->second, new_schema_it->second, old_id, new_id);
497+
= getSchemaTransformationDag(old_schema_it->second, new_schema_it->second, context_, old_id, new_id);
486498
}
487499

488500
Poco::JSON::Object::Ptr IcebergSchemaProcessor::getIcebergTableSchemaById(Int32 id) const

src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ class IcebergSchemaProcessor : private WithContext
8484
public:
8585
explicit IcebergSchemaProcessor(ContextPtr context_) : WithContext(context_) {}
8686

87-
void addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr);
87+
void addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr, ContextPtr context_);
8888
std::shared_ptr<NamesAndTypesList> getClickhouseTableSchemaById(Int32 id);
89-
std::shared_ptr<const ActionsDAG> getSchemaTransformationDagByIds(Int32 old_id, Int32 new_id);
89+
std::shared_ptr<const ActionsDAG> getSchemaTransformationDagByIds(ContextPtr context_, Int32 old_id, Int32 new_id);
9090
NameAndTypePair getFieldCharacteristics(Int32 schema_version, Int32 source_id) const;
9191
std::optional<NameAndTypePair> tryGetFieldCharacteristics(Int32 schema_version, Int32 source_id) const;
9292
NamesAndTypesList tryGetFieldsCharacteristics(Int32 schema_id, const std::vector<Int32> & source_ids) const;
@@ -114,10 +114,15 @@ class IcebergSchemaProcessor : private WithContext
114114
std::unordered_map<Int64, Int32> schema_id_by_snapshot TSA_GUARDED_BY(mutex);
115115

116116
NamesAndTypesList getSchemaType(const Poco::JSON::Object::Ptr & schema);
117-
DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type, String & current_full_name, bool is_subfield_of_root);
117+
DataTypePtr getComplexTypeFromObject(
118+
const Poco::JSON::Object::Ptr & type,
119+
String & current_full_name,
120+
ContextPtr context_,
121+
bool is_subfield_of_root);
118122
DataTypePtr getFieldType(
119123
const Poco::JSON::Object::Ptr & field,
120124
const String & type_key,
125+
ContextPtr context_,
121126
bool required,
122127
String & current_full_name = default_link,
123128
bool is_subfield_of_root = false);
@@ -126,7 +131,11 @@ class IcebergSchemaProcessor : private WithContext
126131
const Node * getDefaultNodeForField(const Poco::JSON::Object::Ptr & field);
127132

128133
std::shared_ptr<ActionsDAG> getSchemaTransformationDag(
129-
const Poco::JSON::Object::Ptr & old_schema, const Poco::JSON::Object::Ptr & new_schema, Int32 old_id, Int32 new_id);
134+
const Poco::JSON::Object::Ptr & old_schema,
135+
const Poco::JSON::Object::Ptr & new_schema,
136+
ContextPtr context_,
137+
Int32 old_id,
138+
Int32 new_id);
130139

131140
mutable SharedMutex mutex;
132141
};

0 commit comments

Comments
 (0)