Skip to content

Commit 92b6229

Browse files
committed
Change content type to an array in info object schema (#7193, #7675)
1 parent c38e5e8 commit 92b6229

File tree

4 files changed

+101
-8
lines changed

4 files changed

+101
-8
lines changed

docs/mirror.rst

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,9 @@ hexadecimal form of a hash of the corresponding file object's content and
6767
algorithm used to derive that hash. The content of an ``info`` object is JSON of
6868
the form ``{"$schema":"…", "content-type":…}``.
6969

70-
The ``content-type`` property contains the content type of the file, as defined
71-
for the HTTP response header of the same name [4]_.
70+
The ``content-type`` property contains a lost of all content types known to be
71+
associated with the file, as defined for the HTTP response header of the same
72+
name [4]_.
7273

7374
.. [4] https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/Content-Type
7475

src/azul/indexer/mirror_service.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
ABCMeta,
33
abstractmethod,
44
)
5+
import bisect
56
from functools import (
67
singledispatchmethod,
78
)
@@ -46,6 +47,9 @@
4647
from azul.auth import (
4748
Authentication,
4849
)
50+
from azul.collections import (
51+
alist,
52+
)
4953
from azul.deployment import (
5054
aws,
5155
)
@@ -466,11 +470,9 @@ def _get_info(self, file: File) -> JSON | None:
466470
return None
467471
else:
468472
json_content = json.loads(content)
469-
content_type = json_content['content-type']
470-
if content_type != file.content_type:
471-
# FIXME: Content type in mirror info objects inconsistent with index
472-
# https://github.com/DataBiosphere/azul/issues/7193
473-
log.warning('Conflicting content type %r for file %r', content_type, file)
473+
content_types = json_content['content-type']
474+
if file.content_type not in content_types:
475+
log.warning('Conflicting content type %r for file %r', content_types, file)
474476
return json_content
475477

476478
info_prefix, file_prefix = 'info', 'file'
@@ -597,6 +599,7 @@ def _(self, a: MirrorFileAction) -> Iterator[MirrorAction]:
597599
assert a.file.size is not None, R('File size unknown', a.file)
598600
if self.info_exists(a.file):
599601
log.info('File is already mirrored, skipping upload: %r', a.file)
602+
self.update_content_type(a.file)
600603
elif self.file_exists(a.file):
601604
assert False, R('File object is already present', a.file)
602605
else:
@@ -689,10 +692,25 @@ def _(self, a: FinalizeFileAction) -> Iterator[MirrorAction]:
689692

690693
def _info(self, file: File) -> JSON:
691694
return {
692-
'content-type': file.content_type,
695+
'content-type': alist(file.content_type),
693696
'$schema': str(self._schema_url_func(schema_name='info', version=1))
694697
}
695698

699+
def update_content_type(self, file: File):
700+
content_type = file.content_type
701+
702+
def update(data: bytes) -> bytes:
703+
json_content = json.loads(data)
704+
content_types = json_content['content-type']
705+
if content_type is not None:
706+
index = bisect.bisect_left(content_types, content_type)
707+
if index == len(content_types) or content_types[index] != content_type:
708+
content_types.insert(index, content_type)
709+
return json.dumps(json_content).encode()
710+
711+
key = self._info_object_key(file)
712+
self._storage.update(key, update)
713+
696714
def _put_info(self, file: File):
697715
object_key = self._info_object_key(file)
698716
info = self._info(file)

