Skip to content

Commit 9c711cd

Browse files
author
basil-shuman
committed
feat userver: add mutlipart methods support for testsuite s3api plugin
commit_hash:8846c5dddd8f795c73d2c57e611c3dbc61d5148f
1 parent 6fa227f commit 9c711cd

File tree

7 files changed

+560
-1
lines changed

7 files changed

+560
-1
lines changed

.mapping.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4814,6 +4814,9 @@
48144814
"testsuite/tests/metrics/CMakeLists.txt":"taxi/uservices/userver/testsuite/tests/metrics/CMakeLists.txt",
48154815
"testsuite/tests/metrics/conftest.py":"taxi/uservices/userver/testsuite/tests/metrics/conftest.py",
48164816
"testsuite/tests/metrics/test_metrics.py":"taxi/uservices/userver/testsuite/tests/metrics/test_metrics.py",
4817+
"testsuite/tests/s3api/CMakeLists.txt":"taxi/uservices/userver/testsuite/tests/s3api/CMakeLists.txt",
4818+
"testsuite/tests/s3api/conftest.py":"taxi/uservices/userver/testsuite/tests/s3api/conftest.py",
4819+
"testsuite/tests/s3api/test_multipart_upload.py":"taxi/uservices/userver/testsuite/tests/s3api/test_multipart_upload.py",
48174820
"testsuite/tests/sql_coverage_empty/CMakeLists.txt":"taxi/uservices/userver/testsuite/tests/sql_coverage_empty/CMakeLists.txt",
48184821
"testsuite/tests/sql_coverage_empty/conftest.py":"taxi/uservices/userver/testsuite/tests/sql_coverage_empty/conftest.py",
48194822
"testsuite/tests/sql_coverage_empty/test_coverage_empty.py":"taxi/uservices/userver/testsuite/tests/sql_coverage_empty/test_coverage_empty.py",

testsuite/pytest_plugins/pytest_userver/plugins/s3api.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,23 @@ def _mock_all(request):
3434
return mock_impl.get_object(request)
3535

3636
if request.method == 'PUT':
37+
if 'uploadId' in request.query:
38+
if 'partNumber' in request.query:
39+
return mock_impl.upload_part(request)
40+
return mockserver.make_response('Unknown or unsupported method', 404)
3741
if request.headers.get('x-amz-copy-source', None):
3842
return mock_impl.copy_object(request)
3943
return mock_impl.put_object(request)
4044

45+
if request.method == 'POST':
46+
if 'uploadId' in request.query:
47+
return mock_impl.complete_multipart_upload(request)
48+
if 'uploads' in request.query:
49+
return mock_impl.create_multipart_upload(request)
50+
4151
if request.method == 'DELETE':
52+
if 'uploadId' in request.query:
53+
return mock_impl.abort_multipart_upload(request)
4254
return mock_impl.delete_object(request)
4355

4456
if request.method == 'HEAD':

testsuite/pytest_plugins/pytest_userver/s3api.py

Lines changed: 297 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,167 @@
1+
import collections
12
from collections.abc import Mapping
3+
from collections.abc import MutableMapping
24
import dataclasses
35
import datetime as dt
46
import hashlib
7+
import os
58
import pathlib
69
import sys
10+
import xml.etree.ElementTree
711

812
import dateutil.tz as tz
913

1014

