Skip to content

Commit 92954cc

Browse files
authored
Merge pull request #2 from blink1073/read-from-storage
INTPYTHON-614 Allow data be loaded by url in the object store
2 parents 741f730 + 919ec27 commit 92954cc

File tree

7 files changed

+212
-81
lines changed

7 files changed

+212
-81
lines changed

docs/examples.rst

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,38 @@ Combining Text and Images
9797
client.close()
9898
9999
100+
Loading Data from S3
101+
--------------------
102+
103+
If you already have data stored in S3, you can use an ``s3://`` url to load the image(s):
104+
105+
.. code-block:: python
106+
107+
import os
108+
from pymongo_voyageai import PyMongoVoyageAI
109+
110+
client = PyMongoVoyageAI(
111+
voyageai_api_key=os.environ["VOYAGEAI_API_KEY"],
112+
s3_bucket_name=os.environ["S3_BUCKET_NAME"],
113+
mongo_connection_string=os.environ["MONGODB_URI"],
114+
collection_name="test",
115+
database_name="test_db",
116+
)
117+
118+
query = "The consequences of a dictator's peace"
119+
url = "s3://my-bucket-name/readingcopy.pdf"
120+
images = client.url_to_images(url)
121+
resp = client.add_documents(images)
122+
client.wait_for_indexing()
123+
data = client.similarity_search(query, extract_images=True)
124+
125+
# We expect page 5 to be the best match.
126+
assert data[0]["inputs"][0].page_number == 5
127+
assert len(client.get_by_ids([d["_id"] for d in resp])) == len(resp)
128+
client.delete_by_ids([d["_id"] for d in resp])
129+
client.close()
130+
131+
100132
Using Async API
101133
---------------
102134

pymongo_voyageai/client.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import io
34
import logging
45
from collections.abc import Mapping, Sequence
56
from time import monotonic, sleep
@@ -206,7 +207,19 @@ def image_to_storage(self, document: ImageDocument | Image.Image) -> StoredDocum
206207
"""
207208
if isinstance(document, Image.Image):
208209
document = ImageDocument(image=document)
209-
return self._storage.save_image(document)
210+
object_name = f"{ObjectId()}.png"
211+
fd = io.BytesIO()
212+
document.image.save(fd, "png")
213+
fd.seek(0)
214+
self._storage.save_data(fd, object_name)
215+
return StoredDocument(
216+
root_location=self._storage.root_location,
217+
object_name=object_name,
218+
page_number=document.page_number,
219+
source_url=document.source_url,
220+
name=document.name,
221+
metadata=document.metadata,
222+
)
210223

211224
async def aimage_to_storage(self, document: ImageDocument | Image.Image) -> StoredDocument:
212225
"""Convert an image to a stored document.
@@ -232,7 +245,15 @@ def storage_to_image(self, document: StoredDocument | str) -> ImageDocument:
232245
document = StoredDocument(
233246
root_location=self._storage.root_location, object_name=document
234247
)
235-
return self._storage.load_image(document=document)
248+
buffer = self._storage.read_data(document.object_name)
249+
image = Image.open(buffer)
250+
return ImageDocument(
251+
image=image,
252+
source_url=document.source_url,
253+
page_number=document.page_number,
254+
metadata=document.metadata,
255+
name=document.name,
256+
)
236257

237258
async def astorage_to_image(self, document: StoredDocument | str) -> ImageDocument:
238259
"""Convert a stored document to an image document.
@@ -267,7 +288,13 @@ def url_to_images(
267288
A list of image document objects.
268289
"""
269290
return url_to_images(
270-
url, metadata=metadata, start=start, end=end, image_column=image_column, **kwargs
291+
url,
292+
storage=self._storage,
293+
metadata=metadata,
294+
start=start,
295+
end=end,
296+
image_column=image_column,
297+
**kwargs,
271298
)
272299

