Skip to content

Commit 1829a42

Browse files
committed
all integration work
1 parent 8c7ba08 commit 1829a42

File tree

3 files changed

+10
-8
lines changed

3 files changed

+10
-8
lines changed

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1085,7 +1085,7 @@ void IcebergMetadata::addDeleteTransformers(
10851085
}
10861086
/// Then we read the content of the delete file.
10871087
auto mutable_columns_for_set = block_for_set.cloneEmptyColumns();
1088-
std::unique_ptr<ReadBuffer> data_read_buffer = createReadBuffer(delete_file_object, object_storage, local_context, log);
1088+
std::unique_ptr<ReadBuffer> data_read_buffer = createReadBuffer(delete_file_object, delete_storage_to_use, local_context, log);
10891089
CompressionMethod compression_method = chooseCompressionMethod(delete_file.file_path, "auto");
10901090
auto delete_format = FormatFactory::instance().getInput(
10911091
delete_file.file_format,

src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,15 +167,19 @@ std::optional<DeleteFileWriteResultWithStats> writeDataFiles(
167167
Field cur_value;
168168
col_data_filename.column->get(i, cur_value);
169169

170+
String original_path = cur_value.safeGet<String>();
170171
String path_without_namespace;
171-
if (cur_value.safeGet<String>().starts_with(configuration->getNamespace()))
172-
path_without_namespace = cur_value.safeGet<String>().substr(configuration->getNamespace().size());
173172

174-
if (!path_without_namespace.starts_with(configuration->getPathForRead().path))
173+
if (original_path.starts_with(configuration->getNamespace()))
174+
path_without_namespace = original_path.substr(configuration->getNamespace().size());
175+
else
176+
path_without_namespace = original_path;
177+
178+
if (!path_without_namespace.empty() && !path_without_namespace.starts_with(configuration->getPathForRead().path))
175179
{
176180
if (path_without_namespace.starts_with('/'))
177181
path_without_namespace = path_without_namespace.substr(1);
178-
else
182+
else if (!path_without_namespace.empty())
179183
path_without_namespace = "/" + path_without_namespace;
180184
}
181185
col_data_filename_without_namespaces->insert(path_without_namespace);

src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ void IcebergPositionDeleteTransform::initializeDeleteSources()
6767
{
6868
/// Skip position deletes that do not match the data file path.
6969
if (position_deletes_object.reference_data_file_path.has_value()
70-
&& position_deletes_object.reference_data_file_path != iceberg_data_path)
70+
&& position_deletes_object.reference_data_file_path.value() != iceberg_data_path)
7171
continue;
7272

7373
/// Resolve the position delete file path to get the correct storage and key
@@ -171,10 +171,8 @@ void IcebergBitmapPositionDeleteTransform::initialize()
171171
while (auto delete_chunk = delete_source->read())
172172
{
173173
int position_index = getColumnIndex(delete_source, IcebergPositionDeleteTransform::positions_column_name);
174-
int filename_index = getColumnIndex(delete_source, IcebergPositionDeleteTransform::data_file_path_column_name);
175174

176175
auto position_column = delete_chunk.getColumns()[position_index];
177-
auto filename_column = delete_chunk.getColumns()[filename_index];
178176

179177
for (size_t i = 0; i < delete_chunk.getNumRows(); ++i)
180178
{

0 commit comments

Comments
 (0)