15+
class _S3NoSuchUploadError(Exception):
16+
code = 'NoSuchUpload'
17+
message = 'The specified multipart upload does not exist.'
18+
19+
def __str__(self):
20+
return _S3NoSuchUploadError.message
21+
22+
23+
class _S3InvalidPartError(Exception):
24+
code = 'InvalidPart'
25+
message = (
26+
'One or more of the specified parts could not be found.'
27+
' The part might not have been uploaded, or the specified'
28+
" ETag might not have matched the uploaded part's ETag."
29+
)
30+
31+
def __str__(self):
32+
return _S3InvalidPartError.message
33+
34+
35+
class _S3InvalidPartOrderError(Exception):
36+
code = 'InvalidPartOrder'
37+
message = 'The list of parts was not in ascending order. The parts list must be specified in order by part number.'
38+
39+
def __str__(self):
40+
return _S3InvalidPartOrderError.message
41+
42+
43+
class _S3EntityTooSmallError(Exception):
44+
code = 'EntityTooSmall'
45+
message = 'Your proposed upload is smaller than the minimum allowed object size.'
46+
47+
def __str__(self):
48+
return _S3EntityTooSmallError.message
49+
50+
51+
class _S3ClientError(Exception):
52+
def __init__(self, msg: str):
53+
self._msg = msg
54+
55+
def __str__(self):
56+
return self._msg
57+
58+
59+
@dataclasses.dataclass
60+
class _S3UploadPart:
61+
data: bytearray
62+
meta: Mapping[str, str]
63+
64+
65+
@dataclasses.dataclass
66+
class _S3Upload:
67+
parts: MutableMapping[int, _S3UploadPart]
68+
meta: Mapping[str, str]
69+
70+
71+
@dataclasses.dataclass
72+
class _S3BucketUploadStorage:
73+
def __init__(self):
74+
self._storage: dict[str, _S3Upload] = {}
75+
76+
@staticmethod
77+
def _generate_upload_id():
78+
return os.urandom(15).hex()
79+
80+
@staticmethod
81+
def _generate_etag(data):
82+
return hashlib.md5(data).hexdigest()
83+
84+
def create_multipart_upload(self, key: str, user_defined_meta: Mapping[str, str] | None = None):
85+
key_path = pathlib.Path(key)
86+
upload_id = _S3BucketUploadStorage._generate_upload_id()
87+
88+
upload_meta = {
89+
'Key': str(key_path),
90+
'UploadId': upload_id,
91+
}
92+
93+
if user_defined_meta:
94+
upload_meta.update(user_defined_meta)
95+
96+
self._storage[upload_id] = _S3Upload(parts={}, meta=upload_meta)
97+
return upload_meta
98+
99+
def abort_multipart_uplod(self, key: str, upload_id: str):
100+
key_path = pathlib.Path(key)
101+
upload = self._storage.get(upload_id)
102+
if not upload or upload.meta['Key'] != str(key_path):
103+
raise _S3NoSuchUploadError()
104+
return self._storage.pop(upload_id)
105+
106+
def upload_part(
107+
self,
108+
key: str,
109+
upload_id: str,
110+
part_number: int,
111+
data: bytearray,
112+
last_modified: dt.datetime | str | None = None,
113+
):
114+
if part_number < 1 or part_number > 10000:
115+
raise _S3ClientError('partNumber value is expected to be between 1 and 10000')
116+
117+
key_path = pathlib.Path(key)
118+
upload = self._storage.get(upload_id)
119+
if not upload or upload.meta['Key'] != str(key_path):
120+
raise _S3NoSuchUploadError()
121+
122+
if last_modified is None:
123+
# Timezone is needed for RFC 3339 timeformat used by S3
124+
last_modified = dt.datetime.now().replace(tzinfo=tz.tzlocal()).isoformat()
125+
elif isinstance(last_modified, dt.datetime):
126+
last_modified = last_modified.isoformat()
127+
128+
meta = {
129+
'ETag': self._generate_etag(data),
130+
'Last-Modified': last_modified,
131+
'Size': str(sys.getsizeof(data)),
132+
}
133+
134+
new_part = _S3UploadPart(data, meta)
135+
upload.parts[part_number] = new_part
136+
return new_part
137+
138+
def complete_multipart_upload(self, key: str, upload_id: str, parts_to_complete: list):
139+
key_path = pathlib.Path(key)
140+
upload = self._storage.get(upload_id)
141+
142+
if not upload or upload.meta['Key'] != str(key_path):
143+
raise _S3NoSuchUploadError()
144+
145+
uploaded_parts = sorted(
146+
({'PartNumber': part_number, 'ETag': info.meta['ETag']} for part_number, info in upload.parts.items()),
147+
key=lambda item: item['PartNumber'],
148+
)
149+
if uploaded_parts != parts_to_complete:
150+
raise _S3InvalidPartOrderError()
151+
152+
merged_data = bytearray()
153+
for part in parts_to_complete:
154+
part_number = part['PartNumber']
155+
uploded_part = upload.parts[part_number]
156+
merged_data += uploded_part.data
157+
158+
if not merged_data:
159+
raise _S3EntityTooSmallError()
160+
161+
self._storage.pop(upload_id)
162+
return {'Data': merged_data, 'Upload': upload}
163+
164+
11165
@dataclasses.dataclass
12166
class S3Object:
13167
data: bytearray
@@ -70,10 +224,13 @@ def delete_object(self, key) -> S3Object | None:
70224

71225

72226
class S3HandleMock:
227+
_s3_xml_nss = {'s3': 'http://s3.amazonaws.com/doc/2006-03-01/'}
228+
73229
def __init__(self, mockserver, s3_mock_storage, mock_base_url):
74230
self._mockserver = mockserver
75231
self._base_url = mock_base_url
76232
self._storage = s3_mock_storage
233+
self._uploads = collections.defaultdict(_S3BucketUploadStorage)
77234

