Skip to content

Commit 75ca9da

Browse files
feat: Add universal HTTP/HTTPS proxy configuration support to Python CDK
- Add ProxyConfig class with support for HTTP/HTTPS proxies, authentication, and SSL certificates - Integrate proxy configuration into HttpClient with session configuration logic - Update HttpRequester to extract proxy config from connector configuration - Update HttpStream to accept optional proxy configuration parameter - Add comprehensive unit tests for proxy functionality - Maintain full backward compatibility with existing connectors - Support environment variable fallback for proxy configuration - Handle secure certificate storage using temporary files with restricted permissions Co-Authored-By: AJ Steers <[email protected]>
1 parent e44362a commit 75ca9da

File tree

6 files changed

+362
-2
lines changed

6 files changed

+362
-2
lines changed

airbyte_cdk/sources/declarative/requesters/http_requester.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from airbyte_cdk.sources.streams.call_rate import APIBudget
2727
from airbyte_cdk.sources.streams.http import HttpClient
2828
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
29+
from airbyte_cdk.sources.streams.http.proxy_config import ProxyConfig
2930
from airbyte_cdk.sources.types import Config, EmptyString, StreamSlice, StreamState
3031
from airbyte_cdk.utils.mapping_helpers import (
3132
combine_mappings,
@@ -104,6 +105,8 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
104105
else:
105106
backoff_strategies = None
106107

108+
proxy_config = ProxyConfig.from_config(self.config)
109+
107110
self._http_client = HttpClient(
108111
name=self.name,
109112
logger=self.logger,
@@ -114,6 +117,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
114117
backoff_strategy=backoff_strategies,
115118
disable_retries=self.disable_retries,
116119
message_repository=self.message_repository,
120+
proxy_config=proxy_config,
117121
)
118122

119123
@property

airbyte_cdk/sources/streams/http/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,12 @@
66
from .exceptions import UserDefinedBackoffException
77
from .http import HttpStream, HttpSubStream
88
from .http_client import HttpClient
9+
from .proxy_config import ProxyConfig
910

10-
__all__ = ["HttpClient", "HttpStream", "HttpSubStream", "UserDefinedBackoffException"]
11+
__all__ = [
12+
"HttpClient",
13+
"HttpStream",
14+
"HttpSubStream",
15+
"ProxyConfig",
16+
"UserDefinedBackoffException",
17+
]

airbyte_cdk/sources/streams/http/http.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
ResponseAction,
3535
)
3636
from airbyte_cdk.sources.streams.http.http_client import HttpClient
37+
from airbyte_cdk.sources.streams.http.proxy_config import ProxyConfig
3738
from airbyte_cdk.sources.types import Record, StreamSlice
3839
from airbyte_cdk.sources.utils.types import JsonType
3940

@@ -52,7 +53,10 @@ class HttpStream(Stream, CheckpointMixin, ABC):
5253
)
5354

