1818 */
1919#include " HTTPLookupService.h"
2020
21- #include < curl/curl.h>
2221#include < pulsar/Version.h>
2322
2423#include < boost/property_tree/json_parser.hpp>
2524#include < boost/property_tree/ptree.hpp>
2625
26+ #include " CurlWrapper.h"
2727#include " ExecutorService.h"
2828#include " Int64SerDes.h"
2929#include " LogUtils.h"
@@ -46,16 +46,6 @@ const static std::string ADMIN_PATH_V2 = "/admin/v2/";
4646const static std::string PARTITION_METHOD_NAME = " partitions" ;
4747const static int NUMBER_OF_LOOKUP_THREADS = 1 ;
4848
49- static inline bool needRedirection (long code) { return (code == 307 || code == 302 || code == 301 ); }
50-
51- HTTPLookupService::CurlInitializer::CurlInitializer () {
52- // Once per application - https://curl.haxx.se/mail/lib-2015-11/0052.html
53- curl_global_init (CURL_GLOBAL_ALL);
54- }
55- HTTPLookupService::CurlInitializer::~CurlInitializer () { curl_global_cleanup (); }
56-
57- HTTPLookupService::CurlInitializer HTTPLookupService::curlInitializer;
58-
5949HTTPLookupService::HTTPLookupService (ServiceNameResolver &serviceNameResolver,
6050 const ClientConfiguration &clientConfiguration,
6151 const AuthenticationPtr &authData)
@@ -182,11 +172,6 @@ Future<Result, SchemaInfo> HTTPLookupService::getSchema(const TopicNamePtr &topi
182172 return promise.getFuture ();
183173}
184174
185- static size_t curlWriteCallback (void *contents, size_t size, size_t nmemb, void *responseDataPtr) {
186- ((std::string *)responseDataPtr)->append ((char *)contents, size * nmemb);
187- return size * nmemb;
188- }
189-
190175void HTTPLookupService::handleNamespaceTopicsHTTPRequest (NamespaceTopicsPromise promise,
191176 const std::string completeUrl) {
192177 std::string responseData;
@@ -209,111 +194,61 @@ Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &
209194 uint16_t reqCount = 0 ;
210195 Result retResult = ResultOk;
211196 while (++reqCount <= maxLookupRedirects_) {
212- CURL *handle;
213- CURLcode res;
214- std::string version = std::string (" Pulsar-CPP-v" ) + PULSAR_VERSION_STR;
215- handle = curl_easy_init ();
216-
217- if (!handle) {
218- LOG_ERROR (" Unable to curl_easy_init for url " << completeUrl);
219- // No curl_easy_cleanup required since handle not initialized
220- return ResultLookupError;
221- }
222- // set URL
223- curl_easy_setopt (handle, CURLOPT_URL, completeUrl.c_str ());
224-
225- // Write callback
226- curl_easy_setopt (handle, CURLOPT_WRITEFUNCTION, curlWriteCallback);
227- curl_easy_setopt (handle, CURLOPT_WRITEDATA, &responseData);
228-
229- // New connection is made for each call
230- curl_easy_setopt (handle, CURLOPT_FRESH_CONNECT, 1L );
231- curl_easy_setopt (handle, CURLOPT_FORBID_REUSE, 1L );
232-
233- // Skipping signal handling - results in timeouts not honored during the DNS lookup
234- curl_easy_setopt (handle, CURLOPT_NOSIGNAL, 1L );
235-
236- // Timer
237- curl_easy_setopt (handle, CURLOPT_TIMEOUT, lookupTimeoutInSeconds_);
238-
239- // Set User Agent
240- curl_easy_setopt (handle, CURLOPT_USERAGENT, version.c_str ());
241-
242- // Fail if HTTP return code >=400
243- curl_easy_setopt (handle, CURLOPT_FAILONERROR, 1L );
244-
245197 // Authorization data
246198 AuthenticationDataPtr authDataContent;
247199 Result authResult = authenticationPtr_->getAuthData (authDataContent);
248200 if (authResult != ResultOk) {
249201 LOG_ERROR (" Failed to getAuthData: " << authResult);
250- curl_easy_cleanup (handle);
251202 return authResult;
252203 }
253- struct curl_slist *list = NULL ;
254- if (authDataContent->hasDataForHttp ()) {
255- list = curl_slist_append (list, authDataContent->getHttpHeaders ().c_str ());
204+
205+ CurlWrapper curl;
206+ if (!curl.init ()) {
207+ LOG_ERROR (" Unable to curl_easy_init for url " << completeUrl);
208+ return ResultLookupError;
256209 }
257- curl_easy_setopt (handle, CURLOPT_HTTPHEADER, list);
258210
259- // TLS
211+ std::unique_ptr<CurlWrapper::TlsContext> tlsContext;
260212 if (isUseTls_) {
261- if (curl_easy_setopt (handle, CURLOPT_SSLENGINE, NULL ) != CURLE_OK) {
262- LOG_ERROR (" Unable to load SSL engine for url " << completeUrl);
263- curl_easy_cleanup (handle);
264- return ResultConnectError;
265- }
266- if (curl_easy_setopt (handle, CURLOPT_SSLENGINE_DEFAULT, 1L ) != CURLE_OK) {
267- LOG_ERROR (" Unable to load SSL engine as default, for url " << completeUrl);
268- curl_easy_cleanup (handle);
269- return ResultConnectError;
270- }
271- curl_easy_setopt (handle, CURLOPT_SSLCERTTYPE, " PEM" );
272-
273- if (tlsAllowInsecure_) {
274- curl_easy_setopt (handle, CURLOPT_SSL_VERIFYPEER, 0L );
275- } else {
276- curl_easy_setopt (handle, CURLOPT_SSL_VERIFYPEER, 1L );
277- }
278-
279- if (!tlsTrustCertsFilePath_.empty ()) {
280- curl_easy_setopt (handle, CURLOPT_CAINFO, tlsTrustCertsFilePath_.c_str ());
281- }
282-
283- curl_easy_setopt (handle, CURLOPT_SSL_VERIFYHOST, tlsValidateHostname_ ? 1L : 0L );
284-
213+ tlsContext.reset (new CurlWrapper::TlsContext);
214+ tlsContext->trustCertsFilePath = tlsTrustCertsFilePath_;
215+ tlsContext->validateHostname = tlsValidateHostname_;
216+ tlsContext->allowInsecure = tlsAllowInsecure_;
285217 if (authDataContent->hasDataForTls ()) {
286- curl_easy_setopt (handle, CURLOPT_SSLCERT, authDataContent->getTlsCertificates (). c_str () );
287- curl_easy_setopt (handle, CURLOPT_SSLKEY, authDataContent->getTlsPrivateKey (). c_str () );
218+ tlsContext-> certPath = authDataContent->getTlsCertificates ();
219+ tlsContext-> keyPath = authDataContent->getTlsPrivateKey ();
288220 } else {
289- if (!tlsPrivateFilePath_.empty () && !tlsCertificateFilePath_.empty ()) {
290- curl_easy_setopt (handle, CURLOPT_SSLCERT, tlsCertificateFilePath_.c_str ());
291- curl_easy_setopt (handle, CURLOPT_SSLKEY, tlsPrivateFilePath_.c_str ());
292- }
221+ tlsContext->certPath = tlsCertificateFilePath_;
222+ tlsContext->keyPath = tlsPrivateFilePath_;
293223 }
294224 }
295225
296226 LOG_INFO (" Curl [" << reqCount << " ] Lookup Request sent for " << completeUrl);
227+ CurlWrapper::Options options;
228+ options.timeoutInSeconds = lookupTimeoutInSeconds_;
229+ options.userAgent = std::string (" Pulsar-CPP-v" ) + PULSAR_VERSION_STR;
230+ options.maxLookupRedirects = 1 ; // redirection is implemented by the outer loop
231+ auto result = curl.get (completeUrl, authDataContent->getHttpHeaders (), options, tlsContext.get ());
232+ const auto &error = result.error ;
233+ if (!error.empty ()) {
234+ LOG_ERROR (completeUrl << " failed: " << error);
235+ return ResultConnectError;
236+ }
297237
298- // Make get call to server
299- res = curl_easy_perform (handle);
300-
301- curl_easy_getinfo (handle, CURLINFO_RESPONSE_CODE, &responseCode);
238+ responseData = result.responseData ;
239+ responseCode = result.responseCode ;
240+ auto res = result.code ;
302241 LOG_INFO (" Response received for url " << completeUrl << " responseCode " << responseCode
303242 << " curl res " << res);
304243
305- // Free header list
306- curl_slist_free_all (list);
307-
244+ const auto &redirectUrl = result.redirectUrl ;
308245 switch (res) {
309246 case CURLE_OK:
310247 if (responseCode == 200 ) {
311248 retResult = ResultOk;
312- } else if (needRedirection (responseCode)) {
313- char *url = NULL ;
314- curl_easy_getinfo (handle, CURLINFO_REDIRECT_URL, &url);
315- LOG_INFO (" Response from url " << completeUrl << " to new url " << url);
316- completeUrl = url;
249+ } else if (!redirectUrl.empty ()) {
250+ LOG_INFO (" Response from url " << completeUrl << " to new url " << redirectUrl);
251+ completeUrl = redirectUrl;
317252 retResult = ResultLookupError;
318253 } else {
319254 retResult = ResultLookupError;
@@ -342,8 +277,7 @@ Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &
342277 retResult = ResultLookupError;
343278 break ;
344279 }
345- curl_easy_cleanup (handle);
346- if (!needRedirection (responseCode)) {
280+ if (redirectUrl.empty ()) {
347281 break ;
348282 }
349283 }
0 commit comments