Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/MainDistributionPipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@main
with:
extension_name: httpfs
duckdb_version: cc7c620e58625f6b5f53f724b6a9645f4ee0be6f
duckdb_version: 992df353dfd8ea54dcf12eb593af3129ff128522
ci_tools_version: main

duckdb-stable-deploy:
Expand All @@ -27,7 +27,7 @@ jobs:
secrets: inherit
with:
extension_name: httpfs
duckdb_version: cc7c620e58625f6b5f53f724b6a9645f4ee0be6f
duckdb_version: 992df353dfd8ea54dcf12eb593af3129ff128522
ci_tools_version: main
deploy_latest: ${{ startsWith(github.ref, 'refs/heads/v') }}
deploy_versioned: ${{ startsWith(github.ref, 'refs/heads/v') || github.ref == 'refs/heads/main' }}
Expand All @@ -37,7 +37,7 @@ jobs:
uses: duckdb/extension-ci-tools/.github/workflows/_extension_code_quality.yml@main
with:
extension_name: httpfs
duckdb_version: cc7c620e58625f6b5f53f724b6a9645f4ee0be6f
duckdb_version: 992df353dfd8ea54dcf12eb593af3129ff128522
ci_tools_version: main
extra_toolchains: 'python3'
format_checks: 'format'
2 changes: 1 addition & 1 deletion duckdb
Submodule duckdb updated 220 files
4 changes: 2 additions & 2 deletions src/hffs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ static string ParseNextUrlFromLinkHeader(const string &link_header_content) {

HFFileHandle::~HFFileHandle() {};

unique_ptr<HTTPClient> HFFileHandle::CreateClient() {
return http_params.http_util.InitializeClient(http_params, parsed_url.endpoint);
string HFFileHandle::BaseUrl() const {
return parsed_url.endpoint;
}

string HuggingFaceFileSystem::ListHFRequest(ParsedHFUrl &url, HTTPFSParams &http_params, string &next_page_url,
Expand Down
54 changes: 46 additions & 8 deletions src/httpfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,29 @@ HTTPFileHandle::HTTPFileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpe
}
}
}

shared_ptr<HTTPClientCache> HTTPFileSystem::GetOrCreateClientCache(const string &path) {
lock_guard<mutex> lock(client_cache_map_lock);

if (auto existing = lru_client_cache.Get(path)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get what you're doing here, and it looks correct, but we try to avoid assignment in if conditions because it's error-prone. Could you split this up?

return existing;
}

auto client_cache = make_uniq<HTTPClientCache>();
lru_client_cache.Put(path, std::move(client_cache));
return lru_client_cache.Get(path);
}

void HTTPFileHandle::InitializeClientCache(HTTPFileSystem &file_system) {
client_cache = file_system.GetOrCreateClientCache(BaseUrl());
}

string HTTPFileHandle::BaseUrl() const {
string path_out, proto_host_port;
HTTPUtil::DecomposeURL(path, path_out, proto_host_port);
return proto_host_port;
}

unique_ptr<HTTPFileHandle> HTTPFileSystem::CreateHandle(const OpenFileInfo &file, FileOpenFlags flags,
optional_ptr<FileOpener> opener) {
D_ASSERT(flags.Compression() == FileCompressionType::UNCOMPRESSED);
Expand All @@ -398,7 +421,18 @@ unique_ptr<HTTPFileHandle> HTTPFileSystem::CreateHandle(const OpenFileInfo &file
httpfs_params.bearer_token = kv_secret.TryGetValue("token", true).ToString();
}
}
return duckdb::make_uniq<HTTPFileHandle>(*this, file, flags, std::move(params));
auto handle = duckdb::make_uniq<HTTPFileHandle>(*this, file, flags, std::move(params));
return handle;
}

void HTTPFileSystem::FinalizeHandleCreate(unique_ptr<HTTPFileHandle> &handle) {
if (handle) {
FinalizeHandleCreate(*handle);
}
}

