Skip to content

Commit c7e52a0

Browse files
author
xiao.dong
committed
add file metric in writer
1 parent d50a20c commit c7e52a0

File tree

6 files changed

+127
-1
lines changed

6 files changed

+127
-1
lines changed

src/iceberg/avro/avro_writer.cc

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class AvroWriter::Impl {
8585
ICEBERG_RETURN_UNEXPECTED(InitWriteContext());
8686
}
8787
// TODO(xiao.dong) convert data and write to avro
88+
// total_bytes_+= written_bytes;
8889
return {};
8990
}
9091

@@ -97,10 +98,15 @@ class AvroWriter::Impl {
9798
return {};
9899
}
99100

101+
bool Closed() const { return writer_ == nullptr; }
102+
103+
int64_t length() { return total_bytes_; }
104+
100105
private:
101106
Status InitWriteContext() { return {}; }
102107

103108
private:
109+
int64_t total_bytes_ = 0;
104110
// The schema to write.
105111
std::shared_ptr<::iceberg::Schema> write_schema_;
106112
// The avro schema to write.
@@ -120,7 +126,29 @@ Status AvroWriter::Open(const WriterOptions& options) {
120126
return impl_->Open(options);
121127
}
122128

123-
Status AvroWriter::Close() { return impl_->Close(); }
129+
Status AvroWriter::Close() {
130+
if (!impl_->Closed()) {
131+
return impl_->Close();
132+
}
133+
return {};
134+
}
135+
136+
std::shared_ptr<Metrics> AvroWriter::metrics() {
137+
if (impl_->Closed()) {
138+
// TODO(xiao.dong) implement metrics
139+
return std::make_shared<Metrics>();
140+
}
141+
return nullptr;
142+
}
143+
144+
int64_t AvroWriter::length() {
145+
if (impl_->Closed()) {
146+
return impl_->length();
147+
}
148+
return 0;
149+
}
150+
151+
std::vector<int64_t> AvroWriter::splitOffsets() { return {}; }
124152