273300
async def aurl_to_images(
@@ -464,7 +491,7 @@ def delete_many(
464491
self._expand_doc(obj, False)
465492
for inp in obj["inputs"]:
466493
if isinstance(inp, StoredDocument):
467-
self._storage.delete_image(inp)
494+
self._storage.delete_data(inp.object_name)
468495
return self._coll.delete_many(filter=filter, **kwargs).acknowledged
469496

470497
async def adelete_many(

pymongo_voyageai/storage.py

Lines changed: 54 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -2,38 +2,43 @@
22

33
import boto3 # type:ignore[import-untyped]
44
import botocore # type:ignore[import-untyped]
5-
from bson import ObjectId
6-
from PIL import Image
7-
8-
from .document import ImageDocument, StoredDocument
95

106

117
class ObjectStorage:
12-
"""A class used store image documents."""
8+
"""A class used to store binary data."""
139

1410
root_location: str
15-
"""The root location to use in the object store."""
11+
"""The default root location to use in the object store."""
12+
13+
url_prefixes: list[str] | None
14+
"""The url prefixes used by the object store, for reading data from a url."""
1615

17-
def save_image(self, image: ImageDocument) -> StoredDocument:
18-
"""Save an image document to the object store."""
16+
def save_data(self, data: io.BytesIO, object_name: str) -> None:
17+
"""Save data to the object store."""
1918
raise NotImplementedError
2019

21-
def load_image(self, document: StoredDocument) -> ImageDocument:
22-
"""Load an image document from the object store."""
20+
def read_data(self, object_name: str) -> io.BytesIO:
21+
"""Read data from the object store."""
2322
raise NotImplementedError
2423

25-
def delete_image(self, document: StoredDocument) -> None:
26-
"""Remove an image document from the object store."""
24+
def load_url(self, url: str) -> io.BytesIO:
25+
"""Load data from a url."""
2726
raise NotImplementedError
2827

29-
def close(self) -> None:
30-
"""Close the object store."""
28+
def delete_data(self, object_name: str) -> None:
29+
"""Delete data from the object store."""
3130
raise NotImplementedError
3231

32+
def close(self):
33+
"""Close the object store."""
34+
pass
35+
3336

3437
class S3Storage(ObjectStorage):
3538
"""An object store using an S3 bucket."""
3639

40+
url_prefixes = ["s3://"]
41+
3742
def __init__(
3843
self,
3944
bucket_name: str,
@@ -50,35 +55,26 @@ def __init__(
5055
self.client = client or boto3.client("s3", region_name=region_name)
5156
self.root_location = bucket_name
5257

53-
def save_image(self, image: ImageDocument) -> StoredDocument:
54-
object_name = str(ObjectId())
55-
fd = io.BytesIO()
56-
image.image.save(fd, "png")
57-
fd.seek(0)
58-
self.client.upload_fileobj(fd, self.root_location, object_name)
59-
return StoredDocument(
60-
root_location=self.root_location,
61-
object_name=object_name,
62-
page_number=image.page_number,
63-
source_url=image.source_url,
64-
name=image.name,
65-
metadata=image.metadata,
66-
)
67-
68-
def load_image(self, document: StoredDocument) -> ImageDocument:
58+
def save_data(self, data: io.BytesIO, object_name: str) -> None:
59+
"""Save data to the object store."""
60+
self.client.upload_fileobj(data, self.root_location, object_name)
61+
62+
def read_data(self, object_name: str) -> io.BytesIO:
63+
"""Read data using the object store."""
64+
buffer = io.BytesIO()
65+
self.client.download_fileobj(self.root_location, object_name, buffer)
66+
return buffer
67+
68+
def load_url(self, url: str) -> io.BytesIO:
69+
"""Load data from a url."""
70+
bucket, _, object_name = url.replace("s3://", "").partition("/")
6971
buffer = io.BytesIO()
70-
self.client.download_fileobj(document.root_location, document.object_name, buffer)
71-
image = Image.open(buffer)
72-
return ImageDocument(
73-
image=image,
74-
source_url=document.source_url,
75-
page_number=document.page_number,
76-
metadata=document.metadata,
77-
name=document.name,
78-
)
79-
80-
def delete_image(self, document: StoredDocument) -> None:
81-
self.client.delete_object(Bucket=document.root_location, Key=document.object_name)
72+
self.client.download_fileobj(bucket, object_name, buffer)
73+
return buffer
74+
75+
def delete_data(self, object_name: str) -> None:
76+
"""Delete data from the object store."""
77+
self.client.delete_object(Bucket=self.root_location, Key=object_name)
8278

8379
def close(self) -> None:
8480
self.client.close()
@@ -87,26 +83,25 @@ def close(self) -> None:
8783
class MemoryStorage(ObjectStorage):
8884
"""An in-memory object store"""
8985

86+
url_prefixes = ["file://"]
87+
9088
def __init__(self) -> None:
9189
self.root_location = "foo"
92-
self.storage: dict[str, ImageDocument] = dict()
90+
self.storage: dict[str, io.BytesIO] = dict()
9391

94-
def save_image(self, image: ImageDocument) -> StoredDocument:
95-
object_name = str(ObjectId())
96-
self.storage[object_name] = image
97-
return StoredDocument(
98-
root_location=self.root_location,
99-
name=image.name,
100-
object_name=object_name,
101-
source_url=image.source_url,
102-
page_number=image.page_number,
103-
)
92+
def save_data(self, data: io.BytesIO, object_name: str) -> None:
93+
"""Save data to the object store."""
94+
self.storage[object_name] = data
10495

105-
def load_image(self, document: StoredDocument) -> ImageDocument:
106-
return self.storage[document.object_name]
96+
def read_data(self, object_name: str) -> io.BytesIO:
97+
"""Read data using the object store."""
98+
return self.storage[object_name]
10799

108-
def delete_image(self, document: StoredDocument) -> None:
109-
self.storage.pop(document.object_name, None)
100+
def load_url(self, url: str) -> io.BytesIO:
101+
"""Load data from a url."""
102+
with open(url.replace("file://", ""), "rb") as fid:
103+
return io.BytesIO(fid.read())
110104

111-
def close(self):
112-
pass
105+
def delete_data(self, object_name: str) -> None:
106+
"""Delete data from the object store."""
107+
self.storage.pop(object_name, None)

pymongo_voyageai/utils.py

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from PIL import Image
66

77
from .document import ImageDocument
8+
from .storage import ObjectStorage, S3Storage
89

910
try:
1011
import fitz # type:ignore[import-untyped]
@@ -17,13 +18,13 @@
1718
INTERVAL = 1
1819

1920

20-
def pdf_url_to_images(
21-
url: str, start: int | None = None, end: int | None = None, zoom: float = 1.0
21+
def pdf_data_to_images(
22+
pdf_stream: io.BytesIO, start: int | None = None, end: int | None = None, zoom: float = 1.0
2223
) -> list[Image.Image]:
23-
"""Extract images from a pdf url.
24+
"""Extract images from a pdf byte stream.
2425
2526
Args:
26-
url: The url to load the images from.
27+
pdf_stream: The BytesIO object to load the images from.
2728
start: The start frame to use for the images.
2829
end: The end frame to use for the images.
2930
zoom: The zoom factor to apply to the images.
@@ -33,14 +34,8 @@ def pdf_url_to_images(
3334
"""
3435
if fitz is None:
3536
raise ValueError("pymongo-voyageai requires PyMuPDF to read pdf files") from None
36-
# Ensure that the URL is valid
37-
if not url.startswith("http") and url.endswith(".pdf"):
38-
raise ValueError("Invalid URL")
3937

4038
# Read the PDF from the specified URL
41-
with urllib.request.urlopen(url) as response:
42-
pdf_data = response.read()
43-
pdf_stream = io.BytesIO(pdf_data)
4439
pdf = fitz.open(stream=pdf_stream, filetype="pdf")
4540

4641
images = []
@@ -57,7 +52,6 @@ def pdf_url_to_images(
5752
# Convert pixmap to PIL Image
5853
img = Image.frombytes("RGB", (pix.width, pix.height), pix.samples)
5954
images.append(img)
60-
print("out of loop")
6155

6256
# Close the document
6357
pdf.close()
@@ -67,6 +61,7 @@ def pdf_url_to_images(
6761

6862
def url_to_images(
6963
url: str,
64+
storage: ObjectStorage | None = None,
7065
metadata: dict[str, Any] | None = None,
7166
start: int = 0,
7267
end: int | None = None,
@@ -77,6 +72,7 @@ def url_to_images(
7772
7873
Args:
7974
url: The url to load the images from.
75+
storage: The storage object which can be used to load data from custom urls.
8076
metadata: A set of metadata to associate with the images.
8177
start: The start frame to use for the images.
8278
end: The end frame to use for the images.
@@ -90,14 +86,35 @@ def url_to_images(
9086
basename = url[i:]
9187
i = basename.rfind(".")
9288
name = basename[:i]
89+
90+
source = None
91+
# Prefer to use our storage object to read the file data.
92+
if storage and storage.url_prefixes:
93+
for pattern in storage.url_prefixes:
94+
if url.startswith(pattern):
95+
source = storage.load_url(url)
96+
break
97+
# For parquet files that are not loaded by the storage object, let pandas handle the download.
98+
if source is None and url.endswith(".parquet"):
99+
source = url
100+
# For s3 files that are not loaded by the storage object, create a temp S3Storage object.
101+
if source is None and url.startswith("s3://"):
102+
storage = S3Storage("")
103+
source = storage.load_url(url)
104+
storage.close()
105+
# For all other files, use the native download.
106+
if source is None:
107+
with urllib.request.urlopen(url) as response:
108+
source = io.BytesIO(response.read())
109+
93110
if url.endswith(".parquet"):
94111
try:
95112
import pandas as pd
96113
except ImportError:
97114
raise ValueError("pymongo-voyageai requires pandas to read parquet files") from None
98115
if image_column is None:
99116
raise ValueError("Must supply and image field to read a parquet file")
100-
column = pd.read_parquet(url, **kwargs)[image_column][start:end]
117+
column = pd.read_parquet(source, **kwargs)[image_column][start:end]
101118
for idx, item in enumerate(column.tolist()):
102119
image = Image.open(io.BytesIO(item["bytes"]))
103120
images.append(
@@ -110,7 +127,7 @@ def url_to_images(
110127
)
111128
)
112129
elif url.endswith(".pdf"):
113-
for idx, img in enumerate(pdf_url_to_images(url, start=start, end=end, **kwargs)):
130+
for idx, img in enumerate(pdf_data_to_images(source, start=start, end=end, **kwargs)):
114131
images.append(
115132
ImageDocument(
116133
image=img,
@@ -121,9 +138,7 @@ def url_to_images(
121138
)
122139
)
123140
else:
124-
with urllib.request.urlopen(url) as response:
125-
image_data = response.read()
126-
image = Image.open(io.BytesIO(image_data))
141+
image = Image.open(source)
127142
if "transparency" in image.info and image.mode != "RGBA":
128143
image = image.convert("RGBA")
129144
images.append(ImageDocument(image=image, name=name, source_url=url, metadata=metadata))

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ addopts = ["-ra", "--showlocals", "--strict-markers", "--strict-config"]
7070
xfail_strict = true
7171
filterwarnings = [
7272
"error",
73+
"module:datetime.datetime.utcnow:DeprecationWarning", # from boto3
7374
"module:builtin type Swig:DeprecationWarning", # from pymupdf
7475
"module:builtin type swig:DeprecationWarning", # from pymupdf
7576
]

0 commit comments

Comments
 (0)