Skip to content

Commit 8a4d7a7

Browse files
Add UnlockedBufferedOutputStream (#75) (#76)
Co-authored-by: Hongze Zhang <hongze.zhang@intel.com>
1 parent 21698d7 commit 8a4d7a7

File tree

2 files changed

+206
-0
lines changed

2 files changed

+206
-0
lines changed

cpp/src/arrow/io/buffered.cc

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,157 @@ Status BufferedOutputStream::Flush() { return impl_->Flush(); }
245245

246246
std::shared_ptr<OutputStream> BufferedOutputStream::raw() const { return impl_->raw(); }
247247

248+
// ----------------------------------------------------------------------
249+
// UnlockedBufferedOutputStream implementation
250+
251+
class UnlockedBufferedOutputStream::Impl : public BufferedBase {
252+
public:
253+
explicit Impl(std::shared_ptr<OutputStream> raw, MemoryPool* pool)
254+
: BufferedBase(pool), raw_(std::move(raw)) {}
255+
256+
Status Close() {
257+
if (is_open_) {
258+
Status st = FlushUnlocked();
259+
is_open_ = false;
260+
RETURN_NOT_OK(raw_->Close());
261+
return st;
262+
}
263+
return Status::OK();
264+
}
265+
266+
Status Abort() {
267+
if (is_open_) {
268+
is_open_ = false;
269+
return raw_->Abort();
270+
}
271+
return Status::OK();
272+
}
273+
274+
Result<int64_t> Tell() const {
275+
if (raw_pos_ == -1) {
276+
ARROW_ASSIGN_OR_RAISE(raw_pos_, raw_->Tell());
277+
DCHECK_GE(raw_pos_, 0);
278+
}
279+
return raw_pos_ + buffer_pos_;
280+
}
281+
282+
Status Write(const void* data, int64_t nbytes) { return DoWrite(data, nbytes); }
283+
284+
Status Write(const std::shared_ptr<Buffer>& buffer) {
285+
return DoWrite(buffer->data(), buffer->size(), buffer);
286+
}
287+
288+
Status DoWrite(const void* data, int64_t nbytes,
289+
const std::shared_ptr<Buffer>& buffer = nullptr) {
290+
if (nbytes < 0) {
291+
return Status::Invalid("write count should be >= 0");
292+
}
293+
if (nbytes == 0) {
294+
return Status::OK();
295+
}
296+
if (nbytes + buffer_pos_ >= buffer_size_) {
297+
RETURN_NOT_OK(FlushUnlocked());
298+
DCHECK_EQ(buffer_pos_, 0);
299+
if (nbytes >= buffer_size_) {
300+
// Invalidate cached raw pos
301+
raw_pos_ = -1;
302+
// Direct write
303+
if (buffer) {
304+
return raw_->Write(buffer);
305+
} else {
306+
return raw_->Write(data, nbytes);
307+
}
308+
}
309+
}
310+
AppendToBuffer(data, nbytes);
311+
return Status::OK();
312+
}
313+
314+
Status FlushUnlocked() {
315+
if (buffer_pos_ > 0) {
316+
// Invalidate cached raw pos
317+
raw_pos_ = -1;
318+
RETURN_NOT_OK(raw_->Write(buffer_data_, buffer_pos_));
319+
buffer_pos_ = 0;
320+
}
321+
return Status::OK();
322+
}
323+
324+
Status Flush() {
325+
return FlushUnlocked();
326+
}
327+
328+
Result<std::shared_ptr<OutputStream>> Detach() {
329+
RETURN_NOT_OK(FlushUnlocked());
330+
is_open_ = false;
331+
return std::move(raw_);
332+
}
333+
334+
Status SetBufferSize(int64_t new_buffer_size) {
335+
if (new_buffer_size <= 0) {
336+
return Status::Invalid("Buffer size should be positive");
337+
}
338+
if (buffer_pos_ >= new_buffer_size) {
339+
// If the buffer is shrinking, first flush to the raw OutputStream
340+
RETURN_NOT_OK(FlushUnlocked());
341+
}
342+
return ResizeBuffer(new_buffer_size);
343+
}
344+
345+
std::shared_ptr<OutputStream> raw() const { return raw_; }
346+
347+
private:
348+
std::shared_ptr<OutputStream> raw_;
349+
};
350+
351+
UnlockedBufferedOutputStream::UnlockedBufferedOutputStream(std::shared_ptr<OutputStream> raw,
352+
MemoryPool* pool) {
353+
impl_.reset(new Impl(std::move(raw), pool));
354+
}
355+
356+
Result<std::shared_ptr<UnlockedBufferedOutputStream>> UnlockedBufferedOutputStream::Create(
357+
int64_t buffer_size, MemoryPool* pool, std::shared_ptr<OutputStream> raw) {
358+
auto result = std::shared_ptr<UnlockedBufferedOutputStream>(
359+
new UnlockedBufferedOutputStream(std::move(raw), pool));
360+
RETURN_NOT_OK(result->SetBufferSize(buffer_size));
361+
return result;
362+
}
363+
364+
UnlockedBufferedOutputStream::~UnlockedBufferedOutputStream() { internal::CloseFromDestructor(this); }
365+
366+
Status UnlockedBufferedOutputStream::SetBufferSize(int64_t new_buffer_size) {
367+
return impl_->SetBufferSize(new_buffer_size);
368+
}
369+
370+
int64_t UnlockedBufferedOutputStream::buffer_size() const { return impl_->buffer_size(); }
371+
372+
int64_t UnlockedBufferedOutputStream::bytes_buffered() const { return impl_->buffer_pos(); }
373+
374+
Result<std::shared_ptr<OutputStream>> UnlockedBufferedOutputStream::Detach() {
375+
return impl_->Detach();
376+
}
377+
378+
Status UnlockedBufferedOutputStream::Close() { return impl_->Close(); }
379+
380+
Status UnlockedBufferedOutputStream::Abort() { return impl_->Abort(); }
381+
382+
bool UnlockedBufferedOutputStream::closed() const { return impl_->closed(); }
383+
384+
Result<int64_t> UnlockedBufferedOutputStream::Tell() const { return impl_->Tell(); }
385+
386+
Status UnlockedBufferedOutputStream::Write(const void* data, int64_t nbytes) {
387+
return impl_->Write(data, nbytes);
388+
}
389+
390+
Status UnlockedBufferedOutputStream::Write(const std::shared_ptr<Buffer>& data) {
391+
return impl_->Write(data);
392+
}
393+
394+
Status UnlockedBufferedOutputStream::Flush() { return impl_->Flush(); }
395+
396+
std::shared_ptr<OutputStream> UnlockedBufferedOutputStream::raw() const { return impl_->raw(); }
397+
398+
248399
// ----------------------------------------------------------------------
249400
// BufferedInputStream implementation
250401

cpp/src/arrow/io/buffered.h

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,61 @@ class ARROW_EXPORT BufferedOutputStream : public OutputStream {
8989
std::unique_ptr<Impl> impl_;
9090
};
9191

92+
93+
class ARROW_EXPORT UnlockedBufferedOutputStream : public OutputStream {
94+
public:
95+
~UnlockedBufferedOutputStream() override;
96+
97+
/// \brief Create a buffered output stream wrapping the given output stream.
98+
/// \param[in] buffer_size the size of the temporary write buffer
99+
/// \param[in] pool a MemoryPool to use for allocations
100+
/// \param[in] raw another OutputStream
101+
/// \return the created BufferedOutputStream
102+
static Result<std::shared_ptr<UnlockedBufferedOutputStream>> Create(
103+
int64_t buffer_size, MemoryPool* pool, std::shared_ptr<OutputStream> raw);
104+
105+
/// \brief Resize internal buffer
106+
/// \param[in] new_buffer_size the new buffer size
107+
/// \return Status
108+
Status SetBufferSize(int64_t new_buffer_size);
109+
110+
/// \brief Return the current size of the internal buffer
111+
int64_t buffer_size() const;
112+
113+
/// \brief Return the number of remaining bytes that have not been flushed to
114+
/// the raw OutputStream
115+
int64_t bytes_buffered() const;
116+
117+
/// \brief Flush any buffered writes and release the raw
118+
/// OutputStream. Further operations on this object are invalid
119+
/// \return the underlying OutputStream
120+
Result<std::shared_ptr<OutputStream>> Detach();
121+
122+
// OutputStream interface
123+
124+
/// \brief Close the buffered output stream. This implicitly closes the
125+
/// underlying raw output stream.
126+
Status Close() override;
127+
Status Abort() override;
128+
bool closed() const override;
129+
130+
Result<int64_t> Tell() const override;
131+
// Write bytes to the stream. Thread-safe
132+
Status Write(const void* data, int64_t nbytes) override;
133+
Status Write(const std::shared_ptr<Buffer>& data) override;
134+
135+
Status Flush() override;
136+
137+
/// \brief Return the underlying raw output stream.
138+
std::shared_ptr<OutputStream> raw() const;
139+
140+
private:
141+
explicit UnlockedBufferedOutputStream(std::shared_ptr<OutputStream> raw, MemoryPool* pool);
142+
143+
class ARROW_NO_EXPORT Impl;
144+
std::unique_ptr<Impl> impl_;
145+
};
146+
92147
/// \class BufferedInputStream
93148
/// \brief An InputStream that performs buffered reads from an unbuffered
94149
/// InputStream, which can mitigate the overhead of many small reads in some

0 commit comments

Comments
 (0)