1212#include < library/cpp/http/io/stream.h>
1313#include < library/cpp/http/misc/httpcodes.h>
1414#include < library/cpp/openssl/io/stream.h>
15+ #include < library/cpp/threading/cancellation/cancellation_token.h>
1516
1617class TNetworkAddress ;
1718class IOutputStream ;
@@ -54,27 +55,31 @@ class TKeepAliveHttpClient {
5455 THttpCode DoGet (const TStringBuf relativeUrl,
5556 IOutputStream* output = nullptr ,
5657 const THeaders& headers = THeaders(),
57- THttpHeaders* outHeaders = nullptr);
58+ THttpHeaders* outHeaders = nullptr,
59+ NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default());
5860
5961 // builds post request from headers and body
6062 THttpCode DoPost (const TStringBuf relativeUrl,
6163 const TStringBuf body,
6264 IOutputStream* output = nullptr ,
6365 const THeaders& headers = THeaders(),
64- THttpHeaders* outHeaders = nullptr);
66+ THttpHeaders* outHeaders = nullptr,
67+ NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default());
6568
6669 // builds request with any HTTP method from headers and body
6770 THttpCode DoRequest (const TStringBuf method,
6871 const TStringBuf relativeUrl,
6972 const TStringBuf body,
7073 IOutputStream* output = nullptr ,
7174 const THeaders& inHeaders = THeaders(),
72- THttpHeaders* outHeaders = nullptr);
75+ THttpHeaders* outHeaders = nullptr,
76+ NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default());
7377
7478 // requires already well-formed request
7579 THttpCode DoRequestRaw (const TStringBuf raw,
7680 IOutputStream* output = nullptr ,
77- THttpHeaders* outHeaders = nullptr );
81+ THttpHeaders* outHeaders = nullptr ,
82+ NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default());
7883
7984 void DisableVerificationForHttps ();
8085 void SetClientCertificate (const TOpenSslClientIO::TOptions::TClientCert& options);
@@ -93,7 +98,8 @@ class TKeepAliveHttpClient {
9398 template <class T >
9499 THttpCode DoRequestReliable (const T& raw,
95100 IOutputStream* output,
96- THttpHeaders* outHeaders);
101+ THttpHeaders* outHeaders,
102+ NThreading::TCancellationToken cancellation);
97103
98104 TVector<IOutputStream::TPart> FormRequest (TStringBuf method, const TStringBuf relativeUrl,
99105 TStringBuf body,
@@ -166,13 +172,13 @@ class TSimpleHttpClient {
166172
167173 void EnableVerificationForHttps ();
168174
169- void DoGet (const TStringBuf relativeUrl, IOutputStream* output, const THeaders& headers = THeaders()) const ;
175+ void DoGet (const TStringBuf relativeUrl, IOutputStream* output, const THeaders& headers = THeaders(), NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default() ) const ;
170176
171177 // builds post request from headers and body
172- void DoPost (const TStringBuf relativeUrl, TStringBuf body, IOutputStream* output, const THeaders& headers = THeaders()) const ;
178+ void DoPost (const TStringBuf relativeUrl, TStringBuf body, IOutputStream* output, const THeaders& headers = THeaders(), NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default() ) const ;
173179
174180 // requires already well-formed post request
175- void DoPostRaw (const TStringBuf relativeUrl, TStringBuf rawRequest, IOutputStream* output) const ;
181+ void DoPostRaw (const TStringBuf relativeUrl, TStringBuf rawRequest, IOutputStream* output, NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default() ) const ;
176182
177183 virtual ~TSimpleHttpClient ();
178184
@@ -227,6 +233,10 @@ namespace NPrivate {
227233 return HttpIn.Get ();
228234 }
229235
236+ void Shutdown () {
237+ Socket.ShutDown (SHUT_RDWR);
238+ }
239+
230240 private:
231241 static TNetworkAddress Resolve (const TString& host, ui32 port);
232242
@@ -250,12 +260,18 @@ namespace NPrivate {
250260template <class T >
251261TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoRequestReliable (const T& raw,
252262 IOutputStream* output,
253- THttpHeaders* outHeaders) {
263+ THttpHeaders* outHeaders,
264+ NThreading::TCancellationToken cancellation) {
265+
254266 for (int i = 0 ; i < 2 ; ++i) {
255267 const bool haveNewConnection = CreateNewConnectionIfNeeded ();
256268 const bool couldRetry = !haveNewConnection && i == 0 ; // Actually old connection could be already closed by server,
257269 // so we should try one more time in this case.
258270 try {
271+ cancellation.Future ().Subscribe ([&](auto &) {
272+ Connection->Shutdown ();
273+ });
274+
259275 Connection->Write (raw);
260276
261277 THttpCode code = ReadAndTransferHttp (*Connection->GetHttpInput (), output, outHeaders);
@@ -265,16 +281,19 @@ TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoRequestReliable(const T&
265281 return code;
266282 } catch (const TSystemError& e) {
267283 Connection.Reset ();
284+ cancellation.ThrowIfCancellationRequested ();
268285 if (!couldRetry || e.Status () != EPIPE) {
269286 throw ;
270287 }
271288 } catch (const THttpReadException&) { // Actually old connection is already closed by server
272289 Connection.Reset ();
290+ cancellation.ThrowIfCancellationRequested ();
273291 if (!couldRetry) {
274292 throw ;
275293 }
276294 } catch (const std::exception&) {
277295 Connection.Reset ();
296+ cancellation.ThrowIfCancellationRequested ();
278297 throw ;
279298 }
280299 }
0 commit comments