5455
def __init__(
55-
self, authenticator: Optional[AuthBase] = None, api_budget: Optional[APIBudget] = None
56+
self,
57+
authenticator: Optional[AuthBase] = None,
58+
api_budget: Optional[APIBudget] = None,
59+
proxy_config: Optional[ProxyConfig] = None,
5660
):
5761
self._exit_on_rate_limit: bool = False
5862
self._http_client = HttpClient(
@@ -64,6 +68,7 @@ def __init__(
6468
use_cache=self.use_cache,
6569
backoff_strategy=self.get_backoff_strategy(),
6670
message_repository=InMemoryMessageRepository(),
71+
proxy_config=proxy_config,
6772
)
6873

6974
# There are three conditions that dictate if RFR should automatically be applied to a stream

airbyte_cdk/sources/streams/http/http_client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
RequestBodyException,
4141
UserDefinedBackoffException,
4242
)
43+
from airbyte_cdk.sources.streams.http.proxy_config import ProxyConfig
4344
from airbyte_cdk.sources.streams.http.rate_limiting import (
4445
http_client_default_backoff_handler,
4546
rate_limit_default_backoff_handler,
@@ -91,9 +92,11 @@ def __init__(
9192
error_message_parser: Optional[ErrorMessageParser] = None,
9293
disable_retries: bool = False,
9394
message_repository: Optional[MessageRepository] = None,
95+
proxy_config: Optional[ProxyConfig] = None,
9496
):
9597
self._name = name
9698
self._api_budget: APIBudget = api_budget or APIBudget(policies=[])
99+
self._proxy_config = proxy_config
97100
if session:
98101
self._session = session
99102
else:
@@ -105,6 +108,10 @@ def __init__(
105108
pool_connections=MAX_CONNECTION_POOL_SIZE, pool_maxsize=MAX_CONNECTION_POOL_SIZE
106109
),
107110
)
111+
112+
if self._proxy_config:
113+
self._session = self._proxy_config.configure_session(self._session)
114+
108115
if isinstance(authenticator, AuthBase):
109116
self._session.auth = authenticator
110117
self._logger = logger
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
#
2+
#
3+
4+
import base64
5+
import os
6+
import tempfile
7+
from dataclasses import dataclass
8+
from pathlib import Path
9+
from typing import Dict, Optional, Tuple
10+
11+
import requests
12+
from requests.auth import HTTPProxyAuth
13+
14+
15+
@dataclass
16+
class ProxyConfig:
17+
"""Configuration for HTTP/HTTPS proxy settings."""
18+
19+
enabled: bool = False
20+
http_proxy: Optional[str] = None
21+
https_proxy: Optional[str] = None
22+
no_proxy: Optional[str] = None
23+
proxy_username: Optional[str] = None
24+
proxy_password: Optional[str] = None
25+
verify_ssl: bool = True
26+
ca_certificate: Optional[str] = None
27+
client_certificate: Optional[str] = None
28+
client_key: Optional[str] = None
29+
30+
def get_proxies(self) -> Dict[str, str]:
31+
"""Get proxy configuration for requests library."""
32+
if not self.enabled:
33+
return {}
34+
35+
proxies = {}
36+
if self.http_proxy:
37+
proxies["http"] = self.http_proxy
38+
if self.https_proxy:
39+
proxies["https"] = self.https_proxy
40+
41+
return proxies
42+
43+
def get_proxy_auth(self) -> Optional[HTTPProxyAuth]:
44+
"""Get proxy authentication if configured."""
45+
if self.proxy_username and self.proxy_password:
46+
return HTTPProxyAuth(self.proxy_username, self.proxy_password)
47+
return None
48+
49+
def configure_session(self, session: requests.Session) -> requests.Session:
50+
"""Configure a requests session with proxy settings."""
51+
if not self.enabled:
52+
return session
53+
54+
proxies = self.get_proxies()
55+
if proxies:
56+
session.proxies.update(proxies)
57+
58+
proxy_auth = self.get_proxy_auth()
59+
if proxy_auth:
60+
session.auth = proxy_auth
61+
62+
if not self.verify_ssl:
63+
session.verify = False
64+
elif self.ca_certificate:
65+
ca_cert_path = self._write_temp_cert(self.ca_certificate, "ca_cert")
66+
session.verify = ca_cert_path
67+
68+
if self.client_certificate and self.client_key:
69+
client_cert_path = self._write_temp_cert(self.client_certificate, "client_cert")
70+
client_key_path = self._write_temp_cert(self.client_key, "client_key")
71+
session.cert = (client_cert_path, client_key_path)
72+
73+
return session
74+
75+
def _write_temp_cert(self, cert_content: str, cert_type: str) -> str:
76+
"""Write certificate content to a secure temporary file."""
77+
try:
78+
cert_bytes = base64.b64decode(cert_content)
79+
except Exception as e:
80+
raise ValueError(f"Invalid base64 certificate content for {cert_type}: {e}")
81+
82+
fd, temp_path = tempfile.mkstemp(suffix=f"_{cert_type}.pem", prefix="airbyte_proxy_")
83+
try:
84+
os.fchmod(fd, 0o600) # Restrict to owner only
85+
os.write(fd, cert_bytes)
86+
finally:
87+
os.close(fd)
88+
89+
return temp_path
90+
91+
@classmethod
92+
def from_config(cls, config: Dict) -> Optional["ProxyConfig"]:
93+
"""Create ProxyConfig from connector configuration."""
94+
proxy_section = config.get("proxy", {})
95+
96+
if not proxy_section.get("enabled", False):
97+
http_proxy = os.getenv("HTTP_PROXY") or os.getenv("http_proxy")
98+
https_proxy = os.getenv("HTTPS_PROXY") or os.getenv("https_proxy")
99+
no_proxy = os.getenv("NO_PROXY") or os.getenv("no_proxy")
100+
101+
if http_proxy or https_proxy:
102+
return cls(
103+
enabled=True,
104+
http_proxy=http_proxy,
105+
https_proxy=https_proxy,
106+
no_proxy=no_proxy,
107+
)
108+
return None
109+
110+
return cls(
111+
enabled=proxy_section.get("enabled", False),
112+
http_proxy=proxy_section.get("http_proxy"),
113+
https_proxy=proxy_section.get("https_proxy"),
114+
no_proxy=proxy_section.get("no_proxy"),
115+
proxy_username=proxy_section.get("proxy_username"),
116+
proxy_password=proxy_section.get("proxy_password"),
117+
verify_ssl=proxy_section.get("verify_ssl", True),
118+
ca_certificate=proxy_section.get("ca_certificate"),
119+
client_certificate=proxy_section.get("client_certificate"),
120+
client_key=proxy_section.get("client_key"),
121+
)

0 commit comments

Comments
 (0)