void HTTPFileSystem::FinalizeHandleCreate(HTTPFileHandle &handle) {
handle.InitializeClientCache(*this);
}

unique_ptr<FileHandle> HTTPFileSystem::OpenFileExtended(const OpenFileInfo &file, FileOpenFlags flags,
Expand All @@ -408,6 +442,7 @@ unique_ptr<FileHandle> HTTPFileSystem::OpenFileExtended(const OpenFileInfo &file
if (flags.ReturnNullIfNotExists()) {
try {
auto handle = CreateHandle(file, flags, opener);
FinalizeHandleCreate(handle);
handle->Initialize(opener);
return std::move(handle);
} catch (...) {
Expand All @@ -416,6 +451,7 @@ unique_ptr<FileHandle> HTTPFileSystem::OpenFileExtended(const OpenFileInfo &file
}

auto handle = CreateHandle(file, flags, opener);
FinalizeHandleCreate(handle);

if (flags.OpenForWriting() && !flags.OpenForAppending() && !flags.OpenForReading()) {
handle->write_overwrite_mode = true;
Expand Down Expand Up @@ -928,9 +964,11 @@ void HTTPFileHandle::Initialize(optional_ptr<FileOpener> opener) {

unique_ptr<HTTPClient> HTTPFileHandle::GetClient() {
// Try to fetch a cached client
auto cached_client = client_cache.GetClient();
if (cached_client) {
return cached_client;
if (client_cache) {
auto cached_client = client_cache->GetClient();
if (cached_client) {
return cached_client;
}
}

// Create a new client
Expand All @@ -939,13 +977,13 @@ unique_ptr<HTTPClient> HTTPFileHandle::GetClient() {

unique_ptr<HTTPClient> HTTPFileHandle::CreateClient() {
// Create a new client
string path_out, proto_host_port;
HTTPUtil::DecomposeURL(path, path_out, proto_host_port);
return http_params.http_util.InitializeClient(http_params, proto_host_port);
return http_params.http_util.InitializeClient(http_params, BaseUrl());
}

void HTTPFileHandle::StoreClient(unique_ptr<HTTPClient> client) {
client_cache.StoreClient(std::move(client));
if (client_cache) {
client_cache->StoreClient(std::move(client));
}
}

HTTPFileHandle::~HTTPFileHandle() {
Expand Down
2 changes: 1 addition & 1 deletion src/include/hffs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class HFFileHandle : public HTTPFileHandle {
}
~HFFileHandle() override;

unique_ptr<HTTPClient> CreateClient() override;
string BaseUrl() const override;

protected:
ParsedHFUrl parsed_url;
Expand Down
18 changes: 16 additions & 2 deletions src/include/httpfs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "duckdb/common/case_insensitive_map.hpp"
#include "duckdb/common/file_system.hpp"
#include "http_state.hpp"
#include "duckdb/common/lru_cache.hpp"
#include "duckdb/common/pair.hpp"
#include "duckdb/common/unordered_map.hpp"
#include "duckdb/common/exception/http_exception.hpp"
Expand Down Expand Up @@ -53,7 +54,7 @@ class HTTPFileHandle : public FileHandle {
virtual void Initialize(optional_ptr<FileOpener> opener);

// We keep an http client stored for connection reuse with keep-alive headers
HTTPClientCache client_cache;
shared_ptr<HTTPClientCache> client_cache;

unique_ptr<HTTPParams> params;
HTTPFSParams &http_params;
Expand Down Expand Up @@ -104,6 +105,8 @@ class HTTPFileHandle : public FileHandle {
bool SkipBuffer() const {
return flags.DirectIO() || flags.RequireParallelAccess();
}
void InitializeClientCache(HTTPFileSystem &file_system);
virtual string BaseUrl() const;

private:
void AllocateReadBuffer(optional_ptr<FileOpener> opener);
Expand All @@ -122,7 +125,7 @@ class HTTPFileHandle : public FileHandle {

protected:
//! Create a new Client
virtual unique_ptr<HTTPClient> CreateClient();
unique_ptr<HTTPClient> CreateClient();
//! Perform a HEAD request to get the file info (if not yet loaded)
void LoadFileInfo();

Expand Down Expand Up @@ -192,6 +195,17 @@ class HTTPFileSystem : public FileSystem {
static void Verify();

optional_ptr<HTTPMetadataCache> GetGlobalCache();
shared_ptr<HTTPClientCache> GetOrCreateClientCache(const string &path);

struct NopCleanup {
void operator()(unique_ptr<int> &) {
}
};

SharedLruCache<string, HTTPClientCache, DefaultPayload> lru_client_cache {256};
mutex client_cache_map_lock;
void FinalizeHandleCreate(duckdb::unique_ptr<HTTPFileHandle> &);
void FinalizeHandleCreate(HTTPFileHandle &);

protected:
unique_ptr<FileHandle> OpenFileExtended(const OpenFileInfo &file, FileOpenFlags flags,
Expand Down
2 changes: 1 addition & 1 deletion src/include/s3fs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class S3FileHandle : public HTTPFileHandle {
atomic<bool> uploader_has_error {false};
std::exception_ptr upload_exception;

unique_ptr<HTTPClient> CreateClient() override;
string BaseUrl() const override;

//! Rethrow IO Exception originating from an upload thread
void RethrowIOError() {
Expand Down
9 changes: 6 additions & 3 deletions src/s3fs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,11 +366,11 @@ void S3FileHandle::Close() {
}
}

unique_ptr<HTTPClient> S3FileHandle::CreateClient() {
string S3FileHandle::BaseUrl() const {
auto parsed_url = S3FileSystem::S3UrlParse(path, this->auth_params);

string proto_host_port = parsed_url.http_proto + parsed_url.host;
return http_params.http_util.InitializeClient(http_params, proto_host_port);
return proto_host_port;
}

// Opens the multipart upload and returns the ID
Expand Down Expand Up @@ -710,7 +710,7 @@ ParsedS3Url S3FileSystem::S3UrlParse(string url, const S3AuthParams &params) {
auto prefix_end_pos = url.find("//") + 2;
auto slash_pos = url.find('/', prefix_end_pos);
if (slash_pos == string::npos) {
throw IOException("URL needs to contain a '/' after the host");
throw IOException("URL needs to contain a '/' after the host. Provided url is '%s'", url);
}
bucket = url.substr(prefix_end_pos, slash_pos - prefix_end_pos);
if (bucket.empty()) {
Expand Down Expand Up @@ -946,8 +946,10 @@ HTTPMetadataCacheEntry S3FileHandle::GetCacheEntry() const {
}

void S3FileHandle::Initialize(optional_ptr<FileOpener> opener) {
auto &s3fs = (S3FileSystem &)file_system;
try {
HTTPFileHandle::Initialize(opener);
s3fs.FinalizeHandleCreate(*this);
} catch (std::exception &ex) {
ErrorData error(ex);
bool refreshed_secret = false;
Expand Down Expand Up @@ -1002,6 +1004,7 @@ void S3FileHandle::Initialize(optional_ptr<FileOpener> opener) {
auth_params.SetRegion(std::move(correct_region));
}
HTTPFileHandle::Initialize(opener);
s3fs.FinalizeHandleCreate(*this);
}

if (flags.OpenForWriting()) {
Expand Down
2 changes: 1 addition & 1 deletion test/sql/json/table/internal_issue_6807.test_slow
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ CREATE TABLE T AS FROM 'https://data.gharchive.org/2023-02-08-0.json.gz';
query I
SELECT count(*) FROM duckdb_logs_parsed('HTTP') WHERE request.type = 'GET' GROUP BY request.type;
----
9
8
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change is already in a patch in the duckdb repo on v1.5-variegata. Not a problem if you add the change here too as long as CI is happy

Loading