6
6
from typing import Any
7
7
8
8
from bson import ObjectId
9
+ from langchain_core .runnables .config import run_in_executor
9
10
from langchain_mongodb .index import create_vector_search_index
10
11
from langchain_mongodb .pipelines import vector_search_stage
11
12
from langchain_mongodb .utils import make_serializable
@@ -105,6 +106,17 @@ def image_to_storage(self, document: ImageDocument | Image.Image) -> StoredDocum
105
106
document = ImageDocument (image = document )
106
107
return self ._storage .save_image (document )
107
108
109
+ async def aimage_to_storage (self , document : ImageDocument | Image .Image ) -> StoredDocument :
110
+ """Convert an image to a stored document.
111
+
112
+ Args:
113
+ document: The input document or image object.
114
+
115
+ Returns:
116
+ The stored document object.
117
+ """
118
+ return await run_in_executor (None , self .image_to_storage , document )
119
+
108
120
def storage_to_image (self , document : StoredDocument | str ) -> ImageDocument :
109
121
"""Convert a stored document to an image document.
110
122
@@ -120,6 +132,17 @@ def storage_to_image(self, document: StoredDocument | str) -> ImageDocument:
120
132
)
121
133
return self ._storage .load_image (document = document )
122
134
135
+ async def astorage_to_image (self , document : StoredDocument | str ) -> ImageDocument :
136
+ """Convert a stored document to an image document.
137
+
138
+ Args:
139
+ document: The input document or object name.
140
+
141
+ Returns:
142
+ The image document object.
143
+ """
144
+ return await run_in_executor (None , self .storage_to_image , document )
145
+
123
146
def url_to_images (
124
147
self ,
125
148
url : str ,
@@ -145,6 +168,38 @@ def url_to_images(
145
168
url , metadata = metadata , start = start , end = end , image_column = image_column , ** kwargs
146
169
)
147
170
171
+ async def aurl_to_images (
172
+ self ,
173
+ url : str ,
174
+ metadata : dict [str , Any ] | None = None ,
175
+ start : int = 0 ,
176
+ end : int | None = None ,
177
+ image_column : str | None = None ,
178
+ ** kwargs : Any ,
179
+ ) -> list [ImageDocument ]:
180
+ """Extract images from a url.
181
+
182
+ Args:
183
+ url: The url to load the images from.
184
+ metadata: A set of metadata to associate with the images.
185
+ start: The start frame to use for the images.
186
+ end: The end frame to use for the images.
187
+ image_column: The name of the column used to store the image data, for parquet files.
188
+
189
+ Returns:
190
+ A list of image document objects.
191
+ """
192
+ return await run_in_executor (
193
+ None ,
194
+ self .url_to_images ,
195
+ url ,
196
+ metadata = metadata ,
197
+ start = start ,
198
+ end = end ,
199
+ image_column = image_column ,
200
+ ** kwargs ,
201
+ )
202
+
148
203
def add_documents (
149
204
self ,
150
205
inputs : Sequence [str | Image .Image | Document | Sequence [str | Image .Image | Document ]],
@@ -230,6 +285,30 @@ def add_documents(
230
285
self ._coll .bulk_write (operations )
231
286
return output_docs
232
287
288
+ async def aadd_documents (
289
+ self ,
290
+ inputs : Sequence [str | Image .Image | Document | Sequence [str | Image .Image | Document ]],
291
+ ids : list [str ] | None = None ,
292
+ batch_size : int = DEFAULT_INSERT_BATCH_SIZE ,
293
+ ** kwargs : Any ,
294
+ ) -> list [dict [str , Any ]]:
295
+ """Add multimodal documents to the vectorstore.
296
+
297
+ Args:
298
+ inputs: List of inputs to add to the vectorstore, which are each a list of documents.
299
+ ids: Optional list of unique ids that will be used as index in VectorStore.
300
+ See note on ids in add_texts.
301
+ batch_size: Number of documents to insert at a time.
302
+ Tuning this may help with performance and sidestep MongoDB limits.
303
+ kwargs: Additional keyword args for future expansion.
304
+
305
+ Returns:
306
+ A list documents with their associated input documents.
307
+ """
308
+ return await run_in_executor (
309
+ None , self .add_documents , inputs , ids = ids , batch_size = batch_size , ** kwargs
310
+ )
311
+
233
312
def delete_by_ids (
234
313
self , ids : list [str | ObjectId ], delete_stored_objects : bool = True , ** kwargs : Any
235
314
) -> bool :
@@ -248,6 +327,23 @@ def delete_by_ids(
248
327
{"_id" : {"$in" : oids }}, delete_stored_objects = delete_stored_objects , ** kwargs
249
328
)
250
329
330
+ async def adelete_by_ids (
331
+ self , ids : list [str | ObjectId ], delete_stored_objects : bool = True , ** kwargs : Any
332
+ ) -> bool :
333
+ """Delete documents by ids.
334
+
335
+ Args:
336
+ ids: List of ids to delete.
337
+ delete_stored_objects: Whether to delete the associated stored objects.
338
+ **kwargs: Other keyword arguments passed to delete_many().
339
+
340
+ Returns:
341
+ bool: True if deletion is successful, False otherwise.
342
+ """
343
+ return await run_in_executor (
344
+ None , self .delete_by_ids , ids , delete_stored_objects = delete_stored_objects , ** kwargs
345
+ )
346
+
251
347
def delete_many (
252
348
self , filter : Mapping [str , Any ], delete_stored_objects : bool = True , ** kwargs : Any
253
349
) -> bool :
@@ -269,11 +365,32 @@ def delete_many(
269
365
self ._storage .delete_image (inp )
270
366
return self ._coll .delete_many (filter = filter , ** kwargs ).acknowledged
271
367
368
+ async def adelete_many (
369
+ self , filter : Mapping [str , Any ], delete_stored_objects : bool = True , ** kwargs : Any
370
+ ) -> bool :
371
+ """Delete documents using a filter.
372
+
373
+ Args:
374
+ ids: List of ids to delete.
375
+ delete_stored_objects: Whether to delete the associated stored objects.
376
+ **kwargs: Other keyword arguments passed to the collection's `delete_many` method.
377
+
378
+ Returns:
379
+ bool: True if deletion is successful, False otherwise.
380
+ """
381
+ return await run_in_executor (
382
+ None , self .delete_many , filter , delete_stored_objects = delete_stored_objects , ** kwargs
383
+ )
384
+
272
385
def close (self ) -> None :
273
386
"""Close the client, cleaning up resources."""
274
387
self ._coll .database .client .close ()
275
388
self ._storage .close ()
276
389
390
+ async def aclose (self ) -> None :
391
+ """Close the client, cleaning up resources."""
392
+ return await run_in_executor (None , self .close )
393
+
277
394
def get_by_ids (
278
395
self , ids : Sequence [str | ObjectId ], extract_images : bool = True
279
396
) -> list [dict [str , Any ]]:
@@ -294,6 +411,21 @@ def get_by_ids(
294
411
docs .append (doc )
295
412
return docs
296
413
414
+ async def aget_by_ids (
415
+ self , ids : Sequence [str | ObjectId ], extract_images : bool = True
416
+ ) -> list [dict [str , Any ]]:
417
+ """Get a list of documents by id.
418
+
419
+ Args:
420
+ ids: List of ids to search for.
421
+ extract_images: Whether to extract the stored documents into image documents.
422
+
423
+ Returns:
424
+ A list of matching documents, where the `inputs` is a list of stored documents
425
+ or image documents.
426
+ """
427
+ return await run_in_executor (None , self .get_by_ids , ids , extract_images = extract_images )
428
+
297
429
def wait_for_indexing (self , timeout : int = TIMEOUT , interval : int = INTERVAL ) -> None :
298
430
"""Wait for the search index to update to account for newly added embeddings."""
299
431
n_docs = self ._coll .count_documents ({})
@@ -306,6 +438,12 @@ def wait_for_indexing(self, timeout: int = TIMEOUT, interval: int = INTERVAL) ->
306
438
307
439
raise TimeoutError (f"Failed to embed, insert, and index texts in { timeout } s." )
308
440
441
+ async def await_for_indexing (self , timeout : int = TIMEOUT , interval : int = INTERVAL ) -> None :
442
+ """Wait for the search index to update to account for newly added embeddings."""
443
+ return await run_in_executor (
444
+ None , self .wait_for_indexing , timeout = timeout , interval = interval
445
+ )
446
+
309
447
def similarity_search (
310
448
self ,
311
449
query : str ,
@@ -379,6 +517,53 @@ def similarity_search(
379
517
docs .append (res )
380
518
return docs
381
519
520
+ async def asimilarity_search (
521
+ self ,
522
+ query : str ,
523
+ k : int = 4 ,
524
+ pre_filter : dict [str , Any ] | None = None ,
525
+ post_filter_pipeline : list [dict [str , Any ]] | None = None ,
526
+ oversampling_factor : int = 10 ,
527
+ include_scores : bool = False ,
528
+ include_embeddings : bool = False ,
529
+ extract_images : bool = False ,
530
+ ** kwargs : Any ,
531
+ ) -> list [dict [str , Any ]]: # noqa: E501
532
+ """Return documents most similar to the given query.
533
+
534
+ Args:
535
+ query: Input text of semantic query.
536
+ k: The number of documents to return. Defaults to 4.
537
+ pre_filter: List of MQL match expressions comparing an indexed field.
538
+ post_filter_pipeline: (Optional) Pipeline of MongoDB aggregation stages
539
+ to filter/process results after $vectorSearch.
540
+ oversampling_factor: Multiple of k used when generating number of candidates
541
+ at each step in the HNSW Vector Search.
542
+ include_scores: If True, the query score of each result
543
+ will be included in metadata.
544
+ include_embeddings: If True, the embedding vector of each result
545
+ will be included in metadata.
546
+ extract_images: If True, the stored documents will be converted image documents.
547
+ kwargs: Additional arguments are specific to the search_type
548
+
549
+ Returns:
550
+ List of documents most similar to the query and their scores, where the `inputs`
551
+ is a list of stored documents or image documents.
552
+ """
553
+ return await run_in_executor (
554
+ None ,
555
+ self .similarity_search ,
556
+ query ,
557
+ k = k ,
558
+ pre_filter = pre_filter ,
559
+ post_filter_pipeline = post_filter_pipeline ,
560
+ oversampling_factor = oversampling_factor ,
561
+ include_scores = include_scores ,
562
+ include_embeddings = include_embeddings ,
563
+ extract_images = extract_images ,
564
+ ** kwargs ,
565
+ )
566
+
382
567
def _expand_doc (self , obj : dict [str , Any ], extract_images : bool = True ) -> dict [str , Any ]:
383
568
for idx , inp in enumerate (list (obj ["inputs" ])):
384
569
if inp ["type" ] == DocumentType .storage :
0 commit comments