Skip to content

Commit d797852

Browse files
authored
[releases/2.23] Catch releases 2.22.2 (#1290)
2 parents 5c27df4 + 9b4989f commit d797852

File tree

15 files changed

+1391
-80
lines changed

15 files changed

+1391
-80
lines changed

src/marqo/api/models/recommend_query.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from marqo.core.models.interpolation_method import InterpolationMethod
44
from marqo.tensor_search.models.api_models import BaseMarqoModel
5-
from pydantic.v1 import root_validator
5+
from pydantic.v1 import root_validator, Field
66
from marqo.tensor_search.models.score_modifiers_object import ScoreModifierLists
77

88

@@ -22,6 +22,8 @@ class RecommendQuery(BaseMarqoModel):
2222
attributesToRetrieve: Union[None, List[str]] = None
2323
scoreModifiers: Optional[ScoreModifierLists] = None
2424
rerankDepth: Optional[int] = None
25+
allow_missing_documents: bool = Field(default=False, alias="allowMissingDocuments")
26+
allow_missing_embeddings: bool = Field(default=False, alias="allowMissingEmbeddings")
2527

2628
@root_validator(pre=False)
2729
def validate_rerank_depth(cls, values):

src/marqo/core/models/hybrid_parameters.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,14 +99,6 @@ def validate_properties(cls, values):
9999
if values.get('rankingMethod') not in [RankingMethod.Lexical, RankingMethod.Tensor]:
100100
raise ValueError("For retrievalMethod: tensor or lexical, rankingMethod must be: tensor or lexical")
101101

102-
# if tensor query is an empty dict
103-
if isinstance(values.get('queryTensor'), dict):
104-
if not len(values.get('queryTensor')):
105-
raise ValueError(
106-
"Multi-term query for queryTensor requires at least one query. Received empty dictionary"
107-
)
108-
109-
110102
return values
111103

112104
@validator('alpha')

src/marqo/core/search/hybrid_search.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,10 +200,11 @@ def search(
200200
tensor_query = query
201201
lexical_query = query
202202

203-
if (tensor_query is None) != (lexical_query is None):
203+
if lexical_query is None:
204+
# We could allow queryTensor to be None as tensors might be provided with context
204205
if hybrid_parameters.retrievalMethod == RetrievalMethod.Disjunction:
205206
raise core_exceptions.InvalidArgumentError(
206-
"Either both of 'hybridParameters.queryLexical' and 'hybridParameters.queryTensor' or just 'q'"
207+
"Either 'hybridParameters.queryLexical' or just 'q'"
207208
"must be present when 'disjunction' retrieval method is used."
208209
)
209210

src/marqo/core/search/recommender.py

Lines changed: 95 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,14 @@ def __init__(self, vespa_client: VespaClient, index_management: IndexManagement,
2323
self.index_management = index_management
2424
self.inference = inference
2525

26-
def get_doc_vectors_from_ids(self,
27-
index_name: str,
28-
documents: Union[List[str], Dict[str, float]],
29-
tensor_fields: Optional[List[str]] = None) -> Dict[str, List[List[float]]]:
26+
def get_doc_vectors_from_ids(
27+
self,
28+
index_name: str,
29+
documents: Union[List[str], Dict[str, float]],
30+
tensor_fields: Optional[List[str]] = None,
31+
allow_missing_documents: bool = False,
32+
allow_missing_embeddings: bool = False
33+
) -> Dict[str, List[List[float]]]:
3034
"""
3135
This method gets documents from Vespa using their IDs, removes any unnecessary data, checks for
3236
lack of vectors, then returns a list of document vectors. Can be used internally (in recommend)
@@ -36,10 +40,17 @@ def get_doc_vectors_from_ids(self,
3640
index_name: Name of the index to search
3741
documents: A list of document IDs or a dictionary where the keys are document IDs and the values are weights
3842
tensor_fields: List of tensor fields to use for recommendation (can include text, image, audio, and video fields)
43+
allow_missing_documents: If True, will not raise an error if some document IDs are not found
44+
allow_missing_embeddings: If True, will not raise an error if some documents do not have embeddings
3945
4046
Returns:
4147
A dictionary mapping document IDs to lists of vector embeddings. This is flattened to 1 list per document
4248
ID (not separated by tensor field). Order of embeddings is not guaranteed.
49+
50+
Raises:
51+
InvalidArgumentError:
52+
- If any document IDs are not found and allow_missing_documents is False
53+
- If any document IDs does not have embeddings and allow_missing_embeddings is False
4354
"""
4455

4556
# TODO - Extract search and get_docs from tensor_search and refactor this
@@ -84,55 +95,107 @@ def get_doc_vectors_from_ids(self,
8495
f'Available tensor fields: {", ".join(valid_tensor_fields)}')
8596

8697
# Use the new optimized method to get only embeddings
98+
# TODO - Consolidate these two method into one place
8799
doc_embeddings_by_field = tensor_search.get_doc_vectors_per_tensor_field_by_ids(
88100
config.Config(self.vespa_client, inference=self.inference),
89101
index_name,
90102
document_ids,
91-
tensor_fields=tensor_fields
103+
tensor_fields=tensor_fields,
104+
allow_missing_documents=allow_missing_documents,
92105
)
93106

107+
return self._sanitize_doc_embeddins_by_field(
108+
all_documents_ids = document_ids,
109+
marqo_index=marqo_index,
110+
doc_embeddings_by_field=doc_embeddings_by_field,
111+
tensor_fields=tensor_fields,
112+
allow_missing_documents=allow_missing_documents,
113+
allow_missing_embeddings=allow_missing_embeddings,
114+
)
115+
116+
def _sanitize_doc_embeddins_by_field(
117+
self,
118+
all_documents_ids: List[str],
119+
marqo_index: MarqoIndex,
120+
doc_embeddings_by_field: Dict[str, Dict[str, List[List[float]]]],
121+
tensor_fields: Optional[List[str]],
122+
allow_missing_documents: bool,
123+
allow_missing_embeddings: bool
124+
) -> Dict[str, List[List[float]]]:
125+
"""
126+
Sanitize the document embeddings by checking for missing documents and embeddings,
127+
and flattening the structure to a simple mapping of document ID to list of embeddings.
128+
129+
If allow_missing_documents is False, raises an error if any document IDs are not found.
130+
If allow_missing_embeddings is False, raises an error if any documents do not have embeddings.
131+
132+
Documents with no embeddings are removed from the result.
133+
Args:
134+
all_documents_ids: The list of all document IDs that were requested
135+
marqo_index: The marqo index object containing metadata about the index
136+
doc_embeddings_by_field: The document embeddings by field returned from
137+
tensor_search.get_doc_vectors_per_tensor_field_by_ids
138+
tensor_fields: tensor fields to include in the result. If None, all fields are included.
139+
allow_missing_documents: If True, will not raise an error if some document IDs are not found.
140+
allow_missing_embeddings: If True, will not raise an error if some documents do not have embeddings.
141+
142+
Returns:
143+
A dictionary mapping document IDs to lists of vector embeddings.
144+
E.g.,
145+
{
146+
"doc1": [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]],
147+
"doc2": [[0.7, 0.8, 0.9]]
148+
}
149+
where each list contains embeddings from all tensor fields where order of embeddings is not preserved.
150+
151+
Raises:
152+
InvalidArgumentError: If any document IDs are not found and allow_missing_documents is False,
153+
or if any documents do not have embeddings and allow_missing_embeddings is False.
154+
"""
155+
94156
# Check that all documents were found
95-
not_found = []
96-
for doc_id in document_ids:
157+
not_found_docs = []
158+
for doc_id in all_documents_ids:
97159
if doc_id not in doc_embeddings_by_field:
98-
not_found.append(doc_id)
160+
not_found_docs.append(doc_id)
99161

100-
if len(not_found) > 0:
101-
raise InvalidArgumentError(f'The following document IDs were not found: {", ".join(not_found)}')
162+
if len(not_found_docs) > 0 and not allow_missing_documents:
163+
raise InvalidArgumentError(f'The following document IDs were not found: {", ".join(not_found_docs)}')
102164

103165
# Flatten the embeddings structure to match the expected return format
104-
# Convert from Dict[doc_id, Dict[field_name, List[List[float]]]]
166+
# Convert from Dict[doc_id, Dict[field_name, List[List[float]]]]
105167
# to Dict[doc_id, List[List[float]]]
106168
doc_vectors: Dict[str, List[List[float]]] = {}
107169
docs_without_vectors = []
108-
170+
109171
for doc_id, field_embeddings in doc_embeddings_by_field.items():
110172
vectors: List[List[float]] = []
111-
173+
112174
# Flatten all embeddings from all fields for this document
113175
for field_name, embedding_list in field_embeddings.items():
114176
# For legacy unstructured indices, field_name will be "marqo__embeddings"
115177
# and we should include all embeddings regardless of tensor_fields filter
116178
# since all embeddings are stored together in marqo__embeddings
117-
if (tensor_fields is None or
179+
if (tensor_fields is None or
118180
field_name in tensor_fields or
119181
(marqo_index.type == IndexType.Unstructured and
120182
field_name == unstructured_common.VESPA_DOC_EMBEDDINGS)):
121183
vectors.extend(embedding_list)
122-
184+
123185
doc_vectors[doc_id] = vectors
124186

125187
if len(vectors) == 0:
126188
docs_without_vectors.append(doc_id)
127189

128-
if len(docs_without_vectors) > 0:
190+
191+
if len(docs_without_vectors) > 0 and not allow_missing_embeddings:
129192
raise InvalidArgumentError(
130193
f'The following documents do not have embeddings: {", ".join(docs_without_vectors)}'
131194
)
132-
195+
for doc_id in docs_without_vectors:
196+
del doc_vectors[doc_id]
133197
return doc_vectors
134198

135-
136199
def recommend(self,
137200
index_name: str,
138201
documents: Union[List[str], Dict[str, float]],
@@ -150,7 +213,9 @@ def recommend(self,
150213
filter: str = None,
151214
attributes_to_retrieve: Optional[List[str]] = None,
152215
score_modifiers: Optional[ScoreModifierLists] = None,
153-
rerank_depth: Optional[int] = None
216+
rerank_depth: Optional[int] = None,
217+
allow_missing_documents: bool = False,
218+
allow_missing_embeddings: bool = False,
154219
):
155220
"""
156221
Recommend documents similar to the provided documents.
@@ -191,7 +256,9 @@ def recommend(self,
191256
doc_vectors = self.get_doc_vectors_from_ids(
192257
index_name=index_name,
193258
documents=documents,
194-
tensor_fields=tensor_fields
259+
tensor_fields=tensor_fields,
260+
allow_missing_documents=allow_missing_documents,
261+
allow_missing_embeddings=allow_missing_embeddings,
195262
)
196263

197264
# Save original document IDs for filtering
@@ -208,9 +275,16 @@ def recommend(self,
208275
weight = documents[document_id]
209276
else:
210277
weight = 1
278+
211279
vectors.extend(vector_list)
212280
weights.extend([weight] * len(vector_list))
213281

282+
if len(vectors) == 0:
283+
raise InvalidArgumentError(
284+
"Marqo could not collect any valid vector from the documents. "
285+
"Please check if the provided documents exist or if the documents have valid embeddings. "
286+
)
287+
214288
try:
215289
interpolated_vector = vector_interpolation.interpolate(
216290
vectors, weights
@@ -253,7 +327,7 @@ def recommend(self,
253327
attributes_to_retrieve=attributes_to_retrieve,
254328
score_modifiers=score_modifiers,
255329
processing_start=t0,
256-
rerank_depth=rerank_depth
330+
rerank_depth=rerank_depth,
257331
)
258332

259333
return results

src/marqo/tensor_search/api.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,9 @@ def recommend(query_dict: dict, index_name: str,
480480
filter=query.filter,
481481
attributes_to_retrieve=query.attributesToRetrieve,
482482
score_modifiers=query.scoreModifiers,
483-
rerank_depth=query.rerankDepth
483+
rerank_depth=query.rerankDepth,
484+
allow_missing_documents=query.allow_missing_documents,
485+
allow_missing_embeddings=query.allow_missing_embeddings,
484486
)
485487

486488

src/marqo/tensor_search/models/search.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ class SearchContextTensor(BaseModel):
6969
class SearchContextDocumentsParameters(BaseModel):
7070
tensor_fields: Optional[List[str]] = Field(None, alias='tensorFields')
7171
exclude_input_documents: bool = Field(True, alias='excludeInputDocuments')
72+
allow_missing_documents: bool = Field(False, alias='allowMissingDocuments')
73+
allow_missing_embeddings: bool = Field(False, alias='allowMissingEmbeddings')
7274

7375
@validator('tensor_fields', pre=True, always=True)
7476
def check_tensor_fields_not_empty(cls, v):

src/marqo/tensor_search/tensor_search.py

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -908,14 +908,16 @@ def get_query_vectors_from_jobs(
908908
) -> Dict[Qidx, List[float]]:
909909
"""
910910
Retrieve the vectorised content associated to each query from the set of batch vectorise jobs.
911-
Handles multi-modal queries, by weighting and combining queries into a single vector
911+
Handles multi-modal queries, by weighting and combining queries into a single vector.
912912
913913
Args:
914914
- queries: Original search queries.
915915
- qidx_to_job: VectorisedJobPointer for each query
916916
- job_to_vectors: inference output from each VectorisedJob
917917
- config: standard Marqo config.
918918
919+
Raises:
920+
api_exceptions.InvalidArgError: If this method can not collect a valid vector from the query
919921
"""
920922
result: Dict[Qidx, List[float]] = defaultdict(list)
921923
for qidx, ptrs in qidx_to_job.items():
@@ -964,7 +966,9 @@ def get_query_vectors_from_jobs(
964966
context_doc_vectors = config.recommender.get_doc_vectors_from_ids(
965967
index_name=q.index.name,
966968
documents=context_documents.ids,
967-
tensor_fields=context_documents.parameters.tensor_fields
969+
tensor_fields=context_documents.parameters.tensor_fields,
970+
allow_missing_documents=context_documents.parameters.allow_missing_documents,
971+
allow_missing_embeddings= context_documents.parameters.allow_missing_embeddings
968972
)
969973

970974
# Update weights and vectors list
@@ -989,13 +993,14 @@ def get_query_vectors_from_jobs(
989993
# Use interpolation to combine all vectors
990994
vector_interpolation = from_interpolation_method(interpolation_method)
991995
with RequestMetricsStore.for_request().time(f"search.vectorise.interpolate_vectors"):
992-
merged_vector = vector_interpolation.interpolate(
993-
vectors=collected_vectors,
994-
weights=collected_weights
995-
)
996-
997-
result[qidx] = list(merged_vector)
998-
996+
if collected_vectors:
997+
merged_vector = vector_interpolation.interpolate(
998+
vectors=collected_vectors,
999+
weights=collected_weights
1000+
)
1001+
result[qidx] = list(merged_vector)
1002+
else:
1003+
result[qidx] = []
9991004
elif isinstance(q.q, str):
10001005
if q.context:
10011006
raise core_exceptions.InvalidArgumentError(
@@ -1009,6 +1014,14 @@ def get_query_vectors_from_jobs(
10091014
)
10101015
else:
10111016
raise ValueError(f"Unexpected query type: {type(q.q).__name__}")
1017+
1018+
if not result[qidx]:
1019+
raise api_exceptions.InvalidArgError(
1020+
f"Marqo could not collect any vectors from the search query but the retrieval or ranking method requires "
1021+
f"at least one valid vector. "
1022+
f"Please check the provided query, context (if any), or queryTensor(for Hybrid search) "
1023+
)
1024+
10121025
return result
10131026

10141027

@@ -1090,7 +1103,7 @@ def add_prefix_to_queries(queries: List[BulkSearchQueryEntity]) -> List[BulkSear
10901103
def run_vectorise_pipeline(config: Config, queries: List[BulkSearchQueryEntity], device: Union[Device, str],
10911104
interpolation_method: InterpolationMethod = None) -> Dict[
10921105
Qidx, List[float]]:
1093-
"""Run the query vectorisation process
1106+
"""Run the query vectorisation process. This is a pipeline used for both Tensor search and Hybrid search.
10941107
10951108
Raise:
10961109
api_exceptions.InvalidArgError: If the vectorisation process fails or if the media cannot be downloaded.
@@ -1206,6 +1219,9 @@ def _vector_text_search(
12061219
qidx_to_vectors: Dict[Qidx, List[float]] = run_vectorise_pipeline(config, queries, device, interpolation_method)
12071220
vectorised_text = list(qidx_to_vectors.values())[0]
12081221

1222+
if not vectorised_text: # pragma: no cover
1223+
raise InternalError(f"No vector is generated for the tensor query: {query}. ")
1224+
12091225
marqo_query = MarqoTensorQuery(
12101226
index_name=index_name,
12111227
vector_query=vectorised_text,
@@ -1374,6 +1390,7 @@ def get_doc_vectors_per_tensor_field_by_ids(
13741390
index_name: str,
13751391
document_ids: List[str],
13761392
tensor_fields: Optional[List[str]] = None,
1393+
allow_missing_documents: bool = False,
13771394
) -> Dict[str, Dict[str, List[List[float]]]]:
13781395
"""
13791396
Get only the embeddings for documents by their IDs.
@@ -1383,9 +1400,18 @@ def get_doc_vectors_per_tensor_field_by_ids(
13831400
index_name: Name of the index
13841401
document_ids: List of document IDs to fetch
13851402
tensor_fields: Specific tensor fields to get. If None, get all tensor fields.
1403+
allow_missing_documents: If True, will not raise an error if a document is not found
13861404
13871405
Returns:
13881406
Dict mapping document_id to field_name to list of embedding vectors
1407+
E.g.,
1408+
{
1409+
"doc_id_1": {
1410+
"field_name_1": [[0.1, 0.2, ...], ...],
1411+
"field_name_2": [[0.3, 0.4, ...], ...],
1412+
},
1413+
"doc_id_2": {"field_name_1": [[0.5, 0.6, ...], ...]}
1414+
}
13891415
"""
13901416

13911417
# We can just use the cache here since we refresh every 1s.
@@ -1438,6 +1464,9 @@ def get_doc_vectors_per_tensor_field_by_ids(
14381464
else:
14391465
# Otherwise, field is empty list
14401466
result[doc_id][marqo_tensor_field_name] = []
1467+
elif response.status == 404 and allow_missing_documents:
1468+
# If the document is not found and we are allowing missing documents, continue to next response
1469+
continue
14411470
else:
14421471
# If the response is not successful, error out
14431472
raise core_exceptions.InvalidArgumentError(

0 commit comments

Comments
 (0)