Skip to content

Commit c3b3559

Browse files
fix(milvus): Enhanced Milvus VectorDB Instrumentation for Improved search Monitoring (#2815)
1 parent a40b28b commit c3b3559

File tree

2 files changed

+275
-2
lines changed

2 files changed

+275
-2
lines changed

packages/opentelemetry-instrumentation-milvus/opentelemetry/instrumentation/milvus/wrapper.py

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from opentelemetry.instrumentation.utils import (
66
_SUPPRESS_INSTRUMENTATION_KEY,
77
)
8-
from opentelemetry.semconv_ai import Events
8+
from opentelemetry.semconv_ai import Events, EventAttributes
99
from opentelemetry.semconv_ai import SpanAttributes as AISpanAttributes
1010

1111

@@ -55,9 +55,13 @@ def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
5555
_set_create_collection_attributes(span, kwargs)
5656

5757
return_value = wrapped(*args, **kwargs)
58+
5859
if to_wrap.get("method") == "query":
5960
_add_query_result_events(span, return_value)
6061

62+
if to_wrap.get("method") == "search":
63+
_add_search_result_events(span, return_value)
64+
6165
return return_value
6266

6367

@@ -198,7 +202,9 @@ def _set_search_attributes(span, kwargs):
198202
count_or_none(kwargs.get("output_fields")),
199203
)
200204
_set_span_attribute(
201-
span, AISpanAttributes.MILVUS_SEARCH_SEARCH_PARAMS, kwargs.get("search_params")
205+
span,
206+
AISpanAttributes.MILVUS_SEARCH_SEARCH_PARAMS,
207+
_encode_include(kwargs.get("search_params")),
202208
)
203209
_set_span_attribute(
204210
span, AISpanAttributes.MILVUS_SEARCH_TIMEOUT, kwargs.get("timeout")
@@ -211,6 +217,19 @@ def _set_search_attributes(span, kwargs):
211217
_set_span_attribute(
212218
span, AISpanAttributes.MILVUS_SEARCH_ANNS_FIELD, kwargs.get("anns_field")
213219
)
220+
_set_span_attribute(
221+
span,
222+
AISpanAttributes.MILVUS_SEARCH_PARTITION_NAMES,
223+
_encode_partition_name(kwargs.get("partition_names")),
224+
)
225+
query_vectors = kwargs.get("data", [])
226+
vector_dims = [len(vec) for vec in query_vectors]
227+
228+
_set_span_attribute(
229+
span,
230+
AISpanAttributes.MILVUS_SEARCH_QUERY_VECTOR_DIMENSION,
231+
_encode_include(vector_dims),
232+
)
214233

215234

216235
@dont_throw
@@ -248,6 +267,60 @@ def _add_query_result_events(span, kwargs):
248267
span.add_event(name=Events.DB_QUERY_RESULT.value, attributes=element)
249268

250269

270+
@dont_throw
271+
def _add_search_result_events(span, kwargs):
272+
273+
all_distances = []
274+
total_matches = 0
275+
276+
single_query = len(kwargs) == 1
277+
278+
def set_query_stats(query_idx, distances, match_ids):
279+
"""Helper function to set per-query stats in the span."""
280+
281+
_set_span_attribute(
282+
span,
283+
f"{AISpanAttributes.MILVUS_SEARCH_RESULT_COUNT}_{query_idx}",
284+
len(distances),
285+
)
286+
287+
def set_global_stats():
288+
"""Helper function to set global stats for a single query."""
289+
_set_span_attribute(
290+
span, AISpanAttributes.MILVUS_SEARCH_RESULT_COUNT, total_matches
291+
)
292+
293+
for query_idx, query_results in enumerate(kwargs):
294+
295+
query_distances = []
296+
query_match_ids = []
297+
298+
for match in query_results:
299+
distance = float(match["distance"])
300+
query_distances.append(distance)
301+
all_distances.append(distance)
302+
total_matches += 1
303+
query_match_ids.append(match["id"])
304+
305+
span.add_event(
306+
Events.DB_SEARCH_RESULT.value,
307+
attributes={
308+
EventAttributes.DB_SEARCH_RESULT_QUERY_ID.value: query_idx,
309+
EventAttributes.DB_SEARCH_RESULT_ID.value: match["id"],
310+
EventAttributes.DB_SEARCH_RESULT_DISTANCE.value: str(distance),
311+
EventAttributes.DB_SEARCH_RESULT_ENTITY.value: _encode_include(
312+
match["entity"]
313+
),
314+
},
315+
)
316+
317+
if not single_query:
318+
set_query_stats(query_idx, query_distances, query_match_ids)
319+
320+
if single_query:
321+
set_global_stats()
322+
323+
251324
@dont_throw
252325
def _set_upsert_attributes(span, kwargs):
253326
_set_span_attribute(
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
import os
2+
import random
3+
4+
import pymilvus
5+
import pytest
6+
from opentelemetry.semconv_ai import Events, SpanAttributes, EventAttributes
7+
8+
path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "milvus.db")
9+
milvus = pymilvus.MilvusClient(uri=path)
10+
11+
12+
@pytest.fixture
13+
def collection():
14+
collection_name = "Colors"
15+
milvus.create_collection(collection_name=collection_name, dimension=5)
16+
yield collection_name
17+
milvus.drop_collection(collection_name=collection_name)
18+
19+
20+
def insert_data(collection):
21+
colors = [
22+
"green",
23+
"blue",
24+
"yellow",
25+
"red",
26+
"black",
27+
"white",
28+
"purple",
29+
"pink",
30+
"orange",
31+
"grey",
32+
]
33+
data = [
34+
{
35+
"id": i,
36+
"vector": [random.uniform(-1, 1) for _ in range(5)],
37+
"color": random.choice(colors),
38+
"tag": random.randint(1000, 9999),
39+
}
40+
for i in range(1000)
41+
]
42+
data += [
43+
{
44+
"id": 1000,
45+
"vector": [random.uniform(-1, 1) for _ in range(5)],
46+
"color": "brown",
47+
"tag": 1234,
48+
},
49+
{
50+
"id": 1001,
51+
"vector": [random.uniform(-1, 1) for _ in range(5)],
52+
"color": "brown",
53+
"tag": 5678,
54+
},
55+
{
56+
"id": 1002,
57+
"vector": [random.uniform(-1, 1) for _ in range(5)],
58+
"color": "brown",
59+
"tag": 9101,
60+
},
61+
]
62+
for i in data:
63+
i["color_tag"] = "{}_{}".format(i["color"], i["tag"])
64+
milvus.insert(collection_name=collection, data=data)
65+
66+
67+
def test_milvus_single_vector_search(exporter, collection):
68+
insert_data(collection)
69+
70+
query_vectors = [
71+
[random.uniform(-1, 1) for _ in range(5)], # Random query vector for the search
72+
]
73+
search_params = {"radius": 0.5, "metric_type": "COSINE", "index_type": "IVF_FLAT"}
74+
milvus.search(
75+
collection_name=collection,
76+
data=query_vectors,
77+
anns_field="vector",
78+
search_params=search_params,
79+
output_fields=["color_tag"],
80+
limit=3,
81+
timeout=10,
82+
)
83+
84+
# Get finished spans
85+
spans = exporter.get_finished_spans()
86+
span = next(span for span in spans if span.name == "milvus.search")
87+
88+
# Check the span attributes related to search
89+
assert span.attributes.get(SpanAttributes.VECTOR_DB_VENDOR) == "milvus"
90+
assert span.attributes.get(SpanAttributes.VECTOR_DB_OPERATION) == "search"
91+
assert (
92+
span.attributes.get(SpanAttributes.MILVUS_SEARCH_COLLECTION_NAME) == collection
93+
)
94+
assert span.attributes.get(SpanAttributes.MILVUS_SEARCH_OUTPUT_FIELDS_COUNT) == 1
95+
assert span.attributes.get(SpanAttributes.MILVUS_SEARCH_LIMIT) == 3
96+
assert span.attributes.get(SpanAttributes.MILVUS_SEARCH_TIMEOUT) == 10
97+
assert span.attributes.get(SpanAttributes.MILVUS_SEARCH_ANNS_FIELD) == "vector"
98+
assert (
99+
span.attributes.get(SpanAttributes.MILVUS_SEARCH_QUERY_VECTOR_DIMENSION)
100+
== "[5]"
101+
)
102+
distances = []
103+
ids = []
104+
105+
events = span.events
106+
107+
for event in events:
108+
assert event.name == Events.DB_SEARCH_RESULT.value
109+
_id = event.attributes.get(EventAttributes.DB_SEARCH_RESULT_ID.value)
110+
distance = event.attributes.get(EventAttributes.DB_SEARCH_RESULT_DISTANCE.value)
111+
112+
assert isinstance(_id, int)
113+
assert isinstance(distance, str)
114+
115+
# Collect the distances and IDs for further computation
116+
distances.append(
117+
float(distance)
118+
) # Convert the distance to a float for computation
119+
ids.append(_id)
120+
121+
# Now compute dynamic stats from the distances
122+
total_matches = len(events)
123+
124+
assert (
125+
span.attributes.get(SpanAttributes.MILVUS_SEARCH_RESULT_COUNT) == total_matches
126+
)
127+
128+
129+
def test_milvus_multiple_vector_search(exporter, collection):
130+
insert_data(collection)
131+
132+
query_vectors = [
133+
[random.uniform(-1, 1) for _ in range(5)], # Random query vector for the search
134+
[random.uniform(-1, 1) for _ in range(5)], # Another query vector
135+
[
136+
random.uniform(-1, 1) for _ in range(5)
137+
], # Another query vector (you can add more as needed)
138+
]
139+
search_params = {"radius": 0.5, "metric_type": "COSINE", "index_type": "IVF_FLAT"}
140+
milvus.search(
141+
collection_name=collection,
142+
data=query_vectors,
143+
anns_field="vector",
144+
search_params=search_params,
145+
output_fields=["color_tag"],
146+
limit=3,
147+
timeout=10,
148+
)
149+
150+
# Get finished spans
151+
spans = exporter.get_finished_spans()
152+
span = next(span for span in spans if span.name == "milvus.search")
153+
154+
# Check the span attributes related to search
155+
assert span.attributes.get(SpanAttributes.VECTOR_DB_VENDOR) == "milvus"
156+
assert span.attributes.get(SpanAttributes.VECTOR_DB_OPERATION) == "search"
157+
assert (
158+
span.attributes.get(SpanAttributes.MILVUS_SEARCH_COLLECTION_NAME) == collection
159+
)
160+
assert span.attributes.get(SpanAttributes.MILVUS_SEARCH_OUTPUT_FIELDS_COUNT) == 1
161+
assert span.attributes.get(SpanAttributes.MILVUS_SEARCH_LIMIT) == 3
162+
assert span.attributes.get(SpanAttributes.MILVUS_SEARCH_TIMEOUT) == 10
163+
assert span.attributes.get(SpanAttributes.MILVUS_SEARCH_ANNS_FIELD) == "vector"
164+
assert (
165+
span.attributes.get(SpanAttributes.MILVUS_SEARCH_QUERY_VECTOR_DIMENSION)
166+
== "[5, 5, 5]"
167+
)
168+
169+
distances_dict = {}
170+
ids_dict = {}
171+
172+
events = span.events
173+
for event in events:
174+
assert event.name == Events.DB_SEARCH_RESULT.value
175+
query_idx = event.attributes.get(
176+
EventAttributes.DB_SEARCH_RESULT_QUERY_ID.value
177+
)
178+
_id = event.attributes.get(EventAttributes.DB_SEARCH_RESULT_ID.value)
179+
distance = event.attributes.get(EventAttributes.DB_SEARCH_RESULT_DISTANCE.value)
180+
181+
assert isinstance(_id, int)
182+
assert isinstance(distance, str)
183+
184+
distance = float(distance)
185+
186+
if query_idx not in distances_dict:
187+
distances_dict[query_idx] = []
188+
ids_dict[query_idx] = []
189+
190+
distances_dict[query_idx].append(distance)
191+
ids_dict[query_idx].append(_id)
192+
193+
for query_idx in distances_dict:
194+
distances = distances_dict[query_idx]
195+
196+
total_matches = len(distances)
197+
198+
count_key = f"{SpanAttributes.MILVUS_SEARCH_RESULT_COUNT}_{query_idx}"
199+
200+
assert span.attributes.get(count_key) == total_matches

0 commit comments

Comments
 (0)