src/azul/service/storage_service.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
)
1919
import time
2020
from typing import (
21+
Callable,
2122
Collection,
2223
IO,
2324
TYPE_CHECKING,
@@ -115,6 +116,7 @@ def put_object(self,
115116
data: bytes,
116117
content_type: str | None = None,
117118
tagging: Tagging | None = None,
119+
etag: str | None = None,
118120
overwrite: bool = True):
119121
try:
120122
request: PutObjectRequestTypeDef
@@ -123,12 +125,55 @@ def put_object(self,
123125
request['ContentType'] = content_type
124126
if tagging is not None:
125127
request['Tagging'] = urlencode(tagging)
128+
if etag is not None:
129+
request['IfMatch'] = etag
126130
if overwrite is False:
127131
request['IfNoneMatch'] = '*'
128132
self._s3.put_object(**request)
129133
except botocore.exceptions.ClientError as e:
130134
self._handle_overwrite(e, object_key)
131135

136+
def update(self,
137+
object_key: str,
138+
updater: Callable[[bytes], bytes],
139+
max_attempts: int = 10
140+
) -> bool:
141+
"""
142+
Attempt to update the contents of an object based on its existing
143+
contents using conditional writes. Expects a callback that returns the
144+
desired contents of the object given its current contents. If the
145+
callback ever returns its argument unchanged, no further writes will be
146+
attempted and this method will return False. It returns True if a write
147+
succeeds, and raises StorageObjectNotFound if the object does not exist
148+
at any point during the update.
149+
"""
150+
for i in range(1, max_attempts + 1):
151+
response = self._get(object_key)
152+
etag = response['ETag']
153+
data = response['Body'].read()
154+
new_data = updater(data)
155+
if new_data == data:
156+
log.info('Update of %r canceled during attempt #%r/%r.',
157+
object_key, i, max_attempts)
158+
return False
159+
else:
160+
try:
161+
self.put_object(object_key=object_key, data=new_data, etag=etag)
162+
except botocore.exceptions.ClientError as e:
163+
error = e.response['Error']
164+
code, condition = error['Code'], error.get('Condition')
165+
if code == 'PreconditionFailed' and condition == 'If-Match':
166+
log.info('Conflict during attempt #%r/%r of updating %r from %r to %r',
167+
i, max_attempts, object_key)
168+
if i == max_attempts:
169+
raise
170+
else:
171+
raise
172+
else:
173+
log.info('Update of %r succeeded after %r attempts', object_key, i)
174+
return True
175+
assert False
176+
132177
def delete_objects(self,
133178
object_keys: Collection[str],
134179
batch_size: int = 1000

test/indexer/test_mirror_controller.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ def test_mirroring(self):
115115
with self.subTest('mirror_file', corrupted=False, info_exists=False, file_exists=False):
116116
self._test_mirror_file(file, file_message)
117117

118+
with self.subTest('mirror_file', corrupted=False, info_exists=True, file_exists=True):
119+
self._test_content_type_update(file, file_message)
120+
118121
self._s3.delete_object(Bucket=self.mirror_bucket,
119122
Key=self.service._info_object_key(file))
120123

@@ -204,6 +207,32 @@ def _test_reuploaded_file(self, file_message):
204207
self.assertTrue(R.caused(e.exception))
205208
self.assertEqual(e.exception.args[0].args[0], 'File object is already present')
206209

210+
def _test_content_type_update(self, file, file_message):
211+
for content_type in [
212+
'application/octet-stream',
213+
'application/octet-stream',
214+
'text/csv; charset="utf-8"',
215+
]:
216+
changed_message = {
217+
**file_message,
218+
'file': attrs.evolve(file, content_type=content_type).to_json()
219+
}
220+
old_content_types = self._assertContentType(file)
221+
event = self._mirror_event(changed_message)
222+
self.mirror_controller.mirror(event)
223+
new_content_types = self._assertContentType(file)
224+
if content_type in old_content_types:
225+
self.assertEqual(old_content_types, new_content_types)
226+
else:
227+
self.assertIn(content_type, new_content_types)
228+
229+
def _assertContentType(self, file) -> list[str]:
230+
info = self.file_service._get_info(file)
231+
content_types = info['content-type']
232+
self.assertIsInstance(content_types, list)
233+
self.assertEqual(sorted(set(content_types)), content_types)
234+
return content_types
235+
207236
def test_info_schema(self):
208237
client = http_client(log)
209238
file = MagicMock(content_type='text/plain')

0 commit comments

Comments
 (0)