diff --git a/.gitignore b/.gitignore index 567609b..6f31401 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ build/ +.vscode/ diff --git a/src/HTTPCommands.cc b/src/HTTPCommands.cc index 070286a..0379bf5 100644 --- a/src/HTTPCommands.cc +++ b/src/HTTPCommands.cc @@ -1,6 +1,6 @@ /*************************************************************** * - * Copyright (C) 2024, Pelican Project, Morgridge Institute for Research + * Copyright (C) 2025, Pelican Project, Morgridge Institute for Research * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You may @@ -277,6 +277,15 @@ size_t HTTPRequest::ReadCallback(char *buffer, size_t size, size_t n, void *v) { return CURL_READFUNC_ABORT; } + if (payload->m_parent.m_log.getMsgMask() & LogMask::Dump) { + payload->m_parent.m_log.Log( + LogMask::Dump, "ReadCallback", + ("sentSoFar=" + std::to_string(payload->sentSoFar) + + " data.size=" + std::to_string(payload->data.size()) + + " final=" + std::to_string(payload->final)) + .c_str()); + } + if (payload->sentSoFar == static_cast(payload->data.size())) { payload->sentSoFar = 0; if (payload->final) { @@ -778,19 +787,30 @@ void HTTPRequest::ProcessCurlResult(CURL *curl, CURLcode rv) { HTTPUpload::~HTTPUpload() {} -bool HTTPUpload::SendRequest(const std::string &payload, off_t offset, - size_t size) { - if (offset != 0 || size != 0) { - std::string range; - formatstr(range, "bytes=%lld-%lld", static_cast(offset), - static_cast(offset + size - 1)); - headers["Range"] = range.c_str(); - } - +bool HTTPUpload::SendRequest(const std::string &payload) { httpVerb = "PUT"; + expectedResponseCode = 201; return SendHTTPRequest(payload); } +bool HTTPUpload::StartStreamingRequest(const std::string_view payload, + off_t object_size) { + httpVerb = "PUT"; + expectedResponseCode = 201; + headers["Content-Type"] = "binary/octet-stream"; + return sendPreparedRequest(hostUrl, payload, object_size, false); +} + +bool HTTPUpload::ContinueStreamingRequest(const std::string_view payload, + off_t object_size, bool final) { + // Note that despite the fact that final gets passed through here, + // in reality the way that curl determines whether the data transfer is + // done is by seeing if the total amount of data sent is equal to the + // expected size of the entire payload, stored in m_object_size. + // See HTTPRequest::ReadCallback for more info + return sendPreparedRequest(hostUrl, payload, object_size, final); +} + void HTTPRequest::Init(XrdSysError &log) { if (!m_workers_initialized) { for (unsigned idx = 0; idx < CurlWorker::GetPollThreads(); idx++) { @@ -837,3 +857,46 @@ bool HTTPHead::SendRequest() { } // --------------------------------------------------------------------------- + +int HTTPRequest::HandleHTTPError(const HTTPRequest &request, XrdSysError &log, + const char *operation, const char *context) { + auto httpCode = request.getResponseCode(); + if (httpCode) { + std::stringstream ss; + ss << operation << " failed: " << request.getResponseCode() << ": " + << request.getResultString(); + if (context) { + ss << " (context: " << context << ")"; + } + log.Log(LogMask::Warning, "HTTPRequest::HandleHTTPError", + ss.str().c_str()); + + switch (httpCode) { + case 404: + return -ENOENT; + case 500: + return -EIO; + case 403: + return -EPERM; + case 401: + return -EACCES; + case 400: + return -EINVAL; + case 503: + return -EAGAIN; + default: + return -EIO; + } + } else { + std::stringstream ss; + ss << "Failed to send " << operation + << " command: " << request.getErrorCode() << ": " + << request.getErrorMessage(); + if (context) { + ss << " (context: " << context << ")"; + } + log.Log(LogMask::Warning, "HTTPRequest::HandleHTTPError", + ss.str().c_str()); + return -EIO; + } +} diff --git a/src/HTTPCommands.hh b/src/HTTPCommands.hh index 7294702..ac74a0a 100644 --- a/src/HTTPCommands.hh +++ b/src/HTTPCommands.hh @@ -1,6 +1,6 @@ /*************************************************************** * - * Copyright (C) 2024, Pelican Project, Morgridge Institute for Research + * Copyright (C) 2025, Pelican Project, Morgridge Institute for Research * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You may @@ -95,6 +95,19 @@ class HTTPRequest { return m_timeout_duration; } + // Handle HTTP request errors and convert them to appropriate POSIX error codes. + // This function can be used anywhere HTTP requests are made to provide + // consistent error handling. + // + // - request: The HTTPRequest object that was used for the request + // - log: The logger instance for error reporting + // - operation: A string describing the operation being performed (for logging) + // - context: Additional context information (for logging) + // + // Returns: A POSIX error code (-ENOENT, -EIO, -EPERM, etc.) or 0 if no error + static int HandleHTTPError(const HTTPRequest &request, XrdSysError &log, + const char *operation, const char *context = nullptr); + protected: // Send the request to the HTTP server. // Blocks until the request has completed. @@ -295,8 +308,23 @@ class HTTPUpload : public HTTPRequest { virtual ~HTTPUpload(); - virtual bool SendRequest(const std::string &payload, off_t offset, - size_t size); + virtual bool SendRequest(const std::string &payload); + + // Start a streaming request. + // + // - payload: The payload contents when uploading. + // - object_size: Size of the entire upload payload. + bool StartStreamingRequest(const std::string_view payload, + off_t object_size); + + // Continue a streaming request. + // + // - payload: The payload contents when uploading. + // - object_size: Size of the entire upload payload. + // - final: True if this is the last or only payload for the request. False + // otherwise. + bool ContinueStreamingRequest(const std::string_view payload, + off_t object_size, bool final); protected: std::string object; diff --git a/src/HTTPFile.cc b/src/HTTPFile.cc index dff96ee..42a9091 100644 --- a/src/HTTPFile.cc +++ b/src/HTTPFile.cc @@ -1,6 +1,6 @@ /*************************************************************** * - * Copyright (C) 2024, Pelican Project, Morgridge Institute for Research + * Copyright (C) 2025, Pelican Project, Morgridge Institute for Research * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You may @@ -30,8 +30,8 @@ #include #include +#include #include -#include #include #include #include @@ -93,6 +93,48 @@ int parse_path(const std::string &storagePrefixStr, const char *pathStr, } int HTTPFile::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) { + if (m_is_open) { + m_log.Log(LogMask::Warning, "HTTPFile::Open", + "File already open:", path); + return -EBADF; + } + if (Oflag & O_CREAT) { + m_log.Log(LogMask::Info, "HTTPFile::Open", + "File opened for creation:", path); + } + if (Oflag & O_APPEND) { + m_log.Log(LogMask::Info, "HTTPFile::Open", + "File opened for append:", path); + } + if (Oflag & (O_RDWR | O_WRONLY)) { + m_write = true; + m_log.Log(LogMask::Debug, "HTTPFile::Open", + "File opened for writing:", path); + m_write_lk.reset(new std::mutex); + } + // get the expected file size; only relevant for O_RDWR | O_WRONLY + char *asize_char; + if ((asize_char = env.Get("oss.asize"))) { + off_t result{0}; + auto [ptr, ec] = std::from_chars( + asize_char, asize_char + strlen(asize_char), result); + if (ec == std::errc() && ptr == asize_char + strlen(asize_char)) { + if (result < 0) { + m_log.Log(LogMask::Warning, "HTTPFile::Open", + "Opened file has oss.asize set to a negative value:", + asize_char); + return -EIO; + } + m_object_size = result; + } else { + std::stringstream ss; + ss << "Opened file has oss.asize set to an unparseable value: " + << asize_char; + m_log.Log(LogMask::Warning, "HTTPFile::Open", ss.str().c_str()); + return -EIO; + } + } + auto configured_hostname = m_oss->getHTTPHostName(); auto configured_hostUrl = m_oss->getHTTPHostUrl(); const auto &configured_url_base = m_oss->getHTTPUrlBase(); @@ -117,13 +159,21 @@ int HTTPFile::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) { if (!Oflag) { struct stat buf; - return Fstat(&buf); + auto rv = Fstat(&buf); + if (rv < 0) { + return rv; + } } + m_is_open = true; return 0; } ssize_t HTTPFile::Read(void *buffer, off_t offset, size_t size) { + if (!m_is_open) { + m_log.Log(LogMask::Warning, "HTTPFile::Read", "File not open"); + return -EBADF; + } HTTPDownload download(m_hostUrl, m_object, m_log, m_oss->getToken()); m_log.Log( LogMask::Debug, "HTTPFile::Read", @@ -131,11 +181,8 @@ ssize_t HTTPFile::Read(void *buffer, off_t offset, size_t size) { m_hostname.c_str(), m_object.c_str()); if (!download.SendRequest(offset, size)) { - std::stringstream ss; - ss << "Failed to send GetObject command: " << download.getResponseCode() - << "'" << download.getResultString() << "'"; - m_log.Log(LogMask::Warning, "HTTPFile::Read", ss.str().c_str()); - return 0; + return HTTPRequest::HandleHTTPError(download, m_log, "GET", + m_object.c_str()); } const std::string &bytes = download.getResultString(); @@ -170,29 +217,8 @@ int HTTPFile::Fstat(struct stat *buff) { // than code 200. If xrootd wants us to distinguish between // these cases, head.getResponseCode() is initialized to 0, so // we can check. - auto httpCode = head.getResponseCode(); - if (httpCode) { - std::stringstream ss; - ss << "HEAD command failed: " << head.getResponseCode() << ": " - << head.getResultString(); - m_log.Log(LogMask::Warning, "HTTPFile::Fstat", ss.str().c_str()); - switch (httpCode) { - case 404: - return -ENOENT; - case 500: - return -EIO; - case 403: - return -EPERM; - default: - return -EIO; - } - } else { - std::stringstream ss; - ss << "Failed to send HEAD command: " << head.getErrorCode() << ": " - << head.getErrorMessage(); - m_log.Log(LogMask::Warning, "HTTPFile::Fstat", ss.str().c_str()); - return -EIO; - } + return HTTPRequest::HandleHTTPError(head, m_log, "HEAD", + m_object.c_str()); } std::string headers = head.getResultString(); @@ -251,20 +277,111 @@ int HTTPFile::Fstat(struct stat *buff) { } ssize_t HTTPFile::Write(const void *buffer, off_t offset, size_t size) { - HTTPUpload upload(m_hostUrl, m_object, m_log, m_oss->getToken()); + if (!m_is_open) { + m_log.Log(LogMask::Warning, "HTTPFile::Write", "File not open"); + return -EBADF; + } + + if (!m_write_lk) { + return -EBADF; + } + std::lock_guard lk(*m_write_lk); + + // Small object optimization as in S3File::Write() + if (!m_write_offset && m_object_size == static_cast(size)) { + HTTPUpload upload(m_hostUrl, m_object, m_log, m_oss->getToken()); + std::string payload((char *)buffer, size); + if (!upload.SendRequest(payload)) { + return HTTPRequest::HandleHTTPError(upload, m_log, "PUT", + m_object.c_str()); + } else { + m_write_offset += size; + m_log.Log(LogMask::Debug, "HTTPFile::Write", + "Creation of small object succeeded", + std::to_string(size).c_str()); + return size; + } + } + // If we don't have an in-progress upload, start one + if (!m_write_op) { + if (offset != 0) { + m_log.Log(LogMask::Error, "HTTPFile::Write", + "Out-of-order write detected; HTTP " + "requires writes to be in order"); + m_write_offset = -1; + return -EIO; + } + m_write_op.reset( + new HTTPUpload(m_hostUrl, m_object, m_log, m_oss->getToken())); + std::string payload((char *)buffer, size); + if (!m_write_op->StartStreamingRequest(payload, m_object_size)) { + return HTTPRequest::HandleHTTPError( + *m_write_op, m_log, "PUT streaming start", m_object.c_str()); + } else { + m_write_offset += size; + m_log.Log(LogMask::Debug, "HTTPFile::Write", + "First write request succeeded", + std::to_string(size).c_str()); + return size; + } + } + // Validate continuing writing at offset + if (offset != static_cast(m_write_offset)) { + std::stringstream ss; + ss << "Requested write offset at " << offset + << " does not match current file descriptor offset at " + << m_write_offset; + m_log.Log(LogMask::Warning, "HTTPFile::Write", ss.str().c_str()); + return -EIO; + } + // Continue the write std::string payload((char *)buffer, size); - if (!upload.SendRequest(payload, offset, size)) { - m_log.Emsg("Open", "upload.SendRequest() failed"); - return -ENOENT; + if (!m_write_op->ContinueStreamingRequest(payload, m_object_size, false)) { + return HTTPRequest::HandleHTTPError( + *m_write_op, m_log, "PUT streaming continue", m_object.c_str()); } else { - m_log.Emsg("Open", "upload.SendRequest() succeeded"); - return 0; + m_write_offset += size; + m_log.Log(LogMask::Debug, "HTTPFile::Write", + "Continued request succeeded", std::to_string(size).c_str()); } + return size; } int HTTPFile::Close(long long *retsz) { - m_log.Emsg("Close", "Closed our HTTP file"); + if (!m_is_open) { + m_log.Log(LogMask::Error, "HTTPFile::Close", + "Cannot close. URL isn't open"); + return -EBADF; + } + m_is_open = false; + // If we opened the object in write mode but did not actually write + // anything, make a quick zero-length file. + if (m_write && !m_write_offset) { + HTTPUpload upload(m_hostUrl, m_object, m_log, m_oss->getToken()); + if (!upload.SendRequest("")) { + return HTTPRequest::HandleHTTPError( + upload, m_log, "PUT zero-length", m_object.c_str()); + } else { + m_log.Log(LogMask::Debug, "HTTPFile::Close", + "Creation of zero-length succeeded"); + return 0; + } + } + + if (m_write && m_object_size == -1) { + // if we didn't get a size, we need to explicitly close the upload + if (!m_write_op->ContinueStreamingRequest("", 0, true)) { + return HTTPRequest::HandleHTTPError( + *m_write_op, m_log, "PUT streaming close", m_object.c_str()); + } else { + m_log.Log(LogMask::Debug, "HTTPFile::Write", + "PUT streaming close succeeded"); + } + } + + m_log.Log(LogMask::Debug, "HTTPFile::Close", + "Closed HTTP file:", m_object.c_str()); return 0; } diff --git a/src/HTTPFile.hh b/src/HTTPFile.hh index 68642e4..28b010c 100644 --- a/src/HTTPFile.hh +++ b/src/HTTPFile.hh @@ -18,6 +18,7 @@ #pragma once +#include "HTTPCommands.hh" #include "HTTPFileSystem.hh" #include "XrdOss/XrdOss.hh" #include "XrdOuc/XrdOucEnv.hh" @@ -25,7 +26,9 @@ #include "XrdSec/XrdSecEntityAttr.hh" #include "XrdVersion.hh" +#include #include +#include int parse_path(const std::string &hostname, const char *path, std::string &object); @@ -103,6 +106,17 @@ class HTTPFile : public XrdOssDF { std::string m_hostname; std::string m_hostUrl; std::string m_object; + // Whether the file was opened in write mode + bool m_write{false}; + // Whether the file is open + bool m_is_open{false}; + // Expected size of the completed object; -1 if unknown. + off_t m_object_size{-1}; + off_t m_write_offset{0}; + std::unique_ptr m_write_lk; + // The in-progress operation for a multi-part upload; its lifetime may be + // spread across multiple write calls. + std::unique_ptr m_write_op; size_t content_length; time_t last_modified; diff --git a/test/http_tests.cc b/test/http_tests.cc index 4429f53..bd8748f 100644 --- a/test/http_tests.cc +++ b/test/http_tests.cc @@ -24,10 +24,13 @@ #include #include +#include #include #include #include +#include #include +#include std::string g_ca_file; std::string g_config_file; @@ -60,16 +63,15 @@ void parseEnvFile(const std::string &fname) { TEST(TestHTTPFile, TestXfer) { XrdSysLogger log; - HTTPFileSystem fs(&log, g_config_file.c_str(), nullptr); struct stat si; - auto rc = fs.Stat("/hello_world.txt", &si); + XrdOucEnv env; + auto rc = fs.Stat("/hello_world.txt", &si, 0, &env); ASSERT_EQ(rc, 0); ASSERT_EQ(si.st_size, 13); - auto fh = fs.newFile(); - XrdOucEnv env; + std::unique_ptr fh(fs.newFile()); rc = fh->Open("/hello_world.txt", O_RDONLY, 0700, env); ASSERT_EQ(rc, 0); @@ -82,10 +84,125 @@ TEST(TestHTTPFile, TestXfer) { ASSERT_EQ(fh->Close(), 0); } +TEST(TestHTTPFile, TestWriteZeroByteFile) { + XrdSysLogger log; + HTTPFileSystem fs(&log, g_config_file.c_str(), nullptr); + + XrdOucEnv env; + std::unique_ptr fh(fs.newFile()); + // Create a 0-byte file + auto rc = + fh->Open("/empty_file.txt", O_WRONLY | O_CREAT | O_TRUNC, 0644, env); + ASSERT_EQ(rc, 0); + + // Close the file immediately (0 bytes written) + ASSERT_EQ(fh->Close(), 0); + + // Verify the file exists and has 0 size + struct stat si; + rc = fs.Stat("/empty_file.txt", &si, 0, &env); + ASSERT_EQ(rc, 0); + ASSERT_EQ(si.st_size, 0); +} + +TEST(TestHTTPFile, TestWriteSmallFile) { + XrdSysLogger log; + HTTPFileSystem fs(&log, g_config_file.c_str(), nullptr); + + XrdOucEnv env; + std::unique_ptr fh(fs.newFile()); + auto rc = + fh->Open("/test_write.txt", O_WRONLY | O_CREAT | O_TRUNC, 0644, env); + ASSERT_EQ(rc, 0); + + // Write some test data + const char test_data[] = "This is a test file for writing operations."; + const size_t data_size = strlen(test_data); + auto write_res = fh->Write(test_data, 0, data_size); + ASSERT_EQ(write_res, static_cast(data_size)); + + ASSERT_EQ(fh->Close(), 0); + + // Verify the file was written correctly + struct stat si; + rc = fs.Stat("/test_write.txt", &si, 0, &env); + ASSERT_EQ(rc, 0); + ASSERT_EQ(si.st_size, data_size); + + // Read back the file to verify content + std::unique_ptr read_fh(fs.newFile()); + rc = read_fh->Open("/test_write.txt", O_RDONLY, 0700, env); + ASSERT_EQ(rc, 0); + + char read_buf[256]; + auto read_res = read_fh->Read(read_buf, 0, data_size); + ASSERT_EQ(read_res, static_cast(data_size)); + ASSERT_EQ(memcmp(read_buf, test_data, data_size), 0); + + ASSERT_EQ(read_fh->Close(), 0); +} + +TEST(TestHTTPFile, TestWriteLargeFile) { + XrdSysLogger log; + HTTPFileSystem fs(&log, g_config_file.c_str(), nullptr); + + XrdOucEnv env; + std::unique_ptr fh(fs.newFile()); + auto rc = fh->Open("/test_large_file.txt", O_WRONLY | O_CREAT | O_TRUNC, + 0644, env); + ASSERT_EQ(rc, 0); + + // Generate 2 MB of test data + const size_t file_size = 2 * 1024 * 1024; // 2 MB + std::vector test_data(file_size); + // Fill with a repeating pattern for easy verification + for (size_t i = 0; i < file_size; i++) { + test_data[i] = static_cast(i % 256); + } + // Write the data in chunks to test streaming upload + const size_t chunk_size = 64 * 1024; // 64 KB chunks + size_t total_written = 0; + for (size_t offset = 0; offset < file_size; offset += chunk_size) { + size_t current_chunk_size = std::min(chunk_size, file_size - offset); + auto write_res = + fh->Write(&test_data[offset], offset, current_chunk_size); + ASSERT_EQ(write_res, static_cast(current_chunk_size)); + total_written += current_chunk_size; + } + ASSERT_EQ(total_written, file_size); + ASSERT_EQ(fh->Close(), 0); + + // Verify the file was written correctly + struct stat si; + rc = fs.Stat("/test_large_file.txt", &si, 0, &env); + ASSERT_EQ(rc, 0); + ASSERT_EQ(si.st_size, file_size); + + // Read back the file to verify content + std::unique_ptr read_fh(fs.newFile()); + rc = read_fh->Open("/test_large_file.txt", O_RDONLY, 0700, env); + ASSERT_EQ(rc, 0); + + // Read the data in chunks + std::vector read_buf(file_size); + size_t total_read = 0; + for (size_t offset = 0; offset < file_size; offset += chunk_size) { + size_t current_chunk_size = std::min(chunk_size, file_size - offset); + auto read_res = + read_fh->Read(&read_buf[offset], offset, current_chunk_size); + ASSERT_EQ(read_res, static_cast(current_chunk_size)); + total_read += current_chunk_size; + } + ASSERT_EQ(total_read, file_size); + ASSERT_EQ(memcmp(read_buf.data(), test_data.data(), file_size), 0); + + ASSERT_EQ(read_fh->Close(), 0); +} + class TestHTTPRequest : public HTTPRequest { public: XrdSysLogger log{}; - XrdSysError err{&log, "TestHTTPR3equest"}; + XrdSysError err{&log, "TestHTTPRequest"}; TestHTTPRequest(const std::string &url) : HTTPRequest(url, err, nullptr) {} }; diff --git a/test/xrdhttp-setup.sh b/test/xrdhttp-setup.sh index 0449c5e..6ef8ebf 100755 --- a/test/xrdhttp-setup.sh +++ b/test/xrdhttp-setup.sh @@ -161,7 +161,7 @@ EOF cat > $XROOTD_CONFIGDIR/authdb <