78235
def _get_bucket_name(self, request):
79236
return request.headers['Host'].split('.')[0]
@@ -137,6 +294,39 @@ def _generate_get_objects_xml(
137294
</ListBucketResult>
138295
"""
139296

297+
@staticmethod
298+
def _generate_error_response_xml(code: str, message: str, resource: str):
299+
return (
300+
'<?xml version="1.0" encoding="UTF-8"?>'
301+
'<Error>'
302+
f'<Code>{code}</Code>'
303+
f'<Message>{message}</Message>'
304+
f'<Resource>{resource}</Resource>'
305+
f'<RequestId>{os.urandom(15).hex()}</RequestId>'
306+
'</Error>'
307+
)
308+
309+
@staticmethod
310+
def _parse_complete_multipart_xml_body(request_body: str):
311+
xml_root_node = xml.etree.ElementTree.fromstring(request_body)
312+
if xml_root_node is None or xml_root_node.tag != f'{{{S3HandleMock._s3_xml_nss["s3"]}}}CompleteMultipartUpload':
313+
raise _S3ClientError('missing CompleteMultipartUpload in request body')
314+
315+
parts_to_complete = []
316+
for xml_part in xml_root_node.findall('s3:Part', S3HandleMock._s3_xml_nss):
317+
xml_part_number = xml_part.find('s3:PartNumber', S3HandleMock._s3_xml_nss)
318+
if xml_part_number is None or not xml_part_number.text:
319+
raise _S3ClientError('missing CompleteMultipartUpload.Part.PartNumber')
320+
part_number_value = int(xml_part_number.text)
321+
322+
xml_etag = xml_part.find('s3:ETag', S3HandleMock._s3_xml_nss)
323+
if xml_etag is None or not xml_etag.text:
324+
raise _S3ClientError('missing CompleteMultipartUpload.Part.ETag')
325+
326+
parts_to_complete.append({'ETag': xml_etag.text, 'PartNumber': part_number_value})
327+
328+
return parts_to_complete
329+
140330
def get_object(self, request):
141331
key = self._extract_key(request)
142332

@@ -161,7 +351,7 @@ def put_object(self, request):
161351
user_defined_meta = {}
162352
for meta_key, meta_value in request.headers.items():
163353
# https://docs.amazonaws.cn/en_us/AmazonS3/latest/userguide/UsingMetadata.html
164-
if meta_key.startswith('x-amz-meta-'):
354+
if meta_key.startswith('x-amz-meta-') or meta_key in ['Content-Type', 'Content-Disposition']:
165355
user_defined_meta[meta_key] = meta_value
166356

167357
meta = bucket_storage.put_object(key, data, user_defined_meta)
@@ -234,3 +424,109 @@ def get_object_head(self, request):
234424
200,
235425
headers=s3_object.meta,
236426
)
427+
428+
def create_multipart_upload(self, request):
429+
key = self._extract_key(request)
430+
bucket_name = self._get_bucket_name(request)
431+
bucket_uploads = self._uploads[bucket_name]
432+
433+
user_defined_meta = {}
434+
for meta_key, meta_value in request.headers.items():
435+
# https://docs.amazonaws.cn/en_us/AmazonS3/latest/userguide/UsingMetadata.html
436+
if meta_key.startswith('x-amz-meta-') or meta_key in ['Content-Type', 'Content-Disposition']:
437+
user_defined_meta[meta_key] = meta_value
438+
439+
meta = bucket_uploads.create_multipart_upload(key, user_defined_meta)
440+
response_body = (
441+
'<?xml version="1.0" encoding="UTF-8"?>'
442+
'<InitiateMultipartUploadResult>'
443+
f'<Bucket>{bucket_name}</Bucket>'
444+
f'<Key>{key}</Key>'
445+
f'<UploadId>{meta["UploadId"]}</UploadId>'
446+
'</InitiateMultipartUploadResult>'
447+
)
448+
# Some clients like AWS SDK for C++ parse not empty body as XML
449+
return self._mockserver.make_response(response_body, 200)
450+
451+
def abort_multipart_upload(self, request):
452+
key = self._extract_key(request)
453+
upload_id = request.query['uploadId']
454+
bucket_uploads = self._uploads[self._get_bucket_name(request)]
455+
try:
456+
bucket_uploads.abort_multipart_uplod(key, upload_id)
457+
except _S3NoSuchUploadError as exc:
458+
# https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
459+
# #API_AbortMultipartUpload_Errors
460+
response_body = S3HandleMock._generate_error_response_xml(
461+
exc.code, exc.message, f'{request.path}?uploadId={upload_id}'
462+
)
463+
return self._mockserver.make_response(response_body, 404)
464+
465+
# Some clients like AWS SDK for C++ parse not empty body as XML
466+
return self._mockserver.make_response('', 204)
467+
468+
def upload_part(self, request):
469+
key = self._extract_key(request)
470+
bucket_name = self._get_bucket_name(request)
471+
upload_id = request.query['uploadId']
472+
part_number = int(request.query['partNumber'])
473+
bucket_uploads = self._uploads[bucket_name]
474+
data = request.get_data()
475+
try:
476+
upload_part = bucket_uploads.upload_part(key, upload_id, part_number, data)
477+
except _S3ClientError as exc:
478+
return self._mockserver.make_response(str(exc), 400)
479+
except _S3NoSuchUploadError as exc:
480+
# https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html
481+
response_body = S3HandleMock._generate_error_response_xml(
482+
exc.code,
483+
exc.message,
484+
f'{request.path}?uploadId={upload_id}',
485+
)
486+
return self._mockserver.make_response(response_body, 404)
487+
488+
return self._mockserver.make_response(status=200, headers={'ETag': upload_part.meta['ETag']})
489+
490+
def complete_multipart_upload(self, request):
491+
key = self._extract_key(request)
492+
bucket_name = self._get_bucket_name(request)
493+
bucket_uploads = self._uploads[bucket_name]
494+
bucket_storage = self._storage[bucket_name]
495+
upload_id = request.query['uploadId']
496+
try:
497+
parts_to_complete = S3HandleMock._parse_complete_multipart_xml_body(request.get_data().decode())
498+
completed_upload = bucket_uploads.complete_multipart_upload(key, upload_id, parts_to_complete)
499+
except _S3NoSuchUploadError as exc:
500+
# https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
501+
response_body = S3HandleMock._generate_error_response_xml(
502+
exc.code,
503+
exc.message,
504+
f'{request.path}?uploadId={upload_id}',
505+
)
506+
return self._mockserver.make_response(response_body, 404)
507+
except (_S3InvalidPartError, _S3InvalidPartOrderError, _S3EntityTooSmallError) as exc:
508+
# https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
509+
response_body = S3HandleMock._generate_error_response_xml(
510+
exc.code,
511+
exc.message,
512+
f'{request.path}?uploadId={upload_id}',
513+
)
514+
return self._mockserver.make_response(response_body, 400)
515+
except _S3ClientError as exc:
516+
return self._mockserver.make_response(str(exc), 400)
517+
518+
meta = bucket_storage.put_object(key, completed_upload['Data'], completed_upload['Upload'].meta)
519+
response_body = (
520+
'<?xml version="1.0" encoding="UTF-8"?>'
521+
'<CompleteMultipartUploadResult>'
522+
f'<Location>{request.path}</Location>'
523+
f'<Bucket>{bucket_name}</Bucket>'
524+
f'<Key>{key}</Key>'
525+
f'<ETag>{meta["ETag"]}</ETag>'
526+
'</CompleteMultipartUploadResult>'
527+
)
528+
return self._mockserver.make_response(
529+
response_body,
530+
status=200,
531+
headers=meta,
532+
)

testsuite/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ project(userver-testsuite-tests)
33
add_subdirectory(chaos)
44
add_subdirectory(config)
55
add_subdirectory(metrics)
6+
# add_subdirectory(s3api) TODO: TAXICOMMON-10381
67
add_subdirectory(sql_coverage_empty)
78
add_subdirectory(tskv)
89
# add_subdirectory(sql_coverage_full) TODO: TAXICOMMON-10381
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
add_test(
2+
NAME "${PROJECT_NAME}-s3api"
3+
COMMAND "${TESTSUITE_PYTHON_BINARY}" -m pytest ${CMAKE_CURRENT_SOURCE_DIR}
4+
WORKING_DIRECTORY "${USERVER_ROOT_DIR}/testsuite/pytest_plugins"
5+
)
6+
7+
set_tests_properties("${PROJECT_NAME}-s3api" PROPERTIES TIMEOUT 600)

testsuite/tests/s3api/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pytest_plugins = ['pytest_userver.plugins.core', 'pytest_userver.plugins.s3api']

0 commit comments

Comments
 (0)