125153
void AvroWriter::Register() {
126154
static WriterFactoryRegistry avro_writer_register(

src/iceberg/avro/avro_writer.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer {
3737

3838
Status Write(ArrowArray data) final;
3939

40+
std::shared_ptr<Metrics> metrics() final;
41+
42+
int64_t length() final;
43+
44+
std::vector<int64_t> splitOffsets() final;
45+
4046
/// \brief Register this Avro writer implementation.
4147
static void Register();
4248

src/iceberg/file_writer.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
#include "iceberg/arrow_c_data.h"
3030
#include "iceberg/file_format.h"
31+
#include "iceberg/metrics.h"
3132
#include "iceberg/result.h"
3233
#include "iceberg/schema.h"
3334
#include "iceberg/type_fwd.h"
@@ -66,6 +67,19 @@ class ICEBERG_EXPORT Writer {
6667
///
6768
/// \return Status of write results.
6869
virtual Status Write(ArrowArray data) = 0;
70+
71+
/// \brief Get the file statistics.
72+
virtual std::shared_ptr<Metrics> metrics() = 0;
73+
74+
/// \brief Get the file length.
75+
virtual int64_t length() = 0;
76+
77+
/// \brief Get the file length.
78+
/// Returns a list of recommended split locations, if applicable, null otherwise.
79+
/// When available, this information is used for planning scan tasks whose boundaries
80+
/// are determined by these offsets. The returned list must be sorted in ascending order
81+
/// Only valid after the file is closed.
82+
virtual std::vector<int64_t> splitOffsets() = 0;
6983
};
7084

7185
/// \brief Factory function to create a writer of a specific file format.

src/iceberg/manifest_writer.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,16 @@ namespace iceberg {
3535
class ICEBERG_EXPORT ManifestWriter {
3636
public:
3737
virtual ~ManifestWriter() = default;
38+
39+
/// \brief Write manifest entries to file
40+
/// \param entries List of manifest entries to write.
41+
/// \return Status::OK() if all entries were written successfully
3842
virtual Status WriteManifestEntries(
3943
const std::vector<ManifestEntry>& entries) const = 0;
4044

45+
/// \brief Close writer and flush to storage.
46+
virtual void Close() = 0;
47+
4148
/// \brief Creates a writer for a manifest file.
4249
/// \param manifest_location Path to the manifest file.
4350
/// \param file_io File IO implementation to use.
@@ -51,8 +58,15 @@ class ICEBERG_EXPORT ManifestWriter {
5158
class ICEBERG_EXPORT ManifestListWriter {
5259
public:
5360
virtual ~ManifestListWriter() = default;
61+
62+
/// \brief Write manifest file list to mainifest list file.
63+
/// \param files List of manifest files to write.
64+
/// \return Status::OK() if all files were written successfully
5465
virtual Status WriteManifestFiles(const std::vector<ManifestFile>& files) const = 0;
5566

67+
/// \brief Close writer and flush to storage.
68+
virtual void Close() = 0;
69+
5670
/// \brief Creates a writer for the manifest list.
5771
/// \param manifest_list_location Path to the manifest list file.
5872
/// \param file_io File IO implementation to use.

src/iceberg/manifest_writer_internal.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ class ManifestWriterImpl : public ManifestWriter {
3535

3636
Status WriteManifestEntries(const std::vector<ManifestEntry>& entries) const override;
3737

38+
void Close() override {}
39+
3840
private:
3941
std::shared_ptr<Schema> schema_;
4042
std::unique_ptr<Writer> writer_;
@@ -49,6 +51,8 @@ class ManifestListWriterImpl : public ManifestListWriter {
4951

5052
Status WriteManifestFiles(const std::vector<ManifestFile>& files) const override;
5153

54+
void Close() override {}
55+
5256
private:
5357
std::shared_ptr<Schema> schema_;
5458
std::unique_ptr<Writer> writer_;

src/iceberg/metrics.h

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/metrics.h
23+
24+
#include <unordered_map>
25+
26+
#include "iceberg/expression/literal.h"
27+
#include "iceberg/iceberg_export.h"
28+
29+
namespace iceberg {
30+
31+
/// \brief Iceberg file format metrics
32+
class ICEBERG_EXPORT Metrics {
33+
public:
34+
Metrics() = default;
35+
36+
Metrics(int64_t row_count, std::unordered_map<int64_t, int64_t> column_sizes = {},
37+
std::unordered_map<int64_t, int64_t> value_counts = {},
38+
std::unordered_map<int64_t, int64_t> null_value_counts = {},
39+
std::unordered_map<int64_t, int64_t> nan_value_counts = {},
40+
std::unordered_map<int64_t, Literal> lower_bounds = {},
41+
std::unordered_map<int64_t, Literal> upper_bounds = {})
42+
: row_count_(row_count),
43+
column_sizes_(std::move(column_sizes)),
44+
value_counts_(std::move(value_counts)),
45+
null_value_counts_(std::move(null_value_counts)),
46+
nan_value_counts_(std::move(nan_value_counts)),
47+
lower_bounds_(std::move(lower_bounds)),
48+
upper_bounds_(std::move(upper_bounds)) {}
49+
50+
private:
51+
int64_t row_count_ = 0;
52+
std::unordered_map<int64_t, int64_t> column_sizes_;
53+
std::unordered_map<int64_t, int64_t> value_counts_;
54+
std::unordered_map<int64_t, int64_t> null_value_counts_;
55+
std::unordered_map<int64_t, int64_t> nan_value_counts_;
56+
std::unordered_map<int64_t, Literal> lower_bounds_;
57+
std::unordered_map<int64_t, Literal> upper_bounds_;
58+
};
59+
60+
} // namespace iceberg

0 commit comments

Comments
 (0)