Skip to content

Commit 9526add

Browse files
committed
[37] Add TPC worker pool
Squashed commit of the following: commit 5cd92a9b52f3bbe9afba0c036578d984aa9eee64 Author: Rahul Chauhan <rahul.chauhan@cern.ch> Date: Tue Aug 12 11:31:29 2025 +0200 Use vorg as part of the queue identifier as well as worker and request labels Also fix nomenclature of identifier and labels: A queue has a identifier (unique) Workers and Requets have labels (not uniquet) that map to a queue Signed-off-by: Rahul Chauhan <rahul.chauhan@cern.ch> commit 980a186dd22e1d65b99323070fa5ad2344204726 Author: Rahul Chauhan <omrahulchauhan@gmail.com> Date: Fri Aug 8 12:03:02 2025 +0200 Make max_transfers and max_pending requests configurable Signed-off-by: Rahul Chauhan <rahul.chauhan@cern.ch> commit 555d0cb0ed8a61ad31e86a18e8b835cff4220978 Author: Rahul Chauhan <rahul.chauhan@cern.ch> Date: Wed Jul 2 19:06:42 2025 +0200 Update remote connection list from curl Signed-off-by: Rahul Chauhan <rahul.chauhan@cern.ch> commit 4527336f710d44043a0e74704bc9d5f80850bafa Author: Rahul Chauhan <omrahulchauhan@gmail.com> Date: Tue Jun 24 11:43:08 2025 +0200 Implement cancellation in TPC Pool Signed-off-by: Rahul Chauhan <rahul.chauhan@cern.ch> commit 0dff3aa82ba5dfb9bb05baca107958cb1a0651a0 Author: Rahul Chauhan <omrahulchauhan@gmail.com> Date: Thu May 22 10:24:00 2025 +0200 Refactor PMarkManager and TPCRequestManager to add SciTag integration Signed-off-by: Rahul Chauhan <rahul.chauhan@cern.ch> commit 51966f6c8308f19c8f28194fcf2f62c0729507bc Author: Rahul Chauhan <omrahulchauhan@gmail.com> Date: Wed May 7 10:22:55 2025 +0200 Change ownership of curl socket callbacks Signed-off-by: Rahul Chauhan <rahul.chauhan@cern.ch> commit 202977a0b5c2a8af3992462668a978b18e6eed66 Author: Brian Bockelman <bbockelman@morgridge.org> Date: Sun Apr 20 10:21:10 2025 +0200 Framework for worker pool for TPC requests Co-authored-by: Rahul Chauhan <rahul.chauhan@cern.ch> Signed-off-by: Rahul Chauhan <rahul.chauhan@cern.ch> commit 2d71d8dbd0f0a430bd82d89738956369689132f0 Author: Rahul Chauhan <omrahulchauhan@gmail.com> Date: Wed Jun 25 10:56:02 2025 +0200 Add multistream transfers to the test Signed-off-by: Rahul Chauhan <rahul.chauhan@cern.ch> commit f3538717e641a40b44fc463a9d3bfee173d5c66d Author: Rahul Chauhan <omrahulchauhan@gmail.com> Date: Tue Jun 24 11:42:35 2025 +0200 Add test for cancellations Signed-off-by: Rahul Chauhan <rahul.chauhan@cern.ch> commit 4150bc9b64637446c7f6e6492f206257114a7e3a Author: Rahul Chauhan <omrahulchauhan@gmail.com> Date: Thu May 15 10:23:32 2025 +0200 Add scitag flow configuration to test Add scitag flow query string in tests Signed-off-by: Rahul Chauhan <rahul.chauhan@cern.ch> commit b0abd58592e5daee9f8f324f79dd8bdb96eadfec Author: Rahul Chauhan <omrahulchauhan@gmail.com> Date: Tue May 6 17:37:37 2025 +0200 Updated perform_http_tpc to correctly detect transfer success or failure by checking the last line of the chunked reponse, since the HTTP 201 code is always returned regardless of actual status. Signed-off-by: Rahul Chauhan <rahul.chauhan@cern.ch>
1 parent f02805c commit 9526add

15 files changed

+859
-264
lines changed

src/XrdHttpTpc/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ add_library(${XrdHttpTPC} MODULE
2222
XrdHttpTpcConfigure.cc
2323
XrdHttpTpcMultistream.cc
2424
XrdHttpTpcPMarkManager.cc XrdHttpTpcPMarkManager.hh
25+
XrdHttpTpcPool.cc XrdHttpTpcPool.hh
2526
XrdHttpTpcState.cc XrdHttpTpcState.hh
2627
XrdHttpTpcStream.cc XrdHttpTpcStream.hh
2728
XrdHttpTpcTPC.cc XrdHttpTpcTPC.hh

src/XrdHttpTpc/XrdHttpTpcConfigure.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,24 @@ bool TPCHandler::Configure(const char *configfn, XrdOucEnv *myEnv)
9696
} else {
9797
m_first_timeout = 2*m_timeout;
9898
}
99+
} else if (!strcmp("tpc.max_active_transfers_per_queue", val)) {
100+
if (!(val = Config.GetWord())) {
101+
Config.Close();
102+
m_log.Emsg("Config", "tpc.max_active_transfers_per_queue value not specified.");
103+
return false;
104+
}
105+
int max_workers;
106+
if (XrdOuca2x::a2i(m_log, "tpc.max_active_transfers_per_queue", val, &max_workers, 1, 1000)) return false;
107+
m_request_manager.SetMaxWorkers(static_cast<unsigned>(max_workers));
108+
} else if (!strcmp("tpc.max_waiting_transfers_per_queue", val)) {
109+
if (!(val = Config.GetWord())) {
110+
Config.Close();
111+
m_log.Emsg("Config", "tpc.max_waiting_transfers_per_queue value not specified.");
112+
return false;
113+
}
114+
int max_pending_ops;
115+
if (XrdOuca2x::a2i(m_log, "tpc.max_waiting_transfers_per_queue", val, &max_pending_ops, 1, 1000)) return false;
116+
m_request_manager.SetMaxIdleRequests(static_cast<unsigned>(max_pending_ops));
99117
}
100118
}
101119
Config.Close();

