Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 6 additions & 36 deletions src/CurlUtil.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,54 +46,24 @@ HandlerQueue::HandlerQueue() {
m_write_fd = filedes[1];
};

namespace {

// Simple debug function for getting information from libcurl; to enable, you
// need to recompile with GetHandle(true);
int dump_header(CURL *handle, curl_infotype type, char *data, size_t size,
void *clientp) {
(void)handle;
(void)clientp;

switch (type) {
case CURLINFO_HEADER_OUT:
printf("Header > %s\n", std::string(data, size).c_str());
break;
default:
printf("Info: %s", std::string(data, size).c_str());
break;
CURL *HandlerQueue::GetHandle() {
if (!m_handles.empty()) {
CURL *result = m_handles.top();
m_handles.pop();
return result;
}
return 0;
}

} // namespace

CURL *GetHandle(bool verbose) {
auto result = curl_easy_init();
if (result == nullptr) {
return result;
}

curl_easy_setopt(result, CURLOPT_USERAGENT, "xrootd-s3/devel");
curl_easy_setopt(result, CURLOPT_DEBUGFUNCTION, dump_header);
if (verbose)
curl_easy_setopt(result, CURLOPT_VERBOSE, 1L);

curl_easy_setopt(result, CURLOPT_BUFFERSIZE, 32 * 1024);

return result;
}

CURL *HandlerQueue::GetHandle() {
if (!m_handles.empty()) {
CURL *result = m_handles.top();
m_handles.pop();
return result;
}

return ::GetHandle(false);
}

void HandlerQueue::RecycleHandle(CURL *curl) { m_handles.push(curl); }

void HandlerQueue::Produce(HTTPRequest *handler) {
Expand Down Expand Up @@ -233,7 +203,7 @@ void CurlWorker::Run() {

auto curl = queue.GetHandle();
if (curl == nullptr) {
m_logger.Log(LogMask::Debug, "Run",
m_logger.Log(LogMask::Warning, "Run",
"Unable to allocate a curl handle");
op->Fail("E_NOMEM", "Unable to get allocate a curl handle");
continue;
Expand Down
1 change: 1 addition & 0 deletions src/S3Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ bool AmazonS3SendMultipartPart::GetEtag(std::string &result) {
AmazonS3Download::~AmazonS3Download() {}

bool AmazonS3Download::SendRequest(off_t offset, size_t size) {
m_request_start = std::chrono::steady_clock::now();
if (offset != 0 || size != 0) {
std::string range;
formatstr(range, "bytes=%lld-%lld", static_cast<long long int>(offset),
Expand Down
6 changes: 6 additions & 0 deletions src/S3Commands.hh
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ class AmazonS3Download : public AmazonRequest {

virtual bool SendRequest(off_t offset, size_t size);

// Return the elapsed time since the request was started with SendRequest().
std::chrono::steady_clock::duration getElapsedTime() const {
return std::chrono::steady_clock::now() - m_request_start;
}

protected:
virtual bool IsBlocking() { return true; }
virtual std::string_view *requestResult() override {
Expand All @@ -280,6 +285,7 @@ class AmazonS3Download : public AmazonRequest {

private:
char *m_buffer{nullptr};
std::chrono::steady_clock::time_point m_request_start;
std::string_view m_buffer_view;
};

Expand Down
89 changes: 60 additions & 29 deletions src/S3File.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) {
}

ssize_t S3File::Read(void *buffer, off_t offset, size_t size) {
return m_cache.Read(*this, static_cast<char *>(buffer), offset, size);
return m_cache.Read(static_cast<char *>(buffer), offset, size);
}

int S3File::Fstat(struct stat *buff) {
Expand Down Expand Up @@ -573,8 +573,8 @@ bool S3File::S3Cache::CouldUse(off_t req_off, size_t req_size,
}
}

void S3File::S3Cache::DownloadCaches(S3File &file, bool download_a,
bool download_b, bool locked) {
void S3File::S3Cache::DownloadCaches(bool download_a, bool download_b,
bool locked) {
if (!download_a && !download_b) {
return;
}
Expand All @@ -584,25 +584,25 @@ void S3File::S3Cache::DownloadCaches(S3File &file, bool download_a,
lk.lock();
}
if (download_a) {
m_a.Download(file);
m_a.Download(m_parent);
}
if (download_b) {
m_b.Download(file);
m_b.Download(m_parent);
}
}

ssize_t S3File::S3Cache::Read(S3File &file, char *buffer, off_t offset,
size_t size) {
if (offset >= file.content_length) {
ssize_t S3File::S3Cache::Read(char *buffer, off_t offset, size_t size) {
if (offset >= m_parent.content_length) {
return 0;
}
if (offset + static_cast<off_t>(size) > file.content_length) {
size = file.content_length - offset;
if (offset + static_cast<off_t>(size) > m_parent.content_length) {
size = m_parent.content_length - offset;
}
if (file.m_log.getMsgMask() & LogMask::Debug) {
if (m_parent.m_log.getMsgMask() & LogMask::Debug) {
std::stringstream ss;
ss << "Read request for offset=" << offset << ", size=" << size;
file.m_log.Log(LogMask::Debug, "cache", ss.str().c_str());
ss << "Read request for object=" << m_parent.m_object
<< ", offset=" << offset << ", size=" << size;
m_parent.m_log.Log(LogMask::Debug, "cache", ss.str().c_str());
}

off_t req3_off, req4_off, req5_off, req6_off;
Expand Down Expand Up @@ -637,8 +637,8 @@ ssize_t S3File::S3Cache::Read(S3File &file, char *buffer, off_t offset,
// download those bypassing the cache.
bool downloaded;
size_t bypass_size = req3_size;
std::tie(req3_off, req3_size, downloaded) =
file.DownloadBypass(req3_off, req3_size, buffer + req3_off - offset);
std::tie(req3_off, req3_size, downloaded) = m_parent.DownloadBypass(
req3_off, req3_size, buffer + req3_off - offset);
if (req3_size < 0) {
m_errors += 1;
return -1;
Expand All @@ -648,8 +648,8 @@ ssize_t S3File::S3Cache::Read(S3File &file, char *buffer, off_t offset,
m_bypass_count += 1;
}
bypass_size = req4_size;
std::tie(req4_off, req4_size, downloaded) =
file.DownloadBypass(req4_off, req4_size, buffer + req4_off - offset);
std::tie(req4_off, req4_size, downloaded) = m_parent.DownloadBypass(
req4_off, req4_size, buffer + req4_off - offset);
if (req4_size < 0) {
m_errors += 1;
return -1;
Expand All @@ -659,8 +659,8 @@ ssize_t S3File::S3Cache::Read(S3File &file, char *buffer, off_t offset,
m_bypass_count += 1;
}
bypass_size = req5_size;
std::tie(req5_off, req5_size, downloaded) =
file.DownloadBypass(req5_off, req5_size, buffer + req5_off - offset);
std::tie(req5_off, req5_size, downloaded) = m_parent.DownloadBypass(
req5_off, req5_size, buffer + req5_off - offset);
if (req5_size < 0) {
m_errors += 1;
return -1;
Expand All @@ -670,8 +670,8 @@ ssize_t S3File::S3Cache::Read(S3File &file, char *buffer, off_t offset,
m_bypass_count += 1;
}
bypass_size = req6_size;
std::tie(req6_off, req6_size, downloaded) =
file.DownloadBypass(req6_off, req6_size, buffer + req6_off - offset);
std::tie(req6_off, req6_size, downloaded) = m_parent.DownloadBypass(
req6_off, req6_size, buffer + req6_off - offset);
if (req6_size < 0) {
m_errors += 1;
return -1;
Expand All @@ -690,7 +690,7 @@ ssize_t S3File::S3Cache::Read(S3File &file, char *buffer, off_t offset,
std::unique_lock lk{m_mutex};
auto next_offset = std::max(m_a.m_off, m_b.m_off) +
static_cast<off_t>(m_cache_entry_size);
if (next_offset < file.content_length) {
if (next_offset < m_parent.content_length) {
if (!m_a.m_inprogress && m_a.m_used >= m_cache_entry_size) {
m_a.m_inprogress = true;
m_a.m_off = next_offset;
Expand All @@ -712,7 +712,7 @@ ssize_t S3File::S3Cache::Read(S3File &file, char *buffer, off_t offset,
m_prefetch_count++;
m_prefetch_bytes += m_cache_entry_size;
}
DownloadCaches(file, download_a, download_b, false);
DownloadCaches(download_a, download_b, false);
return size;
}
// At this point, the only remaining data requests must be less than the
Expand Down Expand Up @@ -914,7 +914,7 @@ ssize_t S3File::S3Cache::Read(S3File &file, char *buffer, off_t offset,
;
download_a = true;
m_a.m_inprogress = true;
if (prefetch_offset < file.content_length) {
if (prefetch_offset < m_parent.content_length) {
m_b.m_off = prefetch_offset;
prefetch_b = true;
m_b.m_inprogress = true;
Expand Down Expand Up @@ -956,7 +956,7 @@ ssize_t S3File::S3Cache::Read(S3File &file, char *buffer, off_t offset,
m_prefetch_count += 1;
m_prefetch_bytes += m_cache_entry_size;
}
DownloadCaches(file, download_a, download_b || prefetch_b, true);
DownloadCaches(download_a, download_b || prefetch_b, true);
}
return size;
}
Expand All @@ -965,28 +965,59 @@ void S3File::S3Cache::Entry::Notify() {
std::unique_lock lk(m_parent.m_mutex);
m_inprogress = false;
m_failed = !m_request->getErrorCode().empty();
if ((m_parent.m_parent.m_log.getMsgMask() & LogMask::Warning) && m_failed) {
std::stringstream ss;
auto duration_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
m_request->getElapsedTime())
.count();
ss << "Finished GET for object=" << m_parent.m_parent.m_object
<< ", offset=" << m_off << ", size=" << m_data.size()
<< ", duration_ms=" << duration_ms << "; failed with error '"
<< m_request->getErrorCode() << "'";
m_parent.m_parent.m_log.Log(LogMask::Warning, "cache",
ss.str().c_str());
} else if (m_parent.m_parent.m_log.getMsgMask() & LogMask::Debug) {
std::stringstream ss;
auto duration_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
m_request->getElapsedTime())
.count();
ss << "Finished GET for object=" << m_parent.m_parent.m_object
<< ", offset=" << m_off << ", size=" << m_data.size()
<< ", duration_ms=" << duration_ms << "; succeeded";
m_parent.m_parent.m_log.Log(LogMask::Debug, "cache", ss.str().c_str());
}
m_request = nullptr;

m_parent.m_cv.notify_all();
}

void S3File::S3Cache::Entry::Download(S3File &file) {
m_used = false;
m_data.resize(m_cache_entry_size);
m_request.reset(new AmazonS3NonblockingDownload<Entry>(
file.m_ai, file.m_object, file.m_log, m_data.data(), *this));
size_t request_size = m_cache_entry_size;
if (m_off + static_cast<off_t>(request_size) > file.content_length) {
request_size = file.content_length - m_off;
}
m_data.resize(request_size);
m_request.reset(new AmazonS3NonblockingDownload<Entry>(
file.m_ai, file.m_object, file.m_log, m_data.data(), *this));
// This function is always called with m_mutex held; however,
// SendRequest can block if the threads are all busy; the threads
// will need to grab the lock to notify of completion. So, we
// must release the lock here before calling a blocking function --
// otherwise deadlock may occur.
auto off = m_off;
m_parent.m_mutex.unlock();
if (!m_request->SendRequest(off, m_cache_entry_size)) {

if (file.m_log.getMsgMask() & LogMask::Debug) {
std::stringstream ss;
ss << "Issuing GET for object=" << file.m_object << ", offset=" << m_off
<< ", size=" << request_size;
file.m_log.Log(LogMask::Debug, "cache", ss.str().c_str());
}

if (!m_request->SendRequest(off, request_size)) {
m_parent.m_mutex.lock();
std::stringstream ss;
ss << "Failed to send GetObject command: "
Expand Down
12 changes: 8 additions & 4 deletions src/S3File.hh
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ class S3File : public XrdOssDF {
std::condition_variable m_cv; // Condition variable for notifying that
// new downloaded data is available.

S3File
&m_parent; // Reference to the S3File object that owns this cache.

// Returns `true` if the request offset would be inside the cache entry.
// The request offset is assumed to be aligned to be inside a single
// cache entry (that is, smaller than a cache entry and not spanning two
Expand All @@ -286,15 +289,16 @@ class S3File : public XrdOssDF {
// Trigger the non-blocking download into the cache entries.
// The condition variable will be notified when one of the caches
// finishes.
void DownloadCaches(S3File &file, bool download_a, bool download_b,
bool locked);
void DownloadCaches(bool download_a, bool download_b, bool locked);

// Trigger a blocking read from a given file
ssize_t Read(S3File &file, char *buffer, off_t offset, size_t size);
ssize_t Read(char *buffer, off_t offset, size_t size);

S3Cache(S3File &file) : m_parent(file) {}

// Shutdown the cache; ensure all reads are completed before
// deleting the objects.
~S3Cache();
};
S3Cache m_cache;
S3Cache m_cache{*this};
};
15 changes: 9 additions & 6 deletions src/logging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,21 @@ bool XrdHTTPServer::ConfigLog(XrdOucGatherConf &conf, XrdSysError &log) {
} else if (!strcmp(val, "error")) {
log.setMsgMask(log.getMsgMask() | LogMask::Error);
} else if (!strcmp(val, "warning")) {
log.setMsgMask(log.getMsgMask() | LogMask::Warning);
log.setMsgMask(log.getMsgMask() | LogMask::Warning |
LogMask::Error);
} else if (!strcmp(val, "info")) {
log.setMsgMask(log.getMsgMask() | LogMask::Info);
log.setMsgMask(log.getMsgMask() | LogMask::Info | LogMask::Warning |
LogMask::Error);
} else if (!strcmp(val, "dump")) {
log.setMsgMask(log.getMsgMask() | LogMask::Dump);
log.setMsgMask(log.getMsgMask() | LogMask::Dump | LogMask::Debug |
LogMask::Info | LogMask::Warning | LogMask::Error);
} else if (!strcmp(val, "debug")) {
log.setMsgMask(log.getMsgMask() | LogMask::Debug);
log.setMsgMask(log.getMsgMask() | LogMask::Debug | LogMask::Info |
LogMask::Warning | LogMask::Error);
} else if (!strcmp(val, "none")) {
log.setMsgMask(0);
} else {
log.Emsg("Config",
"scitokens.trace encountered an unknown directive:", val);
log.Emsg("Config", "trace encountered an unknown directive:", val);
return false;
}
} while ((val = conf.GetToken()));
Expand Down
Loading