Skip to content

Commit a469832

Browse files
committed
fix: milvus setup
Fixed a bug where the milvus pod seems to be ready but api still not available.
1 parent 53c1021 commit a469832

File tree

3 files changed

+90
-6
lines changed

3 files changed

+90
-6
lines changed

nuvolaris/milvus_admin_client.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@
1818
import logging
1919

2020
import nuvolaris.config as cfg
21-
# from pymilvus import MilvusClient, connections, db
2221
from nuvolaris.milvus_simple_client import MilvusSimpleClient as MilvusClient
2322

2423

2524
class MilvusAdminClient:
2625
"""
27-
Simple Milvus Client used to perform Mivlus administration Tasks
26+
Simple Milvus Client used to perform Milvus administration Tasks
2827
"""
2928
def __init__(self, db_name="default"):
3029
self.admin_username = cfg.get("milvus.admin.user", "MILVUS_ROOT_USER", "root")

nuvolaris/milvus_standalone.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
#
18+
import time
1819

1920
import kopf, logging
2021
import nuvolaris.kube as kube
@@ -89,6 +90,13 @@ def create(owner=None):
8990
kube.apply(mspec)
9091
util.wait_for_pod_ready(
9192
r"{.items[?(@.metadata.labels.app\.kubernetes\.io\/instance == 'nuvolaris-milvus')].metadata.name}")
93+
94+
milvus_api_host = cfg.get("milvus.host", "MILVUS_API_HOST", "nuvolaris-milvus")
95+
milvus_api_port = cfg.get("milvus.host", "MILVUS_API_PORT", "19530")
96+
97+
logging.info("*** waiting for milvus api to be available")
98+
util.wait_for_http(f"http://{milvus_api_host}:{milvus_api_port}", up_statuses=[200,401], timeout=30)
99+
92100
res = create_default_milvus_database(data)
93101
logging.info("*** created a milvus standalone instance")
94102

nuvolaris/util.py

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,22 @@
1616
# under the License.
1717
#
1818
# this module wraps utilities functions
19-
import nuvolaris.kube as kube
2019
import logging
21-
import time, random, math, os
22-
import nuvolaris.config as cfg
20+
import math
21+
import random
22+
import time
2323
import uuid
24-
import nuvolaris.apihost_util as apihost_util
2524
from base64 import b64decode, b64encode
25+
from typing import List, Union
26+
from urllib.parse import urlparse
27+
28+
import urllib3
29+
from urllib3.exceptions import NewConnectionError, MaxRetryError, ProtocolError
30+
31+
import nuvolaris.apihost_util as apihost_util
32+
import nuvolaris.config as cfg
33+
import nuvolaris.kube as kube
34+
2635

2736
# Implements truncated exponential backoff from
2837
# https://cloud.google.com/storage/docs/retry-strategy#exponential-backoff
@@ -190,6 +199,74 @@ def wait_for_pod_ready(pod_name_jsonpath, timeout="600s", namespace="nuvolaris")
190199
except Exception as e:
191200
logging.error(e)
192201

202+
203+
def status_matches(code: int, allowed: List[Union[int, str]]) -> bool:
204+
"""Check if the status code matches any allowed pattern."""
205+
for pattern in allowed:
206+
if isinstance(pattern, int) and code == pattern:
207+
return True
208+
if isinstance(pattern, str) and len(pattern) == 3 and pattern.endswith("XX"):
209+
if int(pattern[0]) == code // 100:
210+
return True
211+
return False
212+
213+
def status_matches(code: int, allowed: List[Union[int, str]]) -> bool:
214+
"""Check if the status code matches any allowed pattern."""
215+
for pattern in allowed:
216+
if isinstance(pattern, int) and code == pattern:
217+
return True
218+
if isinstance(pattern, str) and len(pattern) == 3 and pattern.endswith("XX"):
219+
if int(pattern[0]) == code // 100:
220+
return True
221+
return False
222+
223+
def wait_for_http(url: str, timeout: int = 60, up_statuses: List[Union[int, str]] = [200]):
224+
"""Wait until an HTTP endpoint becomes available with an accepted status code.
225+
226+
Args:
227+
url (str): Full URL to check (e.g. http://milvus:9091/healthz)
228+
timeout (int): Total seconds to wait before giving up.
229+
up_statuses (List[Union[int, str]]): Status codes or patterns considered as 'UP'.
230+
231+
Raises:
232+
TimeoutError: If the endpoint doesn't respond with a valid status within the timeout.
233+
"""
234+
parsed = urlparse(url)
235+
scheme = parsed.scheme
236+
host = parsed.hostname
237+
port = parsed.port or (443 if scheme == "https" else 80)
238+
path = parsed.path or "/"
239+
240+
if scheme == "https":
241+
conn = urllib3.connectionpool.HTTPSConnectionPool(host, port=port,
242+
timeout=urllib3.util.Timeout(connect=5.0, read=5.0),
243+
retries=False)
244+
else:
245+
conn = urllib3.connectionpool.HTTPConnectionPool(host, port=port,
246+
timeout=urllib3.util.Timeout(connect=5.0, read=5.0),
247+
retries=False)
248+
249+
deadline = time.time() + timeout
250+
251+
while time.time() < deadline:
252+
try:
253+
response = conn.request("GET", path)
254+
if status_matches(response.status, up_statuses):
255+
logging.info(f"Service is up: {url} (status {response.status})")
256+
return
257+
else:
258+
logging.warning(f"Service responded with {response.status}, not in {up_statuses}. Waiting...")
259+
except (NewConnectionError, MaxRetryError):
260+
logging.warning(f"Cannot connect to {url}, retrying...")
261+
except ProtocolError as e:
262+
if "Connection reset by peer" in str(e):
263+
logging.warning("Connection reset by peer. Sleeping 2 seconds...")
264+
time.sleep(2)
265+
continue
266+
else:
267+
logging.error(f"Protocol error: {e}")
268+
time.sleep(1)
269+
193270
# return mongodb configuration parameter with default valued if not configured
194271
def get_mongodb_config_data():
195272
data = {

0 commit comments

Comments
 (0)