Skip to content

Commit 704a157

Browse files
committed
RDISCROWD-8392 upgrade to boto3
1 parent f3d63e8 commit 704a157

File tree

9 files changed

+473
-265
lines changed

9 files changed

+473
-265
lines changed
Lines changed: 9 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,15 @@
1-
from copy import deepcopy
2-
import ssl
3-
import sys
4-
import time
5-
61
from flask import current_app
7-
from boto.auth_handler import AuthHandler
8-
import boto.auth
9-
10-
from boto.exception import S3ResponseError
11-
from boto.s3.key import Key
12-
from boto.s3.bucket import Bucket
13-
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
14-
from boto.provider import Provider
15-
import jwt
2+
from botocore.config import Config
3+
import boto3
164
from werkzeug.exceptions import BadRequest
175
from boto3.session import Session
186
from botocore.client import Config
197
from pybossa.cloud_store_api.base_conn import BaseConnection
208
from os import environ
219

10+
from pybossa.cloud_store_api.proxied_s3_client import ProxiedS3Client
11+
from pybossa.cloud_store_api.s3_client_wrapper import S3ClientWrapper
12+
2213

2314
def check_store(store):
2415
if not store:
@@ -60,59 +51,13 @@ def create_connection(**kwargs):
6051
proxy_url=kwargs.get("proxy_url")
6152
)
6253
if 'object_service' in kwargs:
63-
current_app.logger.info("Calling ProxiedConnection")
64-
conn = ProxiedConnection(**kwargs)
54+
current_app.logger.info("Calling ProxiedS3Client")
55+
conn = ProxiedS3Client(**kwargs)
6556
else:
66-
current_app.logger.info("Calling CustomConnection")
67-
conn = CustomConnection(**kwargs)
57+
current_app.logger.info("Calling S3ClientWrapper")
58+
conn = S3ClientWrapper(**kwargs)
6859
return conn
6960

