Skip to content

Commit 2dfa617

Browse files
author
xiao.dong
committed
use gread instead of streaming api
1 parent b391f75 commit 2dfa617

File tree

1 file changed

+23
-51
lines changed

1 file changed

+23
-51
lines changed

src/iceberg/table_metadata.cc

Lines changed: 23 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -154,68 +154,40 @@ Result<MetadataFileCodecType> TableMetadataUtil::CodecFromFileName(
154154
return MetadataFileCodecType::kNone;
155155
}
156156

157-
class GZipDecompressor {
158-
public:
159-
GZipDecompressor() : initialized_(false) {}
160-
161-
~GZipDecompressor() {
162-
if (initialized_) {
163-
inflateEnd(&stream_);
164-
}
157+
Result<std::string> DecompressGZIPFile(const std::string& filepath) {
158+
gzFile file = gzopen(filepath.c_str(), "rb");
159+
if (!file) {
160+
return IOError("Failed to open gzip file:{} ", filepath);
165161
}
166162

167-
Status Init() {
168-
int ret = inflateInit2(&stream_, 15 + 32);
169-
if (ret != Z_OK) {
170-
return IOError("inflateInit2 failed, result:{}", ret);
171-
}
172-
initialized_ = true;
173-
return {};
163+
const int CHUNK_SIZE = 32768; // 32KB chunks
164+
char buffer[CHUNK_SIZE];
165+
std::string result;
166+
int bytes_read;
167+
168+
while ((bytes_read = gzread(file, buffer, CHUNK_SIZE)) > 0) {
169+
result.append(buffer, bytes_read);
174170
}
175171

176-
Result<std::string> Decompress(const std::string& compressed_data) {
177-
if (compressed_data.empty()) {
178-
return {};
179-
}
180-
if (!initialized_) {
181-
ICEBERG_RETURN_UNEXPECTED(Init());
182-
}
183-
stream_.avail_in = static_cast<uInt>(compressed_data.size());
184-
stream_.next_in = reinterpret_cast<Bytef*>(const_cast<char*>(compressed_data.data()));
185-
186-
// TODO(xiao.dong) magic buffer 16k, can we get a estimated size from compressed data?
187-
std::vector<char> outBuffer(32 * 1024);
188-
std::string result;
189-
int ret = 0;
190-
do {
191-
outBuffer.resize(outBuffer.size());
192-
stream_.avail_out = static_cast<uInt>(outBuffer.size());
193-
stream_.next_out = reinterpret_cast<Bytef*>(outBuffer.data());
194-
ret = inflate(&stream_, Z_NO_FLUSH);
195-
if (ret != Z_OK && ret != Z_STREAM_END) {
196-
return IOError("inflate failed, result:{}", ret);
197-
}
198-
result.append(outBuffer.data(), outBuffer.size() - stream_.avail_out);
199-
} while (ret != Z_STREAM_END);
200-
return result;
172+
int err;
173+
const char* error_msg = gzerror(file, &err);
174+
if (err != Z_OK) {
175+
gzclose(file);
176+
return IOError("Error during gzip decompression:{} ", std::string(error_msg));
201177
}
202178

203-
private:
204-
bool initialized_ = false;
205-
z_stream stream_;
206-
};
179+
gzclose(file);
180+
return result;
181+
}
207182

208183
Result<std::unique_ptr<TableMetadata>> TableMetadataUtil::Read(
209184
FileIO& io, const std::string& location, std::optional<size_t> length) {
210185
ICEBERG_ASSIGN_OR_RAISE(auto codec_type, CodecFromFileName(location));
211-
212-
ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length));
186+
std::string content;
213187
if (codec_type == MetadataFileCodecType::kGzip) {
214-
auto gzip_decompressor = std::make_unique<GZipDecompressor>();
215-
ICEBERG_RETURN_UNEXPECTED(gzip_decompressor->Init());
216-
auto result = gzip_decompressor->Decompress(content);
217-
ICEBERG_RETURN_UNEXPECTED(result);
218-
content = result.value();
188+
ICEBERG_ASSIGN_OR_RAISE(content, DecompressGZIPFile(location));
189+
} else {
190+
ICEBERG_ASSIGN_OR_RAISE(content, io.ReadFile(location, length));
219191
}
220192

221193
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(content));

0 commit comments

Comments
 (0)