src/XrdHttpTpc/XrdHttpTpcMultistream.cc

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,6 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
273273
}
274274

275275
// Notify the packet marking manager that the transfer will start after this point
276-
rec.pmarkManager.startTransfer();
277276

278277
// Create the multi-handle and add in the current transfer to it.
279278
MultiCurlHandler mch(handles, m_log);
@@ -344,8 +343,6 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
344343
break;
345344
}
346345

347-
rec.pmarkManager.beginPMarks();
348-
349346

350347
// Harvest any messages, looking for CURLMSG_DONE.
351348
CURLMsg *msg;

src/XrdHttpTpc/XrdHttpTpcPMarkManager.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@ PMarkManager::SocketInfo::SocketInfo(int fd, const struct sockaddr * sockP) {
3232
client.addrInfo = static_cast<XrdNetAddrInfo*>(&netAddr);
3333
}
3434

35-
PMarkManager::PMarkManager(XrdHttpExtReq & req, TPC::TpcType tpcType) : mPmark(req.pmark), mReq(req), mTransferWillStart(false), mTpcType(tpcType) {}
35+
PMarkManager::PMarkManager(XrdHttpExtReq &req, TPC::TpcType tpcType)
36+
: mPmark(req.pmark), mSciTag(req.mSciTag), mResource(req.resource.c_str()), mTransferWillStart(false), mTpcType(tpcType) {}
37+
38+
PMarkManager::PMarkManager(XrdNetPMark *pmark, int sciTag, const TPC::TpcType tpcType)
39+
: mPmark(pmark), mSciTag(sciTag), mResource(NULL), mTransferWillStart(false), mTpcType(tpcType) {}
3640

3741
void PMarkManager::addFd(int fd, const struct sockaddr * sockP) {
3842
if(isEnabled() && mTransferWillStart) {
@@ -56,7 +60,7 @@ bool PMarkManager::connect(int fd, const struct sockaddr *sockP, size_t sockPLen
5660
}
5761

5862
bool PMarkManager::isEnabled() const {
59-
return mPmark && (mReq.mSciTag >= 0);
63+
return mPmark && (mSciTag >= 0);
6064
}
6165

6266
void PMarkManager::startTransfer() {
@@ -71,7 +75,7 @@ void PMarkManager::beginPMarks() {
7175
if(mPmarkHandles.empty()) {
7276
// Create the first pmark handle
7377
std::stringstream ss;
74-
ss << "scitag.flow=" << mReq.mSciTag
78+
ss << "scitag.flow=" << mSciTag
7579
// One has to consider that this server is the client side of a normal HTTP PUT/GET. But unlike normal HTTP PUT and GET requests where clients
7680
// do not emit a firefly, this server WILL emit a firefly.
7781
//
@@ -85,7 +89,7 @@ void PMarkManager::beginPMarks() {
8589
// that I do on behalf of the remote server... Hence why if the tpc transfer is Push, the pmark.appname will be equal to http-get.
8690
<< "&" << "pmark.appname=" << ((mTpcType == TPC::TpcType::Pull) ? "http-put" : "http-get");
8791
SocketInfo & sockInfo = mSocketInfos.front();
88-
auto pmark = mPmark->Begin(sockInfo.client, mReq.resource.c_str(), ss.str().c_str(), "http-tpc");
92+
auto pmark = mPmark->Begin(sockInfo.client, mResource, ss.str().c_str(), "http-tpc");
8993
if(!pmark) {
9094
return;
9195
}

src/XrdHttpTpc/XrdHttpTpcPMarkManager.hh

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@ public:
6666
XrdSecEntity client;
6767
};
6868

69-
PMarkManager(XrdHttpExtReq & req, const TPC::TpcType type);
69+
PMarkManager(XrdHttpExtReq &req, const TPC::TpcType type);
70+
71+
PMarkManager(XrdNetPMark *pmark, int sciTag, const TPC::TpcType type);
7072

7173
/**
7274
* Will connect the socket attached to the file descriptor within a certain timeout and add the file descriptor to the.
@@ -128,9 +130,11 @@ private:
128130
// The map of socket FD and packet marking handles
129131
std::map<int,std::unique_ptr<XrdNetPMark::Handle>> mPmarkHandles;
130132
// The instance of the packet marking functionality
131-
XrdNetPMark * mPmark;
132-
// The XrdHttpTPC request information
133-
XrdHttpExtReq & mReq;
133+
XrdNetPMark *mPmark;
134+
// SciTag provided by the user; extracted from the request
135+
int mSciTag;
136+
// Path to the resource
137+
const char *mResource;
134138
// Is true when startTransfer(...) has been called
135139
bool mTransferWillStart;
136140
// Is true if this transfer is a HTTP TPC PULL transfer, false otherwise

0 commit comments

Comments
 (0)