70-
71-
class CustomProvider(Provider):
72-
"""Extend Provider to carry information about the end service provider, in
73-
case the service is being proxied.
74-
"""
75-
76-
def __init__(self, name, access_key=None, secret_key=None,
77-
security_token=None, profile_name=None, object_service=None,
78-
auth_headers=None):
79-
self.object_service = object_service or name
80-
self.auth_headers = auth_headers
81-
super(CustomProvider, self).__init__(name, access_key, secret_key,
82-
security_token, profile_name)
83-
84-
85-
class CustomConnection(S3Connection):
86-
87-
def __init__(self, *args, **kwargs):
88-
if not kwargs.get('calling_format'):
89-
kwargs['calling_format'] = OrdinaryCallingFormat()
90-
91-
kwargs['provider'] = CustomProvider('aws',
92-
kwargs.get('aws_access_key_id'),
93-
kwargs.get('aws_secret_access_key'),
94-
kwargs.get('security_token'),
95-
kwargs.get('profile_name'),
96-
kwargs.pop('object_service', None),
97-
kwargs.pop('auth_headers', None))
98-
99-
kwargs['bucket_class'] = CustomBucket
100-
101-
ssl_no_verify = kwargs.pop('s3_ssl_no_verify', False)
102-
self.host_suffix = kwargs.pop('host_suffix', '')
103-
104-
super(CustomConnection, self).__init__(*args, **kwargs)
105-
106-
if kwargs.get('is_secure', True) and ssl_no_verify:
107-
self.https_validate_certificates = False
108-
context = ssl._create_unverified_context()
109-
self.http_connection_kwargs['context'] = context
110-
111-
def get_path(self, path='/', *args, **kwargs):
112-
ret = super(CustomConnection, self).get_path(path, *args, **kwargs)
113-
return self.host_suffix + ret
114-
115-
11661
class CustomConnectionV2(BaseConnection):
11762
def __init__(
11863
self,
@@ -133,89 +78,3 @@ def __init__(
13378
proxies={"https": proxy_url, "http": proxy_url},
13479
),
13580
)
136-
137-
138-
class CustomBucket(Bucket):
139-
"""Handle both 200 and 204 as response code"""
140-
141-
def delete_key(self, *args, **kwargs):
142-
try:
143-
super(CustomBucket, self).delete_key(*args, **kwargs)
144-
except S3ResponseError as e:
145-
if e.status != 200:
146-
raise
147-
148-
149-
class ProxiedKey(Key):
150-
151-
def should_retry(self, response, chunked_transfer=False):
152-
if 200 <= response.status <= 299:
153-
return True
154-
return super(ProxiedKey, self).should_retry(response, chunked_transfer)
155-
156-
157-
class ProxiedBucket(CustomBucket):
158-
159-
def __init__(self, *args, **kwargs):
160-
super(ProxiedBucket, self).__init__(*args, **kwargs)
161-
self.set_key_class(ProxiedKey)
162-
163-
164-
class ProxiedConnection(CustomConnection):
165-
"""Object Store connection through proxy API. Sets the proper headers and
166-
creates the jwt; use the appropriate Bucket and Key classes.
167-
"""
168-
169-
def __init__(self, client_id, client_secret, object_service, *args, **kwargs):
170-
self.client_id = client_id
171-
self.client_secret = client_secret
172-
kwargs['object_service'] = object_service
173-
super(ProxiedConnection, self).__init__(*args, **kwargs)
174-
self.set_bucket_class(ProxiedBucket)
175-
176-
def make_request(self, method, bucket='', key='', headers=None, data='',
177-
query_args=None, sender=None, override_num_retries=None,
178-
retry_handler=None):
179-
headers = headers or {}
180-
headers['jwt'] = self.create_jwt(method, self.host, bucket, key)
181-
headers['x-objectservice-id'] = self.provider.object_service.upper()
182-
current_app.logger.info("Calling ProxiedConnection.make_request. headers %s", str(headers))
183-
return super(ProxiedConnection, self).make_request(method, bucket, key,
184-
headers, data, query_args, sender, override_num_retries,
185-
retry_handler)
186-
187-
def create_jwt(self, method, host, bucket, key):
188-
now = int(time.time())
189-
path = self.get_path(self.calling_format.build_path_base(bucket, key))
190-
current_app.logger.info("create_jwt called. method %s, host %s, bucket %s, key %s, path %s", method, host, str(bucket), str(key), str(path))
191-
payload = {
192-
'iat': now,
193-
'nbf': now,
194-
'exp': now + 300,
195-
'method': method,
196-
'iss': self.client_id,
197-
'host': host,
198-
'path': path,
199-
'region': 'ny'
200-
}
201-
return jwt.encode(payload, self.client_secret, algorithm='HS256')
202-
203-
204-
class CustomAuthHandler(AuthHandler):
205-
"""Implements sending of custom auth headers"""
206-
207-
capability = ['s3']
208-
209-
def __init__(self, host, config, provider):
210-
if not provider.auth_headers:
211-
raise boto.auth_handler.NotReadyToAuthenticate()
212-
self._provider = provider
213-
super(CustomAuthHandler, self).__init__(host, config, provider)
214-
215-
def add_auth(self, http_request, **kwargs):
216-
headers = http_request.headers
217-
for header, attr in self._provider.auth_headers:
218-
headers[header] = getattr(self._provider, attr)
219-
220-
def sign_string(self, *args, **kwargs):
221-
return ''
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
from typing import Optional, Dict
2+
from urllib.parse import urlsplit, urlunsplit
3+
from botocore.exceptions import ClientError
4+
from botocore.config import Config
5+
import boto3
6+
import time
7+
import jwt
8+
import logging
9+
10+
11+
class ProxiedS3Client:
12+
"""
13+
Emulates the old ProxiedConnection/ProxiedBucket/ProxiedKey behavior using boto3.
14+
15+
Features:
16+
- Path-style addressing (OrdinaryCallingFormat equivalent)
17+
- Optional SSL verification disable
18+
- host_suffix is prepended to every request path (like get_path() override)
19+
- Per-request JWT header and x-objectservice-id header
20+
- Delete tolerant of HTTP 200 and 204
21+
"""
22+
23+
def __init__(
24+
self,
25+
*,
26+
client_id: str,
27+
client_secret: str,
28+
object_service: str,
29+
region_claim: str = "ny", # value used in the JWT "region" claim
30+
host_suffix: str = "", # prepended to every request path
31+
extra_headers: Optional[Dict[str, str]] = None, # any additional headers to inject
32+
endpoint_url: Optional[str] = None,
33+
region_name: Optional[str] = None,
34+
profile_name: Optional[str] = None,
35+
aws_access_key_id: Optional[str] = None,
36+
aws_secret_access_key: Optional[str] = None,
37+
aws_session_token: Optional[str] = None,
38+
s3_ssl_no_verify: bool = False,
39+
# optional logger with .info(...)
40+
logger: Optional[logging.Logger] = None,
41+
):
42+
self.client_id = client_id
43+
self.client_secret = client_secret
44+
self.object_service = object_service
45+
self.region_claim = region_claim
46+
self.host_suffix = host_suffix or ""
47+
self.extra_headers = extra_headers or {}
48+
self.logger = logger
49+
50+
session = (
51+
boto3.session.Session(profile_name=profile_name)
52+
if profile_name else boto3.session.Session()
53+
)
54+
55+
config = Config(
56+
region_name=region_name,
57+
# OrdinaryCallingFormat equivalent
58+
s3={"addressing_style": "path"},
59+
)
60+
61+
verify = False if s3_ssl_no_verify else None # None -> default verify
62+
63+
self.client = session.client(
64+
"s3",
65+
aws_access_key_id=aws_access_key_id,
66+
aws_secret_access_key=aws_secret_access_key,
67+
aws_session_token=aws_session_token,
68+
endpoint_url=endpoint_url,
69+
config=config,
70+
verify=verify,
71+
)
72+
73+
# One hook to: (1) prefix path, (2) add headers, (3) attach JWT
74+
self.client.meta.events.register(
75+
"before-sign.s3", self._before_sign_hook)
76+
77+
# ---------------------------
78+
# Event hook: adjust request
79+
# ---------------------------
80+
def _before_sign_hook(self, request, operation_name, **kwargs):
81+
"""
82+
request: botocore.awsrequest.AWSRequest
83+
operation_name: e.g. "GetObject", "PutObject", etc.
84+
"""
85+
parts = urlsplit(request.url)
86+
87+
# 1) Prefix request path with host_suffix (if set)
88+
path = parts.path
89+
if self.host_suffix:
90+
path = (self.host_suffix.rstrip("/") + "/" +
91+
path.lstrip("/")).replace("//", "/")
92+
request.url = urlunsplit(
93+
(parts.scheme, parts.netloc, path, parts.query, parts.fragment))
94+
95+
# Recompute parts so host/path match the (possibly) updated URL
96+
parts = urlsplit(request.url)
97+
method = request.method
98+
host = parts.netloc
99+
path_for_jwt = parts.path # include the prefixed path exactly as sent
100+
101+
# 2) Build headers (x-objectservice-id + any extra)
102+
headers = dict(self.extra_headers)
103+
headers["x-objectservice-id"] = self.object_service.upper()
104+
105+
# 3) Add JWT header
106+
headers["jwt"] = self._create_jwt(method, host, path_for_jwt)
107+
108+
# Inject/override headers
109+
for k, v in headers.items():
110+
request.headers[k] = str(v)
111+
112+
if self.logger:
113+
self.logger.info(
114+
"ProxiedS3Client before-sign: op=%s method=%s host=%s path=%s headers=%s",
115+
operation_name, method, host, path_for_jwt, list(
116+
headers.keys())
117+
)
118+
119+
def _create_jwt(self, method: str, host: str, path: str) -> str:
120+
now = int(time.time())
121+
payload = {
122+
"iat": now,
123+
"nbf": now,
124+
"exp": now + 300, # 5 minutes
125+
"method": method,
126+
"iss": self.client_id,
127+
"host": host,
128+
"path": path,
129+
"region": self.region_claim,
130+
}
131+
token = jwt.encode(payload, self.client_secret, algorithm="HS256")
132+
# PyJWT may return bytes in older versions; ensure str
133+
return token if isinstance(token, str) else token.decode("utf-8")
134+
135+
# ---------------------------
136+
# Convenience helpers
137+
# ---------------------------
138+
def delete_key(self, bucket: str, key: str) -> bool:
139+
"""
140+
Delete object: accept HTTP 200 or 204 as success (mirrors CustomBucket).
141+
"""
142+
try:
143+
resp = self.client.delete_object(Bucket=bucket, Key=key)
144+
status = resp.get("ResponseMetadata", {}).get("HTTPStatusCode", 0)
145+
if status not in (200, 204):
146+
raise ClientError(
147+
{"Error": {"Code": str(status), "Message": "Unexpected status"},
148+
"ResponseMetadata": {"HTTPStatusCode": status}},
149+
operation_name="DeleteObject",
150+
)
151+
return True
152+
except ClientError:
153+
# Propagate non-success/delete errors
154+
raise
155+
156+
def get_object(self, bucket: str, key: str, **kwargs):
157+
return self.client.get_object(Bucket=bucket, Key=key, **kwargs)
158+
159+
def put_object(self, bucket: str, key: str, body, **kwargs):
160+
return self.client.put_object(Bucket=bucket, Key=key, Body=body, **kwargs)
161+
162+
def list_objects(self, bucket: str, prefix: str = "", **kwargs):
163+
return self.client.list_objects_v2(Bucket=bucket, Prefix=prefix, **kwargs)
164+
165+
def upload_file(self, filename: str, bucket: str, key: str, **kwargs):
166+
# Uses s3transfer under the hood (built-in retries/backoff)
167+
return self.client.upload_file(filename, bucket, key, ExtraArgs=kwargs or {})
168+
169+
def raw(self):
170+
"""Access the underlying boto3 client if you need operations not wrapped here."""
171+
return self.client

0 commit comments

Comments
 (0)