Skip to content

Commit 89899b4

Browse files
committed
Paste in object append stream from apache#12914
1 parent 7df1cdd commit 89899b4

File tree

1 file changed

+190
-0
lines changed

1 file changed

+190
-0
lines changed

cpp/src/arrow/filesystem/azurefs.cc

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,196 @@ class ObjectInputFile final : public io::RandomAccessFile {
461461
int64_t content_length_ = kNoSize;
462462
std::shared_ptr<const KeyValueMetadata> metadata_;
463463
};
464+
465+
class ObjectAppendStream final : public io::OutputStream {
466+
public:
467+
ObjectAppendStream(
468+
std::shared_ptr<Azure::Storage::Files::DataLake::DataLakePathClient>& path_client,
469+
std::shared_ptr<Azure::Storage::Files::DataLake::DataLakeFileClient>& file_client,
470+
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient>& blob_client,
471+
const bool is_hierarchical_namespace_enabled, const io::IOContext& io_context,
472+
const AzurePath& path, const std::shared_ptr<const KeyValueMetadata>& metadata)
473+
: path_client_(std::move(path_client)),
474+
file_client_(std::move(file_client)),
475+
blob_client_(std::move(blob_client)),
476+
is_hierarchical_namespace_enabled_(is_hierarchical_namespace_enabled),
477+
io_context_(io_context),
478+
path_(path) {}
479+
480+
~ObjectAppendStream() override {
481+
// For compliance with the rest of the IO stack, Close rather than Abort,
482+
// even though it may be more expensive.
483+
io::internal::CloseFromDestructor(this);
484+
}
485+
486+
Status Init() {
487+
closed_ = false;
488+
if (content_length_ != kNoSize) {
489+
DCHECK_GE(content_length_, 0);
490+
return Status::OK();
491+
}
492+
try {
493+
auto properties = path_client_->GetProperties();
494+
if (properties.Value.IsDirectory) {
495+
return ::arrow::fs::internal::NotAFile(path_.full_path);
496+
}
497+
content_length_ = properties.Value.FileSize;
498+
pos_ = content_length_;
499+
} catch (const Azure::Storage::StorageException& exception) {
500+
// new file
501+
if (is_hierarchical_namespace_enabled_) {
502+
try {
503+
file_client_->CreateIfNotExists();
504+
} catch (const Azure::Storage::StorageException& exception) {
505+
return Status::IOError(exception.RawResponse->GetReasonPhrase());
506+
}
507+
} else {
508+
std::string s = "";
509+
try {
510+
file_client_->UploadFrom(
511+
const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(s.data())), s.size());
512+
} catch (const Azure::Storage::StorageException& exception) {
513+
return Status::IOError(exception.RawResponse->GetReasonPhrase());
514+
}
515+
}
516+
content_length_ = 0;
517+
}
518+
return Status::OK();
519+
}
520+
521+
Status Abort() override {
522+
if (closed_) {
523+
return Status::OK();
524+
}
525+
path_client_ = nullptr;
526+
file_client_ = nullptr;
527+
blob_client_ = nullptr;
528+
closed_ = true;
529+
return Status::OK();
530+
}
531+
532+
// OutputStream interface
533+
534+
Status Close() override {
535+
if (closed_) {
536+
return Status::OK();
537+
}
538+
path_client_ = nullptr;
539+
file_client_ = nullptr;
540+
blob_client_ = nullptr;
541+
closed_ = true;
542+
return Status::OK();
543+
}
544+
545+
bool closed() const override { return closed_; }
546+
547+
Result<int64_t> Tell() const override {
548+
if (closed_) {
549+
return Status::Invalid("Operation on closed stream");
550+
}
551+
return pos_;
552+
}
553+
554+
Status Write(const std::shared_ptr<Buffer>& buffer) override {
555+
return DoAppend(buffer->data(), buffer->size(), buffer);
556+
}
557+
558+
Status Write(const void* data, int64_t nbytes) override {
559+
return DoAppend(data, nbytes);
560+
}
561+
562+
Status DoAppend(const void* data, int64_t nbytes,
563+
std::shared_ptr<Buffer> owned_buffer = nullptr) {
564+
if (closed_) {
565+
return Status::Invalid("Operation on closed stream");
566+
}
567+
if (is_hierarchical_namespace_enabled_) {
568+
try {
569+
auto buffer_stream = std::make_unique<Azure::Core::IO::MemoryBodyStream>(
570+
Azure::Core::IO::MemoryBodyStream(
571+
const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(data)), nbytes));
572+
if (buffer_stream->Length() == 0) {
573+
return Status::OK();
574+
}
575+
auto result = file_client_->Append(*buffer_stream, pos_);
576+
pos_ += nbytes;
577+
file_client_->Flush(pos_);
578+
} catch (const Azure::Storage::StorageException& exception) {
579+
return Status::IOError(exception.RawResponse->GetReasonPhrase());
580+
}
581+
} else {
582+
try {
583+
auto append_data = static_cast<uint8_t*>((void*)data);
584+
auto res = blob_client_->GetBlockList().Value;
585+
auto size = res.CommittedBlocks.size();
586+
std::string block_id;
587+
{
588+
block_id = std::to_string(size + 1);
589+
size_t n = 8;
590+
int precision = n - std::min(n, block_id.size());
591+
block_id.insert(0, precision, '0');
592+
}
593+
block_id = Azure::Core::Convert::Base64Encode(
594+
std::vector<uint8_t>(block_id.begin(), block_id.end()));
595+
auto block_content = Azure::Core::IO::MemoryBodyStream(
596+
append_data, strlen(reinterpret_cast<char*>(append_data)));
597+
if (block_content.Length() == 0) {
598+
return Status::OK();
599+
}
600+
blob_client_->StageBlock(block_id, block_content);
601+
std::vector<std::string> block_ids;
602+
for (auto block : res.CommittedBlocks) {
603+
block_ids.push_back(block.Name);
604+
}
605+
block_ids.push_back(block_id);
606+
blob_client_->CommitBlockList(block_ids);
607+
pos_ += nbytes;
608+
} catch (const Azure::Storage::StorageException& exception) {
609+
return Status::IOError(exception.RawResponse->GetReasonPhrase());
610+
}
611+
}
612+
content_length_ += nbytes;
613+
return Status::OK();
614+
}
615+
616+
Status Flush() override {
617+
if (closed_) {
618+
return Status::Invalid("Operation on closed stream");
619+
}
620+
if (is_hierarchical_namespace_enabled_) {
621+
try {
622+
file_client_->Flush(content_length_);
623+
} catch (const Azure::Storage::StorageException& exception) {
624+
return Status::IOError(exception.RawResponse->GetReasonPhrase());
625+
}
626+
} else {
627+
try {
628+
auto res = blob_client_->GetBlockList().Value;
629+
std::vector<std::string> block_ids;
630+
for (auto block : res.UncommittedBlocks) {
631+
block_ids.push_back(block.Name);
632+
}
633+
blob_client_->CommitBlockList(block_ids);
634+
} catch (const Azure::Storage::StorageException& exception) {
635+
return Status::IOError(exception.RawResponse->GetReasonPhrase());
636+
}
637+
}
638+
return Status::OK();
639+
}
640+
641+
protected:
642+
std::shared_ptr<Azure::Storage::Files::DataLake::DataLakePathClient> path_client_;
643+
std::shared_ptr<Azure::Storage::Files::DataLake::DataLakeFileClient> file_client_;
644+
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> blob_client_;
645+
const bool is_hierarchical_namespace_enabled_;
646+
const io::IOContext io_context_;
647+
const AzurePath path_;
648+
649+
bool closed_ = true;
650+
int64_t pos_ = 0;
651+
int64_t content_length_ = kNoSize;
652+
};
653+
464654
} // namespace
465655

466656
// -----------------------------------------------------------------------

0 commit comments

Comments
 (0)