Skip to content

Commit 45ba3ca

Browse files
authored
Implement Client::ReadObject() using streaming reads (#938)
Use the streaming read classes to read object media. I removed some tests because they would have become tests for "does iostream works". This fixes #819. * Fix tests in CentOS. The old curl library in CentOS seems to be less forgiving and caught a bug we had. Fixed.
1 parent 7d639be commit 45ba3ca

25 files changed

+232
-394
lines changed

google/cloud/storage/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,6 @@ set(storage_client_unit_tests
258258
list_objects_reader_test.cc
259259
object_access_control_test.cc
260260
object_metadata_test.cc
261-
object_stream_test.cc
262261
object_test.cc
263262
retry_policy_test.cc
264263
storage_client_options_test.cc

google/cloud/storage/client.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class Client {
8181
}
8282

8383
/**
84-
* Fetch the bucket metadata and return it.
84+
* Fetch the bucket metadata and return it.c
8585
*
8686
* @param bucket_name query metadata information about this bucket.
8787
* @param options a list of optional query parameters and/or request headers.
@@ -190,11 +190,12 @@ class Client {
190190
* @snippet storage_object_samples.cc read object
191191
*/
192192
template <typename... Options>
193-
ObjectReadStream Read(std::string const& bucket_name,
194-
std::string const& object_name, Options&&... options) {
193+
ObjectReadStream ReadObject(std::string const& bucket_name,
194+
std::string const& object_name,
195+
Options&&... options) {
195196
internal::ReadObjectRangeRequest request(bucket_name, object_name);
196197
request.set_multiple_options(std::forward<Options>(options)...);
197-
return ObjectReadStream(raw_client_, request);
198+
return ObjectReadStream(raw_client_->ReadObject(request).second);
198199
}
199200

200201
/**

google/cloud/storage/examples/storage_object_samples.cc

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,24 +80,27 @@ void GetObjectMetadata(gcs::Client client, int& argc, char* argv[]) {
8080
(std::move(client), bucket_name, object_name);
8181
}
8282

83-
//! [read object]
8483
void ReadObject(gcs::Client client, int& argc, char* argv[]) {
8584
if (argc < 2) {
86-
throw Usage{
87-
"insert-object <bucket-name> <object-name> <object-contents (string)>"};
85+
throw Usage{"read-object <bucket-name> <object-name>"};
8886
}
8987
auto bucket_name = ConsumeArg(argc, argv);
9088
auto object_name = ConsumeArg(argc, argv);
91-
auto stream = client.Read(bucket_name, object_name);
92-
int count = 0;
93-
while (not stream.eof()) {
94-
std::string line;
95-
std::getline(stream, line, '\n');
96-
++count;
89+
//! [read object]
90+
[](google::cloud::storage::Client client, std::string bucket_name,
91+
std::string object_name) {
92+
auto stream = client.ReadObject(bucket_name, object_name);
93+
int count = 0;
94+
while (not stream.eof()) {
95+
std::string line;
96+
std::getline(stream, line, '\n');
97+
++count;
98+
}
99+
std::cout << "The object has " << count << " lines" << std::endl;
97100
}
98-
std::cout << "The object has " << count << " lines" << std::endl;
101+
//! [read object]
102+
(std::move(client), bucket_name, object_name);
99103
}
100-
//! [read object]
101104

102105
//! [delete object]
103106
void DeleteObject(gcs::Client client, int& argc, char* argv[]) {

google/cloud/storage/internal/curl_client.cc

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -95,31 +95,19 @@ std::pair<Status, ObjectMetadata> CurlClient::GetObjectMetadata(
9595
ObjectMetadata::ParseFromString(payload.payload));
9696
}
9797

98-
std::pair<Status, ReadObjectRangeResponse> CurlClient::ReadObjectRangeMedia(
98+
std::pair<Status, std::unique_ptr<ObjectReadStreambuf>> CurlClient::ReadObject(
9999
ReadObjectRangeRequest const& request) {
100100
// Assume the bucket name is validated by the caller.
101101
CurlRequestBuilder builder(storage_endpoint_ + "/b/" + request.bucket_name() +
102102
"/o/" + request.object_name());
103103
builder.SetDebugLogging(options_.enable_http_tracing());
104104
builder.AddHeader(options_.credentials()->AuthorizationHeader());
105105
builder.AddQueryParameter("alt", "media");
106-
// For the moment, we are using range reads to read the objects (see #727)
107-
// disable decompression because range reads do not work in that case:
108-
// https://cloud.google.com/storage/docs/transcoding#range
109-
// and
110-
// https://cloud.google.com/storage/docs/transcoding#decompressive_transcoding
111-
builder.AddHeader("Cache-Control: no-transform");
112-
builder.AddHeader("Range: bytes=" + std::to_string(request.begin()) + '-' +
113-
std::to_string(request.end()));
114-
auto payload = builder.BuildRequest(std::string{}).MakeRequest();
115-
if (payload.status_code >= 300) {
116-
return std::make_pair(
117-
Status{payload.status_code, std::move(payload.payload)},
118-
internal::ReadObjectRangeResponse{});
119-
}
120-
return std::make_pair(
121-
Status(),
122-
internal::ReadObjectRangeResponse::FromHttpResponse(std::move(payload)));
106+
// TODO(#937) - use client options to configure buffer size.
107+
std::unique_ptr<CurlReadStreambuf> buf(new CurlReadStreambuf(
108+
builder.BuildDownloadRequest(std::string{}), 128 * 1024));
109+
return std::make_pair(Status(),
110+
std::unique_ptr<ObjectReadStreambuf>(std::move(buf)));
123111
}
124112

125113
std::pair<Status, std::unique_ptr<ObjectWriteStreambuf>>
@@ -132,6 +120,7 @@ CurlClient::WriteObject(InsertObjectStreamingRequest const& request) {
132120
builder.AddQueryParameter("uploadType", "media");
133121
builder.AddQueryParameter("name", request.object_name());
134122
builder.AddHeader("Content-Type: application/octet-stream");
123+
// TODO(#937) - use client options to configure buffer size.
135124
std::unique_ptr<internal::CurlStreambuf> buf(
136125
new internal::CurlStreambuf(builder.BuildUpload(), 128 * 1024));
137126
return std::make_pair(

google/cloud/storage/internal/curl_client.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ class CurlClient : public RawClient {
4848
InsertObjectMediaRequest const& request) override;
4949
std::pair<Status, ObjectMetadata> GetObjectMetadata(
5050
GetObjectMetadataRequest const& request) override;
51-
std::pair<Status, ReadObjectRangeResponse> ReadObjectRangeMedia(
52-
ReadObjectRangeRequest const& request) override;
51+
std::pair<Status, std::unique_ptr<ObjectReadStreambuf>> ReadObject(
52+
ReadObjectRangeRequest const&) override;
5353
std::pair<Status, std::unique_ptr<ObjectWriteStreambuf>> WriteObject(
5454
InsertObjectStreamingRequest const&) override;
5555
std::pair<Status, ListObjectsResponse> ListObjects(

google/cloud/storage/internal/curl_download_request.cc

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,29 +30,54 @@ namespace internal {
3030
CurlDownloadRequest::CurlDownloadRequest(std::size_t initial_buffer_size)
3131
: headers_(nullptr, &curl_slist_free_all),
3232
multi_(nullptr, &curl_multi_cleanup),
33+
closing_(false),
3334
curl_closed_(false),
3435
initial_buffer_size_(initial_buffer_size) {
3536
buffer_.reserve(initial_buffer_size);
3637
}
3738

39+
HttpResponse CurlDownloadRequest::Close() {
40+
// Set the the closing_ flag to trigger a return 0 from the next read
41+
// callback, see the comments in the header file for more details.
42+
closing_ = true;
43+
// Block until that callback is made.
44+
Wait([this] { return curl_closed_; });
45+
46+
// Now remove the handle from the CURLM* interface and wait for the response.
47+
auto error = curl_multi_remove_handle(multi_.get(), handle_.handle_.get());
48+
RaiseOnError(__func__, error);
49+
50+
long http_code = handle_.GetResponseCode();
51+
return HttpResponse{http_code, std::string{}, std::move(received_headers_)};
52+
}
53+
3854
HttpResponse CurlDownloadRequest::GetMore(std::string& buffer) {
3955
handle_.FlushDebug(__func__);
40-
GCP_LOG(DEBUG) << __func__ << "(), curl.size=" << buffer_.size();
4156
Wait([this] {
4257
return curl_closed_ or buffer_.size() >= initial_buffer_size_;
4358
});
59+
GCP_LOG(DEBUG) << __func__ << "(), curl.size=" << buffer_.size()
60+
<< ", closing=" << closing_ << ", closed=" << curl_closed_;
4461
if (curl_closed_) {
4562
// Remove the handle from the CURLM* interface and wait for the response.
4663
auto error = curl_multi_remove_handle(multi_.get(), handle_.handle_.get());
4764
RaiseOnError(__func__, error);
4865

49-
buffer.swap(buffer_);
66+
buffer_.swap(buffer);
67+
buffer_.clear();
5068
long http_code = handle_.GetResponseCode();
69+
GCP_LOG(DEBUG) << __func__ << "(), size=" << buffer.size()
70+
<< ", closing=" << closing_ << ", closed=" << curl_closed_
71+
<< ", code=" << http_code;
5172
return HttpResponse{http_code, std::string{}, std::move(received_headers_)};
5273
}
5374
buffer_.swap(buffer);
75+
buffer_.clear();
5476
buffer_.reserve(initial_buffer_size_);
5577
handle_.EasyPause(CURLPAUSE_RECV_CONT);
78+
GCP_LOG(DEBUG) << __func__ << "(), size=" << buffer.size()
79+
<< ", closing=" << closing_ << ", closed=" << curl_closed_
80+
<< ", code=100";
5681
return HttpResponse{100, {}, {}};
5782
}
5883

@@ -91,11 +116,18 @@ std::size_t CurlDownloadRequest::WriteCallback(void* ptr, std::size_t size,
91116
handle_.FlushDebug(__func__);
92117
GCP_LOG(DEBUG) << __func__ << "() size=" << size << ", nmemb=" << nmemb
93118
<< ", buffer.size=" << buffer_.size();
94-
95-
buffer_.append(static_cast<char const*>(ptr), size * nmemb);
96-
if (buffer_.size() > 128 * 1024) {
119+
// This transfer is closing, just return zero, that will make libcurl finish
120+
// any pending work, and will return the handle_ pointer from
121+
// curl_multi_info_read() in PerformWork(). That is the point where
122+
// `curl_closed_` is set.
123+
if (closing_) {
124+
return 0;
125+
}
126+
if (buffer_.size() >= initial_buffer_size_) {
97127
return CURL_READFUNC_PAUSE;
98128
}
129+
130+
buffer_.append(static_cast<char const*>(ptr), size * nmemb);
99131
return size * nmemb;
100132
}
101133

google/cloud/storage/internal/curl_download_request.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class CurlDownloadRequest {
4646
logging_enabled_(rhs.logging_enabled_),
4747
handle_(std::move(rhs.handle_)),
4848
multi_(std::move(rhs.multi_)),
49+
closing_(rhs.closing_),
4950
curl_closed_(rhs.curl_closed_),
5051
initial_buffer_size_(rhs.initial_buffer_size_) {
5152
ResetOptions();
@@ -59,12 +60,16 @@ class CurlDownloadRequest {
5960
logging_enabled_ = rhs.logging_enabled_;
6061
handle_ = std::move(rhs.handle_);
6162
multi_ = std::move(rhs.multi_);
63+
closing_ = rhs.closing_;
6264
curl_closed_ = rhs.curl_closed_;
6365
initial_buffer_size_ = rhs.initial_buffer_size_;
6466
ResetOptions();
6567
return *this;
6668
}
6769

70+
bool IsOpen() const { return not curl_closed_; }
71+
HttpResponse Close();
72+
6873
/**
6974
* Wait for additional data or the end of the transfer.
7075
*
@@ -130,7 +135,16 @@ class CurlDownloadRequest {
130135
CurlMulti multi_;
131136

132137
std::string buffer_;
133-
// The curl_closed_ flag is set if the transfer is done.
138+
// Closing the handle happens in two steps.
139+
// 1. First the application (or higher-level class), calls Close(). This class
140+
// needs to notify libcurl that the transfer is terminated by returning 0
141+
// from the callback.
142+
// 2. Once that callback returns 0, this class needs to know
143+
//
144+
// The closing_ flag is set when we enter step 1.
145+
bool closing_;
146+
// The curl_closed_ flag is set when we enter step 2, or when the transfer
147+
// completes.
134148
bool curl_closed_;
135149

136150
std::size_t initial_buffer_size_;

google/cloud/storage/internal/curl_streambuf.cc

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,49 @@ namespace storage {
2020
inline namespace STORAGE_CLIENT_NS {
2121
namespace internal {
2222

23+
CurlReadStreambuf::CurlReadStreambuf(CurlDownloadRequest&& download,
24+
std::size_t target_buffer_size)
25+
: download_(std::move(download)), target_buffer_size_(target_buffer_size) {
26+
// Start with an empty read area, to force an underflow() on the first
27+
// extraction.
28+
current_ios_buffer_.push_back('\0');
29+
char* data = &current_ios_buffer_[0];
30+
setg(data, data + 1, data + 1);
31+
}
32+
33+
bool CurlReadStreambuf::IsOpen() const { return download_.IsOpen(); }
34+
35+
HttpResponse CurlReadStreambuf::Close() { return download_.Close(); }
36+
37+
CurlReadStreambuf::int_type CurlReadStreambuf::underflow() {
38+
if (not IsOpen()) {
39+
current_ios_buffer_.clear();
40+
current_ios_buffer_.push_back('\0');
41+
char* data = &current_ios_buffer_[0];
42+
setg(data, data + 1, data + 1);
43+
return traits_type::eof();
44+
}
45+
46+
current_ios_buffer_.reserve(target_buffer_size_);
47+
auto response = download_.GetMore(current_ios_buffer_);
48+
if (response.status_code >= 300) {
49+
std::ostringstream os;
50+
os << "CurlDownloadRequest reports error: " << response.status_code
51+
<< ", payload=" << response.payload;
52+
google::cloud::internal::RaiseRuntimeError(os.str());
53+
}
54+
55+
if (not current_ios_buffer_.empty()) {
56+
char* data = &current_ios_buffer_[0];
57+
setg(data, data, data + current_ios_buffer_.size());
58+
return traits_type::to_int_type(*data);
59+
}
60+
current_ios_buffer_.push_back('\0');
61+
char* data = &current_ios_buffer_[0];
62+
setg(data, data + 1, data + 1);
63+
return traits_type::eof();
64+
}
65+
2366
CurlStreambuf::CurlStreambuf(CurlUploadRequest&& upload,
2467
std::size_t max_buffer_size)
2568
: upload_(std::move(upload)), max_buffer_size_(max_buffer_size) {

google/cloud/storage/internal/curl_streambuf.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_CURL_STREAMBUF_H_
1616
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_CURL_STREAMBUF_H_
1717

18+
#include "google/cloud/storage/internal/curl_download_request.h"
1819
#include "google/cloud/storage/internal/curl_upload_request.h"
1920
#include "google/cloud/storage/internal/object_streambuf.h"
2021

@@ -23,6 +24,28 @@ namespace cloud {
2324
namespace storage {
2425
inline namespace STORAGE_CLIENT_NS {
2526
namespace internal {
27+
/**
28+
* Implement a wrapper for libcurl-based streaming downloads.
29+
*/
30+
class CurlReadStreambuf : public ObjectReadStreambuf {
31+
public:
32+
explicit CurlReadStreambuf(CurlDownloadRequest&& download,
33+
std::size_t target_buffer_size);
34+
35+
~CurlReadStreambuf() override = default;
36+
37+
HttpResponse Close() override;
38+
bool IsOpen() const override;
39+
40+
protected:
41+
int_type underflow() override;
42+
43+
private:
44+
CurlDownloadRequest download_;
45+
std::string current_ios_buffer_;
46+
std::size_t target_buffer_size_;
47+
};
48+
2649
/**
2750
* Implement a wrapper for libcurl-based streaming uploads.
2851
*/

google/cloud/storage/internal/logging_client.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,10 @@ std::pair<Status, ObjectMetadata> LoggingClient::GetObjectMetadata(
103103
return MakeCall(*client_, &RawClient::GetObjectMetadata, request, __func__);
104104
}
105105

106-
std::pair<Status, ReadObjectRangeResponse> LoggingClient::ReadObjectRangeMedia(
107-
ReadObjectRangeRequest const& request) {
108-
return MakeCall(*client_, &RawClient::ReadObjectRangeMedia, request,
109-
__func__);
106+
std::pair<Status, std::unique_ptr<ObjectReadStreambuf>>
107+
LoggingClient::ReadObject(ReadObjectRangeRequest const& request) {
108+
return MakeCallNoResponseLogging(*client_, &RawClient::ReadObject, request,
109+
__func__);
110110
}
111111

112112
std::pair<Status, std::unique_ptr<ObjectWriteStreambuf>>

0 commit comments

